From 5f4b6db5052da13f052a427ca9287ed3e93ae43b Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Mon, 27 May 2024 07:54:47 +0530 Subject: [PATCH 1/7] Add remote state publication transport call Signed-off-by: Sooraj Sinha --- .../coordination/CoordinationState.java | 5 + .../cluster/coordination/Coordinator.java | 9 +- .../PublicationTransportHandler.java | 106 +++++++++++++++++- .../coordination/RemotePublishRequest.java | 48 ++++++++ 4 files changed, 160 insertions(+), 8 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index 987a3e3ffa7d3..d8e48279a0db6 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -104,6 +104,10 @@ public CoordinationState( this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings); } + public boolean isRemoteStateEnabled() { + return isRemoteStateEnabled; + } + public long getCurrentTerm() { return persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).getCurrentTerm(); } @@ -449,6 +453,7 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) { clusterState.version(), clusterState.term() ); + logger.info("Setting last accepted state : term - {}, version - {}", clusterState.term(), clusterState.version()); persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState); assert getLastAcceptedState() == clusterState; diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index f53e6837a67f5..faa44be8b0456 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -85,6 +85,7 @@ import org.opensearch.discovery.PeerFinder; import org.opensearch.discovery.SeedHostsProvider; import org.opensearch.discovery.SeedHostsResolver; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; import org.opensearch.node.remotestore.RemoteStoreNodeService; @@ -209,7 +210,8 @@ public Coordinator( NodeHealthService nodeHealthService, PersistedStateRegistry persistedStateRegistry, RemoteStoreNodeService remoteStoreNodeService, - ClusterManagerMetrics clusterManagerMetrics + ClusterManagerMetrics clusterManagerMetrics, + RemoteClusterStateService remoteClusterStateService ) { this.settings = settings; this.transportService = transportService; @@ -261,7 +263,8 @@ public Coordinator( transportService, namedWriteableRegistry, this::handlePublishRequest, - this::handleApplyCommit + this::handleApplyCommit, + remoteClusterStateService ); this.leaderChecker = new LeaderChecker( settings, @@ -1330,7 +1333,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId()) + clusterState; final PublicationTransportHandler.PublicationContext publicationContext = publicationHandler.newPublicationContext( - clusterChangedEvent + clusterChangedEvent, coordinationState.get().isRemoteStateEnabled() ); final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index 1fdaeead0d28d..aa2faab5ffd01 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -31,6 +31,8 @@ package org.opensearch.cluster.coordination; +import java.util.Locale; +import java.util.Optional; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -47,6 +49,8 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.BytesTransportRequest; import org.opensearch.transport.TransportChannel; @@ -74,6 +78,7 @@ public class PublicationTransportHandler { private static final Logger logger = LogManager.getLogger(PublicationTransportHandler.class); public static final String PUBLISH_STATE_ACTION_NAME = "internal:cluster/coordination/publish_state"; + public static final String PUBLISH_REMOTE_STATE_ACTION_NAME = "internal:cluster/coordination/publish_remote_state"; public static final String COMMIT_STATE_ACTION_NAME = "internal:cluster/coordination/commit_state"; private final TransportService transportService; @@ -97,16 +102,19 @@ public class PublicationTransportHandler { private final TransportRequestOptions stateRequestOptions = TransportRequestOptions.builder() .withType(TransportRequestOptions.Type.STATE) .build(); + private final RemoteClusterStateService remoteClusterStateService; public PublicationTransportHandler( TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, Function handlePublishRequest, - BiConsumer> handleApplyCommit + BiConsumer> handleApplyCommit, + RemoteClusterStateService remoteClusterStateService ) { this.transportService = transportService; this.namedWriteableRegistry = namedWriteableRegistry; this.handlePublishRequest = handlePublishRequest; + this.remoteClusterStateService = remoteClusterStateService; transportService.registerRequestHandler( PUBLISH_STATE_ACTION_NAME, @@ -117,6 +125,15 @@ public PublicationTransportHandler( (request, channel, task) -> channel.sendResponse(handleIncomingPublishRequest(request)) ); + transportService.registerRequestHandler( + PUBLISH_REMOTE_STATE_ACTION_NAME, + ThreadPool.Names.GENERIC, + false, + false, + RemotePublishRequest::new, + (request, channel, task) -> channel.sendResponse(handleIncomingRemotePublishRequest(request)) + ); + transportService.registerRequestHandler( COMMIT_STATE_ACTION_NAME, ThreadPool.Names.GENERIC, @@ -211,6 +228,44 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque } } + private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException { + final Optional manifestOptional = remoteClusterStateService.getClusterMetadataManifestByTermVersion(request.getClusterName(), request.getClusterUUID(), request.term, request.version); + if (manifestOptional.isPresent() == false) { + throw new IllegalStateException( + String.format(Locale.ROOT, "Manifest is not present for term - %s version - %s", request.term, request.version) + ); + } + ClusterMetadataManifest manifest = manifestOptional.get(); + boolean applyFullState = false; + final ClusterState lastSeen = lastSeenClusterState.get(); + if (lastSeen == null) { + logger.debug("Diff cannot be applied as there is no last cluster state"); + applyFullState = true; + } else if (manifest.getDiffManifest() == null) { + logger.debug("There is no diff in the manifest"); + applyFullState = true; + } else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) { + logger.debug("Last cluster state not compatible with the diff"); + applyFullState = true; + } + + if (applyFullState == true) { + ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(request.getClusterName(), manifest, transportService.getLocalNode().getId()); + logger.debug("Downloaded full cluster state [{}]", clusterState); + fullClusterStateReceivedCount.incrementAndGet(); + final PublishWithJoinResponse response = acceptState(clusterState); + lastSeenClusterState.set(clusterState); + return response; + } else { + ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(request.getClusterName(), manifest, lastSeenClusterState.get(), transportService.getLocalNode().getId()); + logger.debug("Downloaded full cluster state from diff [{}]", clusterState); + compatibleClusterStateDiffReceivedCount.incrementAndGet(); + final PublishWithJoinResponse response = acceptState(clusterState); + lastSeenClusterState.compareAndSet(lastSeen, clusterState); + return response; + } + } + private PublishWithJoinResponse acceptState(ClusterState incomingState) { // if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation) if (transportService.getLocalNode().equals(incomingState.nodes().getClusterManagerNode())) { @@ -224,8 +279,8 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) { return handlePublishRequest.apply(new PublishRequest(incomingState)); } - public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent) { - final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent); + public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemoteStateEnabled) { + final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, isRemoteStateEnabled); // Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication // straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and @@ -270,12 +325,14 @@ public class PublicationContext { private final boolean sendFullVersion; private final Map serializedStates = new HashMap<>(); private final Map serializedDiffs = new HashMap<>(); + private final boolean sendRemoteState; - PublicationContext(ClusterChangedEvent clusterChangedEvent) { + PublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemoteStateEnabled) { discoveryNodes = clusterChangedEvent.state().nodes(); newState = clusterChangedEvent.state(); previousState = clusterChangedEvent.previousState(); sendFullVersion = previousState.getBlocks().disableStatePersistence(); + sendRemoteState = isRemoteStateEnabled; } void buildDiffAndSerializeStates() { @@ -339,7 +396,9 @@ public void onFailure(Exception e) { } else { responseActionListener = listener; } - if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) { + if (sendRemoteState && destination.isRemoteStateNode()) { + sendRemoteClusterState(destination, publishRequest.getAcceptedState(), responseActionListener); + } else if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) { logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination); sendFullClusterState(destination, responseActionListener); } else { @@ -384,6 +443,43 @@ public String executor() { ); } + private void sendRemoteClusterState(DiscoveryNode destination, ClusterState clusterState, ActionListener listener) { + try { + final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(discoveryNodes.getLocalNode(), clusterState.term(), clusterState.getVersion(), clusterState.getClusterName().value(), clusterState.metadata().clusterUUID()); + final Consumer transportExceptionHandler = exp -> { + logger.debug(() -> new ParameterizedMessage("failed to send remote cluster state to {}", destination), exp); + listener.onFailure(exp); + }; + final TransportResponseHandler responseHandler = new TransportResponseHandler< + PublishWithJoinResponse>() { + + @Override + public PublishWithJoinResponse read(StreamInput in) throws IOException { + return new PublishWithJoinResponse(in); + } + + @Override + public void handleResponse(PublishWithJoinResponse response) { + listener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + transportExceptionHandler.accept(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + }; + transportService.sendRequest(destination, PUBLISH_REMOTE_STATE_ACTION_NAME, remotePublishRequest, stateRequestOptions, responseHandler); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("error sending remote cluster state to {}", destination), e); + listener.onFailure(e); + } + } + private void sendFullClusterState(DiscoveryNode destination, ActionListener listener) { BytesReference bytes = serializedStates.get(destination.getVersion()); if (bytes == null) { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java new file mode 100644 index 0000000000000..b1981c0b4b1a0 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.coordination; + +import java.io.IOException; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +public class RemotePublishRequest extends TermVersionRequest { + + // todo Do we need cluster name and UUID ? + private final String clusterName; + private final String clusterUUID; + + public RemotePublishRequest(DiscoveryNode sourceNode, long term, long version, String clusterName, String clusterUUID) { + super(sourceNode, term, version); + this.clusterName = clusterName; + this.clusterUUID = clusterUUID; + } + + public RemotePublishRequest(StreamInput in) throws IOException { + super(in); + this.clusterName = in.readString(); + this.clusterUUID = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(clusterName); + out.writeString(clusterUUID); + } + + public String getClusterName() { + return clusterName; + } + + public String getClusterUUID() { + return clusterUUID; + } +} From d520c9127b97edb642e619b83bdb22b7134efeac Mon Sep 17 00:00:00 2001 From: Himshikha Gupta Date: Thu, 6 Jun 2024 14:53:19 +0530 Subject: [PATCH 2/7] Add publication flag and remote routing table check Signed-off-by: Himshikha Gupta --- .../coordination/CoordinationState.java | 13 ++--- .../cluster/coordination/Coordinator.java | 3 +- .../PublicationTransportHandler.java | 54 ++++++++++++++----- .../coordination/RemotePublishRequest.java | 3 +- .../cluster/node/DiscoveryNode.java | 21 ++++++++ 5 files changed, 74 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index d8e48279a0db6..c0ed49c0bc0c2 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -52,6 +52,7 @@ import java.util.Set; import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -78,7 +79,7 @@ public class CoordinationState { private long lastPublishedVersion; private VotingConfiguration lastPublishedConfiguration; private VoteCollection publishVotes; - private final boolean isRemoteStateEnabled; + private final boolean isRemotePublicationEnabled; public CoordinationState( DiscoveryNode localNode, @@ -101,11 +102,11 @@ public CoordinationState( .getLastAcceptedState() .getLastAcceptedConfiguration(); this.publishVotes = new VoteCollection(); - this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings); + this.isRemotePublicationEnabled = isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings); } - public boolean isRemoteStateEnabled() { - return isRemoteStateEnabled; + public boolean isRemotePublicationEnabled() { + return isRemotePublicationEnabled; } public long getCurrentTerm() { @@ -579,7 +580,7 @@ public void handlePrePublish(ClusterState clusterState) { // This is to ensure the remote store is the single source of truth for current state. Even if the current node // goes down after sending the cluster state to other nodes, we should be able to read the remote state and // recover the cluster. - if (isRemoteStateEnabled) { + if (isRemotePublicationEnabled) { assert persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null : "Remote state has not been initialized"; persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(clusterState); } @@ -590,7 +591,7 @@ public void handlePrePublish(ClusterState clusterState) { */ public void handlePreCommit() { // Publishing the committed state to remote store before sending apply commit to other nodes. - if (isRemoteStateEnabled) { + if (isRemotePublicationEnabled) { assert persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null : "Remote state has not been initialized"; persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).markLastAcceptedStateAsCommitted(); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index faa44be8b0456..8b0da49e93fb0 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -1333,7 +1333,8 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId()) + clusterState; final PublicationTransportHandler.PublicationContext publicationContext = publicationHandler.newPublicationContext( - clusterChangedEvent, coordinationState.get().isRemoteStateEnabled() + clusterChangedEvent, + coordinationState.get().isRemotePublicationEnabled() ); final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index aa2faab5ffd01..79a7f32ce2ca0 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -31,8 +31,6 @@ package org.opensearch.cluster.coordination; -import java.util.Locale; -import java.util.Optional; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -61,7 +59,9 @@ import java.io.IOException; import java.util.HashMap; +import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -229,7 +229,12 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque } private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException { - final Optional manifestOptional = remoteClusterStateService.getClusterMetadataManifestByTermVersion(request.getClusterName(), request.getClusterUUID(), request.term, request.version); + final Optional manifestOptional = remoteClusterStateService.getClusterMetadataManifestByTermVersion( + request.getClusterName(), + request.getClusterUUID(), + request.term, + request.version + ); if (manifestOptional.isPresent() == false) { throw new IllegalStateException( String.format(Locale.ROOT, "Manifest is not present for term - %s version - %s", request.term, request.version) @@ -250,14 +255,23 @@ private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublish } if (applyFullState == true) { - ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(request.getClusterName(), manifest, transportService.getLocalNode().getId()); + ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest( + request.getClusterName(), + manifest, + transportService.getLocalNode().getId() + ); logger.debug("Downloaded full cluster state [{}]", clusterState); fullClusterStateReceivedCount.incrementAndGet(); final PublishWithJoinResponse response = acceptState(clusterState); lastSeenClusterState.set(clusterState); return response; } else { - ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(request.getClusterName(), manifest, lastSeenClusterState.get(), transportService.getLocalNode().getId()); + ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff( + request.getClusterName(), + manifest, + lastSeenClusterState.get(), + transportService.getLocalNode().getId() + ); logger.debug("Downloaded full cluster state from diff [{}]", clusterState); compatibleClusterStateDiffReceivedCount.incrementAndGet(); final PublishWithJoinResponse response = acceptState(clusterState); @@ -279,8 +293,8 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) { return handlePublishRequest.apply(new PublishRequest(incomingState)); } - public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemoteStateEnabled) { - final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, isRemoteStateEnabled); + public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled) { + final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, isRemotePublicationEnabled); // Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication // straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and @@ -327,12 +341,12 @@ public class PublicationContext { private final Map serializedDiffs = new HashMap<>(); private final boolean sendRemoteState; - PublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemoteStateEnabled) { + PublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled) { discoveryNodes = clusterChangedEvent.state().nodes(); newState = clusterChangedEvent.state(); previousState = clusterChangedEvent.previousState(); sendFullVersion = previousState.getBlocks().disableStatePersistence(); - sendRemoteState = isRemoteStateEnabled; + sendRemoteState = isRemotePublicationEnabled; } void buildDiffAndSerializeStates() { @@ -443,9 +457,19 @@ public String executor() { ); } - private void sendRemoteClusterState(DiscoveryNode destination, ClusterState clusterState, ActionListener listener) { + private void sendRemoteClusterState( + DiscoveryNode destination, + ClusterState clusterState, + ActionListener listener + ) { try { - final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(discoveryNodes.getLocalNode(), clusterState.term(), clusterState.getVersion(), clusterState.getClusterName().value(), clusterState.metadata().clusterUUID()); + final RemotePublishRequest remotePublishRequest = new RemotePublishRequest( + discoveryNodes.getLocalNode(), + clusterState.term(), + clusterState.getVersion(), + clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID() + ); final Consumer transportExceptionHandler = exp -> { logger.debug(() -> new ParameterizedMessage("failed to send remote cluster state to {}", destination), exp); listener.onFailure(exp); @@ -473,7 +497,13 @@ public String executor() { return ThreadPool.Names.GENERIC; } }; - transportService.sendRequest(destination, PUBLISH_REMOTE_STATE_ACTION_NAME, remotePublishRequest, stateRequestOptions, responseHandler); + transportService.sendRequest( + destination, + PUBLISH_REMOTE_STATE_ACTION_NAME, + remotePublishRequest, + stateRequestOptions, + responseHandler + ); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("error sending remote cluster state to {}", destination), e); listener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java index b1981c0b4b1a0..dc770a30a1990 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java @@ -8,11 +8,12 @@ package org.opensearch.cluster.coordination; -import java.io.IOException; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import java.io.IOException; + public class RemotePublishRequest extends TermVersionRequest { // todo Do we need cluster name and UUID ? diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index 5226e9570ac14..5ed5294687c40 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -61,7 +61,9 @@ import java.util.stream.Stream; import static org.opensearch.node.NodeRoleSettings.NODE_ROLES_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; /** * A discovery node represents a node that is part of the cluster. @@ -470,6 +472,25 @@ public boolean isRemoteStoreNode() { return this.getAttributes().keySet().stream().anyMatch(key -> key.startsWith(REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX)); } + /** + * Returns whether the node is a remote cluster state enabled node. + * @return true if the node contains remote cluster state and remote routing table node attributes, false otherwise + */ + public boolean isRemoteStateNode() { + return isRemoteClusterStateEnabled() && isRemoteRoutingTableEnabled(); + } + + private boolean isRemoteClusterStateEnabled() { + return this.getAttributes() + .keySet() + .stream() + .anyMatch(key -> (key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY))); + } + + private boolean isRemoteRoutingTableEnabled() { + return this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)); + } + /** * Returns a set of all the roles that the node has. The roles are returned in sorted order by the role name. *

From f4c2a04d152c3eb670f81060ee3baaa154d50164 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Fri, 7 Jun 2024 22:34:51 +0530 Subject: [PATCH 3/7] Send manifest file name in remote publish Signed-off-by: Sooraj Sinha --- .../coordination/CoordinationState.java | 5 +- .../cluster/coordination/Coordinator.java | 4 +- .../PublicationTransportHandler.java | 94 +++++++++---------- .../coordination/RemotePublishRequest.java | 17 +++- .../cluster/node/DiscoveryNode.java | 16 ++-- .../common/settings/ClusterSettings.java | 4 +- .../opensearch/discovery/DiscoveryModule.java | 7 +- .../opensearch/gateway/GatewayMetaState.java | 23 +++-- .../remote/RemoteClusterStateService.java | 29 +++--- .../remote/model/RemoteUploadDetails.java | 30 ++++++ .../main/java/org/opensearch/node/Node.java | 3 +- .../remotestore/RemoteStoreNodeAttribute.java | 5 + .../coordination/CoordinationStateTests.java | 4 +- .../cluster/coordination/NodeJoinTests.java | 3 +- .../PublicationTransportHandlerTests.java | 5 +- .../discovery/DiscoveryModuleTests.java | 3 +- .../GatewayMetaStatePersistedStateTests.java | 11 ++- .../AbstractCoordinatorTestCase.java | 3 +- 18 files changed, 166 insertions(+), 100 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/remote/model/RemoteUploadDetails.java diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index c0ed49c0bc0c2..f20ea13e7668a 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -50,6 +50,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; @@ -79,6 +80,7 @@ public class CoordinationState { private long lastPublishedVersion; private VotingConfiguration lastPublishedConfiguration; private VoteCollection publishVotes; + private final boolean isRemoteStateEnabled; private final boolean isRemotePublicationEnabled; public CoordinationState( @@ -102,7 +104,8 @@ public CoordinationState( .getLastAcceptedState() .getLastAcceptedConfiguration(); this.publishVotes = new VoteCollection(); - this.isRemotePublicationEnabled = isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings); + this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings); + this.isRemotePublicationEnabled = RemoteStoreNodeAttribute.isRemotePublicationEnabled(settings); } public boolean isRemotePublicationEnabled() { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 8b0da49e93fb0..914db31886850 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -49,6 +49,7 @@ import org.opensearch.cluster.coordination.CoordinationState.VoteCollection; import org.opensearch.cluster.coordination.FollowersChecker.FollowerCheckRequest; import org.opensearch.cluster.coordination.JoinHelper.InitialJoinAccumulator; +import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; @@ -1334,7 +1335,8 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId()) final PublicationTransportHandler.PublicationContext publicationContext = publicationHandler.newPublicationContext( clusterChangedEvent, - coordinationState.get().isRemotePublicationEnabled() + coordinationState.get().isRemotePublicationEnabled(), + persistedStateRegistry ); final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index 79a7f32ce2ca0..0aea421ca1dc3 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -31,6 +31,9 @@ package org.opensearch.cluster.coordination; +import java.util.Locale; +import java.util.Optional; +import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -40,6 +43,8 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.Diff; import org.opensearch.cluster.IncompatibleClusterStateVersionException; +import org.opensearch.cluster.coordination.CoordinationState.PersistedState; +import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.core.action.ActionListener; @@ -47,6 +52,7 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.gateway.GatewayMetaState.RemotePersistedState; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.threadpool.ThreadPool; @@ -229,50 +235,35 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque } private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException { - final Optional manifestOptional = remoteClusterStateService.getClusterMetadataManifestByTermVersion( - request.getClusterName(), - request.getClusterUUID(), - request.term, - request.version - ); - if (manifestOptional.isPresent() == false) { - throw new IllegalStateException( - String.format(Locale.ROOT, "Manifest is not present for term - %s version - %s", request.term, request.version) - ); + if (transportService.getLocalNode().equals(request.getSourceNode())) { + return acceptStateOnLocalNode(request); } - ClusterMetadataManifest manifest = manifestOptional.get(); + ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(request.getClusterUUID(), request.getManifestFile()); boolean applyFullState = false; final ClusterState lastSeen = lastSeenClusterState.get(); if (lastSeen == null) { - logger.debug("Diff cannot be applied as there is no last cluster state"); + logger.debug(() -> "Diff cannot be applied as there is no last cluster state"); applyFullState = true; } else if (manifest.getDiffManifest() == null) { - logger.debug("There is no diff in the manifest"); + logger.trace(() -> "There is no diff in the manifest"); applyFullState = true; } else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) { - logger.debug("Last cluster state not compatible with the diff"); + logger.debug(() -> "Last cluster state not compatible with the diff"); applyFullState = true; } if (applyFullState == true) { - ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest( - request.getClusterName(), - manifest, - transportService.getLocalNode().getId() - ); - logger.debug("Downloaded full cluster state [{}]", clusterState); + logger.debug(() -> new ParameterizedMessage("Downloading full cluster state for term {}, version {}, stateUUID {}", manifest.getClusterTerm(), manifest.getStateVersion(), + manifest.getStateUUID())); + ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(request.getClusterName(), manifest, transportService.getLocalNode().getId(), true); fullClusterStateReceivedCount.incrementAndGet(); final PublishWithJoinResponse response = acceptState(clusterState); lastSeenClusterState.set(clusterState); return response; } else { - ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff( - request.getClusterName(), - manifest, - lastSeenClusterState.get(), - transportService.getLocalNode().getId() - ); - logger.debug("Downloaded full cluster state from diff [{}]", clusterState); + logger.debug(() -> new ParameterizedMessage("Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}", manifest.getClusterTerm(), + manifest.getStateVersion(), manifest.getDiffManifest().getFromStateUUID(), manifest.getStateUUID())); + ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(request.getClusterName(), manifest, lastSeen, transportService.getLocalNode().getId()); compatibleClusterStateDiffReceivedCount.incrementAndGet(); final PublishWithJoinResponse response = acceptState(clusterState); lastSeenClusterState.compareAndSet(lastSeen, clusterState); @@ -293,8 +284,20 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) { return handlePublishRequest.apply(new PublishRequest(incomingState)); } - public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled) { - final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, isRemotePublicationEnabled); + private PublishWithJoinResponse acceptStateOnLocalNode(RemotePublishRequest remotePublishRequest) { + final PublishRequest publishRequest = currentPublishRequestToSelf.get(); + if (publishRequest == null || publishRequest.getAcceptedState().coordinationMetadata().term() != remotePublishRequest.term + || publishRequest.getAcceptedState().version() != remotePublishRequest.version) { + throw new IllegalStateException("publication to self failed for " + remotePublishRequest); + } + PublishWithJoinResponse publishWithJoinResponse = handlePublishRequest.apply(publishRequest); + lastSeenClusterState.set(publishRequest.getAcceptedState()); + return publishWithJoinResponse; + } + + public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled, + PersistedStateRegistry persistedStateRegistry) { + final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, isRemotePublicationEnabled, persistedStateRegistry); // Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication // straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and @@ -340,13 +343,15 @@ public class PublicationContext { private final Map serializedStates = new HashMap<>(); private final Map serializedDiffs = new HashMap<>(); private final boolean sendRemoteState; + private final PersistedStateRegistry persistedStateRegistry; - PublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled) { + PublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled, PersistedStateRegistry persistedStateRegistry) { discoveryNodes = clusterChangedEvent.state().nodes(); newState = clusterChangedEvent.state(); previousState = clusterChangedEvent.previousState(); sendFullVersion = previousState.getBlocks().disableStatePersistence(); sendRemoteState = isRemotePublicationEnabled; + this.persistedStateRegistry = persistedStateRegistry; } void buildDiffAndSerializeStates() { @@ -410,7 +415,7 @@ public void onFailure(Exception e) { } else { responseActionListener = listener; } - if (sendRemoteState && destination.isRemoteStateNode()) { + if (sendRemoteState && destination.isRemoteClusterStateEnabled() && destination.isRemoteRoutingTableEnabled()) { sendRemoteClusterState(destination, publishRequest.getAcceptedState(), responseActionListener); } else if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) { logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination); @@ -457,25 +462,16 @@ public String executor() { ); } - private void sendRemoteClusterState( - DiscoveryNode destination, - ClusterState clusterState, - ActionListener listener - ) { + private void sendRemoteClusterState(final DiscoveryNode destination, final ClusterState clusterState, final ActionListener listener) { try { - final RemotePublishRequest remotePublishRequest = new RemotePublishRequest( - discoveryNodes.getLocalNode(), - clusterState.term(), - clusterState.getVersion(), - clusterState.getClusterName().value(), - clusterState.metadata().clusterUUID() - ); + final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)).getLastUploadedManifestFile(); + final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(discoveryNodes.getLocalNode(), clusterState.term(), + clusterState.getVersion(), clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifestFileName); final Consumer transportExceptionHandler = exp -> { logger.debug(() -> new ParameterizedMessage("failed to send remote cluster state to {}", destination), exp); listener.onFailure(exp); }; - final TransportResponseHandler responseHandler = new TransportResponseHandler< - PublishWithJoinResponse>() { + final TransportResponseHandler responseHandler = new TransportResponseHandler<>() { @Override public PublishWithJoinResponse read(StreamInput in) throws IOException { @@ -497,13 +493,7 @@ public String executor() { return ThreadPool.Names.GENERIC; } }; - transportService.sendRequest( - destination, - PUBLISH_REMOTE_STATE_ACTION_NAME, - remotePublishRequest, - stateRequestOptions, - responseHandler - ); + transportService.sendRequest(destination, PUBLISH_REMOTE_STATE_ACTION_NAME, remotePublishRequest, stateRequestOptions, responseHandler); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("error sending remote cluster state to {}", destination), e); listener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java index dc770a30a1990..68dea53372c45 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java @@ -16,20 +16,22 @@ public class RemotePublishRequest extends TermVersionRequest { - // todo Do we need cluster name and UUID ? private final String clusterName; private final String clusterUUID; + private final String manifestFile; - public RemotePublishRequest(DiscoveryNode sourceNode, long term, long version, String clusterName, String clusterUUID) { + public RemotePublishRequest(DiscoveryNode sourceNode, long term, long version, String clusterName, String clusterUUID, String manifestFile) { super(sourceNode, term, version); this.clusterName = clusterName; this.clusterUUID = clusterUUID; + this.manifestFile = manifestFile; } public RemotePublishRequest(StreamInput in) throws IOException { super(in); this.clusterName = in.readString(); this.clusterUUID = in.readString(); + this.manifestFile = in.readString(); } @Override @@ -37,6 +39,13 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(clusterName); out.writeString(clusterUUID); + out.writeString(manifestFile); + } + + @Override + public String toString() { + return "RemotePublishRequest{" + "term=" + term + ", version=" + version + ", clusterName=" + clusterName + ", clusterUUID=" + clusterUUID + + ", sourceNode=" + sourceNode + ", manifestFile=" + manifestFile + '}'; } public String getClusterName() { @@ -46,4 +55,8 @@ public String getClusterName() { public String getClusterUUID() { return clusterUUID; } + + public String getManifestFile() { + return manifestFile; + } } diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index 5ed5294687c40..6cf1f654e3939 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -33,6 +33,7 @@ package org.opensearch.cluster.node; import org.opensearch.Version; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.common.UUIDs; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.settings.Setting; @@ -43,6 +44,7 @@ import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; import org.opensearch.node.Node; import java.io.IOException; @@ -474,20 +476,20 @@ public boolean isRemoteStoreNode() { /** * Returns whether the node is a remote cluster state enabled node. - * @return true if the node contains remote cluster state and remote routing table node attributes, false otherwise + * @return true if the node contains remote cluster state node attribute, false otherwise */ - public boolean isRemoteStateNode() { - return isRemoteClusterStateEnabled() && isRemoteRoutingTableEnabled(); - } - - private boolean isRemoteClusterStateEnabled() { + public boolean isRemoteClusterStateEnabled() { return this.getAttributes() .keySet() .stream() .anyMatch(key -> (key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY))); } - private boolean isRemoteRoutingTableEnabled() { + /** + * Returns whether remote routing table is enabled on the node + * @return true if the node contains remote routing table node attributes, false otherwise + */ + public boolean isRemoteRoutingTableEnabled() { return this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)); } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 09f32884e0ae1..3ba6cebbe7350 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -77,6 +77,7 @@ import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterManagerService; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; @@ -715,9 +716,6 @@ public void apply(Settings value, Settings current, Settings previous) { // Remote cluster state settings RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING, - RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, - RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, - RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING, RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING, IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java index 538dea5b2e60b..922e23b849d49 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java @@ -53,6 +53,7 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.gateway.GatewayMetaState; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.monitor.NodeHealthService; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.plugins.DiscoveryPlugin; @@ -135,7 +136,8 @@ public DiscoveryModule( NodeHealthService nodeHealthService, PersistedStateRegistry persistedStateRegistry, RemoteStoreNodeService remoteStoreNodeService, - ClusterManagerMetrics clusterManagerMetrics + ClusterManagerMetrics clusterManagerMetrics, + RemoteClusterStateService remoteClusterStateService ) { final Collection> joinValidators = new ArrayList<>(); final Map> hostProviders = new HashMap<>(); @@ -214,7 +216,8 @@ public DiscoveryModule( nodeHealthService, persistedStateRegistry, remoteStoreNodeService, - clusterManagerMetrics + clusterManagerMetrics, + remoteClusterStateService ); } else { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index c3056276706a0..1cc0c9cbb6642 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -64,6 +64,7 @@ import org.opensearch.env.NodeMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.model.RemoteUploadDetails; import org.opensearch.index.recovery.RemoteStoreRestoreService; import org.opensearch.index.recovery.RemoteStoreRestoreService.RemoteRestoreResult; import org.opensearch.node.Node; @@ -665,6 +666,8 @@ public static class RemotePersistedState implements PersistedState { private ClusterState lastAcceptedState; private ClusterMetadataManifest lastAcceptedManifest; + + private String lastUploadedManifestFile; private final RemoteClusterStateService remoteClusterStateService; private String previousClusterUUID; @@ -690,10 +693,14 @@ public void setCurrentTerm(long currentTerm) { // But for RemotePersistedState, the state is only pushed by the active cluster. So this method is not required. } + public String getLastUploadedManifestFile() { + return lastUploadedManifestFile; + } + @Override public void setLastAcceptedState(ClusterState clusterState) { try { - final ClusterMetadataManifest manifest; + final RemoteUploadDetails manifestDetails; if (shouldWriteFullClusterState(clusterState)) { final Optional latestManifest = remoteClusterStateService.getLatestClusterMetadataManifest( clusterState.getClusterName().value(), @@ -711,15 +718,16 @@ public void setLastAcceptedState(ClusterState clusterState) { clusterState.metadata().clusterUUID() ); } - manifest = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID); + manifestDetails = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID); } else { assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == true : "Previous manifest and previous ClusterState are not in sync"; - manifest = remoteClusterStateService.writeIncrementalMetadata(lastAcceptedState, clusterState, lastAcceptedManifest); + manifestDetails = remoteClusterStateService.writeIncrementalMetadata(lastAcceptedState, clusterState, lastAcceptedManifest); } - assert verifyManifestAndClusterState(manifest, clusterState) == true : "Manifest and ClusterState are not in sync"; - lastAcceptedManifest = manifest; + assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(), clusterState) == true : "Manifest and ClusterState are not in sync"; + lastAcceptedManifest = manifestDetails.getClusterMetadataManifest(); lastAcceptedState = clusterState; + lastUploadedManifestFile = manifestDetails.getManifestFileName(); } catch (Exception e) { remoteClusterStateService.writeMetadataFailed(); handleExceptionOnWrite(e); @@ -767,12 +775,13 @@ public void markLastAcceptedStateAsCommitted() { metadataBuilder.clusterUUIDCommitted(true); clusterState = ClusterState.builder(lastAcceptedState).metadata(metadataBuilder).build(); } - final ClusterMetadataManifest committedManifest = remoteClusterStateService.markLastStateAsCommitted( + final RemoteUploadDetails committedManifestDetails = remoteClusterStateService.markLastStateAsCommitted( clusterState, lastAcceptedManifest ); - lastAcceptedManifest = committedManifest; + lastAcceptedManifest = committedManifestDetails.getClusterMetadataManifest(); lastAcceptedState = clusterState; + lastUploadedManifestFile = committedManifestDetails.getManifestFileName(); } catch (Exception e) { handleExceptionOnWrite(e); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index d0593dcd51475..798553b72fef0 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -36,6 +36,7 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.model.RemoteUploadDetails; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.node.Node; @@ -268,7 +269,7 @@ public RemoteClusterStateService( * @return A manifest object which contains the details of uploaded entity metadata. */ @Nullable - public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, String previousClusterUUID) throws IOException { + public RemoteUploadDetails writeFullMetadata(ClusterState clusterState, String previousClusterUUID) throws IOException { final long startTimeNanos = relativeTimeNanosSupplier.getAsLong(); if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { logger.error("Local node is not elected cluster manager. Exiting"); @@ -284,7 +285,7 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri true, true ); - final ClusterMetadataManifest manifest = uploadManifest( + final RemoteUploadDetails manifestDetails = uploadManifest( clusterState, uploadedMetadataResults.uploadedIndexMetadata, previousClusterUUID, @@ -311,7 +312,7 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri uploadedMetadataResults.uploadedIndexMetadata.size() ); } - return manifest; + return manifestDetails; } /** @@ -322,7 +323,7 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri * @return The uploaded ClusterMetadataManifest file */ @Nullable - public ClusterMetadataManifest writeIncrementalMetadata( + public RemoteUploadDetails writeIncrementalMetadata( ClusterState previousClusterState, ClusterState clusterState, ClusterMetadataManifest previousManifest @@ -404,7 +405,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( customsToBeDeletedFromRemote.keySet().forEach(allUploadedCustomMap::remove); indicesToBeDeletedFromRemote.keySet().forEach(allUploadedIndexMetadata::remove); - final ClusterMetadataManifest manifest = uploadManifest( + final RemoteUploadDetails manifestDetails = uploadManifest( clusterState, new ArrayList<>(allUploadedIndexMetadata.values()), previousManifest.getPreviousClusterUUID(), @@ -422,7 +423,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( remoteStateStats.stateTook(durationMillis); ParameterizedMessage clusterStateUploadTimeMessage = new ParameterizedMessage( CLUSTER_STATE_UPLOAD_TIME_LOG_STRING, - manifest.getStateVersion(), + manifestDetails.getClusterMetadataManifest().getStateVersion(), durationMillis ); ParameterizedMessage metadataUpdateMessage = new ParameterizedMessage( @@ -444,7 +445,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( } else { logger.info("{}; {}", clusterStateUploadTimeMessage, metadataUpdateMessage); } - return manifest; + return manifestDetails; } private UploadedMetadataResults writeMetadataInParallel( @@ -724,7 +725,7 @@ public RemoteClusterStateCleanupManager getCleanupManager() { } @Nullable - public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest) + public RemoteUploadDetails markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest) throws IOException { assert clusterState != null : "Last accepted cluster state is not set"; if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { @@ -732,7 +733,7 @@ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterStat return null; } assert previousManifest != null : "Last cluster metadata manifest is not set"; - ClusterMetadataManifest committedManifest = uploadManifest( + RemoteUploadDetails committedManifestDetails = uploadManifest( clusterState, previousManifest.getIndices(), previousManifest.getPreviousClusterUUID(), @@ -742,11 +743,11 @@ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterStat previousManifest.getCustomMetadataMap(), true ); - if (!previousManifest.isClusterUUIDCommitted() && committedManifest.isClusterUUIDCommitted()) { - remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, committedManifest); + if (!previousManifest.isClusterUUIDCommitted() && committedManifestDetails.getClusterMetadataManifest().isClusterUUIDCommitted()) { + remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, committedManifestDetails.getClusterMetadataManifest()); } - return committedManifest; + return committedManifestDetails; } @Override @@ -773,7 +774,7 @@ public void start() { this.remoteRoutingTableService.ifPresent(RemoteRoutingTableService::start); } - private ClusterMetadataManifest uploadManifest( + private RemoteUploadDetails uploadManifest( ClusterState clusterState, List uploadedIndexMetadata, String previousClusterUUID, @@ -812,7 +813,7 @@ private ClusterMetadataManifest uploadManifest( new ArrayList<>() ); writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName); - return manifest; + return new RemoteUploadDetails(manifest, manifestFileName); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteUploadDetails.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteUploadDetails.java new file mode 100644 index 0000000000000..100280c454e43 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteUploadDetails.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.gateway.remote.ClusterMetadataManifest; + +public class RemoteUploadDetails { + + private final ClusterMetadataManifest clusterMetadataManifest; + private final String manifestFileName; + + public RemoteUploadDetails(final ClusterMetadataManifest manifest, final String manifestFileName) { + this.clusterMetadataManifest = manifest; + this.manifestFileName = manifestFileName; + } + + public ClusterMetadataManifest getClusterMetadataManifest() { + return clusterMetadataManifest; + } + + public String getManifestFileName() { + return manifestFileName; + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index f7a901335f34a..49c467181cb7b 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1200,7 +1200,8 @@ protected Node( fsHealthService, persistedStateRegistry, remoteStoreNodeService, - clusterManagerMetrics + clusterManagerMetrics, + remoteClusterStateService ); final SearchPipelineService searchPipelineService = new SearchPipelineService( clusterService, diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java index a0f745a4270c4..05714b4791d06 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java @@ -201,6 +201,11 @@ private static boolean isRemoteRoutingTableAttributePresent(Settings settings) { .isEmpty() == false; } + public static boolean isRemotePublicationEnabled(Settings settings) { + return FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) && isRemoteRoutingTableAttributePresent(settings) + && isRemoteStoreClusterStateEnabled(settings); + } + public static boolean isRemoteRoutingTableEnabled(Settings settings) { return FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) && isRemoteRoutingTableAttributePresent(settings); } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index bd71aecf89101..4ade19b3d7219 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -48,6 +48,7 @@ import org.opensearch.gateway.GatewayMetaState.RemotePersistedState; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.model.RemoteUploadDetails; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.EqualsHashCodeTestUtils; import org.opensearch.test.OpenSearchTestCase; @@ -944,7 +945,8 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep .previousClusterUUID(randomAlphaOfLength(10)) .clusterUUIDCommitted(true) .build(); - Mockito.when(remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID)).thenReturn(manifest); + Mockito.when(remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID)) + .thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, ps1); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index f84f0326f4a9d..3a7988bcd2bda 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -274,7 +274,8 @@ protected void onSendRequest( nodeHealthService, persistedStateRegistry, Mockito.mock(RemoteStoreNodeService.class), - new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), + null ); transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java index 6d94054afdea2..a9950d82bfac8 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java @@ -76,7 +76,8 @@ public void testDiffSerializationFailure() { transportService, writableRegistry(), pu -> null, - (pu, l) -> {} + (pu, l) -> {}, + null ); transportService.start(); transportService.acceptIncomingRequests(); @@ -111,7 +112,7 @@ public void writeTo(StreamOutput out) throws IOException { OpenSearchException e = expectThrows( OpenSearchException.class, - () -> handler.newPublicationContext(new ClusterChangedEvent("test", unserializableClusterState, clusterState)) + () -> handler.newPublicationContext(new ClusterChangedEvent("test", unserializableClusterState, clusterState), false, null) ); assertNotNull(e.getCause()); assertThat(e.getCause(), instanceOf(IOException.class)); diff --git a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java index 5539b3237c2bf..cd3e8af2a3cd1 100644 --- a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java @@ -131,7 +131,8 @@ private DiscoveryModule newModule(Settings settings, List plugi null, new PersistedStateRegistry(), remoteStoreNodeService, - new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), + null ); } diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 418e6d8de6adb..1d63caafbb4dd 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -69,6 +69,7 @@ import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.gateway.remote.RemotePersistenceStats; +import org.opensearch.gateway.remote.model.RemoteUploadDetails; import org.opensearch.index.recovery.RemoteStoreRestoreService; import org.opensearch.index.recovery.RemoteStoreRestoreService.RemoteRestoreResult; import org.opensearch.index.remote.RemoteIndexPathUploader; @@ -723,9 +724,10 @@ public void testRemotePersistedState() throws IOException { final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder().clusterTerm(1L).stateVersion(5L).build(); final String previousClusterUUID = "prev-cluster-uuid"; - Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())).thenReturn(manifest); + Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())).thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); - Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(manifest); + Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())) + .thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID); assertThat(remotePersistedState.getLastAcceptedState(), nullValue()); @@ -779,9 +781,10 @@ public void testRemotePersistedStateNotCommitted() throws IOException { .build(); Mockito.when(remoteClusterStateService.getLatestClusterMetadataManifest(Mockito.any(), Mockito.any())) .thenReturn(Optional.of(manifest)); - Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())).thenReturn(manifest); + Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())).thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); - Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(manifest); + Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())) + .thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); CoordinationState.PersistedState remotePersistedState = new RemotePersistedState( remoteClusterStateService, ClusterState.UNKNOWN_UUID diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index 1c2270bab1260..b432e5411404e 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -1184,7 +1184,8 @@ protected Optional getDisruptableMockTransport(Transpo nodeHealthService, persistedStateRegistry, remoteStoreNodeService, - new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), + null ); clusterManagerService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService( From afc033cd93f6145290c3a070da799353a2e712fe Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Sun, 9 Jun 2024 10:21:59 +0530 Subject: [PATCH 4/7] Add dummy methods in RemoteClusterStateService Signed-off-by: Sooraj Sinha --- .../coordination/CoordinationState.java | 8 +- .../cluster/coordination/Coordinator.java | 1 - .../PublicationTransportHandler.java | 99 ++++++++++++++----- .../coordination/RemotePublishRequest.java | 29 +++++- .../cluster/node/DiscoveryNode.java | 2 - .../common/settings/ClusterSettings.java | 4 +- .../opensearch/gateway/GatewayMetaState.java | 15 ++- .../remote/ClusterMetadataManifest.java | 12 +++ .../remote/RemoteClusterStateService.java | 43 ++++++-- ...va => RemoteClusterStateManifestInfo.java} | 4 +- .../remotestore/RemoteStoreNodeAttribute.java | 3 +- .../coordination/CoordinationStateTests.java | 6 +- .../GatewayMetaStatePersistedStateTests.java | 15 ++- .../RemoteClusterStateServiceTests.java | 58 +++++++---- .../snapshots/SnapshotResiliencyTests.java | 3 +- 15 files changed, 220 insertions(+), 82 deletions(-) rename server/src/main/java/org/opensearch/gateway/remote/model/{RemoteUploadDetails.java => RemoteClusterStateManifestInfo.java} (81%) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index f20ea13e7668a..b326b57a051bb 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -40,6 +40,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import java.io.Closeable; import java.io.IOException; @@ -50,10 +51,8 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -457,7 +456,6 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) { clusterState.version(), clusterState.term() ); - logger.info("Setting last accepted state : term - {}, version - {}", clusterState.term(), clusterState.version()); persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState); assert getLastAcceptedState() == clusterState; @@ -583,7 +581,7 @@ public void handlePrePublish(ClusterState clusterState) { // This is to ensure the remote store is the single source of truth for current state. Even if the current node // goes down after sending the cluster state to other nodes, we should be able to read the remote state and // recover the cluster. - if (isRemotePublicationEnabled) { + if (isRemoteStateEnabled) { assert persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null : "Remote state has not been initialized"; persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(clusterState); } @@ -594,7 +592,7 @@ public void handlePrePublish(ClusterState clusterState) { */ public void handlePreCommit() { // Publishing the committed state to remote store before sending apply commit to other nodes. - if (isRemotePublicationEnabled) { + if (isRemoteStateEnabled) { assert persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null : "Remote state has not been initialized"; persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).markLastAcceptedStateAsCommitted(); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 914db31886850..87f02c6891be6 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -49,7 +49,6 @@ import org.opensearch.cluster.coordination.CoordinationState.VoteCollection; import org.opensearch.cluster.coordination.FollowersChecker.FollowerCheckRequest; import org.opensearch.cluster.coordination.JoinHelper.InitialJoinAccumulator; -import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index 0aea421ca1dc3..80dcc0c8511c9 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -31,9 +31,6 @@ package org.opensearch.cluster.coordination; -import java.util.Locale; -import java.util.Optional; -import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -43,7 +40,6 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.Diff; import org.opensearch.cluster.IncompatibleClusterStateVersionException; -import org.opensearch.cluster.coordination.CoordinationState.PersistedState; import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; @@ -65,9 +61,7 @@ import java.io.IOException; import java.util.HashMap; -import java.util.Locale; import java.util.Map; -import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -236,9 +230,12 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException { if (transportService.getLocalNode().equals(request.getSourceNode())) { - return acceptStateOnLocalNode(request); + return acceptRemoteStateOnLocalNode(request); } - ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(request.getClusterUUID(), request.getManifestFile()); + ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName( + request.getClusterUUID(), + request.getManifestFile() + ); boolean applyFullState = false; final ClusterState lastSeen = lastSeenClusterState.get(); if (lastSeen == null) { @@ -253,17 +250,40 @@ private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublish } if (applyFullState == true) { - logger.debug(() -> new ParameterizedMessage("Downloading full cluster state for term {}, version {}, stateUUID {}", manifest.getClusterTerm(), manifest.getStateVersion(), - manifest.getStateUUID())); - ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(request.getClusterName(), manifest, transportService.getLocalNode().getId(), true); + logger.debug( + () -> new ParameterizedMessage( + "Downloading full cluster state for term {}, version {}, stateUUID {}", + manifest.getClusterTerm(), + manifest.getStateVersion(), + manifest.getStateUUID() + ) + ); + ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest( + request.getClusterName(), + manifest, + transportService.getLocalNode().getId(), + true + ); fullClusterStateReceivedCount.incrementAndGet(); final PublishWithJoinResponse response = acceptState(clusterState); lastSeenClusterState.set(clusterState); return response; } else { - logger.debug(() -> new ParameterizedMessage("Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}", manifest.getClusterTerm(), - manifest.getStateVersion(), manifest.getDiffManifest().getFromStateUUID(), manifest.getStateUUID())); - ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(request.getClusterName(), manifest, lastSeen, transportService.getLocalNode().getId()); + logger.debug( + () -> new ParameterizedMessage( + "Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}", + manifest.getClusterTerm(), + manifest.getStateVersion(), + manifest.getDiffManifest().getFromStateUUID(), + manifest.getStateUUID() + ) + ); + ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff( + request.getClusterName(), + manifest, + lastSeen, + transportService.getLocalNode().getId() + ); compatibleClusterStateDiffReceivedCount.incrementAndGet(); final PublishWithJoinResponse response = acceptState(clusterState); lastSeenClusterState.compareAndSet(lastSeen, clusterState); @@ -284,9 +304,10 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) { return handlePublishRequest.apply(new PublishRequest(incomingState)); } - private PublishWithJoinResponse acceptStateOnLocalNode(RemotePublishRequest remotePublishRequest) { + private PublishWithJoinResponse acceptRemoteStateOnLocalNode(RemotePublishRequest remotePublishRequest) { final PublishRequest publishRequest = currentPublishRequestToSelf.get(); - if (publishRequest == null || publishRequest.getAcceptedState().coordinationMetadata().term() != remotePublishRequest.term + if (publishRequest == null + || publishRequest.getAcceptedState().coordinationMetadata().term() != remotePublishRequest.term || publishRequest.getAcceptedState().version() != remotePublishRequest.version) { throw new IllegalStateException("publication to self failed for " + remotePublishRequest); } @@ -295,9 +316,16 @@ private PublishWithJoinResponse acceptStateOnLocalNode(RemotePublishRequest remo return publishWithJoinResponse; } - public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled, - PersistedStateRegistry persistedStateRegistry) { - final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, isRemotePublicationEnabled, persistedStateRegistry); + public PublicationContext newPublicationContext( + ClusterChangedEvent clusterChangedEvent, + boolean isRemotePublicationEnabled, + PersistedStateRegistry persistedStateRegistry + ) { + final PublicationContext publicationContext = new PublicationContext( + clusterChangedEvent, + isRemotePublicationEnabled, + persistedStateRegistry + ); // Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication // straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and @@ -345,7 +373,11 @@ public class PublicationContext { private final boolean sendRemoteState; private final PersistedStateRegistry persistedStateRegistry; - PublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled, PersistedStateRegistry persistedStateRegistry) { + PublicationContext( + ClusterChangedEvent clusterChangedEvent, + boolean isRemotePublicationEnabled, + PersistedStateRegistry persistedStateRegistry + ) { discoveryNodes = clusterChangedEvent.state().nodes(); newState = clusterChangedEvent.state(); previousState = clusterChangedEvent.previousState(); @@ -462,11 +494,22 @@ public String executor() { ); } - private void sendRemoteClusterState(final DiscoveryNode destination, final ClusterState clusterState, final ActionListener listener) { + private void sendRemoteClusterState( + final DiscoveryNode destination, + final ClusterState clusterState, + final ActionListener listener + ) { try { - final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)).getLastUploadedManifestFile(); - final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(discoveryNodes.getLocalNode(), clusterState.term(), - clusterState.getVersion(), clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifestFileName); + final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)) + .getLastUploadedManifestFile(); + final RemotePublishRequest remotePublishRequest = new RemotePublishRequest( + discoveryNodes.getLocalNode(), + clusterState.term(), + clusterState.getVersion(), + clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID(), + manifestFileName + ); final Consumer transportExceptionHandler = exp -> { logger.debug(() -> new ParameterizedMessage("failed to send remote cluster state to {}", destination), exp); listener.onFailure(exp); @@ -493,7 +536,13 @@ public String executor() { return ThreadPool.Names.GENERIC; } }; - transportService.sendRequest(destination, PUBLISH_REMOTE_STATE_ACTION_NAME, remotePublishRequest, stateRequestOptions, responseHandler); + transportService.sendRequest( + destination, + PUBLISH_REMOTE_STATE_ACTION_NAME, + remotePublishRequest, + stateRequestOptions, + responseHandler + ); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("error sending remote cluster state to {}", destination), e); listener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java index 68dea53372c45..9461c5ee63627 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java @@ -14,13 +14,24 @@ import java.io.IOException; +/** + * Send the publish request with the remote cluster state details + * @opensearch.internal + */ public class RemotePublishRequest extends TermVersionRequest { private final String clusterName; private final String clusterUUID; private final String manifestFile; - public RemotePublishRequest(DiscoveryNode sourceNode, long term, long version, String clusterName, String clusterUUID, String manifestFile) { + public RemotePublishRequest( + DiscoveryNode sourceNode, + long term, + long version, + String clusterName, + String clusterUUID, + String manifestFile + ) { super(sourceNode, term, version); this.clusterName = clusterName; this.clusterUUID = clusterUUID; @@ -44,8 +55,20 @@ public void writeTo(StreamOutput out) throws IOException { @Override public String toString() { - return "RemotePublishRequest{" + "term=" + term + ", version=" + version + ", clusterName=" + clusterName + ", clusterUUID=" + clusterUUID - + ", sourceNode=" + sourceNode + ", manifestFile=" + manifestFile + '}'; + return "RemotePublishRequest{" + + "term=" + + term + + ", version=" + + version + + ", clusterName=" + + clusterName + + ", clusterUUID=" + + clusterUUID + + ", sourceNode=" + + sourceNode + + ", manifestFile=" + + manifestFile + + '}'; } public String getClusterName() { diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index 6cf1f654e3939..f322106f2fe3b 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -33,7 +33,6 @@ package org.opensearch.cluster.node; import org.opensearch.Version; -import org.opensearch.cluster.metadata.Metadata; import org.opensearch.common.UUIDs; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.settings.Setting; @@ -44,7 +43,6 @@ import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; import org.opensearch.node.Node; import java.io.IOException; diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 3ba6cebbe7350..09f32884e0ae1 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -77,7 +77,6 @@ import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; -import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterManagerService; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; @@ -716,6 +715,9 @@ public void apply(Settings value, Settings current, Settings previous) { // Remote cluster state settings RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING, + RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, + RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, + RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING, RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING, IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index 1cc0c9cbb6642..80ba57b7db4a9 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -64,7 +64,7 @@ import org.opensearch.env.NodeMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; -import org.opensearch.gateway.remote.model.RemoteUploadDetails; +import org.opensearch.gateway.remote.model.RemoteClusterStateManifestInfo; import org.opensearch.index.recovery.RemoteStoreRestoreService; import org.opensearch.index.recovery.RemoteStoreRestoreService.RemoteRestoreResult; import org.opensearch.node.Node; @@ -700,7 +700,7 @@ public String getLastUploadedManifestFile() { @Override public void setLastAcceptedState(ClusterState clusterState) { try { - final RemoteUploadDetails manifestDetails; + final RemoteClusterStateManifestInfo manifestDetails; if (shouldWriteFullClusterState(clusterState)) { final Optional latestManifest = remoteClusterStateService.getLatestClusterMetadataManifest( clusterState.getClusterName().value(), @@ -722,9 +722,14 @@ public void setLastAcceptedState(ClusterState clusterState) { } else { assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == true : "Previous manifest and previous ClusterState are not in sync"; - manifestDetails = remoteClusterStateService.writeIncrementalMetadata(lastAcceptedState, clusterState, lastAcceptedManifest); + manifestDetails = remoteClusterStateService.writeIncrementalMetadata( + lastAcceptedState, + clusterState, + lastAcceptedManifest + ); } - assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(), clusterState) == true : "Manifest and ClusterState are not in sync"; + assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(), clusterState) == true + : "Manifest and ClusterState are not in sync"; lastAcceptedManifest = manifestDetails.getClusterMetadataManifest(); lastAcceptedState = clusterState; lastUploadedManifestFile = manifestDetails.getManifestFileName(); @@ -775,7 +780,7 @@ public void markLastAcceptedStateAsCommitted() { metadataBuilder.clusterUUIDCommitted(true); clusterState = ClusterState.builder(lastAcceptedState).metadata(metadataBuilder).build(); } - final RemoteUploadDetails committedManifestDetails = remoteClusterStateService.markLastStateAsCommitted( + final RemoteClusterStateManifestInfo committedManifestDetails = remoteClusterStateService.markLastStateAsCommitted( clusterState, lastAcceptedManifest ); diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index b3b1bf37f8696..d82ba0b859f10 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -332,6 +332,11 @@ public Map getCustomMetadataMap() { return uploadedCustomMetadataMap; } + // TODO https://github.com/opensearch-project/OpenSearch/pull/14089 + public ClusterStateDiffManifest getDiffManifest() { + return new ClusterStateDiffManifest(); + } + public boolean hasMetadataAttributesFiles() { return uploadedCoordinationMetadata != null || uploadedSettingsMetadata != null @@ -991,4 +996,11 @@ public String toString() { + '}'; } } + + // TODO https://github.com/opensearch-project/OpenSearch/pull/14089 + public static class ClusterStateDiffManifest { + public String getFromStateUUID() { + return null; + } + } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 798553b72fef0..8128921de6d3f 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -36,7 +36,7 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; -import org.opensearch.gateway.remote.model.RemoteUploadDetails; +import org.opensearch.gateway.remote.model.RemoteClusterStateManifestInfo; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.node.Node; @@ -269,7 +269,7 @@ public RemoteClusterStateService( * @return A manifest object which contains the details of uploaded entity metadata. */ @Nullable - public RemoteUploadDetails writeFullMetadata(ClusterState clusterState, String previousClusterUUID) throws IOException { + public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterState, String previousClusterUUID) throws IOException { final long startTimeNanos = relativeTimeNanosSupplier.getAsLong(); if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { logger.error("Local node is not elected cluster manager. Exiting"); @@ -285,7 +285,7 @@ public RemoteUploadDetails writeFullMetadata(ClusterState clusterState, String p true, true ); - final RemoteUploadDetails manifestDetails = uploadManifest( + final RemoteClusterStateManifestInfo manifestDetails = uploadManifest( clusterState, uploadedMetadataResults.uploadedIndexMetadata, previousClusterUUID, @@ -323,7 +323,7 @@ public RemoteUploadDetails writeFullMetadata(ClusterState clusterState, String p * @return The uploaded ClusterMetadataManifest file */ @Nullable - public RemoteUploadDetails writeIncrementalMetadata( + public RemoteClusterStateManifestInfo writeIncrementalMetadata( ClusterState previousClusterState, ClusterState clusterState, ClusterMetadataManifest previousManifest @@ -405,7 +405,7 @@ public RemoteUploadDetails writeIncrementalMetadata( customsToBeDeletedFromRemote.keySet().forEach(allUploadedCustomMap::remove); indicesToBeDeletedFromRemote.keySet().forEach(allUploadedIndexMetadata::remove); - final RemoteUploadDetails manifestDetails = uploadManifest( + final RemoteClusterStateManifestInfo manifestDetails = uploadManifest( clusterState, new ArrayList<>(allUploadedIndexMetadata.values()), previousManifest.getPreviousClusterUUID(), @@ -725,7 +725,7 @@ public RemoteClusterStateCleanupManager getCleanupManager() { } @Nullable - public RemoteUploadDetails markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest) + public RemoteClusterStateManifestInfo markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest) throws IOException { assert clusterState != null : "Last accepted cluster state is not set"; if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { @@ -733,7 +733,7 @@ public RemoteUploadDetails markLastStateAsCommitted(ClusterState clusterState, C return null; } assert previousManifest != null : "Last cluster metadata manifest is not set"; - RemoteUploadDetails committedManifestDetails = uploadManifest( + RemoteClusterStateManifestInfo committedManifestDetails = uploadManifest( clusterState, previousManifest.getIndices(), previousManifest.getPreviousClusterUUID(), @@ -774,7 +774,7 @@ public void start() { this.remoteRoutingTableService.ifPresent(RemoteRoutingTableService::start); } - private RemoteUploadDetails uploadManifest( + private RemoteClusterStateManifestInfo uploadManifest( ClusterState clusterState, List uploadedIndexMetadata, String previousClusterUUID, @@ -813,7 +813,7 @@ private RemoteUploadDetails uploadManifest( new ArrayList<>() ); writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName); - return new RemoteUploadDetails(manifest, manifestFileName); + return new RemoteClusterStateManifestInfo(manifest, manifestFileName); } } @@ -1092,6 +1092,31 @@ public ClusterState getLatestClusterState(String clusterName, String clusterUUID .build(); } + public ClusterState getClusterStateForManifest( + String clusterName, + ClusterMetadataManifest manifest, + String localNodeId, + boolean includeEphemeral + ) { + // TODO https://github.com/opensearch-project/OpenSearch/pull/14089 + return null; + } + + public ClusterState getClusterStateUsingDiff( + String clusterName, + ClusterMetadataManifest manifest, + ClusterState previousClusterState, + String localNodeId + ) { + // TODO https://github.com/opensearch-project/OpenSearch/pull/14089 + return null; + } + + public ClusterMetadataManifest getClusterMetadataManifestByFileName(String clusterUUID, String manifestFileName) { + // TODO https://github.com/opensearch-project/OpenSearch/pull/14089 + return null; + } + private Metadata getGlobalMetadata(String clusterName, String clusterUUID, ClusterMetadataManifest clusterMetadataManifest) { String globalMetadataFileName = clusterMetadataManifest.getGlobalMetadataFileName(); try { diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteUploadDetails.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateManifestInfo.java similarity index 81% rename from server/src/main/java/org/opensearch/gateway/remote/model/RemoteUploadDetails.java rename to server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateManifestInfo.java index 100280c454e43..47c1dbf59f938 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteUploadDetails.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateManifestInfo.java @@ -10,12 +10,12 @@ import org.opensearch.gateway.remote.ClusterMetadataManifest; -public class RemoteUploadDetails { +public class RemoteClusterStateManifestInfo { private final ClusterMetadataManifest clusterMetadataManifest; private final String manifestFileName; - public RemoteUploadDetails(final ClusterMetadataManifest manifest, final String manifestFileName) { + public RemoteClusterStateManifestInfo(final ClusterMetadataManifest manifest, final String manifestFileName) { this.clusterMetadataManifest = manifest; this.manifestFileName = manifestFileName; } diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java index 05714b4791d06..46d1d07db7eb6 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java @@ -202,7 +202,8 @@ private static boolean isRemoteRoutingTableAttributePresent(Settings settings) { } public static boolean isRemotePublicationEnabled(Settings settings) { - return FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) && isRemoteRoutingTableAttributePresent(settings) + return FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) + && isRemoteRoutingTableAttributePresent(settings) && isRemoteStoreClusterStateEnabled(settings); } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index 4ade19b3d7219..e74962dcbba1b 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -48,7 +48,7 @@ import org.opensearch.gateway.GatewayMetaState.RemotePersistedState; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; -import org.opensearch.gateway.remote.model.RemoteUploadDetails; +import org.opensearch.gateway.remote.model.RemoteClusterStateManifestInfo; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.EqualsHashCodeTestUtils; import org.opensearch.test.OpenSearchTestCase; @@ -946,7 +946,7 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep .clusterUUIDCommitted(true) .build(); Mockito.when(remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID)) - .thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); + .thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest")); final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, ps1); @@ -979,6 +979,8 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep Mockito.verify(remoteClusterStateService, Mockito.times(1)).writeFullMetadata(clusterState, previousClusterUUID); assertThat(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState(), equalTo(clusterState)); + Mockito.when(remoteClusterStateService.markLastStateAsCommitted(any(), any())) + .thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest")); coordinationState.handlePreCommit(); ClusterState committedClusterState = ClusterState.builder(clusterState) .metadata(Metadata.builder(clusterState.metadata()).clusterUUIDCommitted(true).build()) diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 1d63caafbb4dd..7b113961fc2c7 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -69,7 +69,7 @@ import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.gateway.remote.RemotePersistenceStats; -import org.opensearch.gateway.remote.model.RemoteUploadDetails; +import org.opensearch.gateway.remote.model.RemoteClusterStateManifestInfo; import org.opensearch.index.recovery.RemoteStoreRestoreService; import org.opensearch.index.recovery.RemoteStoreRestoreService.RemoteRestoreResult; import org.opensearch.index.remote.RemoteIndexPathUploader; @@ -724,10 +724,11 @@ public void testRemotePersistedState() throws IOException { final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder().clusterTerm(1L).stateVersion(5L).build(); final String previousClusterUUID = "prev-cluster-uuid"; - Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())).thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); + Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())) + .thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest")); Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())) - .thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); + .thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest")); CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID); assertThat(remotePersistedState.getLastAcceptedState(), nullValue()); @@ -756,6 +757,9 @@ public void testRemotePersistedState() throws IOException { assertThat(remotePersistedState.getLastAcceptedState(), equalTo(secondClusterState)); assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm)); + when(remoteClusterStateService.markLastStateAsCommitted(Mockito.any(), Mockito.any())).thenReturn( + new RemoteClusterStateManifestInfo(manifest, "path/to/manifest") + ); remotePersistedState.markLastAcceptedStateAsCommitted(); Mockito.verify(remoteClusterStateService, times(1)).markLastStateAsCommitted(Mockito.any(), Mockito.any()); @@ -781,10 +785,11 @@ public void testRemotePersistedStateNotCommitted() throws IOException { .build(); Mockito.when(remoteClusterStateService.getLatestClusterMetadataManifest(Mockito.any(), Mockito.any())) .thenReturn(Optional.of(manifest)); - Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())).thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); + Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())) + .thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest")); Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())) - .thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); + .thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest")); CoordinationState.PersistedState remotePersistedState = new RemotePersistedState( remoteClusterStateService, ClusterState.UNKNOWN_UUID diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 4a53770c76d88..890a4b478b502 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -43,6 +43,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.model.RemoteClusterStateManifestInfo; import org.opensearch.index.remote.RemoteIndexPathUploader; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.indices.IndicesModule; @@ -182,8 +183,11 @@ public void teardown() throws Exception { public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().build(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)); - Assert.assertThat(manifest, nullValue()); + final RemoteClusterStateManifestInfo manifestDetails = remoteClusterStateService.writeFullMetadata( + clusterState, + randomAlphaOfLength(10) + ); + Assert.assertThat(manifestDetails, nullValue()); } public void testFailInitializationWhenRemoteStateDisabled() { @@ -218,7 +222,8 @@ public void testWriteFullMetadataSuccess() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid"); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") + .getClusterMetadataManifest(); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); List indices = List.of(uploadedIndexMetadata); @@ -262,7 +267,8 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { }).when(container).asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture()); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid"); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") + .getClusterMetadataManifest(); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); List indices = List.of(uploadedIndexMetadata); @@ -401,8 +407,12 @@ public void testWriteFullMetadataInParallelFailureForIndexMetadata() throws IOEx public void testFailWriteIncrementalMetadataNonClusterManagerNode() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().build(); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata(clusterState, clusterState, null); - Assert.assertThat(manifest, nullValue()); + final RemoteClusterStateManifestInfo manifestDetails = remoteClusterStateService.writeIncrementalMetadata( + clusterState, + clusterState, + null + ); + Assert.assertThat(manifestDetails, nullValue()); assertEquals(0, remoteClusterStateService.getStats().getSuccessCount()); } @@ -433,7 +443,7 @@ public void testWriteIncrementalMetadataSuccess() throws IOException { previousClusterState, clusterState, previousManifest - ); + ).getClusterMetadataManifest(); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); final List indices = List.of(uploadedIndexMetadata); @@ -508,7 +518,7 @@ private void verifyCodecMigrationManifest(int previousCodec) throws IOException previousClusterState, newClusterState, previousManifest - ); + ).getClusterMetadataManifest(); // global metadata is updated assertThat(manifestAfterUpdate.hasMetadataAttributesFiles(), is(true)); @@ -545,7 +555,7 @@ private void verifyWriteIncrementalGlobalMetadataFromOlderCodecSuccess(ClusterMe previousClusterState, clusterState, previousManifest - ); + ).getClusterMetadataManifest(); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() .codecVersion(3) @@ -736,7 +746,8 @@ public void testCustomMetadataDeletedUpdatedAndAdded() throws IOException { // Initial cluster state with index. final ClusterState initialClusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); remoteClusterStateService.start(); - final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_"); + final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_") + .getClusterMetadataManifest(); ClusterState clusterState1 = ClusterState.builder(initialClusterState) .metadata( @@ -751,7 +762,7 @@ public void testCustomMetadataDeletedUpdatedAndAdded() throws IOException { initialClusterState, clusterState1, initialManifest - ); + ).getClusterMetadataManifest(); // remove custom1 from the cluster state, update custom2, custom3 is at it is, added custom4 ClusterState clusterState2 = ClusterState.builder(initialClusterState) .metadata( @@ -761,7 +772,8 @@ public void testCustomMetadataDeletedUpdatedAndAdded() throws IOException { .putCustom("custom4", new CustomMetadata1("mock_custom_metadata4")) ) .build(); - ClusterMetadataManifest manifest2 = remoteClusterStateService.writeIncrementalMetadata(clusterState1, clusterState2, manifest1); + ClusterMetadataManifest manifest2 = remoteClusterStateService.writeIncrementalMetadata(clusterState1, clusterState2, manifest1) + .getClusterMetadataManifest(); // custom1 is removed assertFalse(manifest2.getCustomMetadataMap().containsKey("custom1")); // custom2 is updated @@ -811,7 +823,8 @@ public void testIndexMetadataDeletedUpdatedAndAdded() throws IOException { // Initial cluster state with index. final ClusterState initialClusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); remoteClusterStateService.start(); - final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_"); + final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_") + .getClusterMetadataManifest(); String initialIndex = "test-index"; Index index1 = new Index("test-index-1", "index-uuid-1"); Index index2 = new Index("test-index-2", "index-uuid-2"); @@ -844,7 +857,7 @@ public void testIndexMetadataDeletedUpdatedAndAdded() throws IOException { initialClusterState, clusterState1, initialManifest - ); + ).getClusterMetadataManifest(); // verify that initial index is removed, and new index are added assertEquals(1, initialManifest.getIndices().size()); assertEquals(2, manifest1.getIndices().size()); @@ -855,7 +868,8 @@ public void testIndexMetadataDeletedUpdatedAndAdded() throws IOException { ClusterState clusterState2 = ClusterState.builder(clusterState1) .metadata(Metadata.builder(clusterState1.getMetadata()).put(indexMetadata1, true).build()) .build(); - ClusterMetadataManifest manifest2 = remoteClusterStateService.writeIncrementalMetadata(clusterState1, clusterState2, manifest1); + ClusterMetadataManifest manifest2 = remoteClusterStateService.writeIncrementalMetadata(clusterState1, clusterState2, manifest1) + .getClusterMetadataManifest(); // index1 is updated assertEquals(2, manifest2.getIndices().size()); assertEquals( @@ -888,7 +902,8 @@ private void verifyMetadataAttributeOnlyUpdated( // Initial cluster state with index. final ClusterState initialClusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); remoteClusterStateService.start(); - final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_"); + final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_") + .getClusterMetadataManifest(); ClusterState newClusterState = clusterStateUpdater.apply(initialClusterState); @@ -899,9 +914,10 @@ private void verifyMetadataAttributeOnlyUpdated( initialClusterState, newClusterState, initialManifest - ); + ).getClusterMetadataManifest(); } else { - manifestAfterMetadataUpdate = remoteClusterStateService.writeFullMetadata(newClusterState, initialClusterState.stateUUID()); + manifestAfterMetadataUpdate = remoteClusterStateService.writeFullMetadata(newClusterState, initialClusterState.stateUUID()) + .getClusterMetadataManifest(); } assertions.accept(initialManifest, manifestAfterMetadataUpdate); @@ -1182,7 +1198,8 @@ public void testMarkLastStateAsCommittedSuccess() throws IOException { List indices = List.of(uploadedIndexMetadata); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().indices(indices).build(); - final ClusterMetadataManifest manifest = remoteClusterStateService.markLastStateAsCommitted(clusterState, previousManifest); + final ClusterMetadataManifest manifest = remoteClusterStateService.markLastStateAsCommitted(clusterState, previousManifest) + .getClusterMetadataManifest(); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() .indices(indices) @@ -1287,7 +1304,8 @@ public void testRemoteStateStats() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid"); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") + .getClusterMetadataManifest(); assertTrue(remoteClusterStateService.getStats() != null); assertEquals(1, remoteClusterStateService.getStats().getSuccessCount()); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 622507f885814..e6fa149c26b22 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2562,7 +2562,8 @@ public void start(ClusterState initialState) { () -> new StatusInfo(HEALTHY, "healthy-info"), persistedStateRegistry, remoteStoreNodeService, - new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), + null ); clusterManagerService.setClusterStatePublisher(coordinator); coordinator.start(); From f480d2413ac94df7c9b68edc9e226dc288dc9c44 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Sun, 9 Jun 2024 19:31:23 +0530 Subject: [PATCH 5/7] Add remote publish unit tests Signed-off-by: Sooraj Sinha --- .../PublicationTransportHandler.java | 27 ++- .../cluster/node/DiscoveryNode.java | 8 + .../PublicationTransportHandlerTests.java | 218 ++++++++++++++++-- 3 files changed, 238 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index 80dcc0c8511c9..e1b6d1adbfe2a 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -228,14 +228,19 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque } } - private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException { + // package private for testing + PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException { if (transportService.getLocalNode().equals(request.getSourceNode())) { return acceptRemoteStateOnLocalNode(request); } + // TODO Make cluster state download non-blocking: https://github.com/opensearch-project/OpenSearch/issues/14102 ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName( request.getClusterUUID(), request.getManifestFile() ); + if (manifest == null) { + throw new IllegalStateException("Publication failed as manifest was not found for " + request); + } boolean applyFullState = false; final ClusterState lastSeen = lastSeenClusterState.get(); if (lastSeen == null) { @@ -309,6 +314,13 @@ private PublishWithJoinResponse acceptRemoteStateOnLocalNode(RemotePublishReques if (publishRequest == null || publishRequest.getAcceptedState().coordinationMetadata().term() != remotePublishRequest.term || publishRequest.getAcceptedState().version() != remotePublishRequest.version) { + logger.debug( + () -> new ParameterizedMessage( + "Publication failure for current publish request : {} and remote publish request: {}", + publishRequest, + remotePublishRequest + ) + ); throw new IllegalStateException("publication to self failed for " + remotePublishRequest); } PublishWithJoinResponse publishWithJoinResponse = handlePublishRequest.apply(publishRequest); @@ -334,6 +346,16 @@ public PublicationContext newPublicationContext( return publicationContext; } + // package private for testing + void setCurrentPublishRequestToSelf(PublishRequest publishRequest) { + this.currentPublishRequestToSelf.set(publishRequest); + } + + // package private for testing + void setLastSeenClusterState(ClusterState clusterState) { + this.lastSeenClusterState.set(clusterState); + } + private static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException { final BytesReference serializedState = CompressedStreamUtils.createCompressedStream(nodeVersion, stream -> { stream.writeBoolean(true); @@ -447,7 +469,8 @@ public void onFailure(Exception e) { } else { responseActionListener = listener; } - if (sendRemoteState && destination.isRemoteClusterStateEnabled() && destination.isRemoteRoutingTableEnabled()) { + if (sendRemoteState && destination.isRemoteStatePublicationEnabled()) { + logger.trace("sending remote cluster state version [{}] to [{}]", newState.version(), destination); sendRemoteClusterState(destination, publishRequest.getAcceptedState(), responseActionListener); } else if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) { logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination); diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index f322106f2fe3b..3a299a6d20e4e 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -491,6 +491,14 @@ public boolean isRemoteRoutingTableEnabled() { return this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)); } + /** + * Returns whether remote cluster state publication is enabled on this node + * @return true if the node contains remote cluster state node attribute and remote routing table node attribute + */ + public boolean isRemoteStatePublicationEnabled() { + return isRemoteClusterStateEnabled() && isRemoteRoutingTableEnabled(); + } + /** * Returns a set of all the roles that the node has. The roles are returned in sorted order by the role name. *

diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java index a9950d82bfac8..4893601f28f80 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java @@ -37,33 +37,59 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.Diff; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.node.Node; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.CapturingTransport; import org.opensearch.transport.TransportService; +import org.junit.Before; import java.io.IOException; import java.util.Collections; +import java.util.Optional; +import java.util.function.Function; + +import org.mockito.Mockito; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; public class PublicationTransportHandlerTests extends OpenSearchTestCase { - public void testDiffSerializationFailure() { - DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( + private static final long TERM = 5; + private static final long VERSION = 5; + private static final String CLUSTER_NAME = "test-cluster"; + private static final String CLUSTER_UUID = "test-cluster-UUID"; + private static final String MANIFEST_FILE = "path/to/manifest"; + private static final String LOCAL_NODE_ID = "localNode"; + + private DeterministicTaskQueue deterministicTaskQueue; + private TransportService transportService; + private DiscoveryNode localNode; + private DiscoveryNode secondNode; + + @Before + public void setup() { + deterministicTaskQueue = new DeterministicTaskQueue( Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(), random() ); final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - final DiscoveryNode localNode = new DiscoveryNode("localNode", buildNewFakeTransportAddress(), Version.CURRENT); - final TransportService transportService = new CapturingTransport().createTransportService( + localNode = new DiscoveryNode(LOCAL_NODE_ID, buildNewFakeTransportAddress(), Version.CURRENT); + secondNode = new DiscoveryNode("secondNode", buildNewFakeTransportAddress(), Version.CURRENT); + transportService = new CapturingTransport().createTransportService( Settings.EMPTY, deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR, @@ -72,15 +98,10 @@ public void testDiffSerializationFailure() { Collections.emptySet(), NoopTracer.INSTANCE ); - final PublicationTransportHandler handler = new PublicationTransportHandler( - transportService, - writableRegistry(), - pu -> null, - (pu, l) -> {}, - null - ); - transportService.start(); - transportService.acceptIncomingRequests(); + } + + public void testDiffSerializationFailure() { + final PublicationTransportHandler handler = getPublicationTransportHandler(p -> null, null); final DiscoveryNode otherNode = new DiscoveryNode("otherNode", buildNewFakeTransportAddress(), Version.CURRENT); final ClusterState clusterState = CoordinationStateTests.clusterState( @@ -118,4 +139,175 @@ public void writeTo(StreamOutput out) throws IOException { assertThat(e.getCause(), instanceOf(IOException.class)); assertThat(e.getCause().getMessage(), containsString("Simulated failure of diff serialization")); } + + public void testHandleRemoteIncomingPublishRequestWhenNoCurrentPublishRequest() { + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + + PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); + Function handlePublishRequest = p -> expectedPublishResponse; + final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService); + RemotePublishRequest remotePublishRequest = new RemotePublishRequest( + localNode, + TERM, + VERSION, + CLUSTER_NAME, + CLUSTER_UUID, + MANIFEST_FILE + ); + + IllegalStateException e = expectThrows( + IllegalStateException.class, + () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest) + ); + assertThat(e.getMessage(), containsString("publication to self failed")); + Mockito.verifyNoInteractions(remoteClusterStateService); + } + + public void testHandleRemoteIncomingPublishRequestWhenTermMismatch() { + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + + PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); + Function handlePublishRequest = p -> expectedPublishResponse; + final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService); + RemotePublishRequest remotePublishRequest = new RemotePublishRequest( + localNode, + TERM, + VERSION, + CLUSTER_NAME, + CLUSTER_UUID, + MANIFEST_FILE + ); + ClusterState clusterState = buildClusterState(6L, VERSION); + PublishRequest publishRequest = new PublishRequest(clusterState); + handler.setCurrentPublishRequestToSelf(publishRequest); + IllegalStateException e = expectThrows( + IllegalStateException.class, + () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest) + ); + assertThat(e.getMessage(), containsString("publication to self failed")); + Mockito.verifyNoInteractions(remoteClusterStateService); + } + + public void testHandleRemoteIncomingPublishRequestWhenVersionMismatch() { + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + + PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); + Function handlePublishRequest = p -> expectedPublishResponse; + final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService); + RemotePublishRequest remotePublishRequest = new RemotePublishRequest( + localNode, + TERM, + VERSION, + CLUSTER_NAME, + CLUSTER_UUID, + MANIFEST_FILE + ); + ClusterState clusterState = buildClusterState(TERM, 11L); + PublishRequest publishRequest = new PublishRequest(clusterState); + handler.setCurrentPublishRequestToSelf(publishRequest); + IllegalStateException e = expectThrows( + IllegalStateException.class, + () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest) + ); + assertThat(e.getMessage(), containsString("publication to self failed")); + Mockito.verifyNoInteractions(remoteClusterStateService); + } + + public void testHandleRemoteIncomingPublishRequestForLocalNode() throws IOException { + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + + PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); + Function handlePublishRequest = p -> expectedPublishResponse; + final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService); + RemotePublishRequest remotePublishRequest = new RemotePublishRequest( + localNode, + TERM, + VERSION, + CLUSTER_NAME, + CLUSTER_UUID, + MANIFEST_FILE + ); + ClusterState clusterState = buildClusterState(TERM, VERSION); + PublishRequest publishRequest = new PublishRequest(clusterState); + handler.setCurrentPublishRequestToSelf(publishRequest); + PublishWithJoinResponse publishWithJoinResponse = handler.handleIncomingRemotePublishRequest(remotePublishRequest); + assertThat(publishWithJoinResponse, is(expectedPublishResponse)); + Mockito.verifyNoInteractions(remoteClusterStateService); + } + + public void testHandleRemoteIncomingPublishRequestWhenManifestNotFound() throws IOException { + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + + PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); + Function handlePublishRequest = p -> expectedPublishResponse; + final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService); + RemotePublishRequest remotePublishRequest = new RemotePublishRequest( + secondNode, + TERM, + VERSION, + CLUSTER_NAME, + CLUSTER_UUID, + MANIFEST_FILE + ); + when(remoteClusterStateService.getClusterMetadataManifestByFileName(CLUSTER_UUID, MANIFEST_FILE)).thenReturn(null); + ClusterState clusterState = buildClusterState(TERM, VERSION); + PublishRequest publishRequest = new PublishRequest(clusterState); + handler.setCurrentPublishRequestToSelf(publishRequest); + IllegalStateException e = expectThrows( + IllegalStateException.class, + () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest) + ); + assertThat(e.getMessage(), containsString("Publication failed as manifest was not found for")); + Mockito.verify(remoteClusterStateService, times(1)).getClusterMetadataManifestByFileName(Mockito.any(), Mockito.any()); + } + + public void testHandleRemoteIncomingPublishRequestWhenNoLastSeenState() throws IOException { + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + + PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); + Function handlePublishRequest = p -> expectedPublishResponse; + final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService); + RemotePublishRequest remotePublishRequest = new RemotePublishRequest( + secondNode, + TERM, + VERSION, + CLUSTER_NAME, + CLUSTER_UUID, + MANIFEST_FILE + ); + ClusterMetadataManifest manifest = ClusterMetadataManifest.builder().clusterTerm(TERM).stateVersion(VERSION).build(); + when(remoteClusterStateService.getClusterMetadataManifestByFileName(CLUSTER_UUID, MANIFEST_FILE)).thenReturn(manifest); + when(remoteClusterStateService.getClusterStateForManifest(CLUSTER_NAME, manifest, LOCAL_NODE_ID, true)).thenReturn( + buildClusterState(TERM, VERSION) + ); + ClusterState clusterState = buildClusterState(TERM, VERSION); + PublishRequest publishRequest = new PublishRequest(clusterState); + handler.setCurrentPublishRequestToSelf(publishRequest); + PublishWithJoinResponse publishWithJoinResponse = handler.handleIncomingRemotePublishRequest(remotePublishRequest); + assertThat(publishWithJoinResponse, is(expectedPublishResponse)); + Mockito.verify(remoteClusterStateService, times(1)).getClusterMetadataManifestByFileName(Mockito.any(), Mockito.any()); + } + + private PublicationTransportHandler getPublicationTransportHandler( + Function handlePublishRequest, + RemoteClusterStateService remoteClusterStateService + ) { + final PublicationTransportHandler handler = new PublicationTransportHandler( + transportService, + writableRegistry(), + handlePublishRequest, + (pu, l) -> {}, + remoteClusterStateService + ); + transportService.start(); + transportService.acceptIncomingRequests(); + return handler; + } + + private ClusterState buildClusterState(long term, long version) { + CoordinationMetadata.Builder coordMetadataBuilder = CoordinationMetadata.builder().term(term); + Metadata newMetadata = Metadata.builder().coordinationMetadata(coordMetadataBuilder.build()).build(); + DiscoveryNodes nodes = DiscoveryNodes.builder().add(localNode).add(secondNode).localNodeId(LOCAL_NODE_ID).build(); + return ClusterState.builder(ClusterState.EMPTY_STATE).version(version).metadata(newMetadata).nodes(nodes).build(); + } } From 9feb4be80793a5364f48e656ce166f2c52230da1 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Mon, 10 Jun 2024 13:13:23 +0530 Subject: [PATCH 6/7] Remove duplicate methods Signed-off-by: Sooraj Sinha --- .../coordination/CoordinationState.java | 6 +++-- .../PublicationTransportHandler.java | 1 + .../cluster/node/DiscoveryNode.java | 25 ++++--------------- .../remotestore/RemoteStoreNodeAttribute.java | 6 ----- .../PublicationTransportHandlerTests.java | 12 ++++----- 5 files changed, 16 insertions(+), 34 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index b326b57a051bb..7fa63ae8abc62 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -39,8 +39,8 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.io.IOUtils; -import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import java.io.Closeable; import java.io.IOException; @@ -53,6 +53,7 @@ import java.util.Set; import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM; +import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -104,7 +105,8 @@ public CoordinationState( .getLastAcceptedConfiguration(); this.publishVotes = new VoteCollection(); this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings); - this.isRemotePublicationEnabled = RemoteStoreNodeAttribute.isRemotePublicationEnabled(settings); + this.isRemotePublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) + && localNode.isRemoteStatePublicationEnabled(); } public boolean isRemotePublicationEnabled() { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index e1b6d1adbfe2a..36eabd51ffda1 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -469,6 +469,7 @@ public void onFailure(Exception e) { } else { responseActionListener = listener; } + // TODO Decide to send remote state before starting publication by checking remote publication on all nodes if (sendRemoteState && destination.isRemoteStatePublicationEnabled()) { logger.trace("sending remote cluster state version [{}] to [{}]", newState.version(), destination); sendRemoteClusterState(destination, publishRequest.getAcceptedState(), responseActionListener); diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index 3a299a6d20e4e..690621c2e7bca 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -472,31 +472,16 @@ public boolean isRemoteStoreNode() { return this.getAttributes().keySet().stream().anyMatch(key -> key.startsWith(REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX)); } - /** - * Returns whether the node is a remote cluster state enabled node. - * @return true if the node contains remote cluster state node attribute, false otherwise - */ - public boolean isRemoteClusterStateEnabled() { - return this.getAttributes() - .keySet() - .stream() - .anyMatch(key -> (key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY))); - } - - /** - * Returns whether remote routing table is enabled on the node - * @return true if the node contains remote routing table node attributes, false otherwise - */ - public boolean isRemoteRoutingTableEnabled() { - return this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)); - } - /** * Returns whether remote cluster state publication is enabled on this node * @return true if the node contains remote cluster state node attribute and remote routing table node attribute */ public boolean isRemoteStatePublicationEnabled() { - return isRemoteClusterStateEnabled() && isRemoteRoutingTableEnabled(); + return this.getAttributes() + .keySet() + .stream() + .anyMatch(key -> (key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY))) + && this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)); } /** diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java index 46d1d07db7eb6..a0f745a4270c4 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java @@ -201,12 +201,6 @@ private static boolean isRemoteRoutingTableAttributePresent(Settings settings) { .isEmpty() == false; } - public static boolean isRemotePublicationEnabled(Settings settings) { - return FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) - && isRemoteRoutingTableAttributePresent(settings) - && isRemoteStoreClusterStateEnabled(settings); - } - public static boolean isRemoteRoutingTableEnabled(Settings settings) { return FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) && isRemoteRoutingTableAttributePresent(settings); } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java index 4893601f28f80..08e3f47100d8c 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java @@ -140,7 +140,7 @@ public void writeTo(StreamOutput out) throws IOException { assertThat(e.getCause().getMessage(), containsString("Simulated failure of diff serialization")); } - public void testHandleRemoteIncomingPublishRequestWhenNoCurrentPublishRequest() { + public void testHandleIncomingRemotePublishRequestWhenNoCurrentPublishRequest() { RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); @@ -163,7 +163,7 @@ public void testHandleRemoteIncomingPublishRequestWhenNoCurrentPublishRequest() Mockito.verifyNoInteractions(remoteClusterStateService); } - public void testHandleRemoteIncomingPublishRequestWhenTermMismatch() { + public void testHandleIncomingRemotePublishRequestWhenTermMismatch() { RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); @@ -188,7 +188,7 @@ public void testHandleRemoteIncomingPublishRequestWhenTermMismatch() { Mockito.verifyNoInteractions(remoteClusterStateService); } - public void testHandleRemoteIncomingPublishRequestWhenVersionMismatch() { + public void testHandleIncomingRemotePublishRequestWhenVersionMismatch() { RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); @@ -213,7 +213,7 @@ public void testHandleRemoteIncomingPublishRequestWhenVersionMismatch() { Mockito.verifyNoInteractions(remoteClusterStateService); } - public void testHandleRemoteIncomingPublishRequestForLocalNode() throws IOException { + public void testHandleIncomingRemotePublishRequestForLocalNode() throws IOException { RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); @@ -235,7 +235,7 @@ public void testHandleRemoteIncomingPublishRequestForLocalNode() throws IOExcept Mockito.verifyNoInteractions(remoteClusterStateService); } - public void testHandleRemoteIncomingPublishRequestWhenManifestNotFound() throws IOException { + public void testHandleIncomingRemotePublishRequestWhenManifestNotFound() throws IOException { RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); @@ -261,7 +261,7 @@ public void testHandleRemoteIncomingPublishRequestWhenManifestNotFound() throws Mockito.verify(remoteClusterStateService, times(1)).getClusterMetadataManifestByFileName(Mockito.any(), Mockito.any()); } - public void testHandleRemoteIncomingPublishRequestWhenNoLastSeenState() throws IOException { + public void testHandleIncomingRemotePublishRequestWhenNoLastSeenState() throws IOException { RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); From 5d2f154210bd256237d80fdee7ba51789439990f Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Mon, 10 Jun 2024 16:22:33 +0530 Subject: [PATCH 7/7] Add javadoc Signed-off-by: Sooraj Sinha --- .../remote/ClusterMetadataManifest.java | 7 ------ .../remote/ClusterStateDiffManifest.java | 22 +++++++++++++++++++ .../model/RemoteClusterStateManifestInfo.java | 3 +++ 3 files changed, 25 insertions(+), 7 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/remote/ClusterStateDiffManifest.java diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index d82ba0b859f10..503d86dbe5b7e 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -996,11 +996,4 @@ public String toString() { + '}'; } } - - // TODO https://github.com/opensearch-project/OpenSearch/pull/14089 - public static class ClusterStateDiffManifest { - public String getFromStateUUID() { - return null; - } - } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateDiffManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateDiffManifest.java new file mode 100644 index 0000000000000..d59c6042e7834 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateDiffManifest.java @@ -0,0 +1,22 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +/** + * Manifest of diff between two cluster states + * + * @opensearch.internal + */ +public class ClusterStateDiffManifest { + + // TODO https://github.com/opensearch-project/OpenSearch/pull/14089 + public String getFromStateUUID() { + return null; + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateManifestInfo.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateManifestInfo.java index 47c1dbf59f938..5d987e5e21e1a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateManifestInfo.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateManifestInfo.java @@ -10,6 +10,9 @@ import org.opensearch.gateway.remote.ClusterMetadataManifest; +/** + * A class encapsulating the cluster state manifest and its remote uploaded path + */ public class RemoteClusterStateManifestInfo { private final ClusterMetadataManifest clusterMetadataManifest;