Skip to content

Commit 8cf895b

Browse files
authored
[Backport 2.x] Initial commit for RemoteRoutingTableService setup (#13304) (#14026)
* [Remote Routing Table] Introducing RemoteRoutingTableService (#13304) Signed-off-by: Himshikha Gupta <himshikh@amazon.com>
1 parent fe61816 commit 8cf895b

File tree

13 files changed

+551
-9
lines changed

13 files changed

+551
-9
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1616
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
1717
- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))
1818
- [Remote State] Add async remote state deletion task running on an interval, configurable by a setting ([#13995](https://github.com/opensearch-project/OpenSearch/pull/13995))
19+
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))
1920

2021
### Dependencies
2122
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))

server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java

+29-7
Original file line numberDiff line numberDiff line change
@@ -572,11 +572,27 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
572572
assert existingNodes.isEmpty() == false;
573573

574574
CompatibilityMode remoteStoreCompatibilityMode = REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(metadata.settings());
575-
if (STRICT.equals(remoteStoreCompatibilityMode)) {
576575

577-
DiscoveryNode existingNode = existingNodes.get(0);
576+
List<String> reposToSkip = new ArrayList<>(1);
577+
Optional<DiscoveryNode> remoteRoutingTableNode = existingNodes.stream()
578+
.filter(
579+
node -> node.getAttributes().get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY) != null
580+
)
581+
.findFirst();
582+
// If none of the existing nodes have routing table repo, then we skip this repo check if present in joining node.
583+
// This ensures a new node with remote routing table repo is able to join the cluster.
584+
if (remoteRoutingTableNode.isEmpty()) {
585+
String joiningNodeRepoName = joiningNode.getAttributes()
586+
.get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY);
587+
if (joiningNodeRepoName != null) {
588+
reposToSkip.add(joiningNodeRepoName);
589+
}
590+
}
591+
592+
if (STRICT.equals(remoteStoreCompatibilityMode)) {
593+
DiscoveryNode existingNode = remoteRoutingTableNode.orElseGet(() -> existingNodes.get(0));
578594
if (joiningNode.isRemoteStoreNode()) {
579-
ensureRemoteStoreNodesCompatibility(joiningNode, existingNode);
595+
ensureRemoteStoreNodesCompatibility(joiningNode, existingNode, reposToSkip);
580596
} else {
581597
if (existingNode.isRemoteStoreNode()) {
582598
throw new IllegalStateException(
@@ -598,19 +614,25 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
598614
throw new IllegalStateException(reason);
599615
}
600616
if (joiningNode.isRemoteStoreNode()) {
601-
Optional<DiscoveryNode> remoteDN = existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
602-
remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode));
617+
Optional<DiscoveryNode> remoteDN = remoteRoutingTableNode.isPresent()
618+
? remoteRoutingTableNode
619+
: existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
620+
remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode, reposToSkip));
603621
}
604622
}
605623
}
606624
}
607625

608-
private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNode existingNode) {
626+
private static void ensureRemoteStoreNodesCompatibility(
627+
DiscoveryNode joiningNode,
628+
DiscoveryNode existingNode,
629+
List<String> reposToSkip
630+
) {
609631
if (joiningNode.isRemoteStoreNode()) {
610632
if (existingNode.isRemoteStoreNode()) {
611633
RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode);
612634
RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode);
613-
if (existingRemoteStoreNodeAttribute.equals(joiningRemoteStoreNodeAttribute) == false) {
635+
if (existingRemoteStoreNodeAttribute.equalsWithRepoSkip(joiningRemoteStoreNodeAttribute, reposToSkip) == false) {
614636
throw new IllegalStateException(
615637
"a remote store node ["
616638
+ joiningNode

server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java

+36
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,10 @@
5151
import java.io.IOException;
5252
import java.util.ArrayList;
5353
import java.util.Collections;
54+
import java.util.Comparator;
5455
import java.util.EnumSet;
5556
import java.util.List;
57+
import java.util.stream.Collectors;
5658

5759
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;
5860

@@ -164,6 +166,40 @@ public boolean equalsIgnoreGenerations(@Nullable RepositoriesMetadata other) {
164166
return true;
165167
}
166168

169+
/**
170+
* Checks if this instance and the give instance share the same repositories, with option to skip checking for a list of repos.
171+
* This will support
172+
* @param other other repositories metadata
173+
* @param reposToSkip list of repos to skip check for equality
174+
* @return {@code true} iff both instances contain the same repositories apart from differences in generations, not including repos provided in reposToSkip.
175+
*/
176+
public boolean equalsIgnoreGenerationsWithRepoSkip(@Nullable RepositoriesMetadata other, List<String> reposToSkip) {
177+
if (other == null) {
178+
return false;
179+
}
180+
List<RepositoryMetadata> currentRepositories = repositories.stream()
181+
.filter(repo -> !reposToSkip.contains(repo.name()))
182+
.collect(Collectors.toList());
183+
List<RepositoryMetadata> otherRepositories = other.repositories.stream()
184+
.filter(repo -> !reposToSkip.contains(repo.name()))
185+
.collect(Collectors.toList());
186+
187+
if (otherRepositories.size() != currentRepositories.size()) {
188+
return false;
189+
}
190+
// Sort repos by name for ordered comparison
191+
Comparator<RepositoryMetadata> compareByName = (o1, o2) -> o1.name().compareTo(o2.name());
192+
currentRepositories.sort(compareByName);
193+
otherRepositories.sort(compareByName);
194+
195+
for (int i = 0; i < currentRepositories.size(); i++) {
196+
if (currentRepositories.get(i).equalsIgnoreGenerations(otherRepositories.get(i)) == false) {
197+
return false;
198+
}
199+
}
200+
return true;
201+
}
202+
167203
@Override
168204
public int hashCode() {
169205
return repositories.hashCode();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.routing.remote;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
14+
import org.opensearch.common.settings.Settings;
15+
import org.opensearch.common.util.io.IOUtils;
16+
import org.opensearch.node.Node;
17+
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
18+
import org.opensearch.repositories.RepositoriesService;
19+
import org.opensearch.repositories.Repository;
20+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
21+
22+
import java.io.IOException;
23+
import java.util.function.Supplier;
24+
25+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
26+
27+
/**
28+
* A Service which provides APIs to upload and download routing table from remote store.
29+
*
30+
* @opensearch.internal
31+
*/
32+
public class RemoteRoutingTableService extends AbstractLifecycleComponent {
33+
34+
private static final Logger logger = LogManager.getLogger(RemoteRoutingTableService.class);
35+
private final Settings settings;
36+
private final Supplier<RepositoriesService> repositoriesService;
37+
private BlobStoreRepository blobStoreRepository;
38+
39+
public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService, Settings settings) {
40+
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
41+
this.repositoriesService = repositoriesService;
42+
this.settings = settings;
43+
}
44+
45+
@Override
46+
protected void doClose() throws IOException {
47+
if (blobStoreRepository != null) {
48+
IOUtils.close(blobStoreRepository);
49+
}
50+
}
51+
52+
@Override
53+
protected void doStart() {
54+
assert isRemoteRoutingTableEnabled(settings) == true : "Remote routing table is not enabled";
55+
final String remoteStoreRepo = settings.get(
56+
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY
57+
);
58+
assert remoteStoreRepo != null : "Remote routing table repository is not configured";
59+
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
60+
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
61+
blobStoreRepository = (BlobStoreRepository) repository;
62+
}
63+
64+
@Override
65+
protected void doStop() {}
66+
67+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/** Package containing class to perform operations on remote routing table */
10+
package org.opensearch.cluster.routing.remote;

server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ protected FeatureFlagSettings(
3636
FeatureFlags.DATETIME_FORMATTER_CACHING_SETTING,
3737
FeatureFlags.TIERED_REMOTE_INDEX_SETTING,
3838
FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING,
39-
FeatureFlags.PLUGGABLE_CACHE_SETTING
39+
FeatureFlags.PLUGGABLE_CACHE_SETTING,
40+
FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL_SETTING
4041
);
4142
}

server/src/main/java/org/opensearch/common/util/FeatureFlags.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ public class FeatureFlags {
6767
*/
6868
public static final String PLUGGABLE_CACHE = "opensearch.experimental.feature.pluggable.caching.enabled";
6969

70+
/**
71+
* Gates the functionality of remote routing table.
72+
*/
73+
public static final String REMOTE_PUBLICATION_EXPERIMENTAL = "opensearch.experimental.feature.remote_store.publication.enabled";
74+
7075
public static final Setting<Boolean> REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING = Setting.boolSetting(
7176
REMOTE_STORE_MIGRATION_EXPERIMENTAL,
7277
false,
@@ -89,14 +94,21 @@ public class FeatureFlags {
8994

9095
public static final Setting<Boolean> PLUGGABLE_CACHE_SETTING = Setting.boolSetting(PLUGGABLE_CACHE, false, Property.NodeScope);
9196

97+
public static final Setting<Boolean> REMOTE_PUBLICATION_EXPERIMENTAL_SETTING = Setting.boolSetting(
98+
REMOTE_PUBLICATION_EXPERIMENTAL,
99+
false,
100+
Property.NodeScope
101+
);
102+
92103
private static final List<Setting<Boolean>> ALL_FEATURE_FLAG_SETTINGS = List.of(
93104
REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING,
94105
EXTENSIONS_SETTING,
95106
IDENTITY_SETTING,
96107
TELEMETRY_SETTING,
97108
DATETIME_FORMATTER_CACHING_SETTING,
98109
TIERED_REMOTE_INDEX_SETTING,
99-
PLUGGABLE_CACHE_SETTING
110+
PLUGGABLE_CACHE_SETTING,
111+
REMOTE_PUBLICATION_EXPERIMENTAL_SETTING
100112
);
101113
/**
102114
* Should store the settings from opensearch.yml.

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

+15
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.cluster.metadata.IndexMetadata;
1919
import org.opensearch.cluster.metadata.Metadata;
2020
import org.opensearch.cluster.metadata.TemplatesMetadata;
21+
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
2122
import org.opensearch.cluster.service.ClusterService;
2223
import org.opensearch.common.CheckedRunnable;
2324
import org.opensearch.common.Nullable;
@@ -69,6 +70,7 @@
6970

7071
import static java.util.Objects.requireNonNull;
7172
import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD;
73+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
7274
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;
7375

7476
/**
@@ -201,6 +203,7 @@ public class RemoteClusterStateService implements Closeable {
201203
private final List<IndexMetadataUploadListener> indexMetadataUploadListeners;
202204
private BlobStoreRepository blobStoreRepository;
203205
private BlobStoreTransferService blobStoreTransferService;
206+
private Optional<RemoteRoutingTableService> remoteRoutingTableService;
204207
private volatile TimeValue slowWriteLoggingThreshold;
205208

206209
private volatile TimeValue indexMetadataUploadTimeout;
@@ -253,6 +256,9 @@ public RemoteClusterStateService(
253256
this.remoteStateStats = new RemotePersistenceStats();
254257
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService);
255258
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
259+
this.remoteRoutingTableService = isRemoteRoutingTableEnabled(settings)
260+
? Optional.of(new RemoteRoutingTableService(repositoriesService, settings))
261+
: Optional.empty();
256262
}
257263

258264
/**
@@ -749,6 +755,9 @@ public void close() throws IOException {
749755
if (blobStoreRepository != null) {
750756
IOUtils.close(blobStoreRepository);
751757
}
758+
if (this.remoteRoutingTableService.isPresent()) {
759+
this.remoteRoutingTableService.get().close();
760+
}
752761
}
753762

754763
public void start() {
@@ -761,6 +770,7 @@ public void start() {
761770
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
762771
blobStoreRepository = (BlobStoreRepository) repository;
763772
remoteClusterStateCleanupManager.start();
773+
this.remoteRoutingTableService.ifPresent(RemoteRoutingTableService::start);
764774
}
765775

766776
private ClusterMetadataManifest uploadManifest(
@@ -945,6 +955,11 @@ public TimeValue getMetadataManifestUploadTimeout() {
945955
return this.metadataManifestUploadTimeout;
946956
}
947957

958+
// Package private for unit test
959+
Optional<RemoteRoutingTableService> getRemoteRoutingTableService() {
960+
return this.remoteRoutingTableService;
961+
}
962+
948963
static String getManifestFileName(long term, long version, boolean committed, int codecVersion) {
949964
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest__<inverted_term>__<inverted_version>__C/P__<inverted__timestamp>__<codec_version>
950965
return String.join(

server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java

+33
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.cluster.metadata.RepositoryMetadata;
1414
import org.opensearch.cluster.node.DiscoveryNode;
1515
import org.opensearch.common.settings.Settings;
16+
import org.opensearch.common.util.FeatureFlags;
1617
import org.opensearch.gateway.remote.RemoteClusterStateService;
1718
import org.opensearch.node.Node;
1819
import org.opensearch.repositories.blobstore.BlobStoreRepository;
@@ -28,6 +29,8 @@
2829
import java.util.Set;
2930
import java.util.stream.Collectors;
3031

32+
import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
33+
3134
/**
3235
* This is an abstraction for validating and storing information specific to remote backed storage nodes.
3336
*
@@ -46,6 +49,8 @@ public class RemoteStoreNodeAttribute {
4649
+ "."
4750
+ CryptoMetadata.SETTINGS_KEY;
4851
public static final String REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX = "remote_store.repository.%s.settings.";
52+
public static final String REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.routing_table.repository";
53+
4954
private final RepositoriesMetadata repositoriesMetadata;
5055

5156
public static List<String> SUPPORTED_DATA_REPO_NAME_ATTRIBUTES = List.of(
@@ -157,6 +162,10 @@ private Set<String> getValidatedRepositoryNames(DiscoveryNode node) {
157162
} else if (node.getAttributes().containsKey(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)) {
158163
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY));
159164
}
165+
if (node.getAttributes().containsKey(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)) {
166+
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY));
167+
}
168+
160169
return repositoryNames;
161170
}
162171

@@ -187,6 +196,15 @@ public static boolean isRemoteStoreClusterStateEnabled(Settings settings) {
187196
&& isRemoteClusterStateAttributePresent(settings);
188197
}
189198

199+
private static boolean isRemoteRoutingTableAttributePresent(Settings settings) {
200+
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)
201+
.isEmpty() == false;
202+
}
203+
204+
public static boolean isRemoteRoutingTableEnabled(Settings settings) {
205+
return FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) && isRemoteRoutingTableAttributePresent(settings);
206+
}
207+
190208
public RepositoriesMetadata getRepositoriesMetadata() {
191209
return this.repositoriesMetadata;
192210
}
@@ -231,6 +249,21 @@ public int hashCode() {
231249
return hashCode;
232250
}
233251

252+
/**
253+
* Checks if 2 instances are equal, with option to skip check for a list of repos.
254+
* *
255+
* @param o other instance
256+
* @param reposToSkip list of repos to skip check for equality
257+
* @return {@code true} iff both instances are equal, not including the repositories in both instances if they are part of reposToSkip.
258+
*/
259+
public boolean equalsWithRepoSkip(Object o, List<String> reposToSkip) {
260+
if (this == o) return true;
261+
if (o == null || getClass() != o.getClass()) return false;
262+
263+
RemoteStoreNodeAttribute that = (RemoteStoreNodeAttribute) o;
264+
return this.getRepositoriesMetadata().equalsIgnoreGenerationsWithRepoSkip(that.getRepositoriesMetadata(), reposToSkip);
265+
}
266+
234267
@Override
235268
public boolean equals(Object o) {
236269
if (this == o) return true;

0 commit comments

Comments
 (0)