Skip to content

Commit 2793634

Browse files
authored
relaxing the join validation for nodes which have only store disabled but only publication enabled (opensearch-project#15471)
Signed-off-by: Rajiv Kumar Vaidyanathan <rajivkv@amazon.com>
1 parent d28fbc3 commit 2793634

File tree

12 files changed

+235
-23
lines changed

12 files changed

+235
-23
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
5252
- Add canRemain method to TargetPoolAllocationDecider to move shards from local to remote pool for hot to warm tiering ([#15010](https://github.com/opensearch-project/OpenSearch/pull/15010))
5353
- ClusterManagerTaskThrottler Improvements ([#15508](https://github.com/opensearch-project/OpenSearch/pull/15508))
5454
- Reset DiscoveryNodes in all transport node actions request ([#15131](https://github.com/opensearch-project/OpenSearch/pull/15131))
55+
- Relax the join validation for Remote State publication ([#15471](https://github.com/opensearch-project/OpenSearch/pull/15471))
5556

5657
### Dependencies
5758
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))

server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java

+44-1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767

6868
import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned;
6969
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
70+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES;
7071
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode;
7172
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.MIXED;
7273
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.STRICT;
@@ -458,7 +459,7 @@ public static void ensureNodesCompatibility(
458459
);
459460
}
460461

461-
ensureRemoteStoreNodesCompatibility(joiningNode, currentNodes, metadata);
462+
ensureRemoteRepositoryCompatibility(joiningNode, currentNodes, metadata);
462463
}
463464

464465
/**
@@ -491,6 +492,30 @@ public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata)
491492
}
492493
}
493494

495+
public static void ensureRemoteRepositoryCompatibility(DiscoveryNode joiningNode, DiscoveryNodes currentNodes, Metadata metadata) {
496+
List<DiscoveryNode> existingNodes = new ArrayList<>(currentNodes.getNodes().values());
497+
498+
boolean isClusterRemoteStoreEnabled = existingNodes.stream().anyMatch(DiscoveryNode::isRemoteStoreNode);
499+
if (isClusterRemoteStoreEnabled || joiningNode.isRemoteStoreNode()) {
500+
ensureRemoteStoreNodesCompatibility(joiningNode, currentNodes, metadata);
501+
} else {
502+
ensureRemoteClusterStateNodesCompatibility(joiningNode, currentNodes);
503+
}
504+
}
505+
506+
private static void ensureRemoteClusterStateNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNodes currentNodes) {
507+
List<DiscoveryNode> existingNodes = new ArrayList<>(currentNodes.getNodes().values());
508+
509+
assert existingNodes.isEmpty() == false;
510+
Optional<DiscoveryNode> remotePublicationNode = existingNodes.stream()
511+
.filter(DiscoveryNode::isRemoteStatePublicationEnabled)
512+
.findFirst();
513+
514+
if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationEnabled()) {
515+
ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES);
516+
}
517+
}
518+
494519
/**
495520
* The method ensures homogeneity -
496521
* 1. The joining node has to be a remote store backed if it's joining a remote store backed cluster. Validates
@@ -506,6 +531,7 @@ public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata)
506531
* needs to be modified.
507532
*/
508533
private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNodes currentNodes, Metadata metadata) {
534+
509535
List<DiscoveryNode> existingNodes = new ArrayList<>(currentNodes.getNodes().values());
510536

511537
assert existingNodes.isEmpty() == false;
@@ -587,6 +613,23 @@ private static void ensureRemoteStoreNodesCompatibility(
587613
}
588614
}
589615

616+
private static void ensureRepositoryCompatibility(DiscoveryNode joiningNode, DiscoveryNode existingNode, List<String> reposToValidate) {
617+
618+
RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode);
619+
RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode);
620+
621+
if (existingRemoteStoreNodeAttribute.equalsForRepositories(joiningRemoteStoreNodeAttribute, reposToValidate) == false) {
622+
throw new IllegalStateException(
623+
"a remote store node ["
624+
+ joiningNode
625+
+ "] is trying to join a remote store cluster with incompatible node attributes in "
626+
+ "comparison with existing node ["
627+
+ existingNode
628+
+ "]"
629+
);
630+
}
631+
}
632+
590633
public static Collection<BiConsumer<DiscoveryNode, ClusterState>> addBuiltInJoinValidators(
591634
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators
592635
) {

server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java

+18
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,24 @@ public boolean equalsIgnoreGenerationsWithRepoSkip(@Nullable RepositoriesMetadat
184184
.filter(repo -> !reposToSkip.contains(repo.name()))
185185
.collect(Collectors.toList());
186186

187+
return equalsRepository(currentRepositories, otherRepositories);
188+
}
189+
190+
public boolean equalsIgnoreGenerationsForRepo(@Nullable RepositoriesMetadata other, List<String> reposToValidate) {
191+
if (other == null) {
192+
return false;
193+
}
194+
List<RepositoryMetadata> currentRepositories = repositories.stream()
195+
.filter(repo -> reposToValidate.contains(repo.name()))
196+
.collect(Collectors.toList());
197+
List<RepositoryMetadata> otherRepositories = other.repositories.stream()
198+
.filter(repo -> reposToValidate.contains(repo.name()))
199+
.collect(Collectors.toList());
200+
201+
return equalsRepository(currentRepositories, otherRepositories);
202+
}
203+
204+
public static boolean equalsRepository(List<RepositoryMetadata> currentRepositories, List<RepositoryMetadata> otherRepositories) {
187205
if (otherRepositories.size() != currentRepositories.size()) {
188206
return false;
189207
}

server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
6666
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX;
6767
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
68+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
6869

6970
/**
7071
* A discovery node represents a node that is part of the cluster.
@@ -509,7 +510,8 @@ public boolean isSearchNode() {
509510
* @return true if the node contains remote store node attributes, false otherwise
510511
*/
511512
public boolean isRemoteStoreNode() {
512-
return this.getAttributes().keySet().stream().anyMatch(key -> key.startsWith(REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX));
513+
return this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY))
514+
&& this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY));
513515
}
514516

