Skip to content

Commit 5a95e22

Browse files
ajaymovvaAjay Kumar Movva
authored and
Ajay Kumar Movva
committed
Added changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting (#9286)
* Changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting (#9286) Signed-off-by: Ajay Kumar Movva <movvaam@amazon.com>
1 parent c305a4c commit 5a95e22

28 files changed

+1594
-19
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
5353
- Remove concurrent segment search feature flag for GA launch ([#12074](https://github.com/opensearch-project/OpenSearch/pull/12074))
5454
- Enable Fuzzy codec for doc id fields using a bloom filter ([#11022](https://github.com/opensearch-project/OpenSearch/pull/11022))
5555
- [Metrics Framework] Adds support for Histogram metric ([#12062](https://github.com/opensearch-project/OpenSearch/pull/12062))
56+
- [AdmissionControl] Added changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))
5657

5758
### Dependencies
5859
- Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822))

server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,12 @@ public abstract class TransportReplicationAction<
134134
Setting.Property.NodeScope
135135
);
136136

137+
/**
138+
* Making primary and replica actions suffixes as constant
139+
*/
140+
public static final String PRIMARY_ACTION_SUFFIX = "[p]";
141+
public static final String REPLICA_ACTION_SUFFIX = "[r]";
142+
137143
protected final ThreadPool threadPool;
138144
protected final TransportService transportService;
139145
protected final ClusterService clusterService;
@@ -204,8 +210,8 @@ protected TransportReplicationAction(
204210
this.shardStateAction = shardStateAction;
205211
this.executor = executor;
206212

207-
this.transportPrimaryAction = actionName + "[p]";
208-
this.transportReplicaAction = actionName + "[r]";
213+
this.transportPrimaryAction = actionName + PRIMARY_ACTION_SUFFIX;
214+
this.transportReplicaAction = actionName + REPLICA_ACTION_SUFFIX;
209215

210216
this.initialRetryBackoffBound = REPLICATION_INITIAL_RETRY_BACKOFF_BOUND.get(settings);
211217
this.retryTimeout = REPLICATION_RETRY_TIMEOUT.get(settings);

server/src/main/java/org/opensearch/common/network/NetworkModule.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public final class NetworkModule {
131131

132132
private final Map<String, Supplier<Transport>> transportFactories = new HashMap<>();
133133
private final Map<String, Supplier<HttpServerTransport>> transportHttpFactories = new HashMap<>();
134-
private final List<TransportInterceptor> transportIntercetors = new ArrayList<>();
134+
private final List<TransportInterceptor> transportInterceptors = new ArrayList<>();
135135

136136
/**
137137
* Creates a network module that custom networking classes can be plugged into.
@@ -149,9 +149,13 @@ public NetworkModule(
149149
NetworkService networkService,
150150
HttpServerTransport.Dispatcher dispatcher,
151151
ClusterSettings clusterSettings,
152-
Tracer tracer
152+
Tracer tracer,
153+
List<TransportInterceptor> transportInterceptors
153154
) {
154155
this.settings = settings;
156+
if (transportInterceptors != null) {
157+
transportInterceptors.forEach(this::registerTransportInterceptor);
158+
}
155159
for (NetworkPlugin plugin : plugins) {
156160
Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(
157161
settings,
@@ -180,11 +184,11 @@ public NetworkModule(
180184
for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
181185
registerTransport(entry.getKey(), entry.getValue());
182186
}
183-
List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(
187+
List<TransportInterceptor> pluginTransportInterceptors = plugin.getTransportInterceptors(
184188
namedWriteableRegistry,
185189
threadPool.getThreadContext()
186190
);
187-
for (TransportInterceptor interceptor : transportInterceptors) {
191+
for (TransportInterceptor interceptor : pluginTransportInterceptors) {
188192
registerTransportInterceptor(interceptor);
189193
}
190194
}
@@ -264,15 +268,15 @@ public Supplier<Transport> getTransportSupplier() {
264268
* Registers a new {@link TransportInterceptor}
265269
*/
266270
private void registerTransportInterceptor(TransportInterceptor interceptor) {
267-
this.transportIntercetors.add(Objects.requireNonNull(interceptor, "interceptor must not be null"));
271+
this.transportInterceptors.add(Objects.requireNonNull(interceptor, "interceptor must not be null"));
268272
}
269273

270274
/**
271275
* Returns a composite {@link TransportInterceptor} containing all registered interceptors
272276
* @see #registerTransportInterceptor(TransportInterceptor)
273277
*/
274278
public TransportInterceptor getTransportInterceptor() {
275-
return new CompositeTransportInterceptor(this.transportIntercetors);
279+
return new CompositeTransportInterceptor(this.transportInterceptors);
276280
}
277281

278282
static final class CompositeTransportInterceptor implements TransportInterceptor {

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@
136136
import org.opensearch.persistent.PersistentTasksClusterService;
137137
import org.opensearch.persistent.decider.EnableAssignmentDecider;
138138
import org.opensearch.plugins.PluginsService;
139+
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings;
140+
import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings;
139141
import org.opensearch.repositories.fs.FsRepository;
140142
import org.opensearch.rest.BaseRestHandler;
141143
import org.opensearch.script.ScriptService;
@@ -703,7 +705,12 @@ public void apply(Settings value, Settings current, Settings previous) {
703705

704706
// Concurrent segment search settings
705707
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING,
706-
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING
708+
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING,
709+
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
710+
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
711+
CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
712+
CPUBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT,
713+
CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT
707714
)
708715
)
709716
);

server/src/main/java/org/opensearch/node/Node.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@
198198
import org.opensearch.plugins.SearchPlugin;
199199
import org.opensearch.plugins.SystemIndexPlugin;
200200
import org.opensearch.plugins.TelemetryPlugin;
201+
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService;
202+
import org.opensearch.ratelimitting.admissioncontrol.transport.AdmissionControlTransportInterceptor;
201203
import org.opensearch.repositories.RepositoriesModule;
202204
import org.opensearch.repositories.RepositoriesService;
203205
import org.opensearch.rest.RestController;
@@ -896,6 +898,17 @@ protected Node(
896898

897899
final RestController restController = actionModule.getRestController();
898900

901+
final AdmissionControlService admissionControlService = new AdmissionControlService(
902+
settings,
903+
clusterService.getClusterSettings(),
904+
threadPool
905+
);
906+
907+
AdmissionControlTransportInterceptor admissionControlTransportInterceptor = new AdmissionControlTransportInterceptor(
908+
admissionControlService
909+
);
910+
911+
List<TransportInterceptor> transportInterceptors = List.of(admissionControlTransportInterceptor);
899912
final NetworkModule networkModule = new NetworkModule(
900913
settings,
901914
pluginsService.filterPlugins(NetworkPlugin.class),
@@ -908,7 +921,8 @@ protected Node(
908921
networkService,
909922
restController,
910923
clusterService.getClusterSettings(),
911-
tracer
924+
tracer,
925+
transportInterceptors
912926
);
913927
Collection<UnaryOperator<Map<String, IndexTemplateMetadata>>> indexTemplateMetadataUpgraders = pluginsService.filterPlugins(
914928
Plugin.class
@@ -1186,6 +1200,7 @@ protected Node(
11861200
b.bind(IndexingPressureService.class).toInstance(indexingPressureService);
11871201
b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService);
11881202
b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService);
1203+
b.bind(AdmissionControlService.class).toInstance(admissionControlService);
11891204
b.bind(UsageService.class).toInstance(usageService);
11901205
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
11911206
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.ratelimitting.admissioncontrol;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.common.settings.ClusterSettings;
14+
import org.opensearch.common.settings.Settings;
15+
import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController;
16+
import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController;
17+
import org.opensearch.threadpool.ThreadPool;
18+
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.concurrent.ConcurrentHashMap;
22+
import java.util.concurrent.ConcurrentMap;
23+
24+
import static org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER;
25+
26+
/**
27+
* Admission control Service that bootstraps and manages all the Admission Controllers in OpenSearch.
28+
*/
29+
public class AdmissionControlService {
30+
private final ThreadPool threadPool;
31+
public final AdmissionControlSettings admissionControlSettings;
32+
private final ConcurrentMap<String, AdmissionController> ADMISSION_CONTROLLERS;
33+
private static final Logger logger = LogManager.getLogger(AdmissionControlService.class);
34+
private final ClusterSettings clusterSettings;
35+
private final Settings settings;
36+
37+
/**
38+
*
39+
* @param settings Immutable settings instance
40+
* @param clusterSettings ClusterSettings Instance
41+
* @param threadPool ThreadPool Instance
42+
*/
43+
public AdmissionControlService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
44+
this.threadPool = threadPool;
45+
this.admissionControlSettings = new AdmissionControlSettings(clusterSettings, settings);
46+
this.ADMISSION_CONTROLLERS = new ConcurrentHashMap<>();
47+
this.clusterSettings = clusterSettings;
48+
this.settings = settings;
49+
this.initialise();
50+
}
51+
52+
/**
53+
* Initialise and Register all the admissionControllers
54+
*/
55+
private void initialise() {
56+
// Initialise different type of admission controllers
57+
registerAdmissionController(CPU_BASED_ADMISSION_CONTROLLER);
58+
}
59+
60+
/**
61+
* Handler to trigger registered admissionController
62+
*/
63+
public void applyTransportAdmissionControl(String action) {
64+
this.ADMISSION_CONTROLLERS.forEach((name, admissionController) -> { admissionController.apply(action); });
65+
}
66+
67+
/**
68+
*
69+
* @param admissionControllerName admissionControllerName to register into the service.
70+
*/
71+
public void registerAdmissionController(String admissionControllerName) {
72+
AdmissionController admissionController = this.controllerFactory(admissionControllerName);
73+
this.ADMISSION_CONTROLLERS.put(admissionControllerName, admissionController);
74+
}
75+
76+
/**
77+
* @return AdmissionController Instance
78+
*/
79+
private AdmissionController controllerFactory(String admissionControllerName) {
80+
switch (admissionControllerName) {
81+
case CPU_BASED_ADMISSION_CONTROLLER:
82+
return new CPUBasedAdmissionController(admissionControllerName, this.settings, this.clusterSettings);
83+
default:
84+
throw new IllegalArgumentException("Not Supported AdmissionController : " + admissionControllerName);
85+
}
86+
}
87+
88+
/**
89+
*
90+
* @return list of the registered admissionControllers
91+
*/
92+
public List<AdmissionController> getAdmissionControllers() {
93+
return new ArrayList<>(this.ADMISSION_CONTROLLERS.values());
94+
}
95+
96+
/**
97+
*
98+
* @param controllerName name of the admissionController
99+
* @return instance of the AdmissionController Instance
100+
*/
101+
public AdmissionController getAdmissionController(String controllerName) {
102+
return this.ADMISSION_CONTROLLERS.getOrDefault(controllerName, null);
103+
}
104+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.ratelimitting.admissioncontrol;
10+
11+
import org.opensearch.common.settings.ClusterSettings;
12+
import org.opensearch.common.settings.Setting;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
15+
16+
/**
17+
* Settings related to admission control.
18+
* @opensearch.internal
19+
*/
20+
public final class AdmissionControlSettings {
21+
22+
/**
23+
* Default parameters for the AdmissionControlSettings
24+
*/
25+
public static class Defaults {
26+
public static final String MODE = "disabled";
27+
}
28+
29+
/**
30+
* Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set
31+
* rejection will be performed, otherwise only rejection metrics will be populated.
32+
*/
33+
public static final Setting<AdmissionControlMode> ADMISSION_CONTROL_TRANSPORT_LAYER_MODE = new Setting<>(
34+
"admission_control.transport.mode",
35+
Defaults.MODE,
36+
AdmissionControlMode::fromName,
37+
Setting.Property.Dynamic,
38+
Setting.Property.NodeScope
39+
);
40+
41+
private volatile AdmissionControlMode transportLayeradmissionControlMode;
42+
43+
/**
44+
* @param clusterSettings clusterSettings Instance
45+
* @param settings settings instance
46+
*/
47+
public AdmissionControlSettings(ClusterSettings clusterSettings, Settings settings) {
48+
this.transportLayeradmissionControlMode = ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.get(settings);
49+
clusterSettings.addSettingsUpdateConsumer(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, this::setAdmissionControlTransportLayerMode);
50+
}
51+
52+
/**
53+
*
54+
* @param admissionControlMode update the mode of admission control feature
55+
*/
56+
private void setAdmissionControlTransportLayerMode(AdmissionControlMode admissionControlMode) {
57+
this.transportLayeradmissionControlMode = admissionControlMode;
58+
}
59+
60+
/**
61+
*
62+
* @return return the default mode of the admissionControl
63+
*/
64+
public AdmissionControlMode getAdmissionControlTransportLayerMode() {
65+
return this.transportLayeradmissionControlMode;
66+
}
67+
68+
/**
69+
*
70+
* @return true based on the admission control feature is enforced else false
71+
*/
72+
public Boolean isTransportLayerAdmissionControlEnforced() {
73+
return this.transportLayeradmissionControlMode == AdmissionControlMode.ENFORCED;
74+
}
75+
76+
/**
77+
*
78+
* @return true based on the admission control feature is enabled else false
79+
*/
80+
public Boolean isTransportLayerAdmissionControlEnabled() {
81+
return this.transportLayeradmissionControlMode != AdmissionControlMode.DISABLED;
82+
}
83+
}

0 commit comments

Comments
 (0)