Skip to content

Commit 12ff5ed

Browse files
authored
using remote cluster-state as fallback (opensearch-project#15424)
Signed-off-by: Rajiv Kumar Vaidyanathan <rajivkv@amazon.com>
1 parent 2e98df3 commit 12ff5ed

20 files changed

+853
-21
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
77
### Added
88
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637))
99
- [Workload Management] QueryGroup resource cancellation framework changes ([#15651](https://github.com/opensearch-project/OpenSearch/pull/15651))
10+
- Fallback to Remote cluster-state on Term-Version check mismatch - ([#15424](https://github.com/opensearch-project/OpenSearch/pull/15424))
1011

1112
### Dependencies
1213
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.gateway.remote;
10+
11+
import org.opensearch.action.admin.cluster.state.ClusterStateAction;
12+
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
13+
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
14+
import org.opensearch.action.support.clustermanager.term.GetTermVersionAction;
15+
import org.opensearch.action.support.clustermanager.term.GetTermVersionResponse;
16+
import org.opensearch.cluster.ClusterState;
17+
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
18+
import org.opensearch.cluster.coordination.PublicationTransportHandler;
19+
import org.opensearch.cluster.metadata.IndexMetadata;
20+
import org.opensearch.common.blobstore.BlobPath;
21+
import org.opensearch.common.settings.Settings;
22+
import org.opensearch.core.transport.TransportResponse;
23+
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
24+
import org.opensearch.index.mapper.MapperService;
25+
import org.opensearch.index.remote.RemoteStoreEnums;
26+
import org.opensearch.plugins.Plugin;
27+
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
28+
import org.opensearch.repositories.RepositoriesService;
29+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
30+
import org.opensearch.test.OpenSearchIntegTestCase;
31+
import org.opensearch.test.transport.MockTransportService;
32+
import org.opensearch.transport.TransportService;
33+
import org.junit.Before;
34+
35+
import java.nio.file.Path;
36+
import java.util.Collection;
37+
import java.util.List;
38+
import java.util.Map;
39+
import java.util.Optional;
40+
import java.util.concurrent.atomic.AtomicInteger;
41+
42+
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
43+
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY;
44+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
45+
import static org.hamcrest.Matchers.is;
46+
47+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
48+
public class RemoteClusterStateTermVersionIT extends RemoteStoreBaseIntegTestCase {
49+
private static final String INDEX_NAME = "test-index";
50+
private static final String INDEX_NAME_1 = "test-index-1";
51+
List<BlobPath> indexRoutingPaths;
52+
AtomicInteger indexRoutingFiles = new AtomicInteger();
53+
private final RemoteStoreEnums.PathType pathType = RemoteStoreEnums.PathType.HASHED_PREFIX;
54+
55+
@Before
56+
public void setup() {
57+
asyncUploadMockFsRepo = false;
58+
}
59+
60+
protected Collection<Class<? extends Plugin>> nodePlugins() {
61+
return List.of(MockTransportService.TestPlugin.class);
62+
}
63+
64+
@Override
65+
protected Settings nodeSettings(int nodeOrdinal) {
66+
return Settings.builder()
67+
.put(super.nodeSettings(nodeOrdinal))
68+
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
69+
.put(
70+
RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING.getKey(),
71+
RemoteStoreEnums.PathType.HASHED_PREFIX.toString()
72+
)
73+
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, REMOTE_ROUTING_TABLE_REPO)
74+
.put(REMOTE_PUBLICATION_SETTING_KEY, true)
75+
.build();
76+
}
77+
78+
public void testRemoteClusterStateFallback() throws Exception {
79+
BlobStoreRepository repository = prepareClusterAndVerifyRepository();
80+
81+
RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
82+
RemoteClusterStateService.class
83+
);
84+
85+
RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
86+
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
87+
getClusterState().getClusterName().value(),
88+
getClusterState().getMetadata().clusterUUID()
89+
);
90+
91+
String[] dataNodes = internalCluster().getDataNodeNames().toArray(String[]::new);
92+
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, dataNodes[0]);
93+
94+
String cm = internalCluster().getClusterManagerName();
95+
primaryService.addRequestHandlingBehavior(
96+
PublicationTransportHandler.COMMIT_STATE_ACTION_NAME,
97+
(handler, request, channel, task) -> {
98+
// not committing the state
99+
logger.info("ignoring the commit from cluster-manager {}", request);
100+
channel.sendResponse(TransportResponse.Empty.INSTANCE);
101+
}
102+
);
103+
104+
String index = "index_1";
105+
createIndex(
106+
index,
107+
Settings.builder()
108+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
109+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
110+
.put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), Long.MAX_VALUE)
111+
.build()
112+
);
113+
logger.info("created index {}", index);
114+
Map<String, AtomicInteger> callCounters = Map.ofEntries(
115+
Map.entry(ClusterStateAction.NAME, new AtomicInteger()),
116+
Map.entry(GetTermVersionAction.NAME, new AtomicInteger())
117+
);
118+
119+
addCallCountInterceptor(cm, callCounters);
120+
121+
ClusterStateResponse stateResponseM = client(cm).admin().cluster().state(new ClusterStateRequest()).actionGet();
122+
123+
ClusterStateResponse stateResponseD = client(dataNodes[0]).admin().cluster().state(new ClusterStateRequest()).actionGet();
124+
assertEquals(stateResponseM, stateResponseD);
125+
assertThat(callCounters.get(ClusterStateAction.NAME).get(), is(0));
126+
assertThat(callCounters.get(GetTermVersionAction.NAME).get(), is(1));
127+
128+
}
129+
130+
public void testNoRemoteClusterStateFound() throws Exception {
131+
BlobStoreRepository repository = prepareClusterAndVerifyRepository();
132+
133+
RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
134+
RemoteClusterStateService.class
135+
);
136+
137+
RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
138+
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
139+
getClusterState().getClusterName().value(),
140+
getClusterState().getMetadata().clusterUUID()
141+
);
142+
143+
String[] dataNodes = internalCluster().getDataNodeNames().toArray(String[]::new);
144+
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, dataNodes[0]);
145+
primaryService.addRequestHandlingBehavior(
146+
PublicationTransportHandler.COMMIT_STATE_ACTION_NAME,
147+
(handler, request, channel, task) -> {
148+
// not committing the state
149+
logger.info("ignoring the commit from cluster-manager {}", request);
150+
channel.sendResponse(TransportResponse.Empty.INSTANCE);
151+
}
152+
);
153+
154+
ClusterState state = internalCluster().clusterService().state();
155+
String cm = internalCluster().getClusterManagerName();
156+
MockTransportService cmservice = (MockTransportService) internalCluster().getInstance(TransportService.class, cm);
157+
cmservice.addRequestHandlingBehavior(GetTermVersionAction.NAME, (handler, request, channel, task) -> {
158+
channel.sendResponse(
159+
new GetTermVersionResponse(new ClusterStateTermVersion(state.getClusterName(), state.stateUUID(), -1, -1), true)
160+
);
161+
});
162+
163+
Map<String, AtomicInteger> callCounters = Map.ofEntries(
164+
Map.entry(ClusterStateAction.NAME, new AtomicInteger()),
165+
Map.entry(GetTermVersionAction.NAME, new AtomicInteger())
166+
);
167+
168+
addCallCountInterceptor(cm, callCounters);
169+
170+
ClusterStateResponse stateResponseM = client(cm).admin().cluster().state(new ClusterStateRequest()).actionGet();
171+
ClusterStateResponse stateResponseD = client(dataNodes[0]).admin().cluster().state(new ClusterStateRequest()).actionGet();
172+
assertEquals(stateResponseM, stateResponseD);
173+
assertThat(callCounters.get(ClusterStateAction.NAME).get(), is(1));
174+
assertThat(callCounters.get(GetTermVersionAction.NAME).get(), is(1));
175+
176+
}
177+
178+
private void addCallCountInterceptor(String nodeName, Map<String, AtomicInteger> callCounters) {
179+
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeName);
180+
for (var ctrEnty : callCounters.entrySet()) {
181+
primaryService.addRequestHandlingBehavior(ctrEnty.getKey(), (handler, request, channel, task) -> {
182+
ctrEnty.getValue().incrementAndGet();
183+
logger.info("--> {} response redirect", ctrEnty.getKey());
184+
handler.messageReceived(request, channel, task);
185+
});
186+
}
187+
}
188+
189+
private BlobStoreRepository prepareClusterAndVerifyRepository() throws Exception {
190+
clusterSettingsSuppliedByTest = true;
191+
Path segmentRepoPath = randomRepoPath();
192+
Path translogRepoPath = randomRepoPath();
193+
Path remoteRoutingTableRepoPath = randomRepoPath();
194+
Settings settings = buildRemoteStoreNodeAttributes(
195+
REPOSITORY_NAME,
196+
segmentRepoPath,
197+
REPOSITORY_2_NAME,
198+
translogRepoPath,
199+
REMOTE_ROUTING_TABLE_REPO,
200+
remoteRoutingTableRepoPath,
201+
false
202+
);
203+
prepareCluster(1, 3, INDEX_NAME, 1, 5, settings);
204+
ensureGreen(INDEX_NAME);
205+
206+
RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
207+
BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REMOTE_ROUTING_TABLE_REPO);
208+
209+
return repository;
210+
}
211+
212+
}