515517
/**

server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java

+13
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ public class RemoteStoreNodeAttribute {
5555
REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY
5656
);
5757

58+
public static List<String> REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES = List.of(
59+
REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY,
60+
REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY
61+
);
62+
5863
/**
5964
* Creates a new {@link RemoteStoreNodeAttribute}
6065
*/
@@ -261,6 +266,14 @@ public boolean equalsWithRepoSkip(Object o, List<String> reposToSkip) {
261266
return this.getRepositoriesMetadata().equalsIgnoreGenerationsWithRepoSkip(that.getRepositoriesMetadata(), reposToSkip);
262267
}
263268

269+
public boolean equalsForRepositories(Object otherNode, List<String> repositoryToValidate) {
270+
if (this == otherNode) return true;
271+
if (otherNode == null || getClass() != otherNode.getClass()) return false;
272+
273+
RemoteStoreNodeAttribute other = (RemoteStoreNodeAttribute) otherNode;
274+
return this.getRepositoriesMetadata().equalsIgnoreGenerationsForRepo(other.repositoriesMetadata, repositoryToValidate);
275+
}
276+
264277
@Override
265278
public boolean equals(Object o) {
266279
if (this == o) return true;

server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java

+2
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import java.util.concurrent.atomic.AtomicBoolean;
8686

8787
import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdaterTests.createIndexMetadataWithRemoteStoreSettings;
88+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
8889
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
8990
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
9091
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
@@ -901,6 +902,7 @@ public void testDontAllowSwitchingCompatibilityModeForClusterWithMultipleVersion
901902

902903
private Map<String, String> getRemoteStoreNodeAttributes() {
903904
Map<String, String> remoteStoreNodeAttributes = new HashMap<>();
905+
remoteStoreNodeAttributes.put(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-cluster-repo-1");
904906
remoteStoreNodeAttributes.put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-segment-repo-1");
905907
remoteStoreNodeAttributes.put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-translog-repo-1");
906908
return remoteStoreNodeAttributes;

server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java

+137-20
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,100 @@ public void testJoinClusterWithRemoteStateNodeJoiningRemoteStateCluster() {
611611
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
612612
}
613613

614+
public void testJoinRemotePublicationClusterWithNonRemoteNodes() {
615+
final DiscoveryNode existingNode = new DiscoveryNode(
616+
UUIDs.base64UUID(),
617+
buildNewFakeTransportAddress(),
618+
remotePublicationNodeAttributes(),
619+
DiscoveryNodeRole.BUILT_IN_ROLES,
620+
Version.CURRENT
621+
);
622+
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
623+
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
624+
.build();
625+
626+
DiscoveryNode joiningNode = newDiscoveryNode(new HashMap<>());
627+
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
628+
}
629+
630+
public void testJoinRemotePublicationCluster() {
631+
final DiscoveryNode existingNode = new DiscoveryNode(
632+
UUIDs.base64UUID(),
633+
buildNewFakeTransportAddress(),
634+
remotePublicationNodeAttributes(),
635+
DiscoveryNodeRole.BUILT_IN_ROLES,
636+
Version.CURRENT
637+
);
638+
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
639+
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
640+
.build();
641+
642+
DiscoveryNode joiningNode = newDiscoveryNode(remotePublicationNodeAttributes());
643+
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
644+
}
645+
646+
public void testJoinRemotePubClusterWithRemoteStoreNodes() {
647+
final DiscoveryNode existingNode = new DiscoveryNode(
648+
UUIDs.base64UUID(),
649+
buildNewFakeTransportAddress(),
650+
remotePublicationNodeAttributes(),
651+
DiscoveryNodeRole.BUILT_IN_ROLES,
652+
Version.CURRENT
653+
);
654+
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
655+
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
656+
.build();
657+
658+
Map<String, String> newNodeAttributes = new HashMap<>();
659+
newNodeAttributes.putAll(remoteStateNodeAttributes(CLUSTER_STATE_REPO));
660+
newNodeAttributes.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO));
661+
newNodeAttributes.putAll(remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO));
662+
663+
DiscoveryNode joiningNode = newDiscoveryNode(newNodeAttributes);
664+
Exception e = assertThrows(
665+
IllegalStateException.class,
666+
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
667+
);
668+
assertTrue(e.getMessage().equals("a remote store node [" + joiningNode + "] is trying to join a non remote store cluster"));
669+
}
670+
671+
public void testPreventJoinRemotePublicationClusterWithIncompatibleAttributes() {
672+
Map<String, String> existingNodeAttributes = remotePublicationNodeAttributes();
673+
Map<String, String> remoteStoreNodeAttributes = remotePublicationNodeAttributes();
674+
final DiscoveryNode existingNode = new DiscoveryNode(
675+
UUIDs.base64UUID(),
676+
buildNewFakeTransportAddress(),
677+
existingNodeAttributes,
678+
DiscoveryNodeRole.BUILT_IN_ROLES,
679+
Version.CURRENT
680+
);
681+
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
682+
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
683+
.build();
684+
685+
for (Map.Entry<String, String> nodeAttribute : existingNodeAttributes.entrySet()) {
686+
remoteStoreNodeAttributes.put(nodeAttribute.getKey(), null);
687+
DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes);
688+
Exception e = assertThrows(
689+
IllegalStateException.class,
690+
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
691+
);
692+
assertTrue(
693+
e.getMessage().equals("joining node [" + joiningNode + "] doesn't have the node attribute [" + nodeAttribute.getKey() + "]")
694+
|| e.getMessage()
695+
.equals(
696+
"a remote store node ["
697+
+ joiningNode
698+
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
699+
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
700+
+ "]"
701+
)
702+
);
703+
704+
remoteStoreNodeAttributes.put(nodeAttribute.getKey(), nodeAttribute.getValue());
705+
}
706+
}
707+
614708
public void testPreventJoinClusterWithRemoteStateNodeJoiningRemoteStoreCluster() {
615709
Map<String, String> existingNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO);
616710
final DiscoveryNode existingNode = new DiscoveryNode(
@@ -628,16 +722,7 @@ public void testPreventJoinClusterWithRemoteStateNodeJoiningRemoteStoreCluster()
628722
IllegalStateException.class,
629723
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
630724
);
631-
assertTrue(
632-
e.getMessage()
633-
.equals(
634-
"a remote store node ["
635-
+ joiningNode
636-
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
637-
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
638-
+ "]"
639-
)
640-
);
725+
assertTrue(e.getMessage().equals("a non remote store node [" + joiningNode + "] is trying to join a remote store cluster"));
641726
}
642727

643728
public void testPreventJoinClusterWithRemoteStoreNodeJoiningRemoteStateCluster() {
@@ -657,16 +742,7 @@ public void testPreventJoinClusterWithRemoteStoreNodeJoiningRemoteStateCluster()
657742
IllegalStateException.class,
658743
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
659744
);
660-
assertTrue(
661-
e.getMessage()
662-
.equals(
663-
"a remote store node ["
664-
+ joiningNode
665-
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
666-
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
667-
+ "]"
668-
)
669-
);
745+
assertTrue(e.getMessage().equals("a remote store node [" + joiningNode + "] is trying to join a non remote store cluster"));
670746
}
671747

672748
public void testUpdatesClusterStateWithSingleNodeCluster() throws Exception {
@@ -1077,6 +1153,39 @@ public void testRemoteRoutingTableNodeJoinNodeWithRemoteAndRoutingRepoDifference
10771153
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
10781154
}
10791155

1156+
public void testJoinRemoteStoreClusterWithRemotePublicationNodeInMixedMode() {
1157+
final DiscoveryNode remoteStoreNode = new DiscoveryNode(
1158+
UUIDs.base64UUID(),
1159+
buildNewFakeTransportAddress(),
1160+
remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO),
1161+
DiscoveryNodeRole.BUILT_IN_ROLES,
1162+
Version.CURRENT
1163+
);
1164+
1165+
final DiscoveryNode nonRemoteStoreNode = new DiscoveryNode(
1166+
UUIDs.base64UUID(),
1167+
buildNewFakeTransportAddress(),
1168+
new HashMap<>(),
1169+
DiscoveryNodeRole.BUILT_IN_ROLES,
1170+
Version.CURRENT
1171+
);
1172+
1173+
final Settings settings = Settings.builder()
1174+
.put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE)
1175+
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
1176+
.build();
1177+
final Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
1178+
FeatureFlags.initializeFeatureFlags(nodeSettings);
1179+
Metadata metadata = Metadata.builder().persistentSettings(settings).build();
1180+
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
1181+
.nodes(DiscoveryNodes.builder().add(remoteStoreNode).add(nonRemoteStoreNode).localNodeId(remoteStoreNode.getId()).build())
1182+
.metadata(metadata)
1183+
.build();
1184+
1185+
DiscoveryNode joiningNode = newDiscoveryNode(remotePublicationNodeAttributes());
1186+
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
1187+
}
1188+
10801189
private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode existingNode, int expectedRepositories)
10811190
throws Exception {
10821191

@@ -1115,6 +1224,7 @@ private DiscoveryNode newDiscoveryNode(Map<String, String> attributes) {
11151224
}
11161225

11171226
private static final String SEGMENT_REPO = "segment-repo";
1227+
11181228
private static final String TRANSLOG_REPO = "translog-repo";
11191229
private static final String CLUSTER_STATE_REPO = "cluster-state-repo";
11201230
private static final String COMMON_REPO = "remote-repo";
@@ -1161,6 +1271,13 @@ private Map<String, String> remoteStoreNodeAttributes(String segmentRepoName, St
11611271
};
11621272
}
11631273

1274+
private Map<String, String> remotePublicationNodeAttributes() {
1275+
Map<String, String> existingNodeAttributes = new HashMap<>();
1276+
existingNodeAttributes.putAll(remoteStateNodeAttributes(CLUSTER_STATE_REPO));
1277+
existingNodeAttributes.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO));
1278+
return existingNodeAttributes;
1279+
}
1280+
11641281
private Map<String, String> remoteStateNodeAttributes(String clusterStateRepo) {
11651282
String clusterStateRepositoryTypeAttributeKey = String.format(
11661283
Locale.getDefault(),

0 commit comments

Comments
 (0)