11
11
import org .apache .logging .log4j .LogManager ;
12
12
import org .apache .logging .log4j .Logger ;
13
13
import org .apache .logging .log4j .message .ParameterizedMessage ;
14
- import org .apache .lucene .store .IndexInput ;
15
14
import org .opensearch .action .LatchedActionListener ;
16
- import org .opensearch .cluster .ClusterState ;
17
15
import org .opensearch .cluster .DiffableUtils ;
18
16
import org .opensearch .cluster .routing .IndexRoutingTable ;
19
17
import org .opensearch .cluster .routing .RoutingTable ;
20
18
import org .opensearch .common .CheckedRunnable ;
21
- import org .opensearch .common .blobstore .AsyncMultiStreamBlobContainer ;
22
- import org .opensearch .common .blobstore .BlobContainer ;
23
19
import org .opensearch .common .blobstore .BlobPath ;
24
- import org .opensearch .common .blobstore .stream .write .WritePriority ;
25
- import org .opensearch .common .blobstore .transfer .RemoteTransferContainer ;
26
- import org .opensearch .common .blobstore .transfer .stream .OffsetRangeIndexInputStream ;
27
- import org .opensearch .common .io .stream .BytesStreamOutput ;
28
20
import org .opensearch .common .lifecycle .AbstractLifecycleComponent ;
29
- import org .opensearch .common .lucene . store . ByteArrayIndexInput ;
21
+ import org .opensearch .common .remote . RemoteWritableEntityStore ;
30
22
import org .opensearch .common .settings .ClusterSettings ;
31
- import org .opensearch .common .settings .Setting ;
32
23
import org .opensearch .common .settings .Settings ;
33
24
import org .opensearch .common .util .io .IOUtils ;
34
25
import org .opensearch .core .action .ActionListener ;
35
- import org .opensearch .core .common .bytes .BytesReference ;
36
- import org .opensearch .core .index .Index ;
26
+ import org .opensearch .core .compress .Compressor ;
37
27
import org .opensearch .gateway .remote .ClusterMetadataManifest ;
38
28
import org .opensearch .gateway .remote .RemoteStateTransferException ;
29
+ import org .opensearch .gateway .remote .model .RemoteRoutingTableBlobStore ;
39
30
import org .opensearch .gateway .remote .routingtable .RemoteIndexRoutingTable ;
40
- import org .opensearch .index .remote .RemoteStoreEnums ;
41
- import org .opensearch .index .remote .RemoteStorePathStrategy ;
42
- import org .opensearch .index .remote .RemoteStoreUtils ;
31
+ import org .opensearch .index .translog .transfer .BlobStoreTransferService ;
43
32
import org .opensearch .node .Node ;
44
33
import org .opensearch .node .remotestore .RemoteStoreNodeAttribute ;
45
34
import org .opensearch .repositories .RepositoriesService ;
52
41
import java .util .List ;
53
42
import java .util .Map ;
54
43
import java .util .Optional ;
55
- import java .util .concurrent .ExecutorService ;
56
44
import java .util .function .Function ;
57
45
import java .util .function .Supplier ;
58
46
import java .util .stream .Collectors ;
59
47
60
- import static org .opensearch .gateway .remote .RemoteClusterStateUtils .DELIMITER ;
61
48
import static org .opensearch .node .remotestore .RemoteStoreNodeAttribute .isRemoteRoutingTableEnabled ;
62
49
63
50
/**
67
54
*/
68
55
public class InternalRemoteRoutingTableService extends AbstractLifecycleComponent implements RemoteRoutingTableService {
69
56
70
- /**
71
- * This setting is used to set the remote routing table store blob store path type strategy.
72
- */
73
- public static final Setting <RemoteStoreEnums .PathType > REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING = new Setting <>(
74
- "cluster.remote_store.routing_table.path_type" ,
75
- RemoteStoreEnums .PathType .HASHED_PREFIX .toString (),
76
- RemoteStoreEnums .PathType ::parseString ,
77
- Setting .Property .NodeScope ,
78
- Setting .Property .Dynamic
79
- );
80
-
81
- /**
82
- * This setting is used to set the remote routing table store blob store path hash algorithm strategy.
83
- * This setting will come to effect if the {@link #REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING}
84
- * is either {@code HASHED_PREFIX} or {@code HASHED_INFIX}.
85
- */
86
- public static final Setting <RemoteStoreEnums .PathHashAlgorithm > REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING = new Setting <>(
87
- "cluster.remote_store.routing_table.path_hash_algo" ,
88
- RemoteStoreEnums .PathHashAlgorithm .FNV_1A_BASE64 .toString (),
89
- RemoteStoreEnums .PathHashAlgorithm ::parseString ,
90
- Setting .Property .NodeScope ,
91
- Setting .Property .Dynamic
92
- );
93
-
94
- public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing" ;
95
- public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing" ;
96
- public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--" ;
97
-
98
57
private static final Logger logger = LogManager .getLogger (InternalRemoteRoutingTableService .class );
99
58
private final Settings settings ;
100
59
private final Supplier <RepositoriesService > repositoriesService ;
60
+ private Compressor compressor ;
61
+ private RemoteWritableEntityStore <IndexRoutingTable , RemoteIndexRoutingTable > remoteIndexRoutingTableStore ;
62
+ private final ClusterSettings clusterSettings ;
101
63
private BlobStoreRepository blobStoreRepository ;
102
- private RemoteStoreEnums .PathType pathType ;
103
- private RemoteStoreEnums .PathHashAlgorithm pathHashAlgo ;
104
- private ThreadPool threadPool ;
64
+ private final ThreadPool threadPool ;
65
+ private final String clusterName ;
105
66
106
67
public InternalRemoteRoutingTableService (
107
68
Supplier <RepositoriesService > repositoriesService ,
108
69
Settings settings ,
109
70
ClusterSettings clusterSettings ,
110
- ThreadPool threadpool
71
+ ThreadPool threadpool ,
72
+ String clusterName
111
73
) {
112
74
assert isRemoteRoutingTableEnabled (settings ) : "Remote routing table is not enabled" ;
113
75
this .repositoriesService = repositoriesService ;
114
76
this .settings = settings ;
115
- this .pathType = clusterSettings .get (REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING );
116
- this .pathHashAlgo = clusterSettings .get (REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING );
117
- clusterSettings .addSettingsUpdateConsumer (REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING , this ::setPathTypeSetting );
118
- clusterSettings .addSettingsUpdateConsumer (REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING , this ::setPathHashAlgoSetting );
119
77
this .threadPool = threadpool ;
120
- }
121
-
122
- private void setPathTypeSetting (RemoteStoreEnums .PathType pathType ) {
123
- this .pathType = pathType ;
124
- }
125
-
126
- private void setPathHashAlgoSetting (RemoteStoreEnums .PathHashAlgorithm pathHashAlgo ) {
127
- this .pathHashAlgo = pathHashAlgo ;
78
+ this .clusterName = clusterName ;
79
+ this .clusterSettings = clusterSettings ;
128
80
}
129
81
130
82
public List <IndexRoutingTable > getIndicesRouting (RoutingTable routingTable ) {
@@ -151,43 +103,32 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
151
103
152
104
/**
153
105
* Create async action for writing one {@code IndexRoutingTable} to remote store
154
- * @param clusterState current cluster state
106
+ * @param term current term
107
+ * @param version current version
108
+ * @param clusterUUID current cluster UUID
155
109
* @param indexRouting indexRoutingTable to write to remote store
156
110
* @param latchedActionListener listener for handling async action response
157
- * @param clusterBasePath base path for remote file
158
111
* @return returns runnable async action
159
112
*/
160
- public CheckedRunnable <IOException > getIndexRoutingAsyncAction (
161
- ClusterState clusterState ,
113
+ @ Override
114
+ public CheckedRunnable <IOException > getAsyncIndexRoutingWriteAction (
115
+ String clusterUUID ,
116
+ long term ,
117
+ long version ,
162
118
IndexRoutingTable indexRouting ,
163
- LatchedActionListener <ClusterMetadataManifest .UploadedMetadata > latchedActionListener ,
164
- BlobPath clusterBasePath
119
+ LatchedActionListener <ClusterMetadataManifest .UploadedMetadata > latchedActionListener
165
120
) {
166
121
167
- BlobPath indexRoutingPath = clusterBasePath .add (INDEX_ROUTING_PATH_TOKEN );
168
- BlobPath path = pathType .path (
169
- RemoteStorePathStrategy .PathInput .builder ().basePath (indexRoutingPath ).indexUUID (indexRouting .getIndex ().getUUID ()).build (),
170
- pathHashAlgo
171
- );
172
- final BlobContainer blobContainer = blobStoreRepository .blobStore ().blobContainer (path );
173
-
174
- final String fileName = getIndexRoutingFileName (clusterState .term (), clusterState .version ());
122
+ RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable (indexRouting , clusterUUID , compressor , term , version );
175
123
176
124
ActionListener <Void > completionListener = ActionListener .wrap (
177
- resp -> latchedActionListener .onResponse (
178
- new ClusterMetadataManifest .UploadedIndexMetadata (
179
- indexRouting .getIndex ().getName (),
180
- indexRouting .getIndex ().getUUID (),
181
- path .buildAsString () + fileName ,
182
- INDEX_ROUTING_METADATA_PREFIX
183
- )
184
- ),
125
+ resp -> latchedActionListener .onResponse (remoteIndexRoutingTable .getUploadedMetadata ()),
185
126
ex -> latchedActionListener .onFailure (
186
127
new RemoteStateTransferException ("Exception in writing index to remote store: " + indexRouting .getIndex ().toString (), ex )
187
128
)
188
129
);
189
130
190
- return () -> uploadIndex ( indexRouting , fileName , blobContainer , completionListener );
131
+ return () -> remoteIndexRoutingTableStore . writeAsync ( remoteIndexRoutingTable , completionListener );
191
132
}
192
133
193
134
/**
@@ -214,111 +155,21 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
214
155
return new ArrayList <>(allUploadedIndicesRouting .values ());
215
156
}
216
157
217
- private void uploadIndex (
218
- IndexRoutingTable indexRouting ,
219
- String fileName ,
220
- BlobContainer blobContainer ,
221
- ActionListener <Void > completionListener
222
- ) {
223
- RemoteIndexRoutingTable indexRoutingInput = new RemoteIndexRoutingTable (indexRouting );
224
- BytesReference bytesInput = null ;
225
- try (BytesStreamOutput streamOutput = new BytesStreamOutput ()) {
226
- indexRoutingInput .writeTo (streamOutput );
227
- bytesInput = streamOutput .bytes ();
228
- } catch (IOException e ) {
229
- logger .error ("Failed to serialize IndexRoutingTable for [{}]: [{}]" , indexRouting , e );
230
- completionListener .onFailure (e );
231
- return ;
232
- }
233
-
234
- if (blobContainer instanceof AsyncMultiStreamBlobContainer == false ) {
235
- try {
236
- blobContainer .writeBlob (fileName , bytesInput .streamInput (), bytesInput .length (), true );
237
- completionListener .onResponse (null );
238
- } catch (IOException e ) {
239
- logger .error ("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]" , indexRouting , e );
240
- completionListener .onFailure (e );
241
- }
242
- return ;
243
- }
244
-
245
- try (IndexInput input = new ByteArrayIndexInput ("indexrouting" , BytesReference .toBytes (bytesInput ))) {
246
- try (
247
- RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer (
248
- fileName ,
249
- fileName ,
250
- input .length (),
251
- true ,
252
- WritePriority .URGENT ,
253
- (size , position ) -> new OffsetRangeIndexInputStream (input , size , position ),
254
- null ,
255
- false
256
- )
257
- ) {
258
- ((AsyncMultiStreamBlobContainer ) blobContainer ).asyncBlobUpload (
259
- remoteTransferContainer .createWriteContext (),
260
- completionListener
261
- );
262
- } catch (IOException e ) {
263
- logger .error ("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]" , indexRouting , e );
264
- completionListener .onFailure (e );
265
- }
266
- } catch (IOException e ) {
267
- logger .error (
268
- "Failed to create transfer object for IndexRoutingTable for remote store upload for indexRouting [{}]: [{}]" ,
269
- indexRouting ,
270
- e
271
- );
272
- completionListener .onFailure (e );
273
- }
274
- }
275
-
276
158
@ Override
277
159
public CheckedRunnable <IOException > getAsyncIndexRoutingReadAction (
160
+ String clusterUUID ,
278
161
String uploadedFilename ,
279
- Index index ,
280
162
LatchedActionListener <IndexRoutingTable > latchedActionListener
281
163
) {
282
- int idx = uploadedFilename .lastIndexOf ("/" );
283
- String blobFileName = uploadedFilename .substring (idx + 1 );
284
- BlobContainer blobContainer = blobStoreRepository .blobStore ()
285
- .blobContainer (BlobPath .cleanPath ().add (uploadedFilename .substring (0 , idx )));
286
164
287
- return () -> readAsync (
288
- blobContainer ,
289
- blobFileName ,
290
- index ,
291
- threadPool .executor (ThreadPool .Names .REMOTE_STATE_READ ),
292
- ActionListener .wrap (
293
- response -> latchedActionListener .onResponse (response .getIndexRoutingTable ()),
294
- latchedActionListener ::onFailure
295
- )
165
+ ActionListener <IndexRoutingTable > actionListener = ActionListener .wrap (
166
+ latchedActionListener ::onResponse ,
167
+ latchedActionListener ::onFailure
296
168
);
297
- }
298
169
299
- private void readAsync (
300
- BlobContainer blobContainer ,
301
- String name ,
302
- Index index ,
303
- ExecutorService executorService ,
304
- ActionListener <RemoteIndexRoutingTable > listener
305
- ) {
306
- executorService .execute (() -> {
307
- try {
308
- listener .onResponse (read (blobContainer , name , index ));
309
- } catch (Exception e ) {
310
- listener .onFailure (e );
311
- }
312
- });
313
- }
170
+ RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable (uploadedFilename , clusterUUID , compressor );
314
171
315
- private RemoteIndexRoutingTable read (BlobContainer blobContainer , String path , Index index ) {
316
- try {
317
- return new RemoteIndexRoutingTable (blobContainer .readBlob (path ), index );
318
- } catch (IOException | AssertionError e ) {
319
- logger .error (() -> new ParameterizedMessage ("RoutingTable read failed for path {}" , path ), e );
320
- throw new RemoteStateTransferException ("Failed to read RemoteRoutingTable from Manifest with error " , e );
321
- }
172
+ return () -> remoteIndexRoutingTableStore .readAsync (remoteIndexRoutingTable , actionListener );
322
173
}
323
174
324
175
@ Override
@@ -335,16 +186,6 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutin
335
186
}).collect (Collectors .toList ());
336
187
}
337
188
338
- private String getIndexRoutingFileName (long term , long version ) {
339
- return String .join (
340
- DELIMITER ,
341
- INDEX_ROUTING_FILE_PREFIX ,
342
- RemoteStoreUtils .invertLong (term ),
343
- RemoteStoreUtils .invertLong (version ),
344
- RemoteStoreUtils .invertLong (System .currentTimeMillis ())
345
- );
346
- }
347
-
348
189
@ Override
349
190
protected void doClose () throws IOException {
350
191
if (blobStoreRepository != null ) {
@@ -362,6 +203,16 @@ protected void doStart() {
362
203
final Repository repository = repositoriesService .get ().repository (remoteStoreRepo );
363
204
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository" ;
364
205
blobStoreRepository = (BlobStoreRepository ) repository ;
206
+ compressor = blobStoreRepository .getCompressor ();
207
+
208
+ this .remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore <>(
209
+ new BlobStoreTransferService (blobStoreRepository .blobStore (), threadPool ),
210
+ blobStoreRepository ,
211
+ clusterName ,
212
+ threadPool ,
213
+ ThreadPool .Names .REMOTE_STATE_READ ,
214
+ clusterSettings
215
+ );
365
216
}
366
217
367
218
@ Override
@@ -377,5 +228,4 @@ public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOExcep
377
228
throw e ;
378
229
}
379
230
}
380
-
381
231
}
0 commit comments