Skip to content

Commit def9db1

Browse files
[Backport 2.x] In-flight cancellation of SearchShardTask based on resource consumption (#5039) (#5058)
* [Backport 2.x] Added in-flight cancellation of SearchShardTask based on resource consumption (#4575) This feature aims to identify and cancel resource intensive SearchShardTasks if they have breached certain thresholds. This will help in terminating problematic queries which can put nodes in duress and degrade the cluster performance. * [Backport 2.x] Added resource usage trackers for in-flight cancellation of SearchShardTask (#4805) 1. CpuUsageTracker: cancels tasks if they consume too much CPU 2. ElapsedTimeTracker: cancels tasks if they consume too much time 3. HeapUsageTracker: cancels tasks if they consume too much heap * [Backport 2.x]Added search backpressure stats API Added search backpressure stats to the existing node/stats API to describe: 1. the number of cancellations (currently for SearchShardTask only) 2. the current state of TaskResourceUsageTracker Signed-off-by: Ketan Verma <ketan9495@gmail.com> (cherry picked from commit 7c521b9) Co-authored-by: Ketan Verma <ketanv3@users.noreply.github.com>
1 parent 1cc0c9c commit def9db1

File tree

49 files changed

+2970
-12
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+2970
-12
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
2626
- Update GeoGrid base class access modifier to support extensibility ([#4921](https://github.com/opensearch-project/OpenSearch/pull/4921))
2727
- Build no-jdk distributions as part of release build ([#4902](https://github.com/opensearch-project/OpenSearch/pull/4902))
2828
- Use getParameterCount instead of getParameterTypes ([#4821](https://github.com/opensearch-project/OpenSearch/pull/4821))
29+
- Added in-flight cancellation of SearchShardTask based on resource consumption ([#4565](https://github.com/opensearch-project/OpenSearch/pull/4565))
30+
- Added resource usage trackers for in-flight cancellation of SearchShardTask ([#4805](https://github.com/opensearch-project/OpenSearch/pull/4805))
31+
- Added search backpressure stats API ([#4932](https://github.com/opensearch-project/OpenSearch/pull/4932))
2932

3033
### Dependencies
3134
- Bumps `com.diffplug.spotless` from 6.9.1 to 6.10.0

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java

+23-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.opensearch.node.AdaptiveSelectionStats;
5757
import org.opensearch.script.ScriptCacheStats;
5858
import org.opensearch.script.ScriptStats;
59+
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
5960
import org.opensearch.threadpool.ThreadPoolStats;
6061
import org.opensearch.transport.TransportStats;
6162

@@ -119,6 +120,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
119120
@Nullable
120121
private ShardIndexingPressureStats shardIndexingPressureStats;
121122

123+
@Nullable
124+
private SearchBackpressureStats searchBackpressureStats;
125+
122126
public NodeStats(StreamInput in) throws IOException {
123127
super(in);
124128
timestamp = in.readVLong();
@@ -156,6 +160,11 @@ public NodeStats(StreamInput in) throws IOException {
156160
shardIndexingPressureStats = null;
157161
}
158162

163+
if (in.getVersion().onOrAfter(Version.V_2_4_0)) {
164+
searchBackpressureStats = in.readOptionalWriteable(SearchBackpressureStats::new);
165+
} else {
166+
searchBackpressureStats = null;
167+
}
159168
}
160169

161170
public NodeStats(
@@ -176,7 +185,8 @@ public NodeStats(
176185
@Nullable AdaptiveSelectionStats adaptiveSelectionStats,
177186
@Nullable ScriptCacheStats scriptCacheStats,
178187
@Nullable IndexingPressureStats indexingPressureStats,
179-
@Nullable ShardIndexingPressureStats shardIndexingPressureStats
188+
@Nullable ShardIndexingPressureStats shardIndexingPressureStats,
189+
@Nullable SearchBackpressureStats searchBackpressureStats
180190
) {
181191
super(node);
182192
this.timestamp = timestamp;
@@ -196,6 +206,7 @@ public NodeStats(
196206
this.scriptCacheStats = scriptCacheStats;
197207
this.indexingPressureStats = indexingPressureStats;
198208
this.shardIndexingPressureStats = shardIndexingPressureStats;
209+
this.searchBackpressureStats = searchBackpressureStats;
199210
}
200211

201212
public long getTimestamp() {
@@ -305,6 +316,11 @@ public ShardIndexingPressureStats getShardIndexingPressureStats() {
305316
return shardIndexingPressureStats;
306317
}
307318

319+
@Nullable
320+
public SearchBackpressureStats getSearchBackpressureStats() {
321+
return searchBackpressureStats;
322+
}
323+
308324
@Override
309325
public void writeTo(StreamOutput out) throws IOException {
310326
super.writeTo(out);
@@ -336,6 +352,9 @@ public void writeTo(StreamOutput out) throws IOException {
336352
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
337353
out.writeOptionalWriteable(shardIndexingPressureStats);
338354
}
355+
if (out.getVersion().onOrAfter(Version.V_2_4_0)) {
356+
out.writeOptionalWriteable(searchBackpressureStats);
357+
}
339358
}
340359

341360
@Override
@@ -408,6 +427,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
408427
if (getShardIndexingPressureStats() != null) {
409428
getShardIndexingPressureStats().toXContent(builder, params);
410429
}
430+
if (getSearchBackpressureStats() != null) {
431+
getSearchBackpressureStats().toXContent(builder, params);
432+
}
411433
return builder;
412434
}
413435
}

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,8 @@ public enum Metric {
237237
ADAPTIVE_SELECTION("adaptive_selection"),
238238
SCRIPT_CACHE("script_cache"),
239239
INDEXING_PRESSURE("indexing_pressure"),
240-
SHARD_INDEXING_PRESSURE("shard_indexing_pressure");
240+
SHARD_INDEXING_PRESSURE("shard_indexing_pressure"),
241+
SEARCH_BACKPRESSURE("search_backpressure");
241242

242243
private String metricName;
243244

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
118118
NodesStatsRequest.Metric.ADAPTIVE_SELECTION.containedIn(metrics),
119119
NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics),
120120
NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics),
121-
NodesStatsRequest.Metric.SHARD_INDEXING_PRESSURE.containedIn(metrics)
121+
NodesStatsRequest.Metric.SHARD_INDEXING_PRESSURE.containedIn(metrics),
122+
NodesStatsRequest.Metric.SEARCH_BACKPRESSURE.containedIn(metrics)
122123
);
123124
}
124125

server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java

+1
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
162162
false,
163163
false,
164164
false,
165+
false,
165166
false
166167
);
167168
List<ShardStats> shardsStats = new ArrayList<>();

server/src/main/java/org/opensearch/cluster/ClusterModule.java

-2
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@
9696
import org.opensearch.script.ScriptMetadata;
9797
import org.opensearch.snapshots.SnapshotsInfoService;
9898
import org.opensearch.tasks.Task;
99-
import org.opensearch.tasks.TaskResourceTrackingService;
10099
import org.opensearch.tasks.TaskResultsService;
101100

102101
import java.util.ArrayList;
@@ -402,7 +401,6 @@ protected void configure() {
402401
bind(NodeMappingRefreshAction.class).asEagerSingleton();
403402
bind(MappingUpdatedAction.class).asEagerSingleton();
404403
bind(TaskResultsService.class).asEagerSingleton();
405-
bind(TaskResourceTrackingService.class).asEagerSingleton();
406404
bind(AllocationDeciders.class).toInstance(allocationDeciders);
407405
bind(ShardsAllocator.class).toInstance(shardsAllocator);
408406
}

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@
4242
import org.opensearch.index.ShardIndexingPressureMemoryManager;
4343
import org.opensearch.index.ShardIndexingPressureSettings;
4444
import org.opensearch.index.ShardIndexingPressureStore;
45+
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
46+
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
47+
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
48+
import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
49+
import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
50+
import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
4551
import org.opensearch.tasks.TaskManager;
4652
import org.opensearch.tasks.TaskResourceTrackingService;
4753
import org.opensearch.watcher.ResourceWatcherService;
@@ -582,7 +588,22 @@ public void apply(Settings value, Settings current, Settings previous) {
582588
ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS,
583589
IndexingPressure.MAX_INDEXING_BYTES,
584590
TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED,
585-
TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED
591+
TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED,
592+
593+
// Settings related to search backpressure
594+
SearchBackpressureSettings.SETTING_MODE,
595+
SearchBackpressureSettings.SETTING_CANCELLATION_RATIO,
596+
SearchBackpressureSettings.SETTING_CANCELLATION_RATE,
597+
SearchBackpressureSettings.SETTING_CANCELLATION_BURST,
598+
NodeDuressSettings.SETTING_NUM_SUCCESSIVE_BREACHES,
599+
NodeDuressSettings.SETTING_CPU_THRESHOLD,
600+
NodeDuressSettings.SETTING_HEAP_THRESHOLD,
601+
SearchShardTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD,
602+
HeapUsageTracker.SETTING_HEAP_PERCENT_THRESHOLD,
603+
HeapUsageTracker.SETTING_HEAP_VARIANCE_THRESHOLD,
604+
HeapUsageTracker.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE,
605+
CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD,
606+
ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD
586607
)
587608
)
588609
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.util;
10+
11+
/**
12+
* MovingAverage is used to calculate the moving average of last 'n' observations.
13+
*
14+
* @opensearch.internal
15+
*/
16+
public class MovingAverage {
17+
private final int windowSize;
18+
private final long[] observations;
19+
20+
private long count = 0;
21+
private long sum = 0;
22+
private double average = 0;
23+
24+
public MovingAverage(int windowSize) {
25+
if (windowSize <= 0) {
26+
throw new IllegalArgumentException("window size must be greater than zero");
27+
}
28+
29+
this.windowSize = windowSize;
30+
this.observations = new long[windowSize];
31+
}
32+
33+
/**
34+
* Records a new observation and evicts the n-th last observation.
35+
*/
36+
public synchronized double record(long value) {
37+
long delta = value - observations[(int) (count % observations.length)];
38+
observations[(int) (count % observations.length)] = value;
39+
40+
count++;
41+
sum += delta;
42+
average = (double) sum / Math.min(count, observations.length);
43+
return average;
44+
}
45+
46+
public double getAverage() {
47+
return average;
48+
}
49+
50+
public long getCount() {
51+
return count;
52+
}
53+
54+
public boolean isReady() {
55+
return count >= windowSize;
56+
}
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.util;
10+
11+
import java.util.concurrent.atomic.AtomicInteger;
12+
13+
/**
14+
* Streak is a data structure that keeps track of the number of successive successful events.
15+
*
16+
* @opensearch.internal
17+
*/
18+
public class Streak {
19+
private final AtomicInteger successiveSuccessfulEvents = new AtomicInteger();
20+
21+
public int record(boolean isSuccessful) {
22+
if (isSuccessful) {
23+
return successiveSuccessfulEvents.incrementAndGet();
24+
} else {
25+
successiveSuccessfulEvents.set(0);
26+
return 0;
27+
}
28+
}
29+
30+
public int length() {
31+
return successiveSuccessfulEvents.get();
32+
}
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.util;
10+
11+
import java.util.Objects;
12+
import java.util.concurrent.atomic.AtomicReference;
13+
import java.util.function.LongSupplier;
14+
15+
/**
16+
* TokenBucket is used to limit the number of operations at a constant rate while allowing for short bursts.
17+
*
18+
* @opensearch.internal
19+
*/
20+
public class TokenBucket {
21+
/**
22+
* Defines a monotonically increasing counter.
23+
*
24+
* Usage examples:
25+
* 1. clock = System::nanoTime can be used to perform rate-limiting per unit time
26+
* 2. clock = AtomicLong::get can be used to perform rate-limiting per unit number of operations
27+
*/
28+
private final LongSupplier clock;
29+
30+
/**
31+
* Defines the number of tokens added to the bucket per clock cycle.
32+
*/
33+
private final double rate;
34+
35+
/**
36+
* Defines the capacity and the maximum number of operations that can be performed per clock cycle before
37+
* the bucket runs out of tokens.
38+
*/
39+
private final double burst;
40+
41+
/**
42+
* Defines the current state of the token bucket.
43+
*/
44+
private final AtomicReference<State> state;
45+
46+
public TokenBucket(LongSupplier clock, double rate, double burst) {
47+
this(clock, rate, burst, burst);
48+
}
49+
50+
public TokenBucket(LongSupplier clock, double rate, double burst, double initialTokens) {
51+
if (rate <= 0.0) {
52+
throw new IllegalArgumentException("rate must be greater than zero");
53+
}
54+
55+
if (burst <= 0.0) {
56+
throw new IllegalArgumentException("burst must be greater than zero");
57+
}
58+
59+
this.clock = clock;
60+
this.rate = rate;
61+
this.burst = burst;
62+
this.state = new AtomicReference<>(new State(Math.min(initialTokens, burst), clock.getAsLong()));
63+
}
64+
65+
/**
66+
* If there are enough tokens in the bucket, it requests/deducts 'n' tokens and returns true.
67+
* Otherwise, returns false and leaves the bucket untouched.
68+
*/
69+
public boolean request(double n) {
70+
if (n <= 0) {
71+
throw new IllegalArgumentException("requested tokens must be greater than zero");
72+
}
73+
74+
// Refill tokens
75+
State currentState, updatedState;
76+
do {
77+
currentState = state.get();
78+
long now = clock.getAsLong();
79+
double incr = (now - currentState.lastRefilledAt) * rate;
80+
updatedState = new State(Math.min(currentState.tokens + incr, burst), now);
81+
} while (state.compareAndSet(currentState, updatedState) == false);
82+
83+
// Deduct tokens
84+
do {
85+
currentState = state.get();
86+
if (currentState.tokens < n) {
87+
return false;
88+
}
89+
updatedState = new State(currentState.tokens - n, currentState.lastRefilledAt);
90+
} while (state.compareAndSet(currentState, updatedState) == false);
91+
92+
return true;
93+
}
94+
95+
public boolean request() {
96+
return request(1.0);
97+
}
98+
99+
/**
100+
* Represents an immutable token bucket state.
101+
*/
102+
private static class State {
103+
final double tokens;
104+
final long lastRefilledAt;
105+
106+
public State(double tokens, long lastRefilledAt) {
107+
this.tokens = tokens;
108+
this.lastRefilledAt = lastRefilledAt;
109+
}
110+
111+
@Override
112+
public boolean equals(Object o) {
113+
if (this == o) return true;
114+
if (o == null || getClass() != o.getClass()) return false;
115+
State state = (State) o;
116+
return Double.compare(state.tokens, tokens) == 0 && lastRefilledAt == state.lastRefilledAt;
117+
}
118+
119+
@Override
120+
public int hashCode() {
121+
return Objects.hash(tokens, lastRefilledAt);
122+
}
123+
}
124+
}

0 commit comments

Comments
 (0)