server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,12 @@
4646
import org.opensearch.cluster.metadata.Metadata.Custom;
4747
import org.opensearch.cluster.routing.RoutingTable;
4848
import org.opensearch.cluster.service.ClusterService;
49+
import org.opensearch.common.Nullable;
4950
import org.opensearch.common.inject.Inject;
5051
import org.opensearch.common.unit.TimeValue;
5152
import org.opensearch.core.action.ActionListener;
5253
import org.opensearch.core.common.io.stream.StreamInput;
54+
import org.opensearch.gateway.remote.RemoteClusterStateService;
5355
import org.opensearch.node.NodeClosedException;
5456
import org.opensearch.threadpool.ThreadPool;
5557
import org.opensearch.transport.TransportService;
@@ -80,7 +82,8 @@ public TransportClusterStateAction(
8082
ClusterService clusterService,
8183
ThreadPool threadPool,
8284
ActionFilters actionFilters,
83-
IndexNameExpressionResolver indexNameExpressionResolver
85+
IndexNameExpressionResolver indexNameExpressionResolver,
86+
@Nullable RemoteClusterStateService remoteClusterStateService
8487
) {
8588
super(
8689
ClusterStateAction.NAME,
@@ -93,6 +96,7 @@ public TransportClusterStateAction(
9396
indexNameExpressionResolver
9497
);
9598
this.localExecuteSupported = true;
99+
this.remoteClusterStateService = remoteClusterStateService;
96100
}
97101

98102
@Override

server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java

+57-2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.opensearch.cluster.ClusterStateObserver;
5050
import org.opensearch.cluster.NotClusterManagerException;
5151
import org.opensearch.cluster.block.ClusterBlockException;
52+
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
5253
import org.opensearch.cluster.coordination.FailedToCommitClusterStateException;
5354
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
5455
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
@@ -63,6 +64,8 @@
6364
import org.opensearch.core.common.io.stream.StreamInput;
6465
import org.opensearch.core.common.io.stream.Writeable;
6566
import org.opensearch.discovery.ClusterManagerNotDiscoveredException;
67+
import org.opensearch.gateway.remote.ClusterMetadataManifest;
68+
import org.opensearch.gateway.remote.RemoteClusterStateService;
6669
import org.opensearch.node.NodeClosedException;
6770
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
6871
import org.opensearch.tasks.Task;
@@ -74,6 +77,7 @@
7477
import org.opensearch.transport.TransportService;
7578

7679
import java.io.IOException;
80+
import java.util.Optional;
7781
import java.util.function.BiConsumer;
7882
import java.util.function.Consumer;
7983
import java.util.function.Predicate;
@@ -95,6 +99,8 @@ public abstract class TransportClusterManagerNodeAction<Request extends ClusterM
9599
protected final ClusterService clusterService;
96100
protected final IndexNameExpressionResolver indexNameExpressionResolver;
97101

102+
protected RemoteClusterStateService remoteClusterStateService;
103+
98104
private final String executor;
99105

100106
protected TransportClusterManagerNodeAction(
@@ -378,9 +384,12 @@ public void handleResponse(GetTermVersionResponse response) {
378384
response.getClusterStateTermVersion(),
379385
isLatestClusterStatePresentOnLocalNode
380386
);
381-
if (isLatestClusterStatePresentOnLocalNode) {
382-
onLatestLocalState.accept(clusterState);
387+
388+
ClusterState stateFromNode = getStateFromLocalNode(response);
389+
if (stateFromNode != null) {
390+
onLatestLocalState.accept(stateFromNode);
383391
} else {
392+
// fallback to clusterManager
384393
onStaleLocalState.accept(clusterManagerNode, clusterState);
385394
}
386395
}
@@ -405,6 +414,52 @@ public GetTermVersionResponse read(StreamInput in) throws IOException {
405414
};
406415
}
407416

417+
private ClusterState getStateFromLocalNode(GetTermVersionResponse termVersionResponse) {
418+
ClusterStateTermVersion termVersion = termVersionResponse.getClusterStateTermVersion();
419+
ClusterState appliedState = clusterService.state();
420+
if (termVersion.equals(new ClusterStateTermVersion(appliedState))) {
421+
logger.trace("Using the applied State from local, ClusterStateTermVersion {}", termVersion);
422+
return appliedState;
423+
}
424+
425+
ClusterState preCommitState = clusterService.preCommitState();
426+
if (preCommitState != null && termVersion.equals(new ClusterStateTermVersion(preCommitState))) {
427+
logger.trace("Using the published state from local, ClusterStateTermVersion {}", termVersion);
428+
return preCommitState;
429+
}
430+
431+
if (remoteClusterStateService != null && termVersionResponse.isStatePresentInRemote()) {
432+
try {
433+
ClusterStateTermVersion clusterStateTermVersion = termVersionResponse.getClusterStateTermVersion();
434+
Optional<ClusterMetadataManifest> clusterMetadataManifest = remoteClusterStateService
435+
.getClusterMetadataManifestByTermVersion(
436+
clusterStateTermVersion.getClusterName().value(),
437+
clusterStateTermVersion.getClusterUUID(),
438+
clusterStateTermVersion.getTerm(),
439+
clusterStateTermVersion.getVersion()
440+
);
441+
if (clusterMetadataManifest.isEmpty()) {
442+
logger.trace("could not find manifest in remote-store for ClusterStateTermVersion {}", termVersion);
443+
return null;
444+
}
445+
ClusterState clusterStateFromRemote = remoteClusterStateService.getClusterStateForManifest(
446+
appliedState.getClusterName().value(),
447+
clusterMetadataManifest.get(),
448+
appliedState.nodes().getLocalNode().getId(),
449+
true
450+
);
451+
452+
if (clusterStateFromRemote != null) {
453+
logger.trace("Using the remote cluster-state fetched from local node, ClusterStateTermVersion {}", termVersion);
454+
return clusterStateFromRemote;
455+
}
456+
} catch (Exception e) {
457+
logger.trace("Error while fetching from remote cluster state", e);
458+
}
459+
}
460+
return null;
461+
}
462+
408463
private boolean checkForBlock(Request request, ClusterState localClusterState) {
409464
final ClusterBlockException blockException = checkBlock(request, localClusterState);
410465
if (blockException != null) {

0 commit comments

Comments
 (0)