Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Decouple remote state configuration #12712

Merged
merged 1 commit into from
Mar 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -139,7 +139,7 @@
import static org.opensearch.cluster.metadata.Metadata.DEFAULT_REPLICA_COUNT_SETTING;
import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;

/**
* Service responsible for submitting create index requests
@@ -978,7 +978,7 @@ private static void updateReplicationStrategy(
indexReplicationType = INDEX_REPLICATION_TYPE_SETTING.get(combinedTemplateSettings);
} else if (CLUSTER_REPLICATION_TYPE_SETTING.exists(clusterSettings)) {
indexReplicationType = CLUSTER_REPLICATION_TYPE_SETTING.get(clusterSettings);
} else if (isRemoteStoreAttributePresent(clusterSettings)) {
} else if (isRemoteDataAttributePresent(clusterSettings)) {
indexReplicationType = ReplicationType.SEGMENT;
} else {
indexReplicationType = CLUSTER_REPLICATION_TYPE_SETTING.getDefault(clusterSettings);
@@ -992,7 +992,7 @@ private static void updateReplicationStrategy(
* @param clusterSettings cluster level settings
*/
private static void updateRemoteStoreSettings(Settings.Builder settingsBuilder, Settings clusterSettings) {
if (isRemoteStoreAttributePresent(clusterSettings)) {
if (isRemoteDataAttributePresent(clusterSettings)) {
settingsBuilder.put(SETTING_REMOTE_STORE_ENABLED, true)
.put(
SETTING_REMOTE_SEGMENT_STORE_REPOSITORY,
@@ -1603,7 +1603,7 @@ public static void validateRefreshIntervalSettings(Settings requestSettings, Clu
* @param clusterSettings cluster setting
*/
static void validateTranslogDurabilitySettings(Settings requestSettings, ClusterSettings clusterSettings, Settings settings) {
if (isRemoteStoreAttributePresent(settings) == false
if (isRemoteDataAttributePresent(settings) == false
|| IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.exists(requestSettings) == false
|| clusterSettings.get(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING) == false) {
return;
Original file line number Diff line number Diff line change
@@ -131,12 +131,8 @@ private RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String na
}

private RepositoriesMetadata buildRepositoriesMetadata(DiscoveryNode node) {
Set<String> repositoryNames = getValidatedRepositoryNames(node);
List<RepositoryMetadata> repositoryMetadataList = new ArrayList<>();
Set<String> repositoryNames = new HashSet<>();

repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY));

for (String repositoryName : repositoryNames) {
repositoryMetadataList.add(buildRepositoryMetadata(node, repositoryName));
@@ -145,12 +141,36 @@ private RepositoriesMetadata buildRepositoriesMetadata(DiscoveryNode node) {
return new RepositoriesMetadata(repositoryMetadataList);
}

private Set<String> getValidatedRepositoryNames(DiscoveryNode node) {
Set<String> repositoryNames = new HashSet<>();
if (node.getAttributes().containsKey(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY)
|| node.getAttributes().containsKey(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY)) {
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY));
} else if (node.getAttributes().containsKey(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)) {
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY));
}
return repositoryNames;
}

public static boolean isRemoteStoreAttributePresent(Settings settings) {
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty() == false;
}

public static boolean isRemoteDataAttributePresent(Settings settings) {
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY).isEmpty() == false
|| settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY).isEmpty() == false;
}

public static boolean isRemoteClusterStateAttributePresent(Settings settings) {
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)
.isEmpty() == false;
}

public static boolean isRemoteStoreClusterStateEnabled(Settings settings) {
return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) && isRemoteStoreAttributePresent(settings);
return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings)
&& isRemoteClusterStateAttributePresent(settings);
}

public RepositoriesMetadata getRepositoriesMetadata() {
Original file line number Diff line number Diff line change
@@ -451,7 +451,8 @@ public void testJoinClusterWithNonRemoteStoreNodeJoining() {
}

public void testJoinClusterWithRemoteStoreNodeJoining() {
DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO));
Map<String, String> map = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO);
DiscoveryNode joiningNode = newDiscoveryNode(map);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(joiningNode).build())
.build();
@@ -599,12 +600,94 @@ public void testPreventJoinClusterWithRemoteStoreNodeWithPartialAttributesJoinin
);
assertTrue(
e.getMessage().equals("joining node [" + joiningNode + "] doesn't have the node attribute [" + nodeAttribute.getKey() + "]")
|| e.getMessage()
.equals(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
+ "]"
)
);

remoteStoreNodeAttributes.put(nodeAttribute.getKey(), nodeAttribute.getValue());
}
}

public void testJoinClusterWithRemoteStateNodeJoiningRemoteStateCluster() {
Map<String, String> existingNodeAttributes = remoteStateNodeAttributes(CLUSTER_STATE_REPO);
final DiscoveryNode existingNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
existingNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
.build();
DiscoveryNode joiningNode = newDiscoveryNode(remoteStateNodeAttributes(CLUSTER_STATE_REPO));
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
}

public void testPreventJoinClusterWithRemoteStateNodeJoiningRemoteStoreCluster() {
Map<String, String> existingNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO);
final DiscoveryNode existingNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
existingNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
.build();
DiscoveryNode joiningNode = newDiscoveryNode(remoteStateNodeAttributes(CLUSTER_STATE_REPO));
Exception e = assertThrows(
IllegalStateException.class,
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
);
assertTrue(
e.getMessage()
.equals(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
+ "]"
)
);
}

public void testPreventJoinClusterWithRemoteStoreNodeJoiningRemoteStateCluster() {
Map<String, String> existingNodeAttributes = remoteStateNodeAttributes(CLUSTER_STATE_REPO);
final DiscoveryNode existingNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
existingNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
.build();
DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO));
Exception e = assertThrows(
IllegalStateException.class,
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
);
assertTrue(
e.getMessage()
.equals(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in comparison with existing node ["
+ currentState.getNodes().getNodes().values().stream().findFirst().get()
+ "]"
)
);
}

public void testUpdatesClusterStateWithSingleNodeCluster() throws Exception {
Map<String, String> remoteStoreNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO);
final AllocationService allocationService = mock(AllocationService.class);
@@ -890,6 +973,23 @@ private Map<String, String> remoteStoreNodeAttributes(String segmentRepoName, St
REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
translogRepoName
);

return new HashMap<>() {
{
put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName);
put(segmentRepositoryTypeAttributeKey, "s3");
put(segmentRepositorySettingsAttributeKeyPrefix + "bucket", "segment_bucket");
put(segmentRepositorySettingsAttributeKeyPrefix + "base_path", "/segment/path");
put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName);
putIfAbsent(translogRepositoryTypeAttributeKey, "s3");
putIfAbsent(translogRepositorySettingsAttributeKeyPrefix + "bucket", "translog_bucket");
putIfAbsent(translogRepositorySettingsAttributeKeyPrefix + "base_path", "/translog/path");
putAll(remoteStateNodeAttributes(clusterStateRepo));
}
};
}

private Map<String, String> remoteStateNodeAttributes(String clusterStateRepo) {
String clusterStateRepositoryTypeAttributeKey = String.format(
Locale.getDefault(),
REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
@@ -903,14 +1003,6 @@ private Map<String, String> remoteStoreNodeAttributes(String segmentRepoName, St

return new HashMap<>() {
{
put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName);
put(segmentRepositoryTypeAttributeKey, "s3");
put(segmentRepositorySettingsAttributeKeyPrefix + "bucket", "segment_bucket");
put(segmentRepositorySettingsAttributeKeyPrefix + "base_path", "/segment/path");
put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName);
putIfAbsent(translogRepositoryTypeAttributeKey, "s3");
putIfAbsent(translogRepositorySettingsAttributeKeyPrefix + "bucket", "translog_bucket");
putIfAbsent(translogRepositorySettingsAttributeKeyPrefix + "base_path", "/translog/path");
put(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, clusterStateRepo);
putIfAbsent(clusterStateRepositoryTypeAttributeKey, "s3");
putIfAbsent(clusterStateRepositorySettingsAttributeKeyPrefix + "bucket", "state_bucket");
Original file line number Diff line number Diff line change
@@ -1910,7 +1910,7 @@ public void testAsyncDurabilityThrowsExceptionWhenRestrictSettingTrue() {
request,
Settings.EMPTY,
null,
Settings.builder().put("node.attr.remote_store.setting", "test").build(),
Settings.builder().put("node.attr.remote_store.segment.repository", "test").build(),
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
randomShardLimitService(),
Collections.emptySet(),