Skip to content

Commit 4423166

Browse files
andrrossrayshrey
authored andcommitted
Revert "Integrate IO Based AdmissionController to AdmissionControl Framework (opensearch-project#12583)" (opensearch-project#12670)
This reverts commit b6b16d8. Reverting as this introduced test failures detailed in opensearch-project#12664. Signed-off-by: Andrew Ross <andrross@amazon.com>
1 parent 59e5642 commit 4423166

File tree

14 files changed

+388
-874
lines changed

14 files changed

+388
-874
lines changed

CHANGELOG.md

-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1919
- Remote reindex: Add support for configurable retry mechanism ([#12561](https://github.com/opensearch-project/OpenSearch/pull/12561))
2020
- [Admission Control] Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats ([#11880](https://github.com/opensearch-project/OpenSearch/pull/11880))
2121
- Tracing for deep search path ([#12103](https://github.com/opensearch-project/OpenSearch/pull/12103))
22-
- [Admission Control] Integrated IO Based AdmissionController to AdmissionControl Framework ([#12583](https://github.com/opensearch-project/OpenSearch/pull/12583))
2322

2423
### Dependencies
2524
- Bump `log4j-core` from 2.18.0 to 2.19.0
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.ratelimitting.admissioncontrol;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
14+
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
15+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
16+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
17+
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
18+
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
19+
import org.opensearch.action.admin.indices.stats.ShardStats;
20+
import org.opensearch.action.bulk.BulkRequest;
21+
import org.opensearch.action.bulk.BulkResponse;
22+
import org.opensearch.action.index.IndexRequest;
23+
import org.opensearch.action.search.SearchPhaseExecutionException;
24+
import org.opensearch.action.search.SearchResponse;
25+
import org.opensearch.cluster.metadata.IndexMetadata;
26+
import org.opensearch.cluster.node.DiscoveryNodes;
27+
import org.opensearch.cluster.routing.ShardRouting;
28+
import org.opensearch.common.UUIDs;
29+
import org.opensearch.common.collect.Tuple;
30+
import org.opensearch.common.settings.Settings;
31+
import org.opensearch.common.unit.TimeValue;
32+
import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
33+
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
34+
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
35+
import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings;
36+
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats;
37+
import org.opensearch.test.OpenSearchIntegTestCase;
38+
import org.junit.After;
39+
import org.junit.Before;
40+
41+
import java.util.Arrays;
42+
import java.util.Collections;
43+
import java.util.concurrent.ExecutionException;
44+
import java.util.concurrent.atomic.AtomicLong;
45+
import java.util.stream.Stream;
46+
47+
import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE;
48+
import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT;
49+
import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT;
50+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
51+
52+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1)
53+
public class AdmissionControlMultiNodeIT extends OpenSearchIntegTestCase {
54+
55+
public static final Settings settings = Settings.builder()
56+
.put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500))
57+
.put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500))
58+
.put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED)
59+
.put(SEARCH_CPU_USAGE_LIMIT.getKey(), 0)
60+
.put(INDEXING_CPU_USAGE_LIMIT.getKey(), 0)
61+
.build();
62+
63+
private static final Logger LOGGER = LogManager.getLogger(AdmissionControlMultiNodeIT.class);
64+
65+
public static final String INDEX_NAME = "test_index";
66+
67+
@Before
68+
public void init() {
69+
assertAcked(
70+
prepareCreate(
71+
INDEX_NAME,
72+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
73+
)
74+
);
75+
ensureGreen(INDEX_NAME);
76+
}
77+
78+
@After
79+
public void cleanup() {
80+
client().admin().indices().prepareDelete(INDEX_NAME).get();
81+
}
82+
83+
@Override
84+
protected Settings nodeSettings(int nodeOrdinal) {
85+
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(settings).build();
86+
}
87+
88+
public void testAdmissionControlRejectionOnEnforced() {
89+
Tuple<String, String> primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME);
90+
String primaryName = primaryReplicaNodeNames.v1();
91+
String replicaName = primaryReplicaNodeNames.v2();
92+
String coordinatingOnlyNode = getCoordinatingOnlyNode();
93+
AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName);
94+
AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName);
95+
final BulkRequest bulkRequest = new BulkRequest();
96+
for (int i = 0; i < 3; ++i) {
97+
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
98+
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
99+
bulkRequest.add(request);
100+
}
101+
BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
102+
assertEquals(429, res.getItems()[0].getFailure().getStatus().getStatus());
103+
AdmissionControllerStats admissionControlPrimaryStats = admissionControlServicePrimary.stats()
104+
.getAdmissionControllerStatsList()
105+
.get(0);
106+
assertEquals(admissionControlPrimaryStats.rejectionCount.get(AdmissionControlActionType.INDEXING.getType()).longValue(), 1);
107+
Arrays.stream(res.getItems()).forEach(bulkItemResponse -> {
108+
assertTrue(bulkItemResponse.getFailureMessage().contains("OpenSearchRejectedExecutionException"));
109+
});
110+
SearchResponse searchResponse;
111+
try {
112+
searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get();
113+
} catch (Exception exception) {
114+
assertTrue(((SearchPhaseExecutionException) exception).getDetailedMessage().contains("OpenSearchRejectedExecutionException"));
115+
}
116+
AdmissionControllerStats primaryStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0);
117+
assertEquals(primaryStats.rejectionCount.get(AdmissionControlActionType.SEARCH.getType()).longValue(), 1);
118+
}
119+
120+
public void testAdmissionControlEnforcedOnNonACEnabledActions() throws ExecutionException, InterruptedException {
121+
String coordinatingOnlyNode = getCoordinatingOnlyNode();
122+
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
123+
124+
updateSettingsRequest.transientSettings(
125+
Settings.builder()
126+
.put(
127+
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
128+
AdmissionControlMode.ENFORCED.getMode()
129+
)
130+
);
131+
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
132+
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
133+
nodesStatsRequest.clear()
134+
.indices(true)
135+
.addMetrics(
136+
NodesStatsRequest.Metric.JVM.metricName(),
137+
NodesStatsRequest.Metric.OS.metricName(),
138+
NodesStatsRequest.Metric.FS.metricName(),
139+
NodesStatsRequest.Metric.PROCESS.metricName(),
140+
NodesStatsRequest.Metric.ADMISSION_CONTROL.metricName()
141+
);
142+
NodesStatsResponse nodesStatsResponse = client(coordinatingOnlyNode).admin().cluster().nodesStats(nodesStatsRequest).actionGet();
143+
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().health(new ClusterHealthRequest()).actionGet();
144+
assertEquals(200, clusterHealthResponse.status().getStatus());
145+
assertFalse(nodesStatsResponse.hasFailures());
146+
}
147+
148+
public void testAdmissionControlRejectionOnMonitor() {
149+
Tuple<String, String> primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME);
150+
String primaryName = primaryReplicaNodeNames.v1();
151+
String replicaName = primaryReplicaNodeNames.v2();
152+
String coordinatingOnlyNode = getCoordinatingOnlyNode();
153+
154+
AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName);
155+
AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName);
156+
157+
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
158+
159+
updateSettingsRequest.transientSettings(
160+
Settings.builder()
161+
.put(
162+
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
163+
AdmissionControlMode.MONITOR.getMode()
164+
)
165+
);
166+
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
167+
168+
final BulkRequest bulkRequest = new BulkRequest();
169+
for (int i = 0; i < 3; ++i) {
170+
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
171+
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
172+
bulkRequest.add(request);
173+
}
174+
BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
175+
assertFalse(res.hasFailures());
176+
AdmissionControllerStats admissionControlPrimaryStats = admissionControlServicePrimary.stats()
177+
.getAdmissionControllerStatsList()
178+
.get(0);
179+
AdmissionControllerStats admissionControlReplicaStats = admissionControlServiceReplica.stats()
180+
.getAdmissionControllerStatsList()
181+
.get(0);
182+
long primaryRejectionCount = admissionControlPrimaryStats.rejectionCount.getOrDefault(
183+
AdmissionControlActionType.INDEXING.getType(),
184+
new AtomicLong(0).longValue()
185+
);
186+
long replicaRejectionCount = admissionControlReplicaStats.rejectionCount.getOrDefault(
187+
AdmissionControlActionType.INDEXING.getType(),
188+
new AtomicLong(0).longValue()
189+
);
190+
assertEquals(primaryRejectionCount, 1);
191+
assertEquals(replicaRejectionCount, 0);
192+
SearchResponse searchResponse;
193+
searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get();
194+
admissionControlPrimaryStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0);
195+
admissionControlReplicaStats = admissionControlServiceReplica.stats().getAdmissionControllerStatsList().get(0);
196+
primaryRejectionCount = admissionControlPrimaryStats.getRejectionCount()
197+
.getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue());
198+
replicaRejectionCount = admissionControlReplicaStats.getRejectionCount()
199+
.getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue());
200+
assertTrue(primaryRejectionCount == 1 || replicaRejectionCount == 1);
201+
assertFalse(primaryRejectionCount == 1 && replicaRejectionCount == 1);
202+
}
203+
204+
public void testAdmissionControlRejectionOnDisabled() {
205+
Tuple<String, String> primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME);
206+
String primaryName = primaryReplicaNodeNames.v1();
207+
String replicaName = primaryReplicaNodeNames.v2();
208+
String coordinatingOnlyNode = getCoordinatingOnlyNode();
209+
210+
AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName);
211+
AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName);
212+
213+
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
214+
215+
updateSettingsRequest.transientSettings(
216+
Settings.builder()
217+
.put(
218+
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
219+
AdmissionControlMode.DISABLED.getMode()
220+
)
221+
);
222+
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
223+
224+
final BulkRequest bulkRequest = new BulkRequest();
225+
for (int i = 0; i < 3; ++i) {
226+
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
227+
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
228+
bulkRequest.add(request);
229+
}
230+
BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
231+
assertFalse(res.hasFailures());
232+
AdmissionControllerStats admissionControlPrimaryStats = admissionControlServicePrimary.stats()
233+
.getAdmissionControllerStatsList()
234+
.get(0);
235+
AdmissionControllerStats admissionControlReplicaStats = admissionControlServiceReplica.stats()
236+
.getAdmissionControllerStatsList()
237+
.get(0);
238+
long primaryRejectionCount = admissionControlPrimaryStats.rejectionCount.getOrDefault(
239+
AdmissionControlActionType.INDEXING.getType(),
240+
new AtomicLong(0).longValue()
241+
);
242+
long replicaRejectionCount = admissionControlReplicaStats.rejectionCount.getOrDefault(
243+
AdmissionControlActionType.INDEXING.getType(),
244+
new AtomicLong(0).longValue()
245+
);
246+
assertEquals(primaryRejectionCount, 0);
247+
assertEquals(replicaRejectionCount, 0);
248+
SearchResponse searchResponse;
249+
searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get();
250+
admissionControlPrimaryStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0);
251+
admissionControlReplicaStats = admissionControlServiceReplica.stats().getAdmissionControllerStatsList().get(0);
252+
primaryRejectionCount = admissionControlPrimaryStats.getRejectionCount()
253+
.getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue());
254+
replicaRejectionCount = admissionControlReplicaStats.getRejectionCount()
255+
.getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue());
256+
assertTrue(primaryRejectionCount == 0 && replicaRejectionCount == 0);
257+
}
258+
259+
private Tuple<String, String> getPrimaryReplicaNodeNames(String indexName) {
260+
IndicesStatsResponse response = client().admin().indices().prepareStats(indexName).get();
261+
String primaryId = Stream.of(response.getShards())
262+
.map(ShardStats::getShardRouting)
263+
.filter(ShardRouting::primary)
264+
.findAny()
265+
.get()
266+
.currentNodeId();
267+
String replicaId = Stream.of(response.getShards())
268+
.map(ShardStats::getShardRouting)
269+
.filter(sr -> sr.primary() == false)
270+
.findAny()
271+
.get()
272+
.currentNodeId();
273+
DiscoveryNodes nodes = client().admin().cluster().prepareState().get().getState().nodes();
274+
String primaryName = nodes.get(primaryId).getName();
275+
String replicaName = nodes.get(replicaId).getName();
276+
return new Tuple<>(primaryName, replicaName);
277+
}
278+
279+
private String getCoordinatingOnlyNode() {
280+
return client().admin()
281+
.cluster()
282+
.prepareState()
283+
.get()
284+
.getState()
285+
.nodes()
286+
.getCoordinatingOnlyNodes()
287+
.values()
288+
.iterator()
289+
.next()
290+
.getName();
291+
}
292+
}

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

-4
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,6 @@
140140
import org.opensearch.plugins.PluginsService;
141141
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings;
142142
import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings;
143-
import org.opensearch.ratelimitting.admissioncontrol.settings.IoBasedAdmissionControllerSettings;
144143
import org.opensearch.repositories.fs.FsRepository;
145144
import org.opensearch.rest.BaseRestHandler;
146145
import org.opensearch.script.ScriptService;
@@ -709,9 +708,6 @@ public void apply(Settings value, Settings current, Settings previous) {
709708
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
710709
CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT,
711710
CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT,
712-
IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
713-
IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT,
714-
IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT,
715711
IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING,
716712

717713
// Concurrent segment search settings

server/src/main/java/org/opensearch/node/resource/tracker/NodeResourceUsageTracker.java

-4
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
package org.opensearch.node.resource.tracker;
1010

11-
import org.apache.lucene.util.Constants;
1211
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
1312
import org.opensearch.common.settings.ClusterSettings;
1413
import org.opensearch.common.settings.Settings;
@@ -70,9 +69,6 @@ public IoUsageStats getIoUsageStats() {
7069
* Checks if all of the resource usage trackers are ready
7170
*/
7271
public boolean isReady() {
73-
if (Constants.LINUX) {
74-
return memoryUsageTracker.isReady() && cpuUsageTracker.isReady() && ioUsageTracker.isReady();
75-
}
7672
return memoryUsageTracker.isReady() && cpuUsageTracker.isReady();
7773
}
7874

0 commit comments

Comments
 (0)