Skip to content

Commit 2069bd8

Browse files
authored
Integrate with CPU admission controller for cluster-manager Read API's. (opensearch-project#12496)
* Integrate with CPU admission controller for cluster-manager Read API's. The admission control is enforced at the transport layer. Signed-off-by: Rajiv Kumar Vaidyanathan <rajivkv@amazon.com>
1 parent f3d2bee commit 2069bd8

File tree

14 files changed

+405
-13
lines changed

14 files changed

+405
-13
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
121121
- Introduce a new setting `index.check_pending_flush.enabled` to expose the ability to disable the check for pending flushes by write threads ([#12710](https://github.com/opensearch-project/OpenSearch/pull/12710))
122122
- Built-in secure transports support ([#12435](https://github.com/opensearch-project/OpenSearch/pull/12435))
123123
- Lightweight Transport action to verify local term before fetching cluster-state from remote ([#12252](https://github.com/opensearch-project/OpenSearch/pull/12252/))
124+
- Integrate with admission controller for cluster-manager Read API. ([#12496](https://github.com/opensearch-project/OpenSearch/pull/12496))
124125

125126
### Dependencies
126127
- Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.ratelimitting.admissioncontrol;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest;
14+
import org.opensearch.action.admin.indices.alias.get.GetAliasesResponse;
15+
import org.opensearch.client.node.NodeClient;
16+
import org.opensearch.common.settings.Settings;
17+
import org.opensearch.common.unit.TimeValue;
18+
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
19+
import org.opensearch.core.rest.RestStatus;
20+
import org.opensearch.node.IoUsageStats;
21+
import org.opensearch.node.ResourceUsageCollectorService;
22+
import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
23+
import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController;
24+
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
25+
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
26+
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats;
27+
import org.opensearch.rest.AbstractRestChannel;
28+
import org.opensearch.rest.RestResponse;
29+
import org.opensearch.rest.action.admin.indices.RestGetAliasesAction;
30+
import org.opensearch.test.OpenSearchIntegTestCase;
31+
import org.opensearch.test.rest.FakeRestRequest;
32+
import org.junit.Before;
33+
34+
import java.util.HashMap;
35+
import java.util.Map;
36+
import java.util.concurrent.CountDownLatch;
37+
import java.util.concurrent.atomic.AtomicReference;
38+
39+
import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE;
40+
import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT;
41+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
42+
import static org.hamcrest.Matchers.equalTo;
43+
44+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
45+
public class AdmissionForClusterManagerIT extends OpenSearchIntegTestCase {
46+
47+
private static final Logger LOGGER = LogManager.getLogger(AdmissionForClusterManagerIT.class);
48+
49+
public static final String INDEX_NAME = "test_index";
50+
51+
private String clusterManagerNodeId;
52+
private String datanode;
53+
private ResourceUsageCollectorService cMResourceCollector;
54+
55+
private static final Settings DISABLE_ADMISSION_CONTROL = Settings.builder()
56+
.put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED.getMode())
57+
.build();
58+
59+
private static final Settings ENFORCE_ADMISSION_CONTROL = Settings.builder()
60+
.put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500))
61+
.put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED)
62+
.put(CLUSTER_ADMIN_CPU_USAGE_LIMIT.getKey(), 50)
63+
.build();
64+
65+
@Before
66+
public void init() {
67+
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(
68+
Settings.builder().put(DISABLE_ADMISSION_CONTROL).build()
69+
);
70+
datanode = internalCluster().startDataOnlyNode(Settings.builder().put(DISABLE_ADMISSION_CONTROL).build());
71+
72+
ensureClusterSizeConsistency();
73+
ensureGreen();
74+
75+
// Disable the automatic resource collection
76+
clusterManagerNodeId = internalCluster().clusterService(clusterManagerNode).localNode().getId();
77+
cMResourceCollector = internalCluster().getClusterManagerNodeInstance(ResourceUsageCollectorService.class);
78+
cMResourceCollector.stop();
79+
80+
// Enable admission control
81+
client().admin().cluster().prepareUpdateSettings().setTransientSettings(ENFORCE_ADMISSION_CONTROL).execute().actionGet();
82+
}
83+
84+
public void testAdmissionControlEnforced() throws Exception {
85+
cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98));
86+
87+
// Write API on ClusterManager
88+
assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}"));
89+
90+
// Read API on ClusterManager
91+
GetAliasesRequest aliasesRequest = new GetAliasesRequest();
92+
aliasesRequest.aliases("alias1");
93+
try {
94+
dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet();
95+
fail("expected failure");
96+
} catch (Exception e) {
97+
assertTrue(e instanceof OpenSearchRejectedExecutionException);
98+
assertTrue(e.getMessage().contains("CPU usage admission controller rejected the request"));
99+
assertTrue(e.getMessage().contains("[indices:admin/aliases/get]"));
100+
assertTrue(e.getMessage().contains("action-type [CLUSTER_ADMIN]"));
101+
}
102+
103+
client().admin().cluster().prepareUpdateSettings().setTransientSettings(DISABLE_ADMISSION_CONTROL).execute().actionGet();
104+
GetAliasesResponse getAliasesResponse = dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet();
105+
assertThat(getAliasesResponse.getAliases().get("test").size(), equalTo(1));
106+
107+
AdmissionControlService admissionControlServiceCM = internalCluster().getClusterManagerNodeInstance(AdmissionControlService.class);
108+
109+
AdmissionControllerStats admissionStats = getAdmissionControlStats(admissionControlServiceCM).get(
110+
CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER
111+
);
112+
113+
assertEquals(admissionStats.rejectionCount.get(AdmissionControlActionType.CLUSTER_ADMIN.getType()).longValue(), 1);
114+
assertNull(admissionStats.rejectionCount.get(AdmissionControlActionType.SEARCH.getType()));
115+
assertNull(admissionStats.rejectionCount.get(AdmissionControlActionType.INDEXING.getType()));
116+
}
117+
118+
public void testAdmissionControlEnabledOnNoBreach() throws InterruptedException {
119+
// CPU usage is less than threshold 50%
120+
cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 35, new IoUsageStats(98));
121+
122+
// Write API on ClusterManager
123+
assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}").execute().actionGet());
124+
125+
// Read API on ClusterManager
126+
GetAliasesRequest aliasesRequest = new GetAliasesRequest();
127+
aliasesRequest.aliases("alias1");
128+
GetAliasesResponse getAliasesResponse = dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet();
129+
assertThat(getAliasesResponse.getAliases().get("test").size(), equalTo(1));
130+
}
131+
132+
public void testAdmissionControlMonitorOnBreach() throws InterruptedException {
133+
admissionControlDisabledOnBreach(
134+
Settings.builder().put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.MONITOR.getMode()).build()
135+
);
136+
}
137+
138+
public void testAdmissionControlDisabledOnBreach() throws InterruptedException {
139+
admissionControlDisabledOnBreach(DISABLE_ADMISSION_CONTROL);
140+
}
141+
142+
public void admissionControlDisabledOnBreach(Settings admission) throws InterruptedException {
143+
client().admin().cluster().prepareUpdateSettings().setTransientSettings(admission).execute().actionGet();
144+
145+
cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 97, new IoUsageStats(98));
146+
147+
// Write API on ClusterManager
148+
assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}").execute().actionGet());
149+
150+
// Read API on ClusterManager
151+
GetAliasesRequest aliasesRequest = new GetAliasesRequest();
152+
aliasesRequest.aliases("alias1");
153+
GetAliasesResponse getAliasesResponse = dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet();
154+
assertThat(getAliasesResponse.getAliases().get("test").size(), equalTo(1));
155+
156+
}
157+
158+
public void testAdmissionControlResponseStatus() throws Exception {
159+
cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98));
160+
161+
// Write API on ClusterManager
162+
assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}"));
163+
164+
// Read API on ClusterManager
165+
FakeRestRequest aliasesRequest = new FakeRestRequest();
166+
aliasesRequest.params().put("name", "alias1");
167+
CountDownLatch waitForResponse = new CountDownLatch(1);
168+
AtomicReference<RestResponse> aliasResponse = new AtomicReference<>();
169+
AbstractRestChannel channel = new AbstractRestChannel(aliasesRequest, true) {
170+
171+
@Override
172+
public void sendResponse(RestResponse response) {
173+
waitForResponse.countDown();
174+
aliasResponse.set(response);
175+
}
176+
};
177+
178+
RestGetAliasesAction restHandler = internalCluster().getInstance(RestGetAliasesAction.class, datanode);
179+
restHandler.handleRequest(aliasesRequest, channel, internalCluster().getInstance(NodeClient.class, datanode));
180+
181+
waitForResponse.await();
182+
assertEquals(RestStatus.TOO_MANY_REQUESTS, aliasResponse.get().status());
183+
}
184+
185+
@Override
186+
public void tearDown() throws Exception {
187+
client().admin().cluster().prepareUpdateSettings().setTransientSettings(DISABLE_ADMISSION_CONTROL).execute().actionGet();
188+
super.tearDown();
189+
}
190+
191+
Map<String, AdmissionControllerStats> getAdmissionControlStats(AdmissionControlService admissionControlService) {
192+
Map<String, AdmissionControllerStats> acStats = new HashMap<>();
193+
for (AdmissionControllerStats admissionControllerStats : admissionControlService.stats().getAdmissionControllerStatsList()) {
194+
acStats.put(admissionControllerStats.getAdmissionControllerName(), admissionControllerStats);
195+
}
196+
return acStats;
197+
}
198+
}

server/src/main/java/org/opensearch/action/support/HandledTransportAction.java

+34-3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.opensearch.action.ActionRequest;
3535
import org.opensearch.core.action.ActionResponse;
3636
import org.opensearch.core.common.io.stream.Writeable;
37+
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
3738
import org.opensearch.tasks.Task;
3839
import org.opensearch.threadpool.ThreadPool;
3940
import org.opensearch.transport.TransportChannel;
@@ -65,7 +66,7 @@ protected HandledTransportAction(
6566
Writeable.Reader<Request> requestReader,
6667
String executor
6768
) {
68-
this(actionName, true, transportService, actionFilters, requestReader, executor);
69+
this(actionName, true, null, transportService, actionFilters, requestReader, executor);
6970
}
7071

7172
protected HandledTransportAction(
@@ -75,19 +76,49 @@ protected HandledTransportAction(
7576
ActionFilters actionFilters,
7677
Writeable.Reader<Request> requestReader
7778
) {
78-
this(actionName, canTripCircuitBreaker, transportService, actionFilters, requestReader, ThreadPool.Names.SAME);
79+
this(actionName, canTripCircuitBreaker, null, transportService, actionFilters, requestReader, ThreadPool.Names.SAME);
7980
}
8081

8182
protected HandledTransportAction(
8283
String actionName,
8384
boolean canTripCircuitBreaker,
85+
AdmissionControlActionType admissionControlActionType,
86+
TransportService transportService,
87+
ActionFilters actionFilters,
88+
Writeable.Reader<Request> requestReader
89+
) {
90+
this(
91+
actionName,
92+
canTripCircuitBreaker,
93+
admissionControlActionType,
94+
transportService,
95+
actionFilters,
96+
requestReader,
97+
ThreadPool.Names.SAME
98+
);
99+
}
100+
101+
protected HandledTransportAction(
102+
String actionName,
103+
boolean canTripCircuitBreaker,
104+
AdmissionControlActionType admissionControlActionType,
84105
TransportService transportService,
85106
ActionFilters actionFilters,
86107
Writeable.Reader<Request> requestReader,
87108
String executor
88109
) {
89110
super(actionName, actionFilters, transportService.getTaskManager());
90-
transportService.registerRequestHandler(actionName, executor, false, canTripCircuitBreaker, requestReader, new TransportHandler());
111+
112+
transportService.registerRequestHandler(
113+
actionName,
114+
executor,
115+
false,
116+
canTripCircuitBreaker,
117+
admissionControlActionType,
118+
requestReader,
119+
new TransportHandler()
120+
);
121+
91122
}
92123

93124
/**

server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java

+27-2
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import org.opensearch.core.common.io.stream.Writeable;
6565
import org.opensearch.discovery.ClusterManagerNotDiscoveredException;
6666
import org.opensearch.node.NodeClosedException;
67+
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
6768
import org.opensearch.tasks.Task;
6869
import org.opensearch.threadpool.ThreadPool;
6970
import org.opensearch.transport.ConnectTransportException;
@@ -105,7 +106,7 @@ protected TransportClusterManagerNodeAction(
105106
Writeable.Reader<Request> request,
106107
IndexNameExpressionResolver indexNameExpressionResolver
107108
) {
108-
this(actionName, true, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver);
109+
this(actionName, true, null, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver);
109110
}
110111

111112
protected TransportClusterManagerNodeAction(
@@ -118,7 +119,31 @@ protected TransportClusterManagerNodeAction(
118119
Writeable.Reader<Request> request,
119120
IndexNameExpressionResolver indexNameExpressionResolver
120121
) {
121-
super(actionName, canTripCircuitBreaker, transportService, actionFilters, request);
122+
this(
123+
actionName,
124+
canTripCircuitBreaker,
125+
null,
126+
transportService,
127+
clusterService,
128+
threadPool,
129+
actionFilters,
130+
request,
131+
indexNameExpressionResolver
132+
);
133+
}
134+
135+
protected TransportClusterManagerNodeAction(
136+
String actionName,
137+
boolean canTripCircuitBreaker,
138+
AdmissionControlActionType admissionControlActionType,
139+
TransportService transportService,
140+
ClusterService clusterService,
141+
ThreadPool threadPool,
142+
ActionFilters actionFilters,
143+
Writeable.Reader<Request> request,
144+
IndexNameExpressionResolver indexNameExpressionResolver
145+
) {
146+
super(actionName, canTripCircuitBreaker, admissionControlActionType, transportService, actionFilters, request);
122147
this.transportService = transportService;
123148
this.clusterService = clusterService;
124149
this.threadPool = threadPool;

server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java

+37-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.opensearch.cluster.service.ClusterService;
3838
import org.opensearch.core.action.ActionResponse;
3939
import org.opensearch.core.common.io.stream.Writeable;
40+
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
4041
import org.opensearch.threadpool.ThreadPool;
4142
import org.opensearch.transport.TransportService;
4243

@@ -59,12 +60,46 @@ protected TransportClusterManagerNodeReadAction(
5960
Writeable.Reader<Request> request,
6061
IndexNameExpressionResolver indexNameExpressionResolver
6162
) {
62-
this(actionName, true, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver);
63+
this(
64+
actionName,
65+
true,
66+
AdmissionControlActionType.CLUSTER_ADMIN,
67+
transportService,
68+
clusterService,
69+
threadPool,
70+
actionFilters,
71+
request,
72+
indexNameExpressionResolver
73+
);
74+
}
75+
76+
protected TransportClusterManagerNodeReadAction(
77+
String actionName,
78+
boolean checkSizeLimit,
79+
TransportService transportService,
80+
ClusterService clusterService,
81+
ThreadPool threadPool,
82+
ActionFilters actionFilters,
83+
Writeable.Reader<Request> request,
84+
IndexNameExpressionResolver indexNameExpressionResolver
85+
) {
86+
super(
87+
actionName,
88+
checkSizeLimit,
89+
null,
90+
transportService,
91+
clusterService,
92+
threadPool,
93+
actionFilters,
94+
request,
95+
indexNameExpressionResolver
96+
);
6397
}
6498

6599
protected TransportClusterManagerNodeReadAction(
66100
String actionName,
67101
boolean checkSizeLimit,
102+
AdmissionControlActionType admissionControlActionType,
68103
TransportService transportService,
69104
ClusterService clusterService,
70105
ThreadPool threadPool,
@@ -75,6 +110,7 @@ protected TransportClusterManagerNodeReadAction(
75110
super(
76111
actionName,
77112
checkSizeLimit,
113+
admissionControlActionType,
78114
transportService,
79115
clusterService,
80116
threadPool,

0 commit comments

Comments
 (0)