Skip to content

Commit 75b1dbc

Browse files
soosinhakkewwei
authored andcommitted
Add remote state publication transport call (opensearch-project#13835) (opensearch-project#14137)
* Add remote state publication transport call Signed-off-by: Sooraj Sinha <soosinha@amazon.com> * Add publication flag and remote routing table check Signed-off-by: Himshikha Gupta <himshikh@amazon.com> Signed-off-by: kkewwei <kkewwei@163.com>
1 parent 097cbdb commit 75b1dbc

20 files changed

+707
-74
lines changed

server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java

+9
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.opensearch.cluster.metadata.Metadata;
4040
import org.opensearch.cluster.node.DiscoveryNode;
4141
import org.opensearch.common.settings.Settings;
42+
import org.opensearch.common.util.FeatureFlags;
4243
import org.opensearch.common.util.io.IOUtils;
4344

4445
import java.io.Closeable;
@@ -52,6 +53,7 @@
5253
import java.util.Set;
5354

5455
import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
56+
import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
5557
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;
5658

5759
/**
@@ -79,6 +81,7 @@ public class CoordinationState {
7981
private VotingConfiguration lastPublishedConfiguration;
8082
private VoteCollection publishVotes;
8183
private final boolean isRemoteStateEnabled;
84+
private final boolean isRemotePublicationEnabled;
8285

8386
public CoordinationState(
8487
DiscoveryNode localNode,
@@ -102,6 +105,12 @@ public CoordinationState(
102105
.getLastAcceptedConfiguration();
103106
this.publishVotes = new VoteCollection();
104107
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
108+
this.isRemotePublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL)
109+
&& localNode.isRemoteStatePublicationEnabled();
110+
}
111+
112+
public boolean isRemotePublicationEnabled() {
113+
return isRemotePublicationEnabled;
105114
}
106115

107116
public long getCurrentTerm() {

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import org.opensearch.discovery.PeerFinder;
8787
import org.opensearch.discovery.SeedHostsProvider;
8888
import org.opensearch.discovery.SeedHostsResolver;
89+
import org.opensearch.gateway.remote.RemoteClusterStateService;
8990
import org.opensearch.monitor.NodeHealthService;
9091
import org.opensearch.monitor.StatusInfo;
9192
import org.opensearch.node.remotestore.RemoteStoreNodeService;
@@ -210,7 +211,8 @@ public Coordinator(
210211
NodeHealthService nodeHealthService,
211212
PersistedStateRegistry persistedStateRegistry,
212213
RemoteStoreNodeService remoteStoreNodeService,
213-
ClusterManagerMetrics clusterManagerMetrics
214+
ClusterManagerMetrics clusterManagerMetrics,
215+
RemoteClusterStateService remoteClusterStateService
214216
) {
215217
this.settings = settings;
216218
this.transportService = transportService;
@@ -262,7 +264,8 @@ public Coordinator(
262264
transportService,
263265
namedWriteableRegistry,
264266
this::handlePublishRequest,
265-
this::handleApplyCommit
267+
this::handleApplyCommit,
268+
remoteClusterStateService
266269
);
267270
this.leaderChecker = new LeaderChecker(
268271
settings,
@@ -1331,7 +1334,9 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
13311334
+ clusterState;
13321335

13331336
final PublicationTransportHandler.PublicationContext publicationContext = publicationHandler.newPublicationContext(
1334-
clusterChangedEvent
1337+
clusterChangedEvent,
1338+
coordinationState.get().isRemotePublicationEnabled(),
1339+
persistedStateRegistry
13351340
);
13361341

13371342
final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

+194-5
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,17 @@
4040
import org.opensearch.cluster.ClusterState;
4141
import org.opensearch.cluster.Diff;
4242
import org.opensearch.cluster.IncompatibleClusterStateVersionException;
43+
import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType;
4344
import org.opensearch.cluster.node.DiscoveryNode;
4445
import org.opensearch.cluster.node.DiscoveryNodes;
4546
import org.opensearch.core.action.ActionListener;
4647
import org.opensearch.core.common.bytes.BytesReference;
4748
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
4849
import org.opensearch.core.common.io.stream.StreamInput;
4950
import org.opensearch.core.transport.TransportResponse;
51+
import org.opensearch.gateway.GatewayMetaState.RemotePersistedState;
52+
import org.opensearch.gateway.remote.ClusterMetadataManifest;
53+
import org.opensearch.gateway.remote.RemoteClusterStateService;
5054
import org.opensearch.threadpool.ThreadPool;
5155
import org.opensearch.transport.BytesTransportRequest;
5256
import org.opensearch.transport.TransportChannel;
@@ -74,6 +78,7 @@ public class PublicationTransportHandler {
7478
private static final Logger logger = LogManager.getLogger(PublicationTransportHandler.class);
7579

7680
public static final String PUBLISH_STATE_ACTION_NAME = "internal:cluster/coordination/publish_state";
81+
public static final String PUBLISH_REMOTE_STATE_ACTION_NAME = "internal:cluster/coordination/publish_remote_state";
7782
public static final String COMMIT_STATE_ACTION_NAME = "internal:cluster/coordination/commit_state";
7883

7984
private final TransportService transportService;
@@ -97,16 +102,19 @@ public class PublicationTransportHandler {
97102
private final TransportRequestOptions stateRequestOptions = TransportRequestOptions.builder()
98103
.withType(TransportRequestOptions.Type.STATE)
99104
.build();
105+
private final RemoteClusterStateService remoteClusterStateService;
100106

101107
public PublicationTransportHandler(
102108
TransportService transportService,
103109
NamedWriteableRegistry namedWriteableRegistry,
104110
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest,
105-
BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit
111+
BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit,
112+
RemoteClusterStateService remoteClusterStateService
106113
) {
107114
this.transportService = transportService;
108115
this.namedWriteableRegistry = namedWriteableRegistry;
109116
this.handlePublishRequest = handlePublishRequest;
117+
this.remoteClusterStateService = remoteClusterStateService;
110118

111119
transportService.registerRequestHandler(
112120
PUBLISH_STATE_ACTION_NAME,
@@ -117,6 +125,15 @@ public PublicationTransportHandler(
117125
(request, channel, task) -> channel.sendResponse(handleIncomingPublishRequest(request))
118126
);
119127

128+
transportService.registerRequestHandler(
129+
PUBLISH_REMOTE_STATE_ACTION_NAME,
130+
ThreadPool.Names.GENERIC,
131+
false,
132+
false,
133+
RemotePublishRequest::new,
134+
(request, channel, task) -> channel.sendResponse(handleIncomingRemotePublishRequest(request))
135+
);
136+
120137
transportService.registerRequestHandler(
121138
COMMIT_STATE_ACTION_NAME,
122139
ThreadPool.Names.GENERIC,
@@ -211,6 +228,74 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
211228
}
212229
}
213230

231+
// package private for testing
232+
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException {
233+
if (transportService.getLocalNode().equals(request.getSourceNode())) {
234+
return acceptRemoteStateOnLocalNode(request);
235+
}
236+
// TODO Make cluster state download non-blocking: https://github.com/opensearch-project/OpenSearch/issues/14102
237+
ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(
238+
request.getClusterUUID(),
239+
request.getManifestFile()
240+
);
241+
if (manifest == null) {
242+
throw new IllegalStateException("Publication failed as manifest was not found for " + request);
243+
}
244+
boolean applyFullState = false;
245+
final ClusterState lastSeen = lastSeenClusterState.get();
246+
if (lastSeen == null) {
247+
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
248+
applyFullState = true;
249+
} else if (manifest.getDiffManifest() == null) {
250+
logger.trace(() -> "There is no diff in the manifest");
251+
applyFullState = true;
252+
} else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) {
253+
logger.debug(() -> "Last cluster state not compatible with the diff");
254+
applyFullState = true;
255+
}
256+
257+
if (applyFullState == true) {
258+
logger.debug(
259+
() -> new ParameterizedMessage(
260+
"Downloading full cluster state for term {}, version {}, stateUUID {}",
261+
manifest.getClusterTerm(),
262+
manifest.getStateVersion(),
263+
manifest.getStateUUID()
264+
)
265+
);
266+
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(
267+
request.getClusterName(),
268+
manifest,
269+
transportService.getLocalNode().getId(),
270+
true
271+
);
272+
fullClusterStateReceivedCount.incrementAndGet();
273+
final PublishWithJoinResponse response = acceptState(clusterState);
274+
lastSeenClusterState.set(clusterState);
275+
return response;
276+
} else {
277+
logger.debug(
278+
() -> new ParameterizedMessage(
279+
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
280+
manifest.getClusterTerm(),
281+
manifest.getStateVersion(),
282+
manifest.getDiffManifest().getFromStateUUID(),
283+
manifest.getStateUUID()
284+
)
285+
);
286+
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(
287+
request.getClusterName(),
288+
manifest,
289+
lastSeen,
290+
transportService.getLocalNode().getId()
291+
);
292+
compatibleClusterStateDiffReceivedCount.incrementAndGet();
293+
final PublishWithJoinResponse response = acceptState(clusterState);
294+
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
295+
return response;
296+
}
297+
}
298+
214299
private PublishWithJoinResponse acceptState(ClusterState incomingState) {
215300
// if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation)
216301
if (transportService.getLocalNode().equals(incomingState.nodes().getClusterManagerNode())) {
@@ -224,8 +309,35 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) {
224309
return handlePublishRequest.apply(new PublishRequest(incomingState));
225310
}
226311

227-
public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent) {
228-
final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent);
312+
private PublishWithJoinResponse acceptRemoteStateOnLocalNode(RemotePublishRequest remotePublishRequest) {
313+
final PublishRequest publishRequest = currentPublishRequestToSelf.get();
314+
if (publishRequest == null
315+
|| publishRequest.getAcceptedState().coordinationMetadata().term() != remotePublishRequest.term
316+
|| publishRequest.getAcceptedState().version() != remotePublishRequest.version) {
317+
logger.debug(
318+
() -> new ParameterizedMessage(
319+
"Publication failure for current publish request : {} and remote publish request: {}",
320+
publishRequest,
321+
remotePublishRequest
322+
)
323+
);
324+
throw new IllegalStateException("publication to self failed for " + remotePublishRequest);
325+
}
326+
PublishWithJoinResponse publishWithJoinResponse = handlePublishRequest.apply(publishRequest);
327+
lastSeenClusterState.set(publishRequest.getAcceptedState());
328+
return publishWithJoinResponse;
329+
}
330+
331+
public PublicationContext newPublicationContext(
332+
ClusterChangedEvent clusterChangedEvent,
333+
boolean isRemotePublicationEnabled,
334+
PersistedStateRegistry persistedStateRegistry
335+
) {
336+
final PublicationContext publicationContext = new PublicationContext(
337+
clusterChangedEvent,
338+
isRemotePublicationEnabled,
339+
persistedStateRegistry
340+
);
229341

230342
// Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication
231343
// straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and
@@ -234,6 +346,16 @@ public PublicationContext newPublicationContext(ClusterChangedEvent clusterChang
234346
return publicationContext;
235347
}
236348

349+
// package private for testing
350+
void setCurrentPublishRequestToSelf(PublishRequest publishRequest) {
351+
this.currentPublishRequestToSelf.set(publishRequest);
352+
}
353+
354+
// package private for testing
355+
void setLastSeenClusterState(ClusterState clusterState) {
356+
this.lastSeenClusterState.set(clusterState);
357+
}
358+
237359
private static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
238360
final BytesReference serializedState = CompressedStreamUtils.createCompressedStream(nodeVersion, stream -> {
239361
stream.writeBoolean(true);
@@ -270,12 +392,20 @@ public class PublicationContext {
270392
private final boolean sendFullVersion;
271393
private final Map<Version, BytesReference> serializedStates = new HashMap<>();
272394
private final Map<Version, BytesReference> serializedDiffs = new HashMap<>();
395+
private final boolean sendRemoteState;
396+
private final PersistedStateRegistry persistedStateRegistry;
273397

274-
PublicationContext(ClusterChangedEvent clusterChangedEvent) {
398+
PublicationContext(
399+
ClusterChangedEvent clusterChangedEvent,
400+
boolean isRemotePublicationEnabled,
401+
PersistedStateRegistry persistedStateRegistry
402+
) {
275403
discoveryNodes = clusterChangedEvent.state().nodes();
276404
newState = clusterChangedEvent.state();
277405
previousState = clusterChangedEvent.previousState();
278406
sendFullVersion = previousState.getBlocks().disableStatePersistence();
407+
sendRemoteState = isRemotePublicationEnabled;
408+
this.persistedStateRegistry = persistedStateRegistry;
279409
}
280410

281411
void buildDiffAndSerializeStates() {
@@ -339,7 +469,11 @@ public void onFailure(Exception e) {
339469
} else {
340470
responseActionListener = listener;
341471
}
342-
if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
472+
// TODO Decide to send remote state before starting publication by checking remote publication on all nodes
473+
if (sendRemoteState && destination.isRemoteStatePublicationEnabled()) {
474+
logger.trace("sending remote cluster state version [{}] to [{}]", newState.version(), destination);
475+
sendRemoteClusterState(destination, publishRequest.getAcceptedState(), responseActionListener);
476+
} else if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
343477
logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);
344478
sendFullClusterState(destination, responseActionListener);
345479
} else {
@@ -384,6 +518,61 @@ public String executor() {
384518
);
385519
}
386520

521+
private void sendRemoteClusterState(
522+
final DiscoveryNode destination,
523+
final ClusterState clusterState,
524+
final ActionListener<PublishWithJoinResponse> listener
525+
) {
526+
try {
527+
final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE))
528+
.getLastUploadedManifestFile();
529+
final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
530+
discoveryNodes.getLocalNode(),
531+
clusterState.term(),
532+
clusterState.getVersion(),
533+
clusterState.getClusterName().value(),
534+
clusterState.metadata().clusterUUID(),
535+
manifestFileName
536+
);
537+
final Consumer<TransportException> transportExceptionHandler = exp -> {
538+
logger.debug(() -> new ParameterizedMessage("failed to send remote cluster state to {}", destination), exp);
539+
listener.onFailure(exp);
540+
};
541+
final TransportResponseHandler<PublishWithJoinResponse> responseHandler = new TransportResponseHandler<>() {
542+
543+
@Override
544+
public PublishWithJoinResponse read(StreamInput in) throws IOException {
545+
return new PublishWithJoinResponse(in);
546+
}
547+
548+
@Override
549+
public void handleResponse(PublishWithJoinResponse response) {
550+
listener.onResponse(response);
551+
}
552+
553+
@Override
554+
public void handleException(TransportException exp) {
555+
transportExceptionHandler.accept(exp);
556+
}
557+
558+
@Override
559+
public String executor() {
560+
return ThreadPool.Names.GENERIC;
561+
}
562+
};
563+
transportService.sendRequest(
564+
destination,
565+
PUBLISH_REMOTE_STATE_ACTION_NAME,
566+
remotePublishRequest,
567+
stateRequestOptions,
568+
responseHandler
569+
);
570+
} catch (Exception e) {
571+
logger.warn(() -> new ParameterizedMessage("error sending remote cluster state to {}", destination), e);
572+
listener.onFailure(e);
573+
}
574+
}
575+
387576
private void sendFullClusterState(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
388577
BytesReference bytes = serializedStates.get(destination.getVersion());
389578
if (bytes == null) {

0 commit comments

Comments
 (0)