33
33
import org .opensearch .common .util .io .IOUtils ;
34
34
import org .opensearch .core .action .ActionListener ;
35
35
import org .opensearch .core .common .bytes .BytesReference ;
36
+ import org .opensearch .core .index .Index ;
36
37
import org .opensearch .gateway .remote .ClusterMetadataManifest ;
37
38
import org .opensearch .gateway .remote .RemoteStateTransferException ;
38
39
import org .opensearch .gateway .remote .routingtable .RemoteIndexRoutingTable ;
44
45
import org .opensearch .repositories .RepositoriesService ;
45
46
import org .opensearch .repositories .Repository ;
46
47
import org .opensearch .repositories .blobstore .BlobStoreRepository ;
48
+ import org .opensearch .threadpool .ThreadPool ;
47
49
48
50
import java .io .IOException ;
49
51
import java .util .ArrayList ;
50
52
import java .util .List ;
51
53
import java .util .Map ;
54
+ import java .util .Optional ;
55
+ import java .util .concurrent .ExecutorService ;
52
56
import java .util .function .Function ;
53
57
import java .util .function .Supplier ;
54
58
import java .util .stream .Collectors ;
@@ -97,11 +101,13 @@ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponen
97
101
private BlobStoreRepository blobStoreRepository ;
98
102
private RemoteStoreEnums .PathType pathType ;
99
103
private RemoteStoreEnums .PathHashAlgorithm pathHashAlgo ;
104
+ private ThreadPool threadPool ;
100
105
101
106
public InternalRemoteRoutingTableService (
102
107
Supplier <RepositoriesService > repositoriesService ,
103
108
Settings settings ,
104
- ClusterSettings clusterSettings
109
+ ClusterSettings clusterSettings ,
110
+ ThreadPool threadpool
105
111
) {
106
112
assert isRemoteRoutingTableEnabled (settings ) : "Remote routing table is not enabled" ;
107
113
this .repositoriesService = repositoriesService ;
@@ -110,6 +116,7 @@ public InternalRemoteRoutingTableService(
110
116
this .pathHashAlgo = clusterSettings .get (REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING );
111
117
clusterSettings .addSettingsUpdateConsumer (REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING , this ::setPathTypeSetting );
112
118
clusterSettings .addSettingsUpdateConsumer (REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING , this ::setPathHashAlgoSetting );
119
+ this .threadPool = threadpool ;
113
120
}
114
121
115
122
private void setPathTypeSetting (RemoteStoreEnums .PathType pathType ) {
@@ -266,6 +273,68 @@ private void uploadIndex(
266
273
}
267
274
}
268
275
276
+ @ Override
277
+ public CheckedRunnable <IOException > getAsyncIndexRoutingReadAction (
278
+ String uploadedFilename ,
279
+ Index index ,
280
+ LatchedActionListener <IndexRoutingTable > latchedActionListener
281
+ ) {
282
+ int idx = uploadedFilename .lastIndexOf ("/" );
283
+ String blobFileName = uploadedFilename .substring (idx + 1 );
284
+ BlobContainer blobContainer = blobStoreRepository .blobStore ()
285
+ .blobContainer (BlobPath .cleanPath ().add (uploadedFilename .substring (0 , idx )));
286
+
287
+ return () -> readAsync (
288
+ blobContainer ,
289
+ blobFileName ,
290
+ index ,
291
+ threadPool .executor (ThreadPool .Names .REMOTE_STATE_READ ),
292
+ ActionListener .wrap (
293
+ response -> latchedActionListener .onResponse (response .getIndexRoutingTable ()),
294
+ latchedActionListener ::onFailure
295
+ )
296
+ );
297
+ }
298
+
299
+ private void readAsync (
300
+ BlobContainer blobContainer ,
301
+ String name ,
302
+ Index index ,
303
+ ExecutorService executorService ,
304
+ ActionListener <RemoteIndexRoutingTable > listener
305
+ ) {
306
+ executorService .execute (() -> {
307
+ try {
308
+ listener .onResponse (read (blobContainer , name , index ));
309
+ } catch (Exception e ) {
310
+ listener .onFailure (e );
311
+ }
312
+ });
313
+ }
314
+
315
+ private RemoteIndexRoutingTable read (BlobContainer blobContainer , String path , Index index ) {
316
+ try {
317
+ return new RemoteIndexRoutingTable (blobContainer .readBlob (path ), index );
318
+ } catch (IOException | AssertionError e ) {
319
+ logger .error (() -> new ParameterizedMessage ("RoutingTable read failed for path {}" , path ), e );
320
+ throw new RemoteStateTransferException ("Failed to read RemoteRoutingTable from Manifest with error " , e );
321
+ }
322
+ }
323
+
324
+ @ Override
325
+ public List <ClusterMetadataManifest .UploadedIndexMetadata > getUpdatedIndexRoutingTableMetadata (
326
+ List <String > updatedIndicesRouting ,
327
+ List <ClusterMetadataManifest .UploadedIndexMetadata > allIndicesRouting
328
+ ) {
329
+ return updatedIndicesRouting .stream ().map (idx -> {
330
+ Optional <ClusterMetadataManifest .UploadedIndexMetadata > uploadedIndexMetadataOptional = allIndicesRouting .stream ()
331
+ .filter (idx2 -> idx2 .getIndexName ().equals (idx ))
332
+ .findFirst ();
333
+ assert uploadedIndexMetadataOptional .isPresent () == true ;
334
+ return uploadedIndexMetadataOptional .get ();
335
+ }).collect (Collectors .toList ());
336
+ }
337
+
269
338
private String getIndexRoutingFileName (long term , long version ) {
270
339
return String .join (
271
340
DELIMITER ,
0 commit comments