Skip to content

Commit 00d4b71

Browse files
authored
Decouple remote state configuration (opensearch-project#11858)
* Decouple remote state configuration Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
1 parent b4da802 commit 00d4b71

File tree

4 files changed

+132
-20
lines changed

4 files changed

+132
-20
lines changed

server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@
138138
import static org.opensearch.cluster.metadata.Metadata.DEFAULT_REPLICA_COUNT_SETTING;
139139
import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
140140
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
141-
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreAttributePresent;
141+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
142142

143143
/**
144144
* Service responsible for submitting create index requests
@@ -971,7 +971,7 @@ private static void updateReplicationStrategy(
971971
indexReplicationType = INDEX_REPLICATION_TYPE_SETTING.get(combinedTemplateSettings);
972972
} else if (CLUSTER_REPLICATION_TYPE_SETTING.exists(clusterSettings)) {
973973
indexReplicationType = CLUSTER_REPLICATION_TYPE_SETTING.get(clusterSettings);
974-
} else if (isRemoteStoreAttributePresent(clusterSettings)) {
974+
} else if (isRemoteDataAttributePresent(clusterSettings)) {
975975
indexReplicationType = ReplicationType.SEGMENT;
976976
} else {
977977
indexReplicationType = CLUSTER_REPLICATION_TYPE_SETTING.getDefault(clusterSettings);
@@ -985,7 +985,7 @@ private static void updateReplicationStrategy(
985985
* @param clusterSettings cluster level settings
986986
*/
987987
private static void updateRemoteStoreSettings(Settings.Builder settingsBuilder, Settings clusterSettings) {
988-
if (isRemoteStoreAttributePresent(clusterSettings)) {
988+
if (isRemoteDataAttributePresent(clusterSettings)) {
989989
settingsBuilder.put(SETTING_REMOTE_STORE_ENABLED, true)
990990
.put(
991991
SETTING_REMOTE_SEGMENT_STORE_REPOSITORY,
@@ -1577,7 +1577,7 @@ public static void validateRefreshIntervalSettings(Settings requestSettings, Clu
15771577
* @param clusterSettings cluster setting
15781578
*/
15791579
static void validateTranslogDurabilitySettings(Settings requestSettings, ClusterSettings clusterSettings, Settings settings) {
1580-
if (isRemoteStoreAttributePresent(settings) == false
1580+
if (isRemoteDataAttributePresent(settings) == false
15811581
|| IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.exists(requestSettings) == false
15821582
|| clusterSettings.get(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING) == false) {
15831583
return;

server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java

+26-6
Original file line numberDiff line numberDiff line change
@@ -131,12 +131,8 @@ private RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String na
131131
}
132132

133133
private RepositoriesMetadata buildRepositoriesMetadata(DiscoveryNode node) {
134+
Set<String> repositoryNames = getValidatedRepositoryNames(node);
134135
List<RepositoryMetadata> repositoryMetadataList = new ArrayList<>();
135-
Set<String> repositoryNames = new HashSet<>();
136-
137-
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY));
138-
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY));
139-
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY));
140136

141137
for (String repositoryName : repositoryNames) {
142138
repositoryMetadataList.add(buildRepositoryMetadata(node, repositoryName));
@@ -145,12 +141,36 @@ private RepositoriesMetadata buildRepositoriesMetadata(DiscoveryNode node) {
145141
return new RepositoriesMetadata(repositoryMetadataList);
146142
}
147143

144+
private Set<String> getValidatedRepositoryNames(DiscoveryNode node) {
145+
Set<String> repositoryNames = new HashSet<>();
146+
if (node.getAttributes().containsKey(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY)
147+
|| node.getAttributes().containsKey(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY)) {
148+
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY));
149+
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY));
150+
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY));
151+
} else if (node.getAttributes().containsKey(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)) {
152+
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY));
153+
}
154+
return repositoryNames;
155+
}
156+
148157
public static boolean isRemoteStoreAttributePresent(Settings settings) {
149158
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty() == false;
150159
}
151160

161+
public static boolean isRemoteDataAttributePresent(Settings settings) {
162+
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY).isEmpty() == false
163+
|| settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY).isEmpty() == false;
164+
}
165+
166+
public static boolean isRemoteClusterStateAttributePresent(Settings settings) {
167+
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)
168+
.isEmpty() == false;
169+
}
170+
152171
public static boolean isRemoteStoreClusterStateEnabled(Settings settings) {
153-
return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) && isRemoteStoreAttributePresent(settings);
172+
return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings)
173+
&& isRemoteClusterStateAttributePresent(settings);
154174
}
155175

156176
public RepositoriesMetadata getRepositoriesMetadata() {

server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java

+101-9
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,8 @@ public void testJoinClusterWithNonRemoteStoreNodeJoining() {
377377
}
378378

379379
public void testJoinClusterWithRemoteStoreNodeJoining() {
380-
DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO));
380+
Map<String, String> map = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO);
381+
DiscoveryNode joiningNode = newDiscoveryNode(map);
381382
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
382383
.nodes(DiscoveryNodes.builder().add(joiningNode).build())
383384
.build();
@@ -582,12 +583,94 @@ public void testPreventJoinClusterWithRemoteStoreNodeWithPartialAttributesJoinin
582583
);
583584
assertTrue(
584585
e.getMessage().equals("joining node [" + joiningNode + "] doesn't have the node attribute [" + nodeAttribute.getKey() + "]")
586+
|| e.getMessage()
587+
.equals(
588+
"a remote store node ["
589+
+ joiningNode
590+
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
591+
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
592+
+ "]"
593+
)
585594
);
586595

