Skip to content

Commit 80e7844

Browse files
author
Himshikha Gupta
committed
Add publication flag and remote routing table check
Signed-off-by: Himshikha Gupta <himshikh@amazon.com>
1 parent 2040363 commit 80e7844

File tree

5 files changed

+36
-12
lines changed

5 files changed

+36
-12
lines changed

server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.Set;
5353

5454
import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
55+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
5556
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;
5657

5758
/**
@@ -78,7 +79,7 @@ public class CoordinationState {
7879
private long lastPublishedVersion;
7980
private VotingConfiguration lastPublishedConfiguration;
8081
private VoteCollection publishVotes;
81-
private final boolean isRemoteStateEnabled;
82+
private final boolean isRemotePublicationEnabled;
8283

8384
public CoordinationState(
8485
DiscoveryNode localNode,
@@ -101,11 +102,11 @@ public CoordinationState(
101102
.getLastAcceptedState()
102103
.getLastAcceptedConfiguration();
103104
this.publishVotes = new VoteCollection();
104-
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
105+
this.isRemotePublicationEnabled = isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings);
105106
}
106107

107-
public boolean isRemoteStateEnabled() {
108-
return isRemoteStateEnabled;
108+
public boolean isRemotePublicationEnabled() {
109+
return isRemotePublicationEnabled;
109110
}
110111

111112
public long getCurrentTerm() {
@@ -579,7 +580,7 @@ public void handlePrePublish(ClusterState clusterState) {
579580
// This is to ensure the remote store is the single source of truth for current state. Even if the current node
580581
// goes down after sending the cluster state to other nodes, we should be able to read the remote state and
581582
// recover the cluster.
582-
if (isRemoteStateEnabled) {
583+
if (isRemotePublicationEnabled) {
583584
assert persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null : "Remote state has not been initialized";
584585
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(clusterState);
585586
}
@@ -590,7 +591,7 @@ public void handlePrePublish(ClusterState clusterState) {
590591
*/
591592
public void handlePreCommit() {
592593
// Publishing the committed state to remote store before sending apply commit to other nodes.
593-
if (isRemoteStateEnabled) {
594+
if (isRemotePublicationEnabled) {
594595
assert persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null : "Remote state has not been initialized";
595596
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).markLastAcceptedStateAsCommitted();
596597
}

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1323,7 +1323,8 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
13231323
+ clusterState;
13241324

13251325
final PublicationTransportHandler.PublicationContext publicationContext = publicationHandler.newPublicationContext(
1326-
clusterChangedEvent, coordinationState.get().isRemoteStateEnabled()
1326+
clusterChangedEvent,
1327+
coordinationState.get().isRemotePublicationEnabled()
13271328
);
13281329

13291330
final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -279,8 +279,8 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) {
279279
return handlePublishRequest.apply(new PublishRequest(incomingState));
280280
}
281281

282-
public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemoteStateEnabled) {
283-
final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, isRemoteStateEnabled);
282+
public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled) {
283+
final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, isRemotePublicationEnabled);
284284

285285
// Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication
286286
// straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and
@@ -327,12 +327,12 @@ public class PublicationContext {
327327
private final Map<Version, BytesReference> serializedDiffs = new HashMap<>();
328328
private final boolean sendRemoteState;
329329

330-
PublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemoteStateEnabled) {
330+
PublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled) {
331331
discoveryNodes = clusterChangedEvent.state().nodes();
332332
newState = clusterChangedEvent.state();
333333
previousState = clusterChangedEvent.previousState();
334334
sendFullVersion = previousState.getBlocks().disableStatePersistence();
335-
sendRemoteState = isRemoteStateEnabled;
335+
sendRemoteState = isRemotePublicationEnabled;
336336
}
337337

338338
void buildDiffAndSerializeStates() {

server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@
88

99
package org.opensearch.cluster.coordination;
1010

11-
import java.io.IOException;
1211
import org.opensearch.cluster.node.DiscoveryNode;
1312
import org.opensearch.core.common.io.stream.StreamInput;
1413
import org.opensearch.core.common.io.stream.StreamOutput;
1514

15+
import java.io.IOException;
16+
1617
public class RemotePublishRequest extends TermVersionRequest {
1718

1819
// todo Do we need cluster name and UUID ?

server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java

+21
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@
6161
import java.util.stream.Stream;
6262

6363
import static org.opensearch.node.NodeRoleSettings.NODE_ROLES_SETTING;
64+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
6465
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX;
66+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
6567

6668
/**
6769
* A discovery node represents a node that is part of the cluster.
@@ -470,6 +472,25 @@ public boolean isRemoteStoreNode() {
470472
return this.getAttributes().keySet().stream().anyMatch(key -> key.startsWith(REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX));
471473
}
472474

475+
/**
476+
* Returns whether the node is a remote cluster state enabled node.
477+
* @return true if the node contains remote cluster state and remote routing table node attributes, false otherwise
478+
*/
479+
public boolean isRemoteStateNode() {
480+
return isRemoteClusterStateEnabled() && isRemoteRoutingTableEnabled();
481+
}
482+
483+
private boolean isRemoteClusterStateEnabled() {
484+
return this.getAttributes()
485+
.keySet()
486+
.stream()
487+
.anyMatch(key -> (key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)));
488+
}
489+
490+
private boolean isRemoteRoutingTableEnabled() {
491+
return this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY));
492+
}
493+
473494
/**
474495
* Returns a set of all the roles that the node has. The roles are returned in sorted order by the role name.
475496
* <p>

0 commit comments

Comments
 (0)