31
31
32
32
package org .opensearch .cluster .coordination ;
33
33
34
+ import java .util .Locale ;
35
+ import java .util .Optional ;
36
+ import java .util .function .Supplier ;
34
37
import org .apache .logging .log4j .LogManager ;
35
38
import org .apache .logging .log4j .Logger ;
36
39
import org .apache .logging .log4j .message .ParameterizedMessage ;
40
43
import org .opensearch .cluster .ClusterState ;
41
44
import org .opensearch .cluster .Diff ;
42
45
import org .opensearch .cluster .IncompatibleClusterStateVersionException ;
46
+ import org .opensearch .cluster .coordination .CoordinationState .PersistedState ;
47
+ import org .opensearch .cluster .coordination .PersistedStateRegistry .PersistedStateType ;
43
48
import org .opensearch .cluster .node .DiscoveryNode ;
44
49
import org .opensearch .cluster .node .DiscoveryNodes ;
45
50
import org .opensearch .core .action .ActionListener ;
46
51
import org .opensearch .core .common .bytes .BytesReference ;
47
52
import org .opensearch .core .common .io .stream .NamedWriteableRegistry ;
48
53
import org .opensearch .core .common .io .stream .StreamInput ;
49
54
import org .opensearch .core .transport .TransportResponse ;
55
+ import org .opensearch .gateway .GatewayMetaState .RemotePersistedState ;
50
56
import org .opensearch .gateway .remote .ClusterMetadataManifest ;
51
57
import org .opensearch .gateway .remote .RemoteClusterStateService ;
52
58
import org .opensearch .threadpool .ThreadPool ;
@@ -229,50 +235,35 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
229
235
}
230
236
231
237
private PublishWithJoinResponse handleIncomingRemotePublishRequest (RemotePublishRequest request ) throws IOException {
232
- final Optional <ClusterMetadataManifest > manifestOptional = remoteClusterStateService .getClusterMetadataManifestByTermVersion (
233
- request .getClusterName (),
234
- request .getClusterUUID (),
235
- request .term ,
236
- request .version
237
- );
238
- if (manifestOptional .isPresent () == false ) {
239
- throw new IllegalStateException (
240
- String .format (Locale .ROOT , "Manifest is not present for term - %s version - %s" , request .term , request .version )
241
- );
238
+ if (transportService .getLocalNode ().equals (request .getSourceNode ())) {
239
+ return acceptStateOnLocalNode (request );
242
240
}
243
- ClusterMetadataManifest manifest = manifestOptional . get ( );
241
+ ClusterMetadataManifest manifest = remoteClusterStateService . getClusterMetadataManifestByFileName ( request . getClusterUUID (), request . getManifestFile () );
244
242
boolean applyFullState = false ;
245
243
final ClusterState lastSeen = lastSeenClusterState .get ();
246
244
if (lastSeen == null ) {
247
- logger .debug ("Diff cannot be applied as there is no last cluster state" );
245
+ logger .debug (() -> "Diff cannot be applied as there is no last cluster state" );
248
246
applyFullState = true ;
249
247
} else if (manifest .getDiffManifest () == null ) {
250
- logger .debug ( "There is no diff in the manifest" );
248
+ logger .trace (() -> "There is no diff in the manifest" );
251
249
applyFullState = true ;
252
250
} else if (manifest .getDiffManifest ().getFromStateUUID ().equals (lastSeen .stateUUID ()) == false ) {
253
- logger .debug ("Last cluster state not compatible with the diff" );
251
+ logger .debug (() -> "Last cluster state not compatible with the diff" );
254
252
applyFullState = true ;
255
253
}
256
254
257
255
if (applyFullState == true ) {
258
- ClusterState clusterState = remoteClusterStateService .getClusterStateForManifest (
259
- request .getClusterName (),
260
- manifest ,
261
- transportService .getLocalNode ().getId ()
262
- );
263
- logger .debug ("Downloaded full cluster state [{}]" , clusterState );
256
+ logger .debug (() -> new ParameterizedMessage ("Downloading full cluster state for term {}, version {}, stateUUID {}" , manifest .getClusterTerm (), manifest .getStateVersion (),
257
+ manifest .getStateUUID ()));
258
+ ClusterState clusterState = remoteClusterStateService .getClusterStateForManifest (request .getClusterName (), manifest , transportService .getLocalNode ().getId (), true );
264
259
fullClusterStateReceivedCount .incrementAndGet ();
265
260
final PublishWithJoinResponse response = acceptState (clusterState );
266
261
lastSeenClusterState .set (clusterState );
267
262
return response ;
268
263
} else {
269
- ClusterState clusterState = remoteClusterStateService .getClusterStateUsingDiff (
270
- request .getClusterName (),
271
- manifest ,
272
- lastSeenClusterState .get (),
273
- transportService .getLocalNode ().getId ()
274
- );
275
- logger .debug ("Downloaded full cluster state from diff [{}]" , clusterState );
264
+ logger .debug (() -> new ParameterizedMessage ("Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}" , manifest .getClusterTerm (),
265
+ manifest .getStateVersion (), manifest .getDiffManifest ().getFromStateUUID (), manifest .getStateUUID ()));
266
+ ClusterState clusterState = remoteClusterStateService .getClusterStateUsingDiff (request .getClusterName (), manifest , lastSeen , transportService .getLocalNode ().getId ());
276
267
compatibleClusterStateDiffReceivedCount .incrementAndGet ();
277
268
final PublishWithJoinResponse response = acceptState (clusterState );
278
269
lastSeenClusterState .compareAndSet (lastSeen , clusterState );
@@ -293,8 +284,20 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) {
293
284
return handlePublishRequest .apply (new PublishRequest (incomingState ));
294
285
}
295
286
296
- public PublicationContext newPublicationContext (ClusterChangedEvent clusterChangedEvent , boolean isRemotePublicationEnabled ) {
297
- final PublicationContext publicationContext = new PublicationContext (clusterChangedEvent , isRemotePublicationEnabled );
287
+ private PublishWithJoinResponse acceptStateOnLocalNode (RemotePublishRequest remotePublishRequest ) {
288
+ final PublishRequest publishRequest = currentPublishRequestToSelf .get ();
289
+ if (publishRequest == null || publishRequest .getAcceptedState ().coordinationMetadata ().term () != remotePublishRequest .term
290
+ || publishRequest .getAcceptedState ().version () != remotePublishRequest .version ) {
291
+ throw new IllegalStateException ("publication to self failed for " + remotePublishRequest );
292
+ }
293
+ PublishWithJoinResponse publishWithJoinResponse = handlePublishRequest .apply (publishRequest );
294
+ lastSeenClusterState .set (publishRequest .getAcceptedState ());
295
+ return publishWithJoinResponse ;
296
+ }
297
+
298
+ public PublicationContext newPublicationContext (ClusterChangedEvent clusterChangedEvent , boolean isRemotePublicationEnabled ,
299
+ PersistedStateRegistry persistedStateRegistry ) {
300
+ final PublicationContext publicationContext = new PublicationContext (clusterChangedEvent , isRemotePublicationEnabled , persistedStateRegistry );
298
301
299
302
// Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication
300
303
// straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and
@@ -340,13 +343,15 @@ public class PublicationContext {
340
343
private final Map <Version , BytesReference > serializedStates = new HashMap <>();
341
344
private final Map <Version , BytesReference > serializedDiffs = new HashMap <>();
342
345
private final boolean sendRemoteState ;
346
+ private final PersistedStateRegistry persistedStateRegistry ;
343
347
344
- PublicationContext (ClusterChangedEvent clusterChangedEvent , boolean isRemotePublicationEnabled ) {
348
+ PublicationContext (ClusterChangedEvent clusterChangedEvent , boolean isRemotePublicationEnabled , PersistedStateRegistry persistedStateRegistry ) {
345
349
discoveryNodes = clusterChangedEvent .state ().nodes ();
346
350
newState = clusterChangedEvent .state ();
347
351
previousState = clusterChangedEvent .previousState ();
348
352
sendFullVersion = previousState .getBlocks ().disableStatePersistence ();
349
353
sendRemoteState = isRemotePublicationEnabled ;
354
+ this .persistedStateRegistry = persistedStateRegistry ;
350
355
}
351
356
352
357
void buildDiffAndSerializeStates () {
@@ -410,7 +415,7 @@ public void onFailure(Exception e) {
410
415
} else {
411
416
responseActionListener = listener ;
412
417
}
413
- if (sendRemoteState && destination .isRemoteStateNode ()) {
418
+ if (sendRemoteState && destination .isRemoteClusterStateEnabled () && destination . isRemoteRoutingTableEnabled ()) {
414
419
sendRemoteClusterState (destination , publishRequest .getAcceptedState (), responseActionListener );
415
420
} else if (sendFullVersion || previousState .nodes ().nodeExists (destination ) == false ) {
416
421
logger .trace ("sending full cluster state version [{}] to [{}]" , newState .version (), destination );
@@ -457,25 +462,16 @@ public String executor() {
457
462
);
458
463
}
459
464
460
- private void sendRemoteClusterState (
461
- DiscoveryNode destination ,
462
- ClusterState clusterState ,
463
- ActionListener <PublishWithJoinResponse > listener
464
- ) {
465
+ private void sendRemoteClusterState (final DiscoveryNode destination , final ClusterState clusterState , final ActionListener <PublishWithJoinResponse > listener ) {
465
466
try {
466
- final RemotePublishRequest remotePublishRequest = new RemotePublishRequest (
467
- discoveryNodes .getLocalNode (),
468
- clusterState .term (),
469
- clusterState .getVersion (),
470
- clusterState .getClusterName ().value (),
471
- clusterState .metadata ().clusterUUID ()
472
- );
467
+ final String manifestFileName = ((RemotePersistedState ) persistedStateRegistry .getPersistedState (PersistedStateType .REMOTE )).getLastUploadedManifestFile ();
468
+ final RemotePublishRequest remotePublishRequest = new RemotePublishRequest (discoveryNodes .getLocalNode (), clusterState .term (),
469
+ clusterState .getVersion (), clusterState .getClusterName ().value (), clusterState .metadata ().clusterUUID (), manifestFileName );
473
470
final Consumer <TransportException > transportExceptionHandler = exp -> {
474
471
logger .debug (() -> new ParameterizedMessage ("failed to send remote cluster state to {}" , destination ), exp );
475
472
listener .onFailure (exp );
476
473
};
477
- final TransportResponseHandler <PublishWithJoinResponse > responseHandler = new TransportResponseHandler <
478
- PublishWithJoinResponse >() {
474
+ final TransportResponseHandler <PublishWithJoinResponse > responseHandler = new TransportResponseHandler <>() {
479
475
480
476
@ Override
481
477
public PublishWithJoinResponse read (StreamInput in ) throws IOException {
@@ -497,13 +493,7 @@ public String executor() {
497
493
return ThreadPool .Names .GENERIC ;
498
494
}
499
495
};
500
- transportService .sendRequest (
501
- destination ,
502
- PUBLISH_REMOTE_STATE_ACTION_NAME ,
503
- remotePublishRequest ,
504
- stateRequestOptions ,
505
- responseHandler
506
- );
496
+ transportService .sendRequest (destination , PUBLISH_REMOTE_STATE_ACTION_NAME , remotePublishRequest , stateRequestOptions , responseHandler );
507
497
} catch (Exception e ) {
508
498
logger .warn (() -> new ParameterizedMessage ("error sending remote cluster state to {}" , destination ), e );
509
499
listener .onFailure (e );
0 commit comments