Skip to content

Commit c5d12e2

Browse files
committed
Cancellation support for _cat/nodes and optimize it
Signed-off-by: kkewwei <kewei.11@bytedance.com> Signed-off-by: kkewwei <kkewwei@163.com>
1 parent abe2333 commit c5d12e2

File tree

13 files changed

+605
-47
lines changed

13 files changed

+605
-47
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2626
### Changed
2727
- Convert transport-reactor-netty4 to use gradle version catalog [#17233](https://github.com/opensearch-project/OpenSearch/pull/17233)
2828
- Increase force merge threads to 1/8th of cores [#17255](https://github.com/opensearch-project/OpenSearch/pull/17255)
29+
- Cancellation support for cat/nodes and optimize it ([#14853](https://github.com/opensearch-project/OpenSearch/pull/14853))
2930

3031
### Deprecated
3132

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.http;
10+
11+
import org.apache.hc.core5.http.ParseException;
12+
import org.apache.hc.core5.http.io.entity.EntityUtils;
13+
import org.opensearch.client.Request;
14+
import org.opensearch.client.Response;
15+
import org.opensearch.client.RestClient;
16+
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
17+
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
18+
19+
import java.io.IOException;
20+
21+
import static org.apache.hc.core5.http.HttpStatus.SC_OK;
22+
import static org.hamcrest.Matchers.containsString;
23+
24+
@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 5, numClientNodes = 0)
25+
public class HttpCatIT extends HttpSmokeTestCase {
26+
27+
public void testdoCatRequest() throws IOException {
28+
try (RestClient restClient = getRestClient()) {
29+
int nodesCount = restClient.getNodes().size();
30+
assertEquals(5, nodesCount);
31+
32+
// to make sure the timeout is working
33+
for (int i = 0; i < 5; i++) {
34+
sendRequest(restClient, 30, nodesCount);
35+
}
36+
37+
// no timeout
38+
for (int i = 0; i < 5; i++) {
39+
sendRequest(restClient, -1, nodesCount);
40+
}
41+
42+
for (int i = 1; i < 5; i++) {
43+
long timeout = randomInt(300);
44+
sendRequest(restClient, timeout, nodesCount);
45+
}
46+
}
47+
}
48+
49+
private void sendRequest(RestClient restClient, long timeout, int nodesCount) {
50+
Request nodesRequest;
51+
if (timeout < 0) {
52+
nodesRequest = new Request("GET", "/_cat/nodes");
53+
} else {
54+
nodesRequest = new Request("GET", "/_cat/nodes?timeout=" + timeout + "ms");
55+
}
56+
try {
57+
Response response = restClient.performRequest(nodesRequest);
58+
assertEquals(SC_OK, response.getStatusLine().getStatusCode());
59+
String result = EntityUtils.toString(response.getEntity());
60+
String[] NodeInfos = result.split("\n");
61+
assertEquals(nodesCount, NodeInfos.length);
62+
} catch (IOException | ParseException e) {
63+
// it means that it costs too long to get ClusterState from the master.
64+
assertThat(e.getMessage(), containsString("There is not enough time to obtain nodesInfo metric from the cluster manager"));
65+
}
66+
}
67+
68+
}

server/src/main/java/org/opensearch/action/ActionModule.java

+3
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import org.opensearch.action.admin.cluster.decommission.awareness.put.TransportDecommissionAction;
4949
import org.opensearch.action.admin.cluster.health.ClusterHealthAction;
5050
import org.opensearch.action.admin.cluster.health.TransportClusterHealthAction;
51+
import org.opensearch.action.admin.cluster.node.Nodes.CatNodesAction;
52+
import org.opensearch.action.admin.cluster.node.Nodes.TransportCatNodesAction;
5153
import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction;
5254
import org.opensearch.action.admin.cluster.node.hotthreads.TransportNodesHotThreadsAction;
5355
import org.opensearch.action.admin.cluster.node.info.NodesInfoAction;
@@ -664,6 +666,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
664666
actions.register(ClusterDeleteWeightedRoutingAction.INSTANCE, TransportDeleteWeightedRoutingAction.class);
665667
actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
666668
actions.register(CatShardsAction.INSTANCE, TransportCatShardsAction.class);
669+
actions.register(CatNodesAction.INSTANCE, TransportCatNodesAction.class);
667670
actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
668671
actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
669672
actions.register(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.cluster.node.Nodes;
10+
11+
import org.opensearch.action.ActionType;
12+
13+
/**
14+
* Transport action for cat nodes
15+
*
16+
* @opensearch.internal
17+
*/
18+
public class CatNodesAction extends ActionType<CatNodesResponse> {
19+
public static final CatNodesAction INSTANCE = new CatNodesAction();
20+
public static final String NAME = "cluster:monitor/nodes/cat";
21+
22+
public CatNodesAction() {
23+
super(NAME, CatNodesResponse::new);
24+
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.cluster.node.Nodes;
10+
11+
import org.opensearch.action.ActionRequestValidationException;
12+
import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest;
13+
import org.opensearch.common.unit.TimeValue;
14+
import org.opensearch.core.common.io.stream.StreamInput;
15+
import org.opensearch.core.tasks.TaskId;
16+
import org.opensearch.rest.action.admin.cluster.ClusterAdminTask;
17+
18+
import java.io.IOException;
19+
import java.util.Map;
20+
21+
/**
22+
* A request of _cat/nodes.
23+
*
24+
* @opensearch.api
25+
*/
26+
public class CatNodesRequest extends ClusterManagerNodeReadRequest<CatNodesRequest> {
27+
28+
private TimeValue cancelAfterTimeInterval;
29+
private long timeout = -1;
30+
31+
public CatNodesRequest() {}
32+
33+
public CatNodesRequest(StreamInput in) throws IOException {
34+
super(in);
35+
}
36+
37+
public void setCancelAfterTimeInterval(TimeValue timeout) {
38+
this.cancelAfterTimeInterval = timeout;
39+
}
40+
41+
public TimeValue getCancelAfterTimeInterval() {
42+
return cancelAfterTimeInterval;
43+
}
44+
45+
public void setTimeout(long timeout) {
46+
this.timeout = timeout;
47+
}
48+
49+
public long getTimeout() {
50+
return timeout;
51+
}
52+
53+
@Override
54+
public ActionRequestValidationException validate() {
55+
return null;
56+
}
57+
58+
@Override
59+
public ClusterAdminTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
60+
return new ClusterAdminTask(id, type, action, parentTaskId, headers, this.cancelAfterTimeInterval);
61+
}
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.cluster.node.Nodes;
10+
11+
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
12+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
13+
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
14+
import org.opensearch.core.action.ActionResponse;
15+
import org.opensearch.core.common.io.stream.StreamInput;
16+
import org.opensearch.core.common.io.stream.StreamOutput;
17+
18+
import java.io.IOException;
19+
20+
/**
21+
* A response of a cat shards request.
22+
*
23+
* @opensearch.api
24+
*/
25+
public class CatNodesResponse extends ActionResponse {
26+
27+
private ClusterStateResponse clusterStateResponse;
28+
private NodesInfoResponse nodesInfoResponse;
29+
private NodesStatsResponse nodesStatsResponse;
30+
31+
public CatNodesResponse(
32+
ClusterStateResponse clusterStateResponse,
33+
NodesInfoResponse nodesInfoResponse,
34+
NodesStatsResponse nodesStatsResponse
35+
) {
36+
this.clusterStateResponse = clusterStateResponse;
37+
this.nodesInfoResponse = nodesInfoResponse;
38+
this.nodesStatsResponse = nodesStatsResponse;
39+
}
40+
41+
public CatNodesResponse(StreamInput in) throws IOException {
42+
super(in);
43+
}
44+
45+
@Override
46+
public void writeTo(StreamOutput out) throws IOException {
47+
clusterStateResponse.writeTo(out);
48+
nodesInfoResponse.writeTo(out);
49+
nodesStatsResponse.writeTo(out);
50+
}
51+
52+
public NodesStatsResponse getNodesStatsResponse() {
53+
return nodesStatsResponse;
54+
}
55+
56+
public NodesInfoResponse getNodesInfoResponse() {
57+
return nodesInfoResponse;
58+
}
59+
60+
public ClusterStateResponse getClusterStateResponse() {
61+
return clusterStateResponse;
62+
}
63+
}

0 commit comments

Comments
 (0)