Skip to content

Commit feaa115

Browse files
Optimise TransportNodesAction to not send DiscoveryNodes for NodeStat… (#14749) (#14855)
* Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call (cherry picked from commit 0040f4b) Signed-off-by: Pranshu Shukla <pranshushukla06@gmail.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 11fd0dd commit feaa115

15 files changed

+528
-11
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1919
- Add matchesPluginSystemIndexPattern to SystemIndexRegistry ([#14750](https://github.com/opensearch-project/OpenSearch/pull/14750))
2020
- Add Plugin interface for loading application based configuration templates (([#14659](https://github.com/opensearch-project/OpenSearch/issues/14659)))
2121
- Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790)))
22+
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))
2223

2324
### Dependencies
2425
- Update to Apache Lucene 9.11.1 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042), [#14576](https://github.com/opensearch-project/OpenSearch/pull/14576))

server/src/main/java/org/opensearch/action/admin/cluster/node/info/TransportNodesInfoAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ protected NodeInfo nodeOperation(NodeInfoRequest nodeRequest) {
129129
*/
130130
public static class NodeInfoRequest extends BaseNodeRequest {
131131

132-
NodesInfoRequest request;
132+
protected NodesInfoRequest request;
133133

134134
public NodeInfoRequest(StreamInput in) throws IOException {
135135
super(in);

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
140140
*/
141141
public static class NodeStatsRequest extends BaseNodeRequest {
142142

143-
NodesStatsRequest request;
143+
protected NodesStatsRequest request;
144144

145145
public NodeStatsRequest(StreamInput in) throws IOException {
146146
super(in);

server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
229229
*/
230230
public static class ClusterStatsNodeRequest extends BaseNodeRequest {
231231

232-
ClusterStatsRequest request;
232+
protected ClusterStatsRequest request;
233233

234234
public ClusterStatsNodeRequest(StreamInput in) throws IOException {
235235
super(in);

server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java

+16
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,14 @@ public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>
6565
* will be ignored and this will be used.
6666
* */
6767
private DiscoveryNode[] concreteNodes;
68+
69+
/**
70+
* Since do not use the discovery nodes coming from the request in all code paths following a request extended off from
71+
* BaseNodeRequest, we do not require it to sent around across all nodes.
72+
*
73+
* Setting default behavior as `true` but can be explicitly changed in requests that do not require.
74+
*/
75+
private boolean includeDiscoveryNodes = true;
6876
private final TimeValue DEFAULT_TIMEOUT_SECS = TimeValue.timeValueSeconds(30);
6977

7078
private TimeValue timeout;
@@ -119,6 +127,14 @@ public void setConcreteNodes(DiscoveryNode[] concreteNodes) {
119127
this.concreteNodes = concreteNodes;
120128
}
121129

130+
public void setIncludeDiscoveryNodes(boolean value) {
131+
includeDiscoveryNodes = value;
132+
}
133+
134+
public boolean getIncludeDiscoveryNodes() {
135+
return includeDiscoveryNodes;
136+
}
137+
122138
@Override
123139
public ActionRequestValidationException validate() {
124140
return null;

server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ class AsyncAction {
226226
private final NodesRequest request;
227227
private final ActionListener<NodesResponse> listener;
228228
private final AtomicReferenceArray<Object> responses;
229+
private final DiscoveryNode[] concreteNodes;
229230
private final AtomicInteger counter = new AtomicInteger();
230231
private final Task task;
231232

@@ -238,10 +239,18 @@ class AsyncAction {
238239
assert request.concreteNodes() != null;
239240
}
240241
this.responses = new AtomicReferenceArray<>(request.concreteNodes().length);
242+
this.concreteNodes = request.concreteNodes();
243+
244+
if (request.getIncludeDiscoveryNodes() == false) {
245+
// As we transfer the ownership of discovery nodes to route the request to into the AsyncAction class, we
246+
// remove the list of DiscoveryNodes from the request. This reduces the payload of the request and improves
247+
// the number of concrete nodes in the memory.
248+
request.setConcreteNodes(null);
249+
}
241250
}
242251

243252
void start() {
244-
final DiscoveryNode[] nodes = request.concreteNodes();
253+
final DiscoveryNode[] nodes = this.concreteNodes;
245254
if (nodes.length == 0) {
246255
// nothing to notify
247256
threadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses)));
@@ -260,7 +269,6 @@ void start() {
260269
if (task != null) {
261270
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
262271
}
263-
264272
transportService.sendRequest(
265273
node,
266274
getTransportNodeAction(node),

server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public String getName() {
6666
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
6767
ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null));
6868
clusterStatsRequest.timeout(request.param("timeout"));
69+
clusterStatsRequest.setIncludeDiscoveryNodes(false);
6970
return channel -> client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel));
7071
}
7172

server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
8888
final NodesInfoRequest nodesInfoRequest = prepareRequest(request);
8989
nodesInfoRequest.timeout(request.param("timeout"));
9090
settingsFilter.addFilterSettingParams(request);
91-
91+
nodesInfoRequest.setIncludeDiscoveryNodes(false);
9292
return channel -> client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel));
9393
}
9494

server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java

+1
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
232232
// If no levels are passed in this results in an empty array.
233233
String[] levels = Strings.splitStringByCommaToArray(request.param("level"));
234234
nodesStatsRequest.indices().setLevels(levels);
235+
nodesStatsRequest.setIncludeDiscoveryNodes(false);
235236

236237
return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
237238
}

server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java

+2
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli
125125
public void processResponse(final ClusterStateResponse clusterStateResponse) {
126126
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
127127
nodesInfoRequest.timeout(request.param("timeout"));
128+
nodesInfoRequest.setIncludeDiscoveryNodes(false);
128129
nodesInfoRequest.clear()
129130
.addMetrics(
130131
NodesInfoRequest.Metric.JVM.metricName(),
@@ -137,6 +138,7 @@ public void processResponse(final ClusterStateResponse clusterStateResponse) {
137138
public void processResponse(final NodesInfoResponse nodesInfoResponse) {
138139
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
139140
nodesStatsRequest.timeout(request.param("timeout"));
141+
nodesStatsRequest.setIncludeDiscoveryNodes(false);
140142
nodesStatsRequest.clear()
141143
.indices(true)
142144
.addMetrics(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action;
10+
11+
import org.opensearch.client.node.NodeClient;
12+
import org.opensearch.common.settings.Settings;
13+
import org.opensearch.common.settings.SettingsFilter;
14+
import org.opensearch.rest.action.admin.cluster.RestClusterStatsAction;
15+
import org.opensearch.rest.action.admin.cluster.RestNodesInfoAction;
16+
import org.opensearch.rest.action.admin.cluster.RestNodesStatsAction;
17+
import org.opensearch.test.OpenSearchTestCase;
18+
import org.opensearch.test.rest.FakeRestRequest;
19+
import org.opensearch.threadpool.TestThreadPool;
20+
import org.junit.After;
21+
22+
import java.util.Collections;
23+
24+
public class RestStatsActionTests extends OpenSearchTestCase {
25+
private final TestThreadPool threadPool = new TestThreadPool(RestStatsActionTests.class.getName());
26+
private final NodeClient client = new NodeClient(Settings.EMPTY, threadPool);
27+
28+
@After
29+
public void terminateThreadPool() {
30+
terminate(threadPool);
31+
}
32+
33+
public void testClusterStatsActionPrepareRequestNoError() {
34+
RestClusterStatsAction action = new RestClusterStatsAction();
35+
try {
36+
action.prepareRequest(new FakeRestRequest(), client);
37+
} catch (Throwable t) {
38+
fail(t.getMessage());
39+
}
40+
}
41+
42+
public void testNodesStatsActionPrepareRequestNoError() {
43+
RestNodesStatsAction action = new RestNodesStatsAction();
44+
try {
45+
action.prepareRequest(new FakeRestRequest(), client);
46+
} catch (Throwable t) {
47+
fail(t.getMessage());
48+
}
49+
}
50+
51+
public void testNodesInfoActionPrepareRequestNoError() {
52+
RestNodesInfoAction action = new RestNodesInfoAction(new SettingsFilter(Collections.singleton("foo.filtered")));
53+
try {
54+
action.prepareRequest(new FakeRestRequest(), client);
55+
} catch (Throwable t) {
56+
fail(t.getMessage());
57+
}
58+
}
59+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.support.nodes;
10+
11+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
12+
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest;
13+
import org.opensearch.action.admin.cluster.stats.TransportClusterStatsAction;
14+
import org.opensearch.action.support.ActionFilters;
15+
import org.opensearch.action.support.PlainActionFuture;
16+
import org.opensearch.cluster.node.DiscoveryNode;
17+
import org.opensearch.cluster.service.ClusterService;
18+
import org.opensearch.common.io.stream.BytesStreamOutput;
19+
import org.opensearch.core.common.io.stream.StreamInput;
20+
import org.opensearch.indices.IndicesService;
21+
import org.opensearch.node.NodeService;
22+
import org.opensearch.test.transport.CapturingTransport;
23+
import org.opensearch.threadpool.ThreadPool;
24+
import org.opensearch.transport.TransportService;
25+
26+
import java.io.IOException;
27+
import java.util.ArrayList;
28+
import java.util.Collection;
29+
import java.util.Collections;
30+
import java.util.HashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
34+
public class TransportClusterStatsActionTests extends TransportNodesActionTests {
35+
36+
/**
37+
* By default, we send discovery nodes list to each request that is sent across from the coordinator node. This
38+
* behavior is asserted in this test.
39+
*/
40+
public void testClusterStatsActionWithRetentionOfDiscoveryNodesList() {
41+
ClusterStatsRequest request = new ClusterStatsRequest();
42+
request.setIncludeDiscoveryNodes(true);
43+
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);
44+
45+
assertNotNull(combinedSentRequest);
46+
combinedSentRequest.forEach((node, capturedRequestList) -> {
47+
assertNotNull(capturedRequestList);
48+
capturedRequestList.forEach(sentRequest -> {
49+
assertNotNull(sentRequest.getDiscoveryNodes());
50+
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
51+
});
52+
});
53+
}
54+
55+
public void testClusterStatsActionWithPreFilledConcreteNodesAndWithRetentionOfDiscoveryNodesList() {
56+
ClusterStatsRequest request = new ClusterStatsRequest();
57+
Collection<DiscoveryNode> discoveryNodes = clusterService.state().getNodes().getNodes().values();
58+
request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new));
59+
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);
60+
61+
assertNotNull(combinedSentRequest);
62+
combinedSentRequest.forEach((node, capturedRequestList) -> {
63+
assertNotNull(capturedRequestList);
64+
capturedRequestList.forEach(sentRequest -> {
65+
assertNotNull(sentRequest.getDiscoveryNodes());
66+
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
67+
});
68+
});
69+
}
70+
71+
/**
72+
* In the optimized ClusterStats Request, we do not send the DiscoveryNodes List to each node. This behavior is
73+
* asserted in this test.
74+
*/
75+
public void testClusterStatsActionWithoutRetentionOfDiscoveryNodesList() {
76+
ClusterStatsRequest request = new ClusterStatsRequest();
77+
request.setIncludeDiscoveryNodes(false);
78+
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);
79+
80+
assertNotNull(combinedSentRequest);
81+
combinedSentRequest.forEach((node, capturedRequestList) -> {
82+
assertNotNull(capturedRequestList);
83+
capturedRequestList.forEach(sentRequest -> { assertNull(sentRequest.getDiscoveryNodes()); });
84+
});
85+
}
86+
87+
public void testClusterStatsActionWithPreFilledConcreteNodesAndWithoutRetentionOfDiscoveryNodesList() {
88+
ClusterStatsRequest request = new ClusterStatsRequest();
89+
Collection<DiscoveryNode> discoveryNodes = clusterService.state().getNodes().getNodes().values();
90+
request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new));
91+
request.setIncludeDiscoveryNodes(false);
92+
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);
93+
94+
assertNotNull(combinedSentRequest);
95+
combinedSentRequest.forEach((node, capturedRequestList) -> {
96+
assertNotNull(capturedRequestList);
97+
capturedRequestList.forEach(sentRequest -> { assertNull(sentRequest.getDiscoveryNodes()); });
98+
});
99+
}
100+
101+
private Map<String, List<MockClusterStatsNodeRequest>> performNodesInfoAction(ClusterStatsRequest request) {
102+
TransportNodesAction action = getTestTransportClusterStatsAction();
103+
PlainActionFuture<NodesStatsRequest> listener = new PlainActionFuture<>();
104+
action.new AsyncAction(null, request, listener).start();
105+
Map<String, List<CapturingTransport.CapturedRequest>> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear();
106+
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = new HashMap<>();
107+
108+
capturedRequests.forEach((node, capturedRequestList) -> {
109+
List<MockClusterStatsNodeRequest> sentRequestList = new ArrayList<>();
110+
111+
capturedRequestList.forEach(preSentRequest -> {
112+
BytesStreamOutput out = new BytesStreamOutput();
113+
try {
114+
TransportClusterStatsAction.ClusterStatsNodeRequest clusterStatsNodeRequestFromCoordinator =
115+
(TransportClusterStatsAction.ClusterStatsNodeRequest) preSentRequest.request;
116+
clusterStatsNodeRequestFromCoordinator.writeTo(out);
117+
StreamInput in = out.bytes().streamInput();
118+
MockClusterStatsNodeRequest mockClusterStatsNodeRequest = new MockClusterStatsNodeRequest(in);
119+
sentRequestList.add(mockClusterStatsNodeRequest);
120+
} catch (IOException e) {
121+
throw new RuntimeException(e);
122+
}
123+
});
124+
125+
combinedSentRequest.put(node, sentRequestList);
126+
});
127+
128+
return combinedSentRequest;
129+
}
130+
131+
private TestTransportClusterStatsAction getTestTransportClusterStatsAction() {
132+
return new TestTransportClusterStatsAction(
133+
THREAD_POOL,
134+
clusterService,
135+
transportService,
136+
nodeService,
137+
indicesService,
138+
new ActionFilters(Collections.emptySet())
139+
);
140+
}
141+
142+
private static class TestTransportClusterStatsAction extends TransportClusterStatsAction {
143+
public TestTransportClusterStatsAction(
144+
ThreadPool threadPool,
145+
ClusterService clusterService,
146+
TransportService transportService,
147+
NodeService nodeService,
148+
IndicesService indicesService,
149+
ActionFilters actionFilters
150+
) {
151+
super(threadPool, clusterService, transportService, nodeService, indicesService, actionFilters);
152+
}
153+
}
154+
155+
private static class MockClusterStatsNodeRequest extends TransportClusterStatsAction.ClusterStatsNodeRequest {
156+
157+
public MockClusterStatsNodeRequest(StreamInput in) throws IOException {
158+
super(in);
159+
}
160+
161+
public DiscoveryNode[] getDiscoveryNodes() {
162+
return this.request.concreteNodes();
163+
}
164+
}
165+
}

server/src/test/java/org/opensearch/action/support/nodes/TransportNodesActionTests.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import org.opensearch.core.common.io.stream.StreamInput;
4747
import org.opensearch.core.common.io.stream.StreamOutput;
4848
import org.opensearch.core.common.io.stream.Writeable;
49+
import org.opensearch.indices.IndicesService;
50+
import org.opensearch.node.NodeService;
4951
import org.opensearch.telemetry.tracing.noop.NoopTracer;
5052
import org.opensearch.test.OpenSearchTestCase;
5153
import org.opensearch.test.transport.CapturingTransport;
@@ -75,11 +77,12 @@
7577

7678
public class TransportNodesActionTests extends OpenSearchTestCase {
7779

78-
private static ThreadPool THREAD_POOL;
79-
80-
private ClusterService clusterService;
81-
private CapturingTransport transport;
82-
private TransportService transportService;
80+
protected static ThreadPool THREAD_POOL;
81+
protected ClusterService clusterService;
82+
protected CapturingTransport transport;
83+
protected TransportService transportService;
84+
protected NodeService nodeService;
85+
protected IndicesService indicesService;
8386

8487
public void testRequestIsSentToEachNode() throws Exception {
8588
TransportNodesAction action = getTestTransportNodesAction();

0 commit comments

Comments
 (0)