31
31
32
32
package org .opensearch .cluster .coordination ;
33
33
34
- import java .util .Locale ;
35
- import java .util .Optional ;
36
- import java .util .function .Supplier ;
37
34
import org .apache .logging .log4j .LogManager ;
38
35
import org .apache .logging .log4j .Logger ;
39
36
import org .apache .logging .log4j .message .ParameterizedMessage ;
43
40
import org .opensearch .cluster .ClusterState ;
44
41
import org .opensearch .cluster .Diff ;
45
42
import org .opensearch .cluster .IncompatibleClusterStateVersionException ;
46
- import org .opensearch .cluster .coordination .CoordinationState .PersistedState ;
47
43
import org .opensearch .cluster .coordination .PersistedStateRegistry .PersistedStateType ;
48
44
import org .opensearch .cluster .node .DiscoveryNode ;
49
45
import org .opensearch .cluster .node .DiscoveryNodes ;
65
61
66
62
import java .io .IOException ;
67
63
import java .util .HashMap ;
68
- import java .util .Locale ;
69
64
import java .util .Map ;
70
- import java .util .Optional ;
71
65
import java .util .concurrent .atomic .AtomicLong ;
72
66
import java .util .concurrent .atomic .AtomicReference ;
73
67
import java .util .function .BiConsumer ;
@@ -238,7 +232,10 @@ private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublish
238
232
if (transportService .getLocalNode ().equals (request .getSourceNode ())) {
239
233
return acceptStateOnLocalNode (request );
240
234
}
241
- ClusterMetadataManifest manifest = remoteClusterStateService .getClusterMetadataManifestByFileName (request .getClusterUUID (), request .getManifestFile ());
235
+ ClusterMetadataManifest manifest = remoteClusterStateService .getClusterMetadataManifestByFileName (
236
+ request .getClusterUUID (),
237
+ request .getManifestFile ()
238
+ );
242
239
boolean applyFullState = false ;
243
240
final ClusterState lastSeen = lastSeenClusterState .get ();
244
241
if (lastSeen == null ) {
@@ -253,17 +250,40 @@ private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublish
253
250
}
254
251
255
252
if (applyFullState == true ) {
256
- logger .debug (() -> new ParameterizedMessage ("Downloading full cluster state for term {}, version {}, stateUUID {}" , manifest .getClusterTerm (), manifest .getStateVersion (),
257
- manifest .getStateUUID ()));
258
- ClusterState clusterState = remoteClusterStateService .getClusterStateForManifest (request .getClusterName (), manifest , transportService .getLocalNode ().getId (), true );
253
+ logger .debug (
254
+ () -> new ParameterizedMessage (
255
+ "Downloading full cluster state for term {}, version {}, stateUUID {}" ,
256
+ manifest .getClusterTerm (),
257
+ manifest .getStateVersion (),
258
+ manifest .getStateUUID ()
259
+ )
260
+ );
261
+ ClusterState clusterState = remoteClusterStateService .getClusterStateForManifest (
262
+ request .getClusterName (),
263
+ manifest ,
264
+ transportService .getLocalNode ().getId (),
265
+ true
266
+ );
259
267
fullClusterStateReceivedCount .incrementAndGet ();
260
268
final PublishWithJoinResponse response = acceptState (clusterState );
261
269
lastSeenClusterState .set (clusterState );
262
270
return response ;
263
271
} else {
264
- logger .debug (() -> new ParameterizedMessage ("Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}" , manifest .getClusterTerm (),
265
- manifest .getStateVersion (), manifest .getDiffManifest ().getFromStateUUID (), manifest .getStateUUID ()));
266
- ClusterState clusterState = remoteClusterStateService .getClusterStateUsingDiff (request .getClusterName (), manifest , lastSeen , transportService .getLocalNode ().getId ());
272
+ logger .debug (
273
+ () -> new ParameterizedMessage (
274
+ "Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}" ,
275
+ manifest .getClusterTerm (),
276
+ manifest .getStateVersion (),
277
+ manifest .getDiffManifest ().getFromStateUUID (),
278
+ manifest .getStateUUID ()
279
+ )
280
+ );
281
+ ClusterState clusterState = remoteClusterStateService .getClusterStateUsingDiff (
282
+ request .getClusterName (),
283
+ manifest ,
284
+ lastSeen ,
285
+ transportService .getLocalNode ().getId ()
286
+ );
267
287
compatibleClusterStateDiffReceivedCount .incrementAndGet ();
268
288
final PublishWithJoinResponse response = acceptState (clusterState );
269
289
lastSeenClusterState .compareAndSet (lastSeen , clusterState );
@@ -286,7 +306,8 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) {
286
306
287
307
private PublishWithJoinResponse acceptStateOnLocalNode (RemotePublishRequest remotePublishRequest ) {
288
308
final PublishRequest publishRequest = currentPublishRequestToSelf .get ();
289
- if (publishRequest == null || publishRequest .getAcceptedState ().coordinationMetadata ().term () != remotePublishRequest .term
309
+ if (publishRequest == null
310
+ || publishRequest .getAcceptedState ().coordinationMetadata ().term () != remotePublishRequest .term
290
311
|| publishRequest .getAcceptedState ().version () != remotePublishRequest .version ) {
291
312
throw new IllegalStateException ("publication to self failed for " + remotePublishRequest );
292
313
}
@@ -295,9 +316,16 @@ private PublishWithJoinResponse acceptStateOnLocalNode(RemotePublishRequest remo
295
316
return publishWithJoinResponse ;
296
317
}
297
318
298
- public PublicationContext newPublicationContext (ClusterChangedEvent clusterChangedEvent , boolean isRemotePublicationEnabled ,
299
- PersistedStateRegistry persistedStateRegistry ) {
300
- final PublicationContext publicationContext = new PublicationContext (clusterChangedEvent , isRemotePublicationEnabled , persistedStateRegistry );
319
+ public PublicationContext newPublicationContext (
320
+ ClusterChangedEvent clusterChangedEvent ,
321
+ boolean isRemotePublicationEnabled ,
322
+ PersistedStateRegistry persistedStateRegistry
323
+ ) {
324
+ final PublicationContext publicationContext = new PublicationContext (
325
+ clusterChangedEvent ,
326
+ isRemotePublicationEnabled ,
327
+ persistedStateRegistry
328
+ );
301
329
302
330
// Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication
303
331
// straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and
@@ -345,7 +373,11 @@ public class PublicationContext {
345
373
private final boolean sendRemoteState ;
346
374
private final PersistedStateRegistry persistedStateRegistry ;
347
375
348
- PublicationContext (ClusterChangedEvent clusterChangedEvent , boolean isRemotePublicationEnabled , PersistedStateRegistry persistedStateRegistry ) {
376
+ PublicationContext (
377
+ ClusterChangedEvent clusterChangedEvent ,
378
+ boolean isRemotePublicationEnabled ,
379
+ PersistedStateRegistry persistedStateRegistry
380
+ ) {
349
381
discoveryNodes = clusterChangedEvent .state ().nodes ();
350
382
newState = clusterChangedEvent .state ();
351
383
previousState = clusterChangedEvent .previousState ();
@@ -462,11 +494,22 @@ public String executor() {
462
494
);
463
495
}
464
496
465
- private void sendRemoteClusterState (final DiscoveryNode destination , final ClusterState clusterState , final ActionListener <PublishWithJoinResponse > listener ) {
497
+ private void sendRemoteClusterState (
498
+ final DiscoveryNode destination ,
499
+ final ClusterState clusterState ,
500
+ final ActionListener <PublishWithJoinResponse > listener
501
+ ) {
466
502
try {
467
- final String manifestFileName = ((RemotePersistedState ) persistedStateRegistry .getPersistedState (PersistedStateType .REMOTE )).getLastUploadedManifestFile ();
468
- final RemotePublishRequest remotePublishRequest = new RemotePublishRequest (discoveryNodes .getLocalNode (), clusterState .term (),
469
- clusterState .getVersion (), clusterState .getClusterName ().value (), clusterState .metadata ().clusterUUID (), manifestFileName );
503
+ final String manifestFileName = ((RemotePersistedState ) persistedStateRegistry .getPersistedState (PersistedStateType .REMOTE ))
504
+ .getLastUploadedManifestFile ();
505
+ final RemotePublishRequest remotePublishRequest = new RemotePublishRequest (
506
+ discoveryNodes .getLocalNode (),
507
+ clusterState .term (),
508
+ clusterState .getVersion (),
509
+ clusterState .getClusterName ().value (),
510
+ clusterState .metadata ().clusterUUID (),
511
+ manifestFileName
512
+ );
470
513
final Consumer <TransportException > transportExceptionHandler = exp -> {
471
514
logger .debug (() -> new ParameterizedMessage ("failed to send remote cluster state to {}" , destination ), exp );
472
515
listener .onFailure (exp );
@@ -493,7 +536,13 @@ public String executor() {
493
536
return ThreadPool .Names .GENERIC ;
494
537
}
495
538
};
496
- transportService .sendRequest (destination , PUBLISH_REMOTE_STATE_ACTION_NAME , remotePublishRequest , stateRequestOptions , responseHandler );
539
+ transportService .sendRequest (
540
+ destination ,
541
+ PUBLISH_REMOTE_STATE_ACTION_NAME ,
542
+ remotePublishRequest ,
543
+ stateRequestOptions ,
544
+ responseHandler
545
+ );
497
546
} catch (Exception e ) {
498
547
logger .warn (() -> new ParameterizedMessage ("error sending remote cluster state to {}" , destination ), e );
499
548
listener .onFailure (e );
0 commit comments