587596
remoteStoreNodeAttributes.put(nodeAttribute.getKey(), nodeAttribute.getValue());
588597
}
589598
}
590599

600+
public void testJoinClusterWithRemoteStateNodeJoiningRemoteStateCluster() {
601+
Map<String, String> existingNodeAttributes = remoteStateNodeAttributes(CLUSTER_STATE_REPO);
602+
final DiscoveryNode existingNode = new DiscoveryNode(
603+
UUIDs.base64UUID(),
604+
buildNewFakeTransportAddress(),
605+
existingNodeAttributes,
606+
DiscoveryNodeRole.BUILT_IN_ROLES,
607+
Version.CURRENT
608+
);
609+
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
610+
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
611+
.build();
612+
DiscoveryNode joiningNode = newDiscoveryNode(remoteStateNodeAttributes(CLUSTER_STATE_REPO));
613+
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
614+
}
615+
616+
public void testPreventJoinClusterWithRemoteStateNodeJoiningRemoteStoreCluster() {
617+
Map<String, String> existingNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO);
618+
final DiscoveryNode existingNode = new DiscoveryNode(
619+
UUIDs.base64UUID(),
620+
buildNewFakeTransportAddress(),
621+
existingNodeAttributes,
622+
DiscoveryNodeRole.BUILT_IN_ROLES,
623+
Version.CURRENT
624+
);
625+
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
626+
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
627+
.build();
628+
DiscoveryNode joiningNode = newDiscoveryNode(remoteStateNodeAttributes(CLUSTER_STATE_REPO));
629+
Exception e = assertThrows(
630+
IllegalStateException.class,
631+
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
632+
);
633+
assertTrue(
634+
e.getMessage()
635+
.equals(
636+
"a remote store node ["
637+
+ joiningNode
638+
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
639+
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
640+
+ "]"
641+
)
642+
);
643+
}
644+
645+
public void testPreventJoinClusterWithRemoteStoreNodeJoiningRemoteStateCluster() {
646+
Map<String, String> existingNodeAttributes = remoteStateNodeAttributes(CLUSTER_STATE_REPO);
647+
final DiscoveryNode existingNode = new DiscoveryNode(
648+
UUIDs.base64UUID(),
649+
buildNewFakeTransportAddress(),
650+
existingNodeAttributes,
651+
DiscoveryNodeRole.BUILT_IN_ROLES,
652+
Version.CURRENT
653+
);
654+
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
655+
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
656+
.build();
657+
DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO));
658+
Exception e = assertThrows(
659+
IllegalStateException.class,
660+
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
661+
);
662+
assertTrue(
663+
e.getMessage()
664+
.equals(
665+
"a remote store node ["
666+
+ joiningNode
667+
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
668+
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
669+
+ "]"
670+
)
671+
);
672+
}
673+
591674
public void testUpdatesClusterStateWithSingleNodeCluster() throws Exception {
592675
Map<String, String> remoteStoreNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO);
593676
final AllocationService allocationService = mock(AllocationService.class);
@@ -869,6 +952,23 @@ private Map<String, String> remoteStoreNodeAttributes(String segmentRepoName, St
869952
REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
870953
translogRepoName
871954
);
955+
956+
return new HashMap<>() {
957+
{
958+
put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName);
959+
put(segmentRepositoryTypeAttributeKey, "s3");
960+
put(segmentRepositorySettingsAttributeKeyPrefix + "bucket", "segment_bucket");
961+
put(segmentRepositorySettingsAttributeKeyPrefix + "base_path", "/segment/path");
962+
put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName);
963+
putIfAbsent(translogRepositoryTypeAttributeKey, "s3");
964+
putIfAbsent(translogRepositorySettingsAttributeKeyPrefix + "bucket", "translog_bucket");
965+
putIfAbsent(translogRepositorySettingsAttributeKeyPrefix + "base_path", "/translog/path");
966+
putAll(remoteStateNodeAttributes(clusterStateRepo));
967+
}
968+
};
969+
}
970+
971+
private Map<String, String> remoteStateNodeAttributes(String clusterStateRepo) {
872972
String clusterStateRepositoryTypeAttributeKey = String.format(
873973
Locale.getDefault(),
874974
REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
@@ -882,14 +982,6 @@ private Map<String, String> remoteStoreNodeAttributes(String segmentRepoName, St
882982

883983
return new HashMap<>() {
884984
{
885-
put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName);
886-
put(segmentRepositoryTypeAttributeKey, "s3");
887-
put(segmentRepositorySettingsAttributeKeyPrefix + "bucket", "segment_bucket");
888-
put(segmentRepositorySettingsAttributeKeyPrefix + "base_path", "/segment/path");
889-
put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName);
890-
putIfAbsent(translogRepositoryTypeAttributeKey, "s3");
891-
putIfAbsent(translogRepositorySettingsAttributeKeyPrefix + "bucket", "translog_bucket");
892-
putIfAbsent(translogRepositorySettingsAttributeKeyPrefix + "base_path", "/translog/path");
893985
put(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, clusterStateRepo);
894986
putIfAbsent(clusterStateRepositoryTypeAttributeKey, "s3");
895987
putIfAbsent(clusterStateRepositorySettingsAttributeKeyPrefix + "bucket", "state_bucket");

server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1901,7 +1901,7 @@ public void testAsyncDurabilityThrowsExceptionWhenRestrictSettingTrue() {
19011901
request,
19021902
Settings.EMPTY,
19031903
null,
1904-
Settings.builder().put("node.attr.remote_store.setting", "test").build(),
1904+
Settings.builder().put("node.attr.remote_store.segment.repository", "test").build(),
19051905
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
19061906
randomShardLimitService(),
19071907
Collections.emptySet(),

0 commit comments

Comments
 (0)