10
10
11
11
import org .apache .logging .log4j .LogManager ;
12
12
import org .apache .logging .log4j .Logger ;
13
- import org .opensearch . Version ;
13
+ import org .apache . lucene . store . IndexInput ;
14
14
import org .opensearch .action .LatchedActionListener ;
15
15
import org .opensearch .cluster .ClusterState ;
16
16
import org .opensearch .cluster .routing .IndexRoutingTable ;
17
17
import org .opensearch .cluster .routing .RoutingTable ;
18
18
import org .opensearch .common .CheckedRunnable ;
19
+ import org .opensearch .common .blobstore .AsyncMultiStreamBlobContainer ;
19
20
import org .opensearch .common .blobstore .BlobContainer ;
20
21
import org .opensearch .common .blobstore .BlobPath ;
21
- import org .opensearch .common .io .stream .BytesStreamOutput ;
22
+ import org .opensearch .common .blobstore .stream .write .WritePriority ;
23
+ import org .opensearch .common .blobstore .transfer .RemoteTransferContainer ;
24
+ import org .opensearch .common .blobstore .transfer .stream .OffsetRangeIndexInputStream ;
22
25
23
- import org .opensearch .common .blobstore .BlobContainer ;
24
- import org .opensearch .common .settings .ClusterSettings ;
26
+ import org .opensearch .common .lucene .store .ByteArrayIndexInput ;
25
27
import org .opensearch .common .settings .Setting ;
26
28
import org .opensearch .common .settings .Settings ;
27
29
import org .opensearch .common .util .io .IOUtils ;
28
30
import org .opensearch .core .action .ActionListener ;
29
- import org .opensearch .core .common .io .stream .StreamOutput ;
30
31
import org .opensearch .core .index .Index ;
31
- import org .opensearch .core .xcontent .NamedXContentRegistry ;
32
- import org .opensearch .core .xcontent .ToXContent ;
33
32
import org .opensearch .gateway .remote .ClusterMetadataManifest ;
34
- import org .opensearch .gateway .remote .RemoteClusterStateService ;
35
33
import org .opensearch .gateway .remote .RemoteClusterStateUtils ;
36
34
import org .opensearch .gateway .remote .routingtable .IndexRoutingTableInputStream ;
37
35
import org .opensearch .gateway .remote .routingtable .IndexRoutingTableInputStreamReader ;
36
+ import org .opensearch .index .remote .RemoteStoreEnums ;
37
+ import org .opensearch .index .remote .RemoteStorePathStrategy ;
38
38
import org .opensearch .index .remote .RemoteStoreUtils ;
39
39
import org .opensearch .node .Node ;
40
40
import org .opensearch .node .remotestore .RemoteStoreNodeAttribute ;
41
41
import org .opensearch .repositories .RepositoriesService ;
42
42
import org .opensearch .repositories .Repository ;
43
43
import org .opensearch .repositories .blobstore .BlobStoreRepository ;
44
- import org .opensearch .repositories .blobstore .ChecksumBlobStoreFormat ;
45
44
import org .opensearch .threadpool .ThreadPool ;
46
45
47
- import java .io .*;
48
46
import java .io .Closeable ;
49
47
import java .io .IOException ;
50
48
53
51
import java .util .HashMap ;
54
52
import java .util .List ;
55
53
import java .util .Map ;
54
+ import java .util .Set ;
56
55
import java .util .concurrent .ExecutorService ;
57
56
import java .util .function .Function ;
58
57
import java .util .function .Supplier ;
59
58
import java .util .stream .Collectors ;
60
59
60
+ import static org .opensearch .common .blobstore .transfer .RemoteTransferContainer .checksumOfChecksum ;
61
61
import static org .opensearch .gateway .remote .RemoteClusterStateUtils .getCusterMetadataBasePath ;
62
62
import static org .opensearch .node .remotestore .RemoteStoreNodeAttribute .isRemoteRoutingTableEnabled ;
63
63
@@ -78,100 +78,157 @@ public class RemoteRoutingTableService implements Closeable {
78
78
Setting .Property .Final
79
79
);
80
80
public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing" ;
81
+ public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing" ;
81
82
public static final String DELIMITER = "__" ;
82
-
83
+ public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--" ;
83
84
private static final Logger logger = LogManager .getLogger (RemoteRoutingTableService .class );
84
85
private final Settings settings ;
85
86
private final Supplier <RepositoriesService > repositoriesService ;
86
- private final ClusterSettings clusterSettings ;
87
87
private BlobStoreRepository blobStoreRepository ;
88
88
private final ThreadPool threadPool ;
89
89
90
90
public RemoteRoutingTableService (Supplier <RepositoriesService > repositoriesService ,
91
91
Settings settings ,
92
- ClusterSettings clusterSettings , ThreadPool threadPool ) {
92
+ ThreadPool threadPool ) {
93
93
assert isRemoteRoutingTableEnabled (settings ) : "Remote routing table is not enabled" ;
94
94
this .repositoriesService = repositoriesService ;
95
95
this .settings = settings ;
96
- this .clusterSettings = clusterSettings ;
97
96
this .threadPool = threadPool ;
98
97
}
99
98
100
- public List <ClusterMetadataManifest .UploadedIndexMetadata > writeFullRoutingTable (ClusterState clusterState , String previousClusterUUID ) {
99
+ public List <IndexRoutingTable > getChangedIndicesRouting ( ClusterState previousClusterState ,
100
+ ClusterState clusterState ) {
101
+ Map <String , IndexRoutingTable > previousIndexRoutingTable = previousClusterState .getRoutingTable ().getIndicesRouting ();
102
+ List <IndexRoutingTable > changedIndicesRouting = new ArrayList <>();
103
+ for (IndexRoutingTable indexRouting : clusterState .getRoutingTable ().getIndicesRouting ().values ()) {
104
+ if (!(previousIndexRoutingTable .containsKey (indexRouting .getIndex ().getName ()) && indexRouting .equals (previousIndexRoutingTable .get (indexRouting .getIndex ().getName ())))) {
105
+ changedIndicesRouting .add (indexRouting );
106
+ logger .info ("changedIndicesRouting {}" , indexRouting .prettyPrint ());
107
+ }
108
+ }
101
109
110
+ return changedIndicesRouting ;
111
+ }
112
+
113
+ public CheckedRunnable <IOException > getIndexRoutingAsyncAction (
114
+ ClusterState clusterState ,
115
+ IndexRoutingTable indexRouting ,
116
+ LatchedActionListener <ClusterMetadataManifest .UploadedMetadata > latchedActionListener
117
+ ) throws IOException {
102
118
103
- //batch index count and parallelize
104
- RoutingTable currentRoutingTable = clusterState .getRoutingTable ();
105
- List <ClusterMetadataManifest .UploadedIndexMetadata > uploadedIndices = new ArrayList <>();
106
119
BlobPath custerMetadataBasePath = getCusterMetadataBasePath (blobStoreRepository , clusterState .getClusterName ().value (),
107
- clusterState .metadata ().clusterUUID ());
108
- for (IndexRoutingTable indexRouting : currentRoutingTable .getIndicesRouting ().values ()) {
109
- uploadedIndices .add (uploadIndex (indexRouting , custerMetadataBasePath ));
120
+ clusterState .metadata ().clusterUUID ()).add (INDEX_ROUTING_PATH_TOKEN );
121
+ logger .info ("custerMetadataBasePath {}" , custerMetadataBasePath );
122
+
123
+ BlobPath path = RemoteStoreEnums .PathType .HASHED_PREFIX .path (RemoteStorePathStrategy .PathInput .builder ()
124
+ .basePath (custerMetadataBasePath )
125
+ .indexUUID (indexRouting .getIndex ().getUUID ())
126
+ .build (),
127
+ RemoteStoreEnums .PathHashAlgorithm .FNV_1A_BASE64 );
128
+ logger .info ("path from prefix hasd {}" , path );
129
+ final BlobContainer blobContainer = blobStoreRepository .blobStore ().blobContainer (path );
130
+
131
+ final String fileName = getIndexRoutingFileName ();
132
+ logger .info ("fileName {}" , fileName );
133
+
134
+ ActionListener <Void > completionListener = ActionListener .wrap (
135
+ resp -> latchedActionListener .onResponse (
136
+ new ClusterMetadataManifest .UploadedIndexMetadata (
137
+
138
+ indexRouting .getIndex ().getName (),
139
+ indexRouting .getIndex ().getUUID (),
140
+ path .buildAsString () + fileName ,
141
+ INDEX_ROUTING_METADATA_PREFIX
142
+ )
143
+ ),
144
+ ex -> latchedActionListener .onFailure (new RemoteClusterStateUtils .RemoteStateTransferException (indexRouting .getIndex ().toString (), ex ))
145
+ );
146
+
147
+ if (blobContainer instanceof AsyncMultiStreamBlobContainer == false ) {
148
+ logger .info ("TRYING FILE UPLOAD" );
149
+
150
+ return () -> {
151
+ logger .info ("Going to upload {}" , indexRouting .prettyPrint ());
152
+
153
+ uploadIndex (indexRouting , fileName , blobContainer );
154
+ logger .info ("upload done {}" , indexRouting .prettyPrint ());
155
+
156
+ completionListener .onResponse (null );
157
+ logger .info ("response done {}" , indexRouting .prettyPrint ());
158
+
159
+ };
110
160
}
111
- logger .info ("uploadedIndices {}" , uploadedIndices );
112
161
113
- return uploadedIndices ;
162
+ logger .info ("TRYING S3 UPLOAD" );
163
+
164
+ //TODO: Integrate with S3AsyncCrtClient for using buffered stream directly with putObject.
165
+ try (
166
+ InputStream indexRoutingStream = new IndexRoutingTableInputStream (indexRouting );
167
+ IndexInput input = new ByteArrayIndexInput ("indexrouting" , indexRoutingStream .readAllBytes ())) {
168
+ long expectedChecksum ;
169
+ try {
170
+ expectedChecksum = checksumOfChecksum (input .clone (), 8 );
171
+ } catch (Exception e ) {
172
+ throw e ;
173
+ }
174
+ try (
175
+ RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer (
176
+ fileName ,
177
+ fileName ,
178
+ input .length (),
179
+ true ,
180
+ WritePriority .URGENT ,
181
+ (size , position ) -> new OffsetRangeIndexInputStream (input , size , position ),
182
+ expectedChecksum ,
183
+ ((AsyncMultiStreamBlobContainer ) blobContainer ).remoteIntegrityCheckSupported ()
184
+ )
185
+ ) {
186
+ return () -> ((AsyncMultiStreamBlobContainer ) blobContainer ).asyncBlobUpload (remoteTransferContainer .createWriteContext (), completionListener );
187
+ } catch (IOException e ) {
188
+ e .printStackTrace ();
189
+ return null ;
190
+ }
191
+ }
114
192
}
115
193
116
- public List <ClusterMetadataManifest .UploadedIndexMetadata > writeIncrementalRoutingTable (
117
- ClusterState previousClusterState ,
118
- ClusterState clusterState ,
119
- ClusterMetadataManifest previousManifest ) {
120
194
195
+ public List <ClusterMetadataManifest .UploadedIndexMetadata > getAllUploadedIndicesRouting (ClusterMetadataManifest previousManifest , List <ClusterMetadataManifest .UploadedIndexMetadata > indicesRoutingToUpload , Set <String > indicesRoutingToDelete ) {
121
196
final Map <String , ClusterMetadataManifest .UploadedIndexMetadata > allUploadedIndicesRouting = previousManifest .getIndicesRouting ()
122
197
.stream ()
123
198
.collect (Collectors .toMap (ClusterMetadataManifest .UploadedIndexMetadata ::getIndexName , Function .identity ()));
199
+
200
+ indicesRoutingToUpload .forEach (
201
+ uploadedIndexRouting -> allUploadedIndicesRouting .put (uploadedIndexRouting .getIndexName (), uploadedIndexRouting )
202
+ );
203
+
204
+ indicesRoutingToDelete .forEach (index -> allUploadedIndicesRouting .remove (index ));
205
+
124
206
logger .info ("allUploadedIndicesRouting ROUTING {}" , allUploadedIndicesRouting );
125
207
126
- Map <String , IndexRoutingTable > previousIndexRoutingTable = previousClusterState .getRoutingTable ().getIndicesRouting ();
127
- List <ClusterMetadataManifest .UploadedIndexMetadata > uploadedIndices = new ArrayList <>();
128
- BlobPath custerMetadataBasePath = getCusterMetadataBasePath (blobStoreRepository , clusterState .getClusterName ().value (),
129
- clusterState .metadata ().clusterUUID ());
130
- for (IndexRoutingTable indexRouting : clusterState .getRoutingTable ().getIndicesRouting ().values ()) {
131
- if (previousIndexRoutingTable .containsKey (indexRouting .getIndex ().getName ()) && indexRouting .equals (previousIndexRoutingTable .get (indexRouting .getIndex ().getName ()))) {
132
- logger .info ("index exists {}" , indexRouting .getIndex ().getName ());
133
- //existing index with no shard change.
134
- uploadedIndices .add (allUploadedIndicesRouting .get (indexRouting .getIndex ().getName ()));
135
- } else {
136
- // new index or shards changed, in both cases we upload new index file.
137
- uploadedIndices .add (uploadIndex (indexRouting , custerMetadataBasePath ));
138
- }
139
- }
140
- return uploadedIndices ;
208
+ return new ArrayList <>(allUploadedIndicesRouting .values ());
141
209
}
142
210
143
- private ClusterMetadataManifest .UploadedIndexMetadata uploadIndex (IndexRoutingTable indexRouting , BlobPath custerMetadataBasePath ) {
211
+ private void uploadIndex (IndexRoutingTable indexRouting , String fileName , BlobContainer container ) {
212
+ logger .info ("Starting write" );
213
+
144
214
try {
145
215
InputStream indexRoutingStream = new IndexRoutingTableInputStream (indexRouting );
146
- BlobContainer container = blobStoreRepository .blobStore ().blobContainer (custerMetadataBasePath .add (INDEX_ROUTING_PATH_TOKEN ).add (indexRouting .getIndex ().getUUID ()));
147
- String indexRoutingFileName = getIndexRoutingFileName ();
148
- container .writeBlob (indexRoutingFileName , indexRoutingStream , 4096 , true );
149
- return new ClusterMetadataManifest .UploadedIndexMetadata (indexRouting .getIndex ().getName (), indexRouting .getIndex ().getUUID (), container .path ().buildAsString () + indexRoutingFileName );
150
-
216
+ container .writeBlob (fileName , indexRoutingStream , 4096 , true );
217
+ logger .info ("SUccessful write" );
151
218
} catch (IOException e ) {
152
219
logger .error ("Failed to write {}" , e );
153
220
}
154
- logger .info ("SUccessful write" );
155
- return null ;
156
221
}
157
222
158
223
private String getIndexRoutingFileName () {
159
224
return String .join (
160
225
DELIMITER ,
161
- //RemoteStoreUtils.invertLong(indexMetadata.getVersion()),
162
- RemoteStoreUtils .invertLong (System .currentTimeMillis ()),
163
- String .valueOf ("CODEC1" ) // Keep the codec version at last place only, during read we reads last
164
- // place to determine codec version.
226
+ INDEX_ROUTING_FILE_PREFIX ,
227
+ RemoteStoreUtils .invertLong (System .currentTimeMillis ())
165
228
);
166
229
167
230
}
168
- public RoutingTable getLatestRoutingTable (String clusterName , String clusterUUID ) {
169
- return null ;
170
- }
171
231
172
- public RoutingTable getIncrementalRoutingTable (ClusterState previousClusterState , ClusterMetadataManifest previousManifest , String clusterName , String clusterUUID ) {
173
- return null ;
174
- }
175
232
176
233
public RoutingTable getIncrementalRoutingTable (ClusterState previousClusterState , ClusterMetadataManifest manifest ){
177
234
List <String > indicesRoutingDeleted = manifest .getDiffManifest ().getIndicesRoutingDeleted ();
@@ -232,9 +289,10 @@ public CheckedRunnable<IOException> getAsyncIndexMetadataReadAction(
232
289
String uploadedFilename ,
233
290
Index index ,
234
291
LatchedActionListener <RemoteIndexRoutingResult > latchedActionListener ) {
235
- BlobContainer blobContainer = blobStoreRepository .blobStore ().blobContainer (getCusterMetadataBasePath (blobStoreRepository , clusterName , clusterUUID ).add (INDEX_ROUTING_PATH_TOKEN ).add (index .getUUID ()));
236
- String [] fileNameTokens = uploadedFilename .split ("/" );
237
- String blobFileName = fileNameTokens [fileNameTokens .length -1 ];
292
+ int idx = uploadedFilename .lastIndexOf ("/" );
293
+ String blobFileName = uploadedFilename .substring (idx +1 );
294
+ BlobContainer blobContainer = blobStoreRepository .blobStore ().blobContainer ( BlobPath .cleanPath ().add (uploadedFilename .substring (0 ,idx )));
295
+
238
296
return () -> readAsync (
239
297
blobContainer ,
240
298
blobFileName ,
@@ -262,8 +320,6 @@ public IndexRoutingTableInputStreamReader read(BlobContainer blobContainer, Stri
262
320
}
263
321
return null ;
264
322
}
265
- private void deleteStaleRoutingTable (String clusterName , String clusterUUID , int manifestsToRetain ) {
266
- }
267
323
268
324
@ Override
269
325
public void close () throws IOException {
@@ -300,4 +356,5 @@ public IndexRoutingTable getIndexRoutingTable() {
300
356
return indexRoutingTable ;
301
357
}
302
358
}
359
+
303
360
}
0 commit comments