13
13
import org .apache .lucene .store .IndexInput ;
14
14
import org .opensearch .action .LatchedActionListener ;
15
15
import org .opensearch .cluster .ClusterState ;
16
+ import org .opensearch .cluster .Diff ;
17
+ import org .opensearch .cluster .DiffableUtils ;
16
18
import org .opensearch .cluster .routing .IndexRoutingTable ;
17
19
import org .opensearch .cluster .routing .RoutingTable ;
18
20
import org .opensearch .common .CheckedRunnable ;
28
30
import org .opensearch .common .settings .Settings ;
29
31
import org .opensearch .common .util .io .IOUtils ;
30
32
import org .opensearch .core .action .ActionListener ;
33
+ import org .opensearch .core .common .io .stream .StreamInput ;
34
+ import org .opensearch .core .common .io .stream .StreamOutput ;
31
35
import org .opensearch .core .index .Index ;
32
36
import org .opensearch .gateway .remote .ClusterMetadataManifest ;
33
37
import org .opensearch .gateway .remote .RemoteClusterStateUtils ;
@@ -88,6 +92,19 @@ public class RemoteRoutingTableService implements Closeable {
88
92
private BlobStoreRepository blobStoreRepository ;
89
93
private final ThreadPool threadPool ;
90
94
95
+ private static final DiffableUtils .NonDiffableValueSerializer <String , IndexRoutingTable > CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = new DiffableUtils .NonDiffableValueSerializer <String , IndexRoutingTable >() {
96
+ @ Override
97
+ public void write (IndexRoutingTable value , StreamOutput out ) throws IOException {
98
+ value .writeTo (out );
99
+ }
100
+
101
+ @ Override
102
+ public IndexRoutingTable read (StreamInput in , String key ) throws IOException {
103
+ return IndexRoutingTable .readFrom (in );
104
+ }
105
+ };
106
+
107
+
91
108
public RemoteRoutingTableService (Supplier <RepositoriesService > repositoriesService ,
92
109
Settings settings ,
93
110
ThreadPool threadPool ) {
@@ -97,19 +114,6 @@ public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesServi
97
114
this .threadPool = threadPool ;
98
115
}
99
116
100
- public List <IndexRoutingTable > getChangedIndicesRouting ( ClusterState previousClusterState ,
101
- ClusterState clusterState ) {
102
- Map <String , IndexRoutingTable > previousIndexRoutingTable = previousClusterState .getRoutingTable ().getIndicesRouting ();
103
- List <IndexRoutingTable > changedIndicesRouting = new ArrayList <>();
104
- for (IndexRoutingTable indexRouting : clusterState .getRoutingTable ().getIndicesRouting ().values ()) {
105
- if (!(previousIndexRoutingTable .containsKey (indexRouting .getIndex ().getName ()) && indexRouting .equals (previousIndexRoutingTable .get (indexRouting .getIndex ().getName ())))) {
106
- changedIndicesRouting .add (indexRouting );
107
- logger .info ("changedIndicesRouting {}" , indexRouting .prettyPrint ());
108
- }
109
- }
110
-
111
- return changedIndicesRouting ;
112
- }
113
117
114
118
public CheckedRunnable <IOException > getIndexRoutingAsyncAction (
115
119
ClusterState clusterState ,
@@ -202,7 +206,7 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
202
206
uploadedIndexRouting -> allUploadedIndicesRouting .put (uploadedIndexRouting .getIndexName (), uploadedIndexRouting )
203
207
);
204
208
205
- indicesRoutingToDelete .forEach (index -> allUploadedIndicesRouting . remove ( index ) );
209
+ indicesRoutingToDelete .forEach (allUploadedIndicesRouting :: remove );
206
210
207
211
logger .info ("allUploadedIndicesRouting ROUTING {}" , allUploadedIndicesRouting );
208
212
@@ -274,34 +278,17 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutin
274
278
}).collect (Collectors .toList ());
275
279
}
276
280
277
- public static List <String > getIndicesRoutingDeleted (RoutingTable previousRoutingTable , RoutingTable currentRoutingTable ) {
278
- List <String > deletedIndicesRouting = new ArrayList <>();
279
- for (IndexRoutingTable previousIndexRouting : previousRoutingTable .getIndicesRouting ().values ()) {
280
- if (!currentRoutingTable .getIndicesRouting ().containsKey (previousIndexRouting .getIndex ().getName ())) {
281
- // Latest Routing Table does not have entry for the index which means the index is deleted
282
- deletedIndicesRouting .add (previousIndexRouting .getIndex ().getName ());
283
- }
284
- }
285
- return deletedIndicesRouting ;
286
- }
287
281
288
- public static List <String > getIndicesRoutingUpdated (RoutingTable previousRoutingTable , RoutingTable currentRoutingTable ) {
289
- List <String > updatedIndicesRouting = new ArrayList <>();
290
- for (IndexRoutingTable currentIndicesRouting : currentRoutingTable .getIndicesRouting ().values ()) {
291
- if (!previousRoutingTable .getIndicesRouting ().containsKey (currentIndicesRouting .getIndex ().getName ())) {
292
- // Latest Routing Table does not have entry for the index which means the index is created
293
- updatedIndicesRouting .add (currentIndicesRouting .getIndex ().getName ());
294
- } else {
295
- if (previousRoutingTable .getIndicesRouting ().get (currentIndicesRouting .getIndex ().getName ()).equals (currentIndicesRouting )) {
296
- // if the latest routing table has the same routing table as the previous routing table, then the index is not updated
297
- continue ;
298
- }
299
- updatedIndicesRouting .add (currentIndicesRouting .getIndex ().getName ());
300
- }
301
- }
302
- return updatedIndicesRouting ;
282
+ public static DiffableUtils .MapDiff <String , IndexRoutingTable , Map <String , IndexRoutingTable >> getIndicesRoutingMapDiff (RoutingTable before , RoutingTable after ) {
283
+ return DiffableUtils .diff (
284
+ before .getIndicesRouting (),
285
+ after .getIndicesRouting (),
286
+ DiffableUtils .getStringKeySerializer (),
287
+ CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER
288
+ );
303
289
}
304
290
291
+
305
292
@ Override
306
293
public void close () throws IOException {
307
294
if (blobStoreRepository != null ) {
0 commit comments