40
40
import org .opensearch .cluster .ClusterState ;
41
41
import org .opensearch .cluster .Diff ;
42
42
import org .opensearch .cluster .IncompatibleClusterStateVersionException ;
43
+ import org .opensearch .cluster .coordination .PersistedStateRegistry .PersistedStateType ;
43
44
import org .opensearch .cluster .node .DiscoveryNode ;
44
45
import org .opensearch .cluster .node .DiscoveryNodes ;
45
46
import org .opensearch .core .action .ActionListener ;
46
47
import org .opensearch .core .common .bytes .BytesReference ;
47
48
import org .opensearch .core .common .io .stream .NamedWriteableRegistry ;
48
49
import org .opensearch .core .common .io .stream .StreamInput ;
49
50
import org .opensearch .core .transport .TransportResponse ;
51
+ import org .opensearch .gateway .GatewayMetaState .RemotePersistedState ;
52
+ import org .opensearch .gateway .remote .ClusterMetadataManifest ;
53
+ import org .opensearch .gateway .remote .RemoteClusterStateService ;
50
54
import org .opensearch .threadpool .ThreadPool ;
51
55
import org .opensearch .transport .BytesTransportRequest ;
52
56
import org .opensearch .transport .TransportChannel ;
@@ -74,6 +78,7 @@ public class PublicationTransportHandler {
74
78
private static final Logger logger = LogManager .getLogger (PublicationTransportHandler .class );
75
79
76
80
public static final String PUBLISH_STATE_ACTION_NAME = "internal:cluster/coordination/publish_state" ;
81
+ public static final String PUBLISH_REMOTE_STATE_ACTION_NAME = "internal:cluster/coordination/publish_remote_state" ;
77
82
public static final String COMMIT_STATE_ACTION_NAME = "internal:cluster/coordination/commit_state" ;
78
83
79
84
private final TransportService transportService ;
@@ -97,16 +102,19 @@ public class PublicationTransportHandler {
97
102
private final TransportRequestOptions stateRequestOptions = TransportRequestOptions .builder ()
98
103
.withType (TransportRequestOptions .Type .STATE )
99
104
.build ();
105
+ private final RemoteClusterStateService remoteClusterStateService ;
100
106
101
107
public PublicationTransportHandler (
102
108
TransportService transportService ,
103
109
NamedWriteableRegistry namedWriteableRegistry ,
104
110
Function <PublishRequest , PublishWithJoinResponse > handlePublishRequest ,
105
- BiConsumer <ApplyCommitRequest , ActionListener <Void >> handleApplyCommit
111
+ BiConsumer <ApplyCommitRequest , ActionListener <Void >> handleApplyCommit ,
112
+ RemoteClusterStateService remoteClusterStateService
106
113
) {
107
114
this .transportService = transportService ;
108
115
this .namedWriteableRegistry = namedWriteableRegistry ;
109
116
this .handlePublishRequest = handlePublishRequest ;
117
+ this .remoteClusterStateService = remoteClusterStateService ;
110
118
111
119
transportService .registerRequestHandler (
112
120
PUBLISH_STATE_ACTION_NAME ,
@@ -117,6 +125,15 @@ public PublicationTransportHandler(
117
125
(request , channel , task ) -> channel .sendResponse (handleIncomingPublishRequest (request ))
118
126
);
119
127
128
+ transportService .registerRequestHandler (
129
+ PUBLISH_REMOTE_STATE_ACTION_NAME ,
130
+ ThreadPool .Names .GENERIC ,
131
+ false ,
132
+ false ,
133
+ RemotePublishRequest ::new ,
134
+ (request , channel , task ) -> channel .sendResponse (handleIncomingRemotePublishRequest (request ))
135
+ );
136
+
120
137
transportService .registerRequestHandler (
121
138
COMMIT_STATE_ACTION_NAME ,
122
139
ThreadPool .Names .GENERIC ,
@@ -211,6 +228,74 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
211
228
}
212
229
}
213
230
231
+ // package private for testing
232
+ PublishWithJoinResponse handleIncomingRemotePublishRequest (RemotePublishRequest request ) throws IOException {
233
+ if (transportService .getLocalNode ().equals (request .getSourceNode ())) {
234
+ return acceptRemoteStateOnLocalNode (request );
235
+ }
236
+ // TODO Make cluster state download non-blocking: https://github.com/opensearch-project/OpenSearch/issues/14102
237
+ ClusterMetadataManifest manifest = remoteClusterStateService .getClusterMetadataManifestByFileName (
238
+ request .getClusterUUID (),
239
+ request .getManifestFile ()
240
+ );
241
+ if (manifest == null ) {
242
+ throw new IllegalStateException ("Publication failed as manifest was not found for " + request );
243
+ }
244
+ boolean applyFullState = false ;
245
+ final ClusterState lastSeen = lastSeenClusterState .get ();
246
+ if (lastSeen == null ) {
247
+ logger .debug (() -> "Diff cannot be applied as there is no last cluster state" );
248
+ applyFullState = true ;
249
+ } else if (manifest .getDiffManifest () == null ) {
250
+ logger .trace (() -> "There is no diff in the manifest" );
251
+ applyFullState = true ;
252
+ } else if (manifest .getDiffManifest ().getFromStateUUID ().equals (lastSeen .stateUUID ()) == false ) {
253
+ logger .debug (() -> "Last cluster state not compatible with the diff" );
254
+ applyFullState = true ;
255
+ }
256
+
257
+ if (applyFullState == true ) {
258
+ logger .debug (
259
+ () -> new ParameterizedMessage (
260
+ "Downloading full cluster state for term {}, version {}, stateUUID {}" ,
261
+ manifest .getClusterTerm (),
262
+ manifest .getStateVersion (),
263
+ manifest .getStateUUID ()
264
+ )
265
+ );
266
+ ClusterState clusterState = remoteClusterStateService .getClusterStateForManifest (
267
+ request .getClusterName (),
268
+ manifest ,
269
+ transportService .getLocalNode ().getId (),
270
+ true
271
+ );
272
+ fullClusterStateReceivedCount .incrementAndGet ();
273
+ final PublishWithJoinResponse response = acceptState (clusterState );
274
+ lastSeenClusterState .set (clusterState );
275
+ return response ;
276
+ } else {
277
+ logger .debug (
278
+ () -> new ParameterizedMessage (
279
+ "Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}" ,
280
+ manifest .getClusterTerm (),
281
+ manifest .getStateVersion (),
282
+ manifest .getDiffManifest ().getFromStateUUID (),
283
+ manifest .getStateUUID ()
284
+ )
285
+ );
286
+ ClusterState clusterState = remoteClusterStateService .getClusterStateUsingDiff (
287
+ request .getClusterName (),
288
+ manifest ,
289
+ lastSeen ,
290
+ transportService .getLocalNode ().getId ()
291
+ );
292
+ compatibleClusterStateDiffReceivedCount .incrementAndGet ();
293
+ final PublishWithJoinResponse response = acceptState (clusterState );
294
+ lastSeenClusterState .compareAndSet (lastSeen , clusterState );
295
+ return response ;
296
+ }
297
+ }
298
+
214
299
private PublishWithJoinResponse acceptState (ClusterState incomingState ) {
215
300
// if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation)
216
301
if (transportService .getLocalNode ().equals (incomingState .nodes ().getClusterManagerNode ())) {
@@ -224,8 +309,35 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) {
224
309
return handlePublishRequest .apply (new PublishRequest (incomingState ));
225
310
}
226
311
227
- public PublicationContext newPublicationContext (ClusterChangedEvent clusterChangedEvent ) {
228
- final PublicationContext publicationContext = new PublicationContext (clusterChangedEvent );
312
+ private PublishWithJoinResponse acceptRemoteStateOnLocalNode (RemotePublishRequest remotePublishRequest ) {
313
+ final PublishRequest publishRequest = currentPublishRequestToSelf .get ();
314
+ if (publishRequest == null
315
+ || publishRequest .getAcceptedState ().coordinationMetadata ().term () != remotePublishRequest .term
316
+ || publishRequest .getAcceptedState ().version () != remotePublishRequest .version ) {
317
+ logger .debug (
318
+ () -> new ParameterizedMessage (
319
+ "Publication failure for current publish request : {} and remote publish request: {}" ,
320
+ publishRequest ,
321
+ remotePublishRequest
322
+ )
323
+ );
324
+ throw new IllegalStateException ("publication to self failed for " + remotePublishRequest );
325
+ }
326
+ PublishWithJoinResponse publishWithJoinResponse = handlePublishRequest .apply (publishRequest );
327
+ lastSeenClusterState .set (publishRequest .getAcceptedState ());
328
+ return publishWithJoinResponse ;
329
+ }
330
+
331
+ public PublicationContext newPublicationContext (
332
+ ClusterChangedEvent clusterChangedEvent ,
333
+ boolean isRemotePublicationEnabled ,
334
+ PersistedStateRegistry persistedStateRegistry
335
+ ) {
336
+ final PublicationContext publicationContext = new PublicationContext (
337
+ clusterChangedEvent ,
338
+ isRemotePublicationEnabled ,
339
+ persistedStateRegistry
340
+ );
229
341
230
342
// Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication
231
343
// straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and
@@ -234,6 +346,16 @@ public PublicationContext newPublicationContext(ClusterChangedEvent clusterChang
234
346
return publicationContext ;
235
347
}
236
348
349
+ // package private for testing
350
+ void setCurrentPublishRequestToSelf (PublishRequest publishRequest ) {
351
+ this .currentPublishRequestToSelf .set (publishRequest );
352
+ }
353
+
354
+ // package private for testing
355
+ void setLastSeenClusterState (ClusterState clusterState ) {
356
+ this .lastSeenClusterState .set (clusterState );
357
+ }
358
+
237
359
private static BytesReference serializeFullClusterState (ClusterState clusterState , Version nodeVersion ) throws IOException {
238
360
final BytesReference serializedState = CompressedStreamUtils .createCompressedStream (nodeVersion , stream -> {
239
361
stream .writeBoolean (true );
@@ -270,12 +392,20 @@ public class PublicationContext {
270
392
private final boolean sendFullVersion ;
271
393
private final Map <Version , BytesReference > serializedStates = new HashMap <>();
272
394
private final Map <Version , BytesReference > serializedDiffs = new HashMap <>();
395
+ private final boolean sendRemoteState ;
396
+ private final PersistedStateRegistry persistedStateRegistry ;
273
397
274
- PublicationContext (ClusterChangedEvent clusterChangedEvent ) {
398
+ PublicationContext (
399
+ ClusterChangedEvent clusterChangedEvent ,
400
+ boolean isRemotePublicationEnabled ,
401
+ PersistedStateRegistry persistedStateRegistry
402
+ ) {
275
403
discoveryNodes = clusterChangedEvent .state ().nodes ();
276
404
newState = clusterChangedEvent .state ();
277
405
previousState = clusterChangedEvent .previousState ();
278
406
sendFullVersion = previousState .getBlocks ().disableStatePersistence ();
407
+ sendRemoteState = isRemotePublicationEnabled ;
408
+ this .persistedStateRegistry = persistedStateRegistry ;
279
409
}
280
410
281
411
void buildDiffAndSerializeStates () {
@@ -339,7 +469,11 @@ public void onFailure(Exception e) {
339
469
} else {
340
470
responseActionListener = listener ;
341
471
}
342
- if (sendFullVersion || previousState .nodes ().nodeExists (destination ) == false ) {
472
+ // TODO Decide to send remote state before starting publication by checking remote publication on all nodes
473
+ if (sendRemoteState && destination .isRemoteStatePublicationEnabled ()) {
474
+ logger .trace ("sending remote cluster state version [{}] to [{}]" , newState .version (), destination );
475
+ sendRemoteClusterState (destination , publishRequest .getAcceptedState (), responseActionListener );
476
+ } else if (sendFullVersion || previousState .nodes ().nodeExists (destination ) == false ) {
343
477
logger .trace ("sending full cluster state version [{}] to [{}]" , newState .version (), destination );
344
478
sendFullClusterState (destination , responseActionListener );
345
479
} else {
@@ -384,6 +518,61 @@ public String executor() {
384
518
);
385
519
}
386
520
521
+ private void sendRemoteClusterState (
522
+ final DiscoveryNode destination ,
523
+ final ClusterState clusterState ,
524
+ final ActionListener <PublishWithJoinResponse > listener
525
+ ) {
526
+ try {
527
+ final String manifestFileName = ((RemotePersistedState ) persistedStateRegistry .getPersistedState (PersistedStateType .REMOTE ))
528
+ .getLastUploadedManifestFile ();
529
+ final RemotePublishRequest remotePublishRequest = new RemotePublishRequest (
530
+ discoveryNodes .getLocalNode (),
531
+ clusterState .term (),
532
+ clusterState .getVersion (),
533
+ clusterState .getClusterName ().value (),
534
+ clusterState .metadata ().clusterUUID (),
535
+ manifestFileName
536
+ );
537
+ final Consumer <TransportException > transportExceptionHandler = exp -> {
538
+ logger .debug (() -> new ParameterizedMessage ("failed to send remote cluster state to {}" , destination ), exp );
539
+ listener .onFailure (exp );
540
+ };
541
+ final TransportResponseHandler <PublishWithJoinResponse > responseHandler = new TransportResponseHandler <>() {
542
+
543
+ @ Override
544
+ public PublishWithJoinResponse read (StreamInput in ) throws IOException {
545
+ return new PublishWithJoinResponse (in );
546
+ }
547
+
548
+ @ Override
549
+ public void handleResponse (PublishWithJoinResponse response ) {
550
+ listener .onResponse (response );
551
+ }
552
+
553
+ @ Override
554
+ public void handleException (TransportException exp ) {
555
+ transportExceptionHandler .accept (exp );
556
+ }
557
+
558
+ @ Override
559
+ public String executor () {
560
+ return ThreadPool .Names .GENERIC ;
561
+ }
562
+ };
563
+ transportService .sendRequest (
564
+ destination ,
565
+ PUBLISH_REMOTE_STATE_ACTION_NAME ,
566
+ remotePublishRequest ,
567
+ stateRequestOptions ,
568
+ responseHandler
569
+ );
570
+ } catch (Exception e ) {
571
+ logger .warn (() -> new ParameterizedMessage ("error sending remote cluster state to {}" , destination ), e );
572
+ listener .onFailure (e );
573
+ }
574
+ }
575
+
387
576
private void sendFullClusterState (DiscoveryNode destination , ActionListener <PublishWithJoinResponse > listener ) {
388
577
BytesReference bytes = serializedStates .get (destination .getVersion ());
389
578
if (bytes == null ) {
0 commit comments