Skip to content

Commit 7ad3017

Browse files
authored
Light weight Transport action to verify local term before fetching cluster-state from remote (opensearch-project#12252)
Signed-off-by: Rajiv Kumar Vaidyanathan <rajivkv@amazon.com>
1 parent 2908621 commit 7ad3017

16 files changed

+1111
-62
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
120120
- [Admission Control] Integrated IO Based AdmissionController to AdmissionControl Framework ([#12583](https://github.com/opensearch-project/OpenSearch/pull/12583))
121121
- Introduce a new setting `index.check_pending_flush.enabled` to expose the ability to disable the check for pending flushes by write threads ([#12710](https://github.com/opensearch-project/OpenSearch/pull/12710))
122122
- Built-in secure transports support ([#12435](https://github.com/opensearch-project/OpenSearch/pull/12435))
123+
- Lightweight Transport action to verify local term before fetching cluster-state from remote ([#12252](https://github.com/opensearch-project/OpenSearch/pull/12252/))
123124

124125
### Dependencies
125126
- Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.state;
10+
11+
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
12+
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
13+
import org.opensearch.action.admin.cluster.state.term.GetTermVersionAction;
14+
import org.opensearch.action.admin.cluster.state.term.GetTermVersionResponse;
15+
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
16+
import org.opensearch.cluster.ClusterName;
17+
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
18+
import org.opensearch.cluster.metadata.IndexMetadata;
19+
import org.opensearch.common.settings.Settings;
20+
import org.opensearch.index.mapper.MapperService;
21+
import org.opensearch.plugins.Plugin;
22+
import org.opensearch.test.OpenSearchIntegTestCase;
23+
import org.opensearch.test.transport.MockTransportService;
24+
import org.opensearch.transport.TransportService;
25+
26+
import java.util.ArrayList;
27+
import java.util.Collection;
28+
import java.util.Collections;
29+
import java.util.List;
30+
import java.util.Map;
31+
import java.util.concurrent.atomic.AtomicBoolean;
32+
import java.util.stream.IntStream;
33+
34+
import static org.hamcrest.Matchers.is;
35+
36+
@SuppressWarnings("unchecked")
37+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
38+
public class FetchByTermVersionIT extends OpenSearchIntegTestCase {
39+
40+
AtomicBoolean isTermVersionCheckEnabled = new AtomicBoolean();
41+
42+
protected Collection<Class<? extends Plugin>> nodePlugins() {
43+
return List.of(MockTransportService.TestPlugin.class);
44+
}
45+
46+
AtomicBoolean forceFetchFromCM = new AtomicBoolean();
47+
48+
public void testClusterStateResponseFromDataNode() throws Exception {
49+
String cm = internalCluster().startClusterManagerOnlyNode();
50+
List<String> dns = internalCluster().startDataOnlyNodes(5);
51+
int numberOfShards = dns.size();
52+
stubClusterTermResponse(cm);
53+
54+
ensureClusterSizeConsistency();
55+
ensureGreen();
56+
57+
List<String> indices = new ArrayList<>();
58+
59+
// Create a large sized cluster-state by creating field mappings
60+
IntStream.range(0, 20).forEachOrdered(n -> {
61+
String index = "index_" + n;
62+
createIndex(
63+
index,
64+
Settings.builder()
65+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)
66+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
67+
.put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), Long.MAX_VALUE)
68+
.build()
69+
);
70+
indices.add(index);
71+
});
72+
IntStream.range(0, 5).forEachOrdered(n -> {
73+
List<String> mappings = new ArrayList<>();
74+
for (int i = 0; i < 2000; i++) {
75+
mappings.add("t-123456789-123456789-" + n + "-" + i);
76+
mappings.add("type=keyword");
77+
}
78+
PutMappingRequest request = new PutMappingRequest().source(mappings.toArray(new String[0]))
79+
.indices(indices.toArray(new String[0]));
80+
internalCluster().dataNodeClient().admin().indices().putMapping(request).actionGet();
81+
});
82+
ensureGreen();
83+
84+
ClusterStateResponse stateResponseM = internalCluster().clusterManagerClient()
85+
.admin()
86+
.cluster()
87+
.state(new ClusterStateRequest())
88+
.actionGet();
89+
90+
waitUntil(() -> {
91+
ClusterStateResponse stateResponseD = internalCluster().dataNodeClient()
92+
.admin()
93+
.cluster()
94+
.state(new ClusterStateRequest())
95+
.actionGet();
96+
return stateResponseD.getState().stateUUID().equals(stateResponseM.getState().stateUUID());
97+
});
98+
// cluster state response time with term check enabled on datanode
99+
isTermVersionCheckEnabled.set(true);
100+
{
101+
List<Long> latencies = new ArrayList<>();
102+
IntStream.range(0, 50).forEachOrdered(n1 -> {
103+
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
104+
long start = System.currentTimeMillis();
105+
ClusterStateResponse stateResponse = dataNodeClient().admin().cluster().state(clusterStateRequest).actionGet();
106+
latencies.add(System.currentTimeMillis() - start);
107+
assertThat(stateResponse.getClusterName().value(), is(internalCluster().getClusterName()));
108+
assertThat(stateResponse.getState().nodes().getSize(), is(internalCluster().getNodeNames().length));
109+
assertThat(stateResponse.getState().metadata().indices().size(), is(indices.size()));
110+
Map<String, Object> fieldMappings = (Map<String, Object>) stateResponse.getState()
111+
.metadata()
112+
.index(indices.get(0))
113+
.mapping()
114+
.sourceAsMap()
115+
.get("properties");
116+
117+
assertThat(fieldMappings.size(), is(10000));
118+
});
119+
Collections.sort(latencies);
120+
121+
logger.info("cluster().state() fetch with Term Version enabled took {} milliseconds", (latencies.get(latencies.size() / 2)));
122+
}
123+
// cluster state response time with term check disabled on datanode
124+
isTermVersionCheckEnabled.set(false);
125+
{
126+
List<Long> latencies = new ArrayList<>();
127+
IntStream.range(0, 50).forEachOrdered(n1 -> {
128+
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
129+
long start = System.currentTimeMillis();
130+
ClusterStateResponse stateResponse = dataNodeClient().admin().cluster().state(clusterStateRequest).actionGet();
131+
latencies.add(System.currentTimeMillis() - start);
132+
assertThat(stateResponse.getClusterName().value(), is(internalCluster().getClusterName()));
133+
assertThat(stateResponse.getState().nodes().getSize(), is(internalCluster().getNodeNames().length));
134+
assertThat(stateResponse.getState().metadata().indices().size(), is(indices.size()));
135+
Map<String, Object> typeProperties = (Map<String, Object>) stateResponse.getState()
136+
.metadata()
137+
.index(indices.get(0))
138+
.mapping()
139+
.sourceAsMap()
140+
.get("properties");
141+
assertThat(typeProperties.size(), is(10000));
142+
143+
});
144+
Collections.sort(latencies);
145+
logger.info("cluster().state() fetch with Term Version disabled took {} milliseconds", (latencies.get(latencies.size() / 2)));
146+
}
147+
148+
}
149+
150+
private void stubClusterTermResponse(String master) {
151+
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master);
152+
primaryService.addRequestHandlingBehavior(GetTermVersionAction.NAME, (handler, request, channel, task) -> {
153+
if (isTermVersionCheckEnabled.get()) {
154+
handler.messageReceived(request, channel, task);
155+
} else {
156+
// always return response that does not match
157+
channel.sendResponse(new GetTermVersionResponse(new ClusterStateTermVersion(new ClusterName("test"), "1", -1, -1)));
158+
}
159+
});
160+
}
161+
}

server/src/main/java/org/opensearch/action/ActionModule.java

+3
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@
107107
import org.opensearch.action.admin.cluster.snapshots.status.TransportSnapshotsStatusAction;
108108
import org.opensearch.action.admin.cluster.state.ClusterStateAction;
109109
import org.opensearch.action.admin.cluster.state.TransportClusterStateAction;
110+
import org.opensearch.action.admin.cluster.state.term.GetTermVersionAction;
111+
import org.opensearch.action.admin.cluster.state.term.TransportGetTermVersionAction;
110112
import org.opensearch.action.admin.cluster.stats.ClusterStatsAction;
111113
import org.opensearch.action.admin.cluster.stats.TransportClusterStatsAction;
112114
import org.opensearch.action.admin.cluster.storedscripts.DeleteStoredScriptAction;
@@ -614,6 +616,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
614616
actions.register(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class);
615617
actions.register(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class);
616618
actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
619+
actions.register(GetTermVersionAction.INSTANCE, TransportGetTermVersionAction.class);
617620
actions.register(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class);
618621
actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);
619622
actions.register(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class);

server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,12 @@ protected void clusterManagerOperation(
125125
? clusterState -> true
126126
: clusterState -> clusterState.metadata().version() >= request.waitForMetadataVersion();
127127

128+
// action will be executed on local node, if either the request is local only (or) the local node has the same cluster-state as
129+
// ClusterManager
128130
final Predicate<ClusterState> acceptableClusterStateOrNotMasterPredicate = request.local()
129-
? acceptableClusterStatePredicate
130-
: acceptableClusterStatePredicate.or(clusterState -> clusterState.nodes().isLocalNodeElectedClusterManager() == false);
131+
|| !state.nodes().isLocalNodeElectedClusterManager()
132+
? acceptableClusterStatePredicate
133+
: acceptableClusterStatePredicate.or(clusterState -> clusterState.nodes().isLocalNodeElectedClusterManager() == false);
131134

132135
if (acceptableClusterStatePredicate.test(state)) {
133136
ActionListener.completeWith(listener, () -> buildResponse(request, state));
@@ -231,4 +234,8 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request, fi
231234
return new ClusterStateResponse(currentState.getClusterName(), builder.build(), false);
232235
}
233236

237+
@Override
238+
protected boolean localExecuteSupportedByAction() {
239+
return true;
240+
}
234241
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.cluster.state.term;
10+
11+
import org.opensearch.action.ActionType;
12+
13+
/**
14+
* Transport action for fetching cluster term and version
15+
*
16+
* @opensearch.internal
17+
*/
18+
public class GetTermVersionAction extends ActionType<GetTermVersionResponse> {
19+
20+
public static final GetTermVersionAction INSTANCE = new GetTermVersionAction();
21+
public static final String NAME = "cluster:monitor/term";
22+
23+
private GetTermVersionAction() {
24+
super(NAME, GetTermVersionResponse::new);
25+
}
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.cluster.state.term;
10+
11+
import org.opensearch.action.ActionRequestValidationException;
12+
import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest;
13+
import org.opensearch.core.common.io.stream.StreamInput;
14+
15+
import java.io.IOException;
16+
17+
/**
18+
* Request object to get cluster term and version
19+
*
20+
* @opensearch.internal
21+
*/
22+
public class GetTermVersionRequest extends ClusterManagerNodeReadRequest<GetTermVersionRequest> {
23+
24+
public GetTermVersionRequest() {}
25+
26+
public GetTermVersionRequest(StreamInput in) throws IOException {
27+
super(in);
28+
}
29+
30+
@Override
31+
public ActionRequestValidationException validate() {
32+
return null;
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.cluster.state.term;
10+
11+
import org.opensearch.cluster.ClusterState;
12+
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
13+
import org.opensearch.core.action.ActionResponse;
14+
import org.opensearch.core.common.io.stream.StreamInput;
15+
import org.opensearch.core.common.io.stream.StreamOutput;
16+
17+
import java.io.IOException;
18+
19+
/**
20+
* Response object of cluster term
21+
*
22+
* @opensearch.internal
23+
*/
24+
public class GetTermVersionResponse extends ActionResponse {
25+
26+
private final ClusterStateTermVersion clusterStateTermVersion;
27+
28+
public GetTermVersionResponse(ClusterStateTermVersion clusterStateTermVersion) {
29+
this.clusterStateTermVersion = clusterStateTermVersion;
30+
}
31+
32+
public GetTermVersionResponse(StreamInput in) throws IOException {
33+
super(in);
34+
this.clusterStateTermVersion = new ClusterStateTermVersion(in);
35+
}
36+
37+
@Override
38+
public void writeTo(StreamOutput out) throws IOException {
39+
clusterStateTermVersion.writeTo(out);
40+
}
41+
42+
public ClusterStateTermVersion getClusterStateTermVersion() {
43+
return clusterStateTermVersion;
44+
}
45+
46+
public boolean matches(ClusterState clusterState) {
47+
return clusterStateTermVersion != null && clusterStateTermVersion.equals(new ClusterStateTermVersion(clusterState));
48+
}
49+
50+
}

0 commit comments

Comments
 (0)