Skip to content

Commit 1467c4d

Browse files
ajaymovvabharath-techie
authored and
Ajay Kumar Movva
committed
Integrate CPU AC with ResourceUsageCollector and add CPU AC stats to nodes/stats (#10887)
* Added changes to integrade cpu AC to ResourceUsageCollector and Emit Stats Signed-off-by: Ajay Kumar Movva <movvaam@amazon.com> Co-authored-by: Bharathwaj G <bharath78910@gmail.com>
1 parent 5a95e22 commit 1467c4d

File tree

42 files changed

+1655
-328
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1655
-328
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
5454
- Enable Fuzzy codec for doc id fields using a bloom filter ([#11022](https://github.com/opensearch-project/OpenSearch/pull/11022))
5555
- [Metrics Framework] Adds support for Histogram metric ([#12062](https://github.com/opensearch-project/OpenSearch/pull/12062))
5656
- [AdmissionControl] Added changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))
57+
- [Admission Control] Integrate CPU AC with ResourceUsageCollector and add CPU AC stats to nodes/stats ([#10887](https://github.com/opensearch-project/OpenSearch/pull/10887))
5758

5859
### Dependencies
5960
- Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.ratelimitting.admissioncontrol;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
14+
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
15+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
16+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
17+
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
18+
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
19+
import org.opensearch.action.admin.indices.stats.ShardStats;
20+
import org.opensearch.action.bulk.BulkRequest;
21+
import org.opensearch.action.bulk.BulkResponse;
22+
import org.opensearch.action.index.IndexRequest;
23+
import org.opensearch.action.search.SearchPhaseExecutionException;
24+
import org.opensearch.action.search.SearchResponse;
25+
import org.opensearch.cluster.metadata.IndexMetadata;
26+
import org.opensearch.cluster.node.DiscoveryNodes;
27+
import org.opensearch.cluster.routing.ShardRouting;
28+
import org.opensearch.common.UUIDs;
29+
import org.opensearch.common.collect.Tuple;
30+
import org.opensearch.common.settings.Settings;
31+
import org.opensearch.common.unit.TimeValue;
32+
import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
33+
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
34+
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
35+
import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings;
36+
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats;
37+
import org.opensearch.test.OpenSearchIntegTestCase;
38+
import org.junit.After;
39+
import org.junit.Before;
40+
41+
import java.util.Arrays;
42+
import java.util.Collections;
43+
import java.util.concurrent.ExecutionException;
44+
import java.util.concurrent.atomic.AtomicLong;
45+
import java.util.stream.Stream;
46+
47+
import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE;
48+
import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT;
49+
import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT;
50+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
51+
52+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1)
53+
public class AdmissionControlMultiNodeIT extends OpenSearchIntegTestCase {
54+
55+
public static final Settings settings = Settings.builder()
56+
.put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500))
57+
.put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500))
58+
.put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED)
59+
.put(SEARCH_CPU_USAGE_LIMIT.getKey(), 0)
60+
.put(INDEXING_CPU_USAGE_LIMIT.getKey(), 0)
61+
.build();
62+
63+
private static final Logger LOGGER = LogManager.getLogger(AdmissionControlMultiNodeIT.class);
64+
65+
public static final String INDEX_NAME = "test_index";
66+
67+
@Before
68+
public void init() {
69+
assertAcked(
70+
prepareCreate(
71+
INDEX_NAME,
72+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
73+
)
74+
);
75+
ensureGreen(INDEX_NAME);
76+
}
77+
78+
@After
79+
public void cleanup() {
80+
client().admin().indices().prepareDelete(INDEX_NAME).get();
81+
}
82+
83+
@Override
84+
protected Settings nodeSettings(int nodeOrdinal) {
85+
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(settings).build();
86+
}
87+
88+
public void testAdmissionControlRejectionOnEnforced() {
89+
Tuple<String, String> primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME);
90+
String primaryName = primaryReplicaNodeNames.v1();
91+
String replicaName = primaryReplicaNodeNames.v2();
92+
String coordinatingOnlyNode = getCoordinatingOnlyNode();
93+
AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName);
94+
AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName);
95+
final BulkRequest bulkRequest = new BulkRequest();
96+
for (int i = 0; i < 3; ++i) {
97+
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
98+
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
99+
bulkRequest.add(request);
100+
}
101+
BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
102+
assertEquals(429, res.getItems()[0].getFailure().getStatus().getStatus());
103+
AdmissionControllerStats admissionControlPrimaryStats = admissionControlServicePrimary.stats()
104+
.getAdmissionControllerStatsList()
105+
.get(0);
106+
assertEquals(admissionControlPrimaryStats.rejectionCount.get(AdmissionControlActionType.INDEXING.getType()).longValue(), 1);
107+
Arrays.stream(res.getItems()).forEach(bulkItemResponse -> {
108+
assertTrue(bulkItemResponse.getFailureMessage().contains("OpenSearchRejectedExecutionException"));
109+
});
110+
SearchResponse searchResponse;
111+
try {
112+
searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get();
113+
} catch (Exception exception) {
114+
assertTrue(((SearchPhaseExecutionException) exception).getDetailedMessage().contains("OpenSearchRejectedExecutionException"));
115+
}
116+
AdmissionControllerStats primaryStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0);
117+
assertEquals(primaryStats.rejectionCount.get(AdmissionControlActionType.SEARCH.getType()).longValue(), 1);
118+
}
119+
120+
public void testAdmissionControlEnforcedOnNonACEnabledActions() throws ExecutionException, InterruptedException {
121+
String coordinatingOnlyNode = getCoordinatingOnlyNode();
122+
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
123+
124+
updateSettingsRequest.transientSettings(
125+
Settings.builder()
126+
.put(
127+
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
128+
AdmissionControlMode.ENFORCED.getMode()
129+
)
130+
);
131+
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
132+
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
133+
nodesStatsRequest.clear()
134+
.indices(true)
135+
.addMetrics(
136+
NodesStatsRequest.Metric.JVM.metricName(),
137+
NodesStatsRequest.Metric.OS.metricName(),
138+
NodesStatsRequest.Metric.FS.metricName(),
139+
NodesStatsRequest.Metric.PROCESS.metricName(),
140+
NodesStatsRequest.Metric.ADMISSION_CONTROL.metricName()
141+
);
142+
NodesStatsResponse nodesStatsResponse = client(coordinatingOnlyNode).admin().cluster().nodesStats(nodesStatsRequest).actionGet();
143+
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().health(new ClusterHealthRequest()).actionGet();
144+
assertEquals(200, clusterHealthResponse.status().getStatus());
145+
assertFalse(nodesStatsResponse.hasFailures());
146+
}
147+
148+
public void testAdmissionControlRejectionOnMonitor() {
149+
Tuple<String, String> primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME);
150+
String primaryName = primaryReplicaNodeNames.v1();
151+
String replicaName = primaryReplicaNodeNames.v2();
152+
String coordinatingOnlyNode = getCoordinatingOnlyNode();
153+
154+
AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName);
155+
AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName);
156+
157+
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
158+
159+
updateSettingsRequest.transientSettings(
160+
Settings.builder()
161+
.put(
162+
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
163+
AdmissionControlMode.MONITOR.getMode()
164+
)
165+
);
166+
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
167+
168+
final BulkRequest bulkRequest = new BulkRequest();
169+
for (int i = 0; i < 3; ++i) {
170+
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
171+
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
172+
bulkRequest.add(request);
173+
}
174+
BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
175+
assertFalse(res.hasFailures());
176+
AdmissionControllerStats admissionControlPrimaryStats = admissionControlServicePrimary.stats()
177+
.getAdmissionControllerStatsList()
178+
.get(0);
179+
AdmissionControllerStats admissionControlReplicaStats = admissionControlServiceReplica.stats()
180+
.getAdmissionControllerStatsList()
181+
.get(0);
182+
long primaryRejectionCount = admissionControlPrimaryStats.rejectionCount.getOrDefault(
183+
AdmissionControlActionType.INDEXING.getType(),
184+
new AtomicLong(0).longValue()
185+
);
186+
long replicaRejectionCount = admissionControlReplicaStats.rejectionCount.getOrDefault(
187+
AdmissionControlActionType.INDEXING.getType(),
188+
new AtomicLong(0).longValue()
189+
);
190+
assertEquals(primaryRejectionCount, 1);
191+
assertEquals(replicaRejectionCount, 0);
192+
SearchResponse searchResponse;
193+
searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get();
194+
admissionControlPrimaryStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0);
195+
admissionControlReplicaStats = admissionControlServiceReplica.stats().getAdmissionControllerStatsList().get(0);
196+
primaryRejectionCount = admissionControlPrimaryStats.getRejectionCount()
197+
.getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue());
198+
replicaRejectionCount = admissionControlReplicaStats.getRejectionCount()
199+
.getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue());
200+
assertTrue(primaryRejectionCount == 1 || replicaRejectionCount == 1);
201+
assertFalse(primaryRejectionCount == 1 && replicaRejectionCount == 1);
202+
}
203+
204+
public void testAdmissionControlRejectionOnDisabled() {
205+
Tuple<String, String> primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME);
206+
String primaryName = primaryReplicaNodeNames.v1();
207+
String replicaName = primaryReplicaNodeNames.v2();
208+
String coordinatingOnlyNode = getCoordinatingOnlyNode();
209+
210+
AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName);
211+
AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName);
212+
213+
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
214+
215+
updateSettingsRequest.transientSettings(
216+
Settings.builder()
217+
.put(
218+
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
219+
AdmissionControlMode.DISABLED.getMode()
220+
)
221+
);
222+
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
223+
224+
final BulkRequest bulkRequest = new BulkRequest();
225+
for (int i = 0; i < 3; ++i) {
226+
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
227+
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
228+
bulkRequest.add(request);
229+
}
230+
BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
231+
assertFalse(res.hasFailures());
232+
AdmissionControllerStats admissionControlPrimaryStats = admissionControlServicePrimary.stats()
233+
.getAdmissionControllerStatsList()
234+
.get(0);
235+
AdmissionControllerStats admissionControlReplicaStats = admissionControlServiceReplica.stats()
236+
.getAdmissionControllerStatsList()
237+
.get(0);
238+
long primaryRejectionCount = admissionControlPrimaryStats.rejectionCount.getOrDefault(
239+
AdmissionControlActionType.INDEXING.getType(),
240+
new AtomicLong(0).longValue()
241+
);
242+
long replicaRejectionCount = admissionControlReplicaStats.rejectionCount.getOrDefault(
243+
AdmissionControlActionType.INDEXING.getType(),
244+
new AtomicLong(0).longValue()
245+
);
246+
assertEquals(primaryRejectionCount, 0);
247+
assertEquals(replicaRejectionCount, 0);
248+
SearchResponse searchResponse;
249+
searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get();
250+
admissionControlPrimaryStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0);
251+
admissionControlReplicaStats = admissionControlServiceReplica.stats().getAdmissionControllerStatsList().get(0);
252+
primaryRejectionCount = admissionControlPrimaryStats.getRejectionCount()
253+
.getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue());
254+
replicaRejectionCount = admissionControlReplicaStats.getRejectionCount()
255+
.getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue());
256+
assertTrue(primaryRejectionCount == 0 && replicaRejectionCount == 0);
257+
}
258+
259+
private Tuple<String, String> getPrimaryReplicaNodeNames(String indexName) {
260+
IndicesStatsResponse response = client().admin().indices().prepareStats(indexName).get();
261+
String primaryId = Stream.of(response.getShards())
262+
.map(ShardStats::getShardRouting)
263+
.filter(ShardRouting::primary)
264+
.findAny()
265+
.get()
266+
.currentNodeId();
267+
String replicaId = Stream.of(response.getShards())
268+
.map(ShardStats::getShardRouting)
269+
.filter(sr -> sr.primary() == false)
270+
.findAny()
271+
.get()
272+
.currentNodeId();
273+
DiscoveryNodes nodes = client().admin().cluster().prepareState().get().getState().nodes();
274+
String primaryName = nodes.get(primaryId).getName();
275+
String replicaName = nodes.get(replicaId).getName();
276+
return new Tuple<>(primaryName, replicaName);
277+
}
278+
279+
private String getCoordinatingOnlyNode() {
280+
return client().admin()
281+
.cluster()
282+
.prepareState()
283+
.get()
284+
.getState()
285+
.nodes()
286+
.getCoordinatingOnlyNodes()
287+
.values()
288+
.iterator()
289+
.next()
290+
.getName();
291+
}
292+
}

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java

+25-1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.opensearch.monitor.process.ProcessStats;
6060
import org.opensearch.node.AdaptiveSelectionStats;
6161
import org.opensearch.node.NodesResourceUsageStats;
62+
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats;
6263
import org.opensearch.repositories.RepositoriesStats;
6364
import org.opensearch.script.ScriptCacheStats;
6465
import org.opensearch.script.ScriptStats;
@@ -155,6 +156,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
155156
@Nullable
156157
private RepositoriesStats repositoriesStats;
157158

159+
@Nullable
160+
private AdmissionControlStats admissionControlStats;
161+
158162
public NodeStats(StreamInput in) throws IOException {
159163
super(in);
160164
timestamp = in.readVLong();
@@ -238,6 +242,12 @@ public NodeStats(StreamInput in) throws IOException {
238242
} else {
239243
repositoriesStats = null;
240244
}
245+
// TODO: change to V_2_12_0 on main after backport to 2.x
246+
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
247+
admissionControlStats = in.readOptionalWriteable(AdmissionControlStats::new);
248+
} else {
249+
admissionControlStats = null;
250+
}
241251
}
242252

243253
public NodeStats(
@@ -267,7 +277,8 @@ public NodeStats(
267277
@Nullable TaskCancellationStats taskCancellationStats,
268278
@Nullable SearchPipelineStats searchPipelineStats,
269279
@Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats,
270-
@Nullable RepositoriesStats repositoriesStats
280+
@Nullable RepositoriesStats repositoriesStats,
281+
@Nullable AdmissionControlStats admissionControlStats
271282
) {
272283
super(node);
273284
this.timestamp = timestamp;
@@ -296,6 +307,7 @@ public NodeStats(
296307
this.searchPipelineStats = searchPipelineStats;
297308
this.segmentReplicationRejectionStats = segmentReplicationRejectionStats;
298309
this.repositoriesStats = repositoriesStats;
310+
this.admissionControlStats = admissionControlStats;
299311
}
300312

301313
public long getTimestamp() {
@@ -448,6 +460,11 @@ public RepositoriesStats getRepositoriesStats() {
448460
return repositoriesStats;
449461
}
450462

463+
@Nullable
464+
public AdmissionControlStats getAdmissionControlStats() {
465+
return admissionControlStats;
466+
}
467+
451468
@Override
452469
public void writeTo(StreamOutput out) throws IOException {
453470
super.writeTo(out);
@@ -506,6 +523,10 @@ public void writeTo(StreamOutput out) throws IOException {
506523
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
507524
out.writeOptionalWriteable(repositoriesStats);
508525
}
526+
// TODO: change to V_2_12_0 on main after backport to 2.x
527+
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
528+
out.writeOptionalWriteable(admissionControlStats);
529+
}
509530
}
510531

511532
@Override
@@ -605,6 +626,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
605626
if (getRepositoriesStats() != null) {
606627
getRepositoriesStats().toXContent(builder, params);
607628
}
629+
if (getAdmissionControlStats() != null) {
630+
getAdmissionControlStats().toXContent(builder, params);
631+
}
608632
return builder;
609633
}
610634
}

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,8 @@ public enum Metric {
249249
SEARCH_PIPELINE("search_pipeline"),
250250
RESOURCE_USAGE_STATS("resource_usage_stats"),
251251
SEGMENT_REPLICATION_BACKPRESSURE("segment_replication_backpressure"),
252-
REPOSITORIES("repositories");
252+
REPOSITORIES("repositories"),
253+
ADMISSION_CONTROL("admission_control");
253254

254255
private String metricName;
255256

0 commit comments

Comments
 (0)