Skip to content

Commit bd5041f

Browse files
authored
relaxing the join validation for nodes which have only store disabled but only publication enabled (#15471) (#15667)
Signed-off-by: Rajiv Kumar Vaidyanathan <rajivkv@amazon.com>
1 parent 4532afe commit bd5041f

File tree

12 files changed

+235
-23
lines changed

12 files changed

+235
-23
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4545
- Add support for pluggable deciders for concurrent search ([#15363](https://github.com/opensearch-project/OpenSearch/pull/15363))
4646
- [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218))
4747
- Add support for comma-separated list of index names to be used with Snapshot Status API ([#15409](https://github.com/opensearch-project/OpenSearch/pull/15409))[SnapshotV2] Snapshot Status API changes (#15409))
48+
- Relax the join validation for Remote State publication ([#15471](https://github.com/opensearch-project/OpenSearch/pull/15471))
4849

4950
### Dependencies
5051
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))

server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java

+44-1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969

7070
import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned;
7171
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
72+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES;
7273
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode;
7374
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.MIXED;
7475
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.STRICT;
@@ -519,7 +520,7 @@ public static void ensureNodesCompatibility(
519520
);
520521
}
521522

522-
ensureRemoteStoreNodesCompatibility(joiningNode, currentNodes, metadata);
523+
ensureRemoteRepositoryCompatibility(joiningNode, currentNodes, metadata);
523524
}
524525

525526
/**
@@ -552,6 +553,30 @@ public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata)
552553
}
553554
}
554555

556+
public static void ensureRemoteRepositoryCompatibility(DiscoveryNode joiningNode, DiscoveryNodes currentNodes, Metadata metadata) {
557+
List<DiscoveryNode> existingNodes = new ArrayList<>(currentNodes.getNodes().values());
558+
559+
boolean isClusterRemoteStoreEnabled = existingNodes.stream().anyMatch(DiscoveryNode::isRemoteStoreNode);
560+
if (isClusterRemoteStoreEnabled || joiningNode.isRemoteStoreNode()) {
561+
ensureRemoteStoreNodesCompatibility(joiningNode, currentNodes, metadata);
562+
} else {
563+
ensureRemoteClusterStateNodesCompatibility(joiningNode, currentNodes);
564+
}
565+
}
566+
567+
private static void ensureRemoteClusterStateNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNodes currentNodes) {
568+
List<DiscoveryNode> existingNodes = new ArrayList<>(currentNodes.getNodes().values());
569+
570+
assert existingNodes.isEmpty() == false;
571+
Optional<DiscoveryNode> remotePublicationNode = existingNodes.stream()
572+
.filter(DiscoveryNode::isRemoteStatePublicationEnabled)
573+
.findFirst();
574+
575+
if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationEnabled()) {
576+
ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES);
577+
}
578+
}
579+
555580
/**
556581
* The method ensures homogeneity -
557582
* 1. The joining node has to be a remote store backed if it's joining a remote store backed cluster. Validates
@@ -567,6 +592,7 @@ public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata)
567592
* needs to be modified.
568593
*/
569594
private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNodes currentNodes, Metadata metadata) {
595+
570596
List<DiscoveryNode> existingNodes = new ArrayList<>(currentNodes.getNodes().values());
571597

572598
assert existingNodes.isEmpty() == false;
@@ -648,6 +674,23 @@ private static void ensureRemoteStoreNodesCompatibility(
648674
}
649675
}
650676

677+
private static void ensureRepositoryCompatibility(DiscoveryNode joiningNode, DiscoveryNode existingNode, List<String> reposToValidate) {
678+
679+
RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode);
680+
RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode);
681+
682+
if (existingRemoteStoreNodeAttribute.equalsForRepositories(joiningRemoteStoreNodeAttribute, reposToValidate) == false) {
683+
throw new IllegalStateException(
684+
"a remote store node ["
685+
+ joiningNode
686+
+ "] is trying to join a remote store cluster with incompatible node attributes in "
687+
+ "comparison with existing node ["
688+
+ existingNode
689+
+ "]"
690+
);
691+
}
692+
}
693+
651694
public static Collection<BiConsumer<DiscoveryNode, ClusterState>> addBuiltInJoinValidators(
652695
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators
653696
) {

server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java

+18
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,24 @@ public boolean equalsIgnoreGenerationsWithRepoSkip(@Nullable RepositoriesMetadat
184184
.filter(repo -> !reposToSkip.contains(repo.name()))
185185
.collect(Collectors.toList());
186186

187+
return equalsRepository(currentRepositories, otherRepositories);
188+
}
189+
190+
public boolean equalsIgnoreGenerationsForRepo(@Nullable RepositoriesMetadata other, List<String> reposToValidate) {
191+
if (other == null) {
192+
return false;
193+
}
194+
List<RepositoryMetadata> currentRepositories = repositories.stream()
195+
.filter(repo -> reposToValidate.contains(repo.name()))
196+
.collect(Collectors.toList());
197+
List<RepositoryMetadata> otherRepositories = other.repositories.stream()
198+
.filter(repo -> reposToValidate.contains(repo.name()))
199+
.collect(Collectors.toList());
200+
201+
return equalsRepository(currentRepositories, otherRepositories);
202+
}
203+
204+
public static boolean equalsRepository(List<RepositoryMetadata> currentRepositories, List<RepositoryMetadata> otherRepositories) {
187205
if (otherRepositories.size() != currentRepositories.size()) {
188206
return false;
189207
}

server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
6868
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX;
6969
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
70+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
7071

7172
/**
7273
* A discovery node represents a node that is part of the cluster.
@@ -554,7 +555,8 @@ public boolean isSearchNode() {
554555
* @return true if the node contains remote store node attributes, false otherwise
555556
*/
556557
public boolean isRemoteStoreNode() {
557-
return this.getAttributes().keySet().stream().anyMatch(key -> key.startsWith(REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX));
558+
return this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY))
559+
&& this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY));
558560
}
559561

560562
/**

server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java

+13
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ public class RemoteStoreNodeAttribute {
5555
REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY
5656
);
5757

58+
public static List<String> REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES = List.of(
59+
REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY,
60+
REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY
61+
);
62+
5863
/**
5964
* Creates a new {@link RemoteStoreNodeAttribute}
6065
*/
@@ -261,6 +266,14 @@ public boolean equalsWithRepoSkip(Object o, List<String> reposToSkip) {
261266
return this.getRepositoriesMetadata().equalsIgnoreGenerationsWithRepoSkip(that.getRepositoriesMetadata(), reposToSkip);
262267
}
263268

269+
public boolean equalsForRepositories(Object otherNode, List<String> repositoryToValidate) {
270+
if (this == otherNode) return true;
271+
if (otherNode == null || getClass() != otherNode.getClass()) return false;
272+
273+
RemoteStoreNodeAttribute other = (RemoteStoreNodeAttribute) otherNode;
274+
return this.getRepositoriesMetadata().equalsIgnoreGenerationsForRepo(other.repositoriesMetadata, repositoryToValidate);
275+
}
276+
264277
@Override
265278
public boolean equals(Object o) {
266279
if (this == o) return true;

server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java

+2
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import java.util.concurrent.atomic.AtomicBoolean;
8686

8787
import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdaterTests.createIndexMetadataWithRemoteStoreSettings;
88+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
8889
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
8990
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
9091
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
@@ -901,6 +902,7 @@ public void testDontAllowSwitchingCompatibilityModeForClusterWithMultipleVersion
901902

902903
private Map<String, String> getRemoteStoreNodeAttributes() {
903904
Map<String, String> remoteStoreNodeAttributes = new HashMap<>();
905+
remoteStoreNodeAttributes.put(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-cluster-repo-1");
904906
remoteStoreNodeAttributes.put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-segment-repo-1");
905907
remoteStoreNodeAttributes.put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-translog-repo-1");
906908
return remoteStoreNodeAttributes;

server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java

+137-20
Original file line numberDiff line numberDiff line change
@@ -689,6 +689,100 @@ public void testJoinClusterWithRemoteStateNodeJoiningRemoteStateCluster() {
689689
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
690690
}
691691

692+
public void testJoinRemotePublicationClusterWithNonRemoteNodes() {
693+
final DiscoveryNode existingNode = new DiscoveryNode(
694+
UUIDs.base64UUID(),
695+
buildNewFakeTransportAddress(),
696+
remotePublicationNodeAttributes(),
697+
DiscoveryNodeRole.BUILT_IN_ROLES,
698+
Version.CURRENT
699+
);
700+
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
701+
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
702+
.build();
703+
704+
DiscoveryNode joiningNode = newDiscoveryNode(new HashMap<>());
705+
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
706+
}
707+
708+
public void testJoinRemotePublicationCluster() {
709+
final DiscoveryNode existingNode = new DiscoveryNode(
710+
UUIDs.base64UUID(),
711+
buildNewFakeTransportAddress(),
712+
remotePublicationNodeAttributes(),
713+
DiscoveryNodeRole.BUILT_IN_ROLES,
714+
Version.CURRENT
715+
);
716+
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
717+
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
718+
.build();
719+
720+
DiscoveryNode joiningNode = newDiscoveryNode(remotePublicationNodeAttributes());
721+
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
722+
}
723+
724+
public void testJoinRemotePubClusterWithRemoteStoreNodes() {
725+
final DiscoveryNode existingNode = new DiscoveryNode(
726+
UUIDs.base64UUID(),
727+
buildNewFakeTransportAddress(),
728+
remotePublicationNodeAttributes(),
729+
DiscoveryNodeRole.BUILT_IN_ROLES,
730+
Version.CURRENT
731+
);
732+
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
733+
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
734+
.build();
735+
736+
Map<String, String> newNodeAttributes = new HashMap<>();
737+
newNodeAttributes.putAll(remoteStateNodeAttributes(CLUSTER_STATE_REPO));
738+
newNodeAttributes.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO));
739+
newNodeAttributes.putAll(remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO));
740+
741+
DiscoveryNode joiningNode = newDiscoveryNode(newNodeAttributes);
742+
Exception e = assertThrows(
743+
IllegalStateException.class,
744+
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
745+
);
746+
assertTrue(e.getMessage().equals("a remote store node [" + joiningNode + "] is trying to join a non remote store cluster"));
747+
}
748+
749+
public void testPreventJoinRemotePublicationClusterWithIncompatibleAttributes() {
750+
Map<String, String> existingNodeAttributes = remotePublicationNodeAttributes();
751+
Map<String, String> remoteStoreNodeAttributes = remotePublicationNodeAttributes();
752+
final DiscoveryNode existingNode = new DiscoveryNode(
753+
UUIDs.base64UUID(),
754+
buildNewFakeTransportAddress(),
755+
existingNodeAttributes,
756+
DiscoveryNodeRole.BUILT_IN_ROLES,
757+
Version.CURRENT
758+
);
759+
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
760+
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
761+
.build();
762+
763+
for (Map.Entry<String, String> nodeAttribute : existingNodeAttributes.entrySet()) {
764+
remoteStoreNodeAttributes.put(nodeAttribute.getKey(), null);
765+
DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes);
766+
Exception e = assertThrows(
767+
IllegalStateException.class,
768+
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
769+
);
770+
assertTrue(
771+
e.getMessage().equals("joining node [" + joiningNode + "] doesn't have the node attribute [" + nodeAttribute.getKey() + "]")
772+
|| e.getMessage()
773+
.equals(
774+
"a remote store node ["
775+
+ joiningNode
776+
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
777+
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
778+
+ "]"
779+
)
780+
);
781+
782+
remoteStoreNodeAttributes.put(nodeAttribute.getKey(), nodeAttribute.getValue());
783+
}
784+
}
785+
692786
public void testPreventJoinClusterWithRemoteStateNodeJoiningRemoteStoreCluster() {
693787
Map<String, String> existingNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO);
694788
final DiscoveryNode existingNode = new DiscoveryNode(
@@ -706,16 +800,7 @@ public void testPreventJoinClusterWithRemoteStateNodeJoiningRemoteStoreCluster()
706800
IllegalStateException.class,
707801
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
708802
);
709-
assertTrue(
710-
e.getMessage()
711-
.equals(
712-
"a remote store node ["
713-
+ joiningNode
714-
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
715-
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
716-
+ "]"
717-
)
718-
);
803+
assertTrue(e.getMessage().equals("a non remote store node [" + joiningNode + "] is trying to join a remote store cluster"));
719804
}
720805

721806
public void testPreventJoinClusterWithRemoteStoreNodeJoiningRemoteStateCluster() {
@@ -735,16 +820,7 @@ public void testPreventJoinClusterWithRemoteStoreNodeJoiningRemoteStateCluster()
735820
IllegalStateException.class,
736821
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
737822
);
738-
assertTrue(
739-
e.getMessage()
740-
.equals(
741-
"a remote store node ["
742-
+ joiningNode
743-
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
744-
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
745-
+ "]"
746-
)
747-
);
823+
assertTrue(e.getMessage().equals("a remote store node [" + joiningNode + "] is trying to join a non remote store cluster"));
748824
}
749825

750826
public void testUpdatesClusterStateWithSingleNodeCluster() throws Exception {
@@ -1159,6 +1235,39 @@ public void testRemoteRoutingTableNodeJoinNodeWithRemoteAndRoutingRepoDifference
11591235
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
11601236
}
11611237

1238+
public void testJoinRemoteStoreClusterWithRemotePublicationNodeInMixedMode() {
1239+
final DiscoveryNode remoteStoreNode = new DiscoveryNode(
1240+
UUIDs.base64UUID(),
1241+
buildNewFakeTransportAddress(),
1242+
remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO),
1243+
DiscoveryNodeRole.BUILT_IN_ROLES,
1244+
Version.CURRENT
1245+
);
1246+
1247+
final DiscoveryNode nonRemoteStoreNode = new DiscoveryNode(
1248+
UUIDs.base64UUID(),
1249+
buildNewFakeTransportAddress(),
1250+
new HashMap<>(),
1251+
DiscoveryNodeRole.BUILT_IN_ROLES,
1252+
Version.CURRENT
1253+
);
1254+
1255+
final Settings settings = Settings.builder()
1256+
.put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE)
1257+
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
1258+
.build();
1259+
final Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
1260+
FeatureFlags.initializeFeatureFlags(nodeSettings);
1261+
Metadata metadata = Metadata.builder().persistentSettings(settings).build();
1262+
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
1263+
.nodes(DiscoveryNodes.builder().add(remoteStoreNode).add(nonRemoteStoreNode).localNodeId(remoteStoreNode.getId()).build())
1264+
.metadata(metadata)
1265+
.build();
1266+
1267+
DiscoveryNode joiningNode = newDiscoveryNode(remotePublicationNodeAttributes());
1268+
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
1269+
}
1270+
11621271
private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode existingNode, int expectedRepositories)
11631272
throws Exception {
11641273

@@ -1197,6 +1306,7 @@ private DiscoveryNode newDiscoveryNode(Map<String, String> attributes) {
11971306
}
11981307

11991308
private static final String SEGMENT_REPO = "segment-repo";
1309+
12001310
private static final String TRANSLOG_REPO = "translog-repo";
12011311
private static final String CLUSTER_STATE_REPO = "cluster-state-repo";
12021312
private static final String COMMON_REPO = "remote-repo";
@@ -1243,6 +1353,13 @@ private Map<String, String> remoteStoreNodeAttributes(String segmentRepoName, St
12431353
};
12441354
}
12451355

1356+
private Map<String, String> remotePublicationNodeAttributes() {
1357+
Map<String, String> existingNodeAttributes = new HashMap<>();
1358+
existingNodeAttributes.putAll(remoteStateNodeAttributes(CLUSTER_STATE_REPO));
1359+
existingNodeAttributes.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO));
1360+
return existingNodeAttributes;
1361+
}
1362+
12461363
private Map<String, String> remoteStateNodeAttributes(String clusterStateRepo) {
12471364
String clusterStateRepositoryTypeAttributeKey = String.format(
12481365
Locale.getDefault(),

0 commit comments

Comments
 (0)