Skip to content

Commit 5121409

Browse files
authored
[Remote Routing Table] Initial commit for RemoteRoutingTableService setup (opensearch-project#13304)
* Initial commit for RemoteRoutingTableService setup Signed-off-by: Himshikha Gupta <himshikh@amazon.com>
1 parent 10ae4cb commit 5121409

File tree

13 files changed

+551
-9
lines changed

13 files changed

+551
-9
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1414
- Add getMetadataFields to MapperService ([#13819](https://github.com/opensearch-project/OpenSearch/pull/13819))
1515
- [Remote State] Add async remote state deletion task running on an interval, configurable by a setting ([#13131](https://github.com/opensearch-project/OpenSearch/pull/13131))
1616
- Allow setting query parameters on requests ([#13776](https://github.com/opensearch-project/OpenSearch/issues/13776))
17+
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))
1718

1819
### Dependencies
1920
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))

server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java

+29-7
Original file line numberDiff line numberDiff line change
@@ -511,11 +511,27 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
511511
assert existingNodes.isEmpty() == false;
512512

513513
CompatibilityMode remoteStoreCompatibilityMode = REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(metadata.settings());
514-
if (STRICT.equals(remoteStoreCompatibilityMode)) {
515514

516-
DiscoveryNode existingNode = existingNodes.get(0);
515+
List<String> reposToSkip = new ArrayList<>(1);
516+
Optional<DiscoveryNode> remoteRoutingTableNode = existingNodes.stream()
517+
.filter(
518+
node -> node.getAttributes().get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY) != null
519+
)
520+
.findFirst();
521+
// If none of the existing nodes have routing table repo, then we skip this repo check if present in joining node.
522+
// This ensures a new node with remote routing table repo is able to join the cluster.
523+
if (remoteRoutingTableNode.isEmpty()) {
524+
String joiningNodeRepoName = joiningNode.getAttributes()
525+
.get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY);
526+
if (joiningNodeRepoName != null) {
527+
reposToSkip.add(joiningNodeRepoName);
528+
}
529+
}
530+
531+
if (STRICT.equals(remoteStoreCompatibilityMode)) {
532+
DiscoveryNode existingNode = remoteRoutingTableNode.orElseGet(() -> existingNodes.get(0));
517533
if (joiningNode.isRemoteStoreNode()) {
518-
ensureRemoteStoreNodesCompatibility(joiningNode, existingNode);
534+
ensureRemoteStoreNodesCompatibility(joiningNode, existingNode, reposToSkip);
519535
} else {
520536
if (existingNode.isRemoteStoreNode()) {
521537
throw new IllegalStateException(
@@ -537,19 +553,25 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
537553
throw new IllegalStateException(reason);
538554
}
539555
if (joiningNode.isRemoteStoreNode()) {
540-
Optional<DiscoveryNode> remoteDN = existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
541-
remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode));
556+
Optional<DiscoveryNode> remoteDN = remoteRoutingTableNode.isPresent()
557+
? remoteRoutingTableNode
558+
: existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
559+
remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode, reposToSkip));
542560
}
543561
}
544562
}
545563
}
546564

547-
private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNode existingNode) {
565+
private static void ensureRemoteStoreNodesCompatibility(
566+
DiscoveryNode joiningNode,
567+
DiscoveryNode existingNode,
568+
List<String> reposToSkip
569+
) {
548570
if (joiningNode.isRemoteStoreNode()) {
549571
if (existingNode.isRemoteStoreNode()) {
550572
RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode);
551573
RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode);
552-
if (existingRemoteStoreNodeAttribute.equals(joiningRemoteStoreNodeAttribute) == false) {
574+
if (existingRemoteStoreNodeAttribute.equalsWithRepoSkip(joiningRemoteStoreNodeAttribute, reposToSkip) == false) {
553575
throw new IllegalStateException(
554576
"a remote store node ["
555577
+ joiningNode

server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java

+36
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,10 @@
5151
import java.io.IOException;
5252
import java.util.ArrayList;
5353
import java.util.Collections;
54+
import java.util.Comparator;
5455
import java.util.EnumSet;
5556
import java.util.List;
57+
import java.util.stream.Collectors;
5658

5759
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;
5860

@@ -164,6 +166,40 @@ public boolean equalsIgnoreGenerations(@Nullable RepositoriesMetadata other) {
164166
return true;
165167
}
166168

169+
/**
170+
* Checks if this instance and the give instance share the same repositories, with option to skip checking for a list of repos.
171+
* This will support
172+
* @param other other repositories metadata
173+
* @param reposToSkip list of repos to skip check for equality
174+
* @return {@code true} iff both instances contain the same repositories apart from differences in generations, not including repos provided in reposToSkip.
175+
*/
176+
public boolean equalsIgnoreGenerationsWithRepoSkip(@Nullable RepositoriesMetadata other, List<String> reposToSkip) {
177+
if (other == null) {
178+
return false;
179+
}
180+
List<RepositoryMetadata> currentRepositories = repositories.stream()
181+
.filter(repo -> !reposToSkip.contains(repo.name()))
182+
.collect(Collectors.toList());
183+
List<RepositoryMetadata> otherRepositories = other.repositories.stream()
184+
.filter(repo -> !reposToSkip.contains(repo.name()))
185+
.collect(Collectors.toList());
186+
187+
if (otherRepositories.size() != currentRepositories.size()) {
188+
return false;
189+
}
190+
// Sort repos by name for ordered comparison
191+
Comparator<RepositoryMetadata> compareByName = (o1, o2) -> o1.name().compareTo(o2.name());
192+
currentRepositories.sort(compareByName);
193+
otherRepositories.sort(compareByName);
194+
195+
for (int i = 0; i < currentRepositories.size(); i++) {
196+
if (currentRepositories.get(i).equalsIgnoreGenerations(otherRepositories.get(i)) == false) {
197+
return false;
198+
}
199+
}
200+
return true;
201+
}
202+
167203
@Override
168204
public int hashCode() {
169205
return repositories.hashCode();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.routing.remote;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
14+
import org.opensearch.common.settings.Settings;
15+
import org.opensearch.common.util.io.IOUtils;
16+
import org.opensearch.node.Node;
17+
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
18+
import org.opensearch.repositories.RepositoriesService;
19+
import org.opensearch.repositories.Repository;
20+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
21+
22+
import java.io.IOException;
23+
import java.util.function.Supplier;
24+
25+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
26+
27+
/**
28+
* A Service which provides APIs to upload and download routing table from remote store.
29+
*
30+
* @opensearch.internal
31+
*/
32+
public class RemoteRoutingTableService extends AbstractLifecycleComponent {
33+
34+
private static final Logger logger = LogManager.getLogger(RemoteRoutingTableService.class);
35+
private final Settings settings;
36+
private final Supplier<RepositoriesService> repositoriesService;
37+
private BlobStoreRepository blobStoreRepository;
38+
39+
public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService, Settings settings) {
40+
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
41+
this.repositoriesService = repositoriesService;
42+
this.settings = settings;
43+
}
44+
45+
@Override
46+
protected void doClose() throws IOException {
47+
if (blobStoreRepository != null) {
48+
IOUtils.close(blobStoreRepository);
49+
}
50+
}
51+
52+
@Override
53+
protected void doStart() {
54+
assert isRemoteRoutingTableEnabled(settings) == true : "Remote routing table is not enabled";
55+
final String remoteStoreRepo = settings.get(
56+
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY
57+
);
58+
assert remoteStoreRepo != null : "Remote routing table repository is not configured";
59+
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
60+
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
61+
blobStoreRepository = (BlobStoreRepository) repository;
62+
}
63+
64+
@Override
65+
protected void doStop() {}
66+
67+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/** Package containing class to perform operations on remote routing table */
10+
package org.opensearch.cluster.routing.remote;

server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ protected FeatureFlagSettings(
3636
FeatureFlags.DATETIME_FORMATTER_CACHING_SETTING,
3737
FeatureFlags.TIERED_REMOTE_INDEX_SETTING,
3838
FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING,
39-
FeatureFlags.PLUGGABLE_CACHE_SETTING
39+
FeatureFlags.PLUGGABLE_CACHE_SETTING,
40+
FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL_SETTING
4041
);
4142
}

server/src/main/java/org/opensearch/common/util/FeatureFlags.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ public class FeatureFlags {
6767
*/
6868
public static final String PLUGGABLE_CACHE = "opensearch.experimental.feature.pluggable.caching.enabled";
6969

70+
/**
71+
* Gates the functionality of remote routing table.
72+
*/
73+
public static final String REMOTE_PUBLICATION_EXPERIMENTAL = "opensearch.experimental.feature.remote_store.publication.enabled";
74+
7075
public static final Setting<Boolean> REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING = Setting.boolSetting(
7176
REMOTE_STORE_MIGRATION_EXPERIMENTAL,
7277
false,
@@ -89,14 +94,21 @@ public class FeatureFlags {
8994

9095
public static final Setting<Boolean> PLUGGABLE_CACHE_SETTING = Setting.boolSetting(PLUGGABLE_CACHE, false, Property.NodeScope);
9196

97+
public static final Setting<Boolean> REMOTE_PUBLICATION_EXPERIMENTAL_SETTING = Setting.boolSetting(
98+
REMOTE_PUBLICATION_EXPERIMENTAL,
99+
false,
100+
Property.NodeScope
101+
);
102+
92103
private static final List<Setting<Boolean>> ALL_FEATURE_FLAG_SETTINGS = List.of(
93104
REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING,
94105
EXTENSIONS_SETTING,
95106
IDENTITY_SETTING,
96107
TELEMETRY_SETTING,
97108
DATETIME_FORMATTER_CACHING_SETTING,
98109
TIERED_REMOTE_INDEX_SETTING,
99-
PLUGGABLE_CACHE_SETTING
110+
PLUGGABLE_CACHE_SETTING,
111+
REMOTE_PUBLICATION_EXPERIMENTAL_SETTING
100112
);
101113
/**
102114
* Should store the settings from opensearch.yml.

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

+15
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.cluster.metadata.IndexMetadata;
1919
import org.opensearch.cluster.metadata.Metadata;
2020
import org.opensearch.cluster.metadata.TemplatesMetadata;
21+
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
2122
import org.opensearch.common.CheckedRunnable;
2223
import org.opensearch.common.Nullable;
2324
import org.opensearch.common.blobstore.BlobContainer;
@@ -68,6 +69,7 @@
6869

6970
import static java.util.Objects.requireNonNull;
7071
import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD;
72+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
7173
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;
7274

7375
/**
@@ -202,6 +204,7 @@ public class RemoteClusterStateService implements Closeable {
202204
private final List<IndexMetadataUploadListener> indexMetadataUploadListeners;
203205
private BlobStoreRepository blobStoreRepository;
204206
private BlobStoreTransferService blobStoreTransferService;
207+
private Optional<RemoteRoutingTableService> remoteRoutingTableService;
205208
private volatile TimeValue slowWriteLoggingThreshold;
206209

207210
private volatile TimeValue indexMetadataUploadTimeout;
@@ -253,6 +256,9 @@ public RemoteClusterStateService(
253256
clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout);
254257
this.remoteStateStats = new RemotePersistenceStats();
255258
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
259+
this.remoteRoutingTableService = isRemoteRoutingTableEnabled(settings)
260+
? Optional.of(new RemoteRoutingTableService(repositoriesService, settings))
261+
: Optional.empty();
256262
}
257263

258264
private BlobStoreTransferService getBlobStoreTransferService() {
@@ -749,6 +755,9 @@ public void close() throws IOException {
749755
if (blobStoreRepository != null) {
750756
IOUtils.close(blobStoreRepository);
751757
}
758+
if (this.remoteRoutingTableService.isPresent()) {
759+
this.remoteRoutingTableService.get().close();
760+
}
752761
}
753762

754763
public void start() {
@@ -760,6 +769,7 @@ public void start() {
760769
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
761770
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
762771
blobStoreRepository = (BlobStoreRepository) repository;
772+
this.remoteRoutingTableService.ifPresent(RemoteRoutingTableService::start);
763773
}
764774

765775
private ClusterMetadataManifest uploadManifest(
@@ -933,6 +943,11 @@ public TimeValue getMetadataManifestUploadTimeout() {
933943
return this.metadataManifestUploadTimeout;
934944
}
935945

946+
// Package private for unit test
947+
Optional<RemoteRoutingTableService> getRemoteRoutingTableService() {
948+
return this.remoteRoutingTableService;
949+
}
950+
936951
static String getManifestFileName(long term, long version, boolean committed, int codecVersion) {
937952
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest__<inverted_term>__<inverted_version>__C/P__<inverted__timestamp>__<codec_version>
938953
return String.join(

server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java

+33
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.cluster.metadata.RepositoryMetadata;
1414
import org.opensearch.cluster.node.DiscoveryNode;
1515
import org.opensearch.common.settings.Settings;
16+
import org.opensearch.common.util.FeatureFlags;
1617
import org.opensearch.gateway.remote.RemoteClusterStateService;
1718
import org.opensearch.node.Node;
1819
import org.opensearch.repositories.blobstore.BlobStoreRepository;
@@ -28,6 +29,8 @@
2829
import java.util.Set;
2930
import java.util.stream.Collectors;
3031

32+
import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
33+
3134
/**
3235
* This is an abstraction for validating and storing information specific to remote backed storage nodes.
3336
*
@@ -46,6 +49,8 @@ public class RemoteStoreNodeAttribute {
4649
+ "."
4750
+ CryptoMetadata.SETTINGS_KEY;
4851
public static final String REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX = "remote_store.repository.%s.settings.";
52+
public static final String REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.routing_table.repository";
53+
4954
private final RepositoriesMetadata repositoriesMetadata;
5055

5156
public static List<String> SUPPORTED_DATA_REPO_NAME_ATTRIBUTES = List.of(
@@ -157,6 +162,10 @@ private Set<String> getValidatedRepositoryNames(DiscoveryNode node) {
157162
} else if (node.getAttributes().containsKey(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)) {
158163
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY));
159164
}
165+
if (node.getAttributes().containsKey(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)) {
166+
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY));
167+
}
168+
160169
return repositoryNames;
161170
}
162171

@@ -187,6 +196,15 @@ public static boolean isRemoteStoreClusterStateEnabled(Settings settings) {
187196
&& isRemoteClusterStateAttributePresent(settings);
188197
}
189198

199+
private static boolean isRemoteRoutingTableAttributePresent(Settings settings) {
200+
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)
201+
.isEmpty() == false;
202+
}
203+
204+
public static boolean isRemoteRoutingTableEnabled(Settings settings) {
205+
return FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) && isRemoteRoutingTableAttributePresent(settings);
206+
}
207+
190208
public RepositoriesMetadata getRepositoriesMetadata() {
191209
return this.repositoriesMetadata;
192210
}
@@ -231,6 +249,21 @@ public int hashCode() {
231249
return hashCode;
232250
}
233251

252+
/**
253+
* Checks if 2 instances are equal, with option to skip check for a list of repos.
254+
* *
255+
* @param o other instance
256+
* @param reposToSkip list of repos to skip check for equality
257+
* @return {@code true} iff both instances are equal, not including the repositories in both instances if they are part of reposToSkip.
258+
*/
259+
public boolean equalsWithRepoSkip(Object o, List<String> reposToSkip) {
260+
if (this == o) return true;
261+
if (o == null || getClass() != o.getClass()) return false;
262+
263+
RemoteStoreNodeAttribute that = (RemoteStoreNodeAttribute) o;
264+
return this.getRepositoriesMetadata().equalsIgnoreGenerationsWithRepoSkip(that.getRepositoriesMetadata(), reposToSkip);
265+
}
266+
234267
@Override
235268
public boolean equals(Object o) {
236269
if (this == o) return true;

0 commit comments

Comments
 (0)