|
33 | 33 | * <p>
|
34 | 34 | * Set specific setting to for setting the threshold of throttling of particular task type.
|
35 | 35 | * e.g : Set "cluster_manager.throttling.thresholds.put_mapping" to set throttling limit of "put mapping" tasks,
|
36 |
| - * Set it to default value(-1) to disable the throttling for this task type. |
| 36 | + * Set it to default value(-1) to disable the throttling for this task type. |
37 | 37 | */
|
38 | 38 | public class ClusterManagerTaskThrottler implements TaskBatcherListener {
|
39 | 39 | private static final Logger logger = LogManager.getLogger(ClusterManagerTaskThrottler.class);
|
@@ -69,7 +69,7 @@ public class ClusterManagerTaskThrottler implements TaskBatcherListener {
|
69 | 69 | private final int MIN_THRESHOLD_VALUE = -1; // Disabled throttling
|
70 | 70 | private final ClusterManagerTaskThrottlerListener clusterManagerTaskThrottlerListener;
|
71 | 71 |
|
72 |
| - private final ConcurrentMap<String, Long> tasksCount; |
| 72 | + final ConcurrentMap<String, Long> tasksCount; |
73 | 73 | private final ConcurrentMap<String, Long> tasksThreshold;
|
74 | 74 | private final Supplier<Version> minNodeVersionSupplier;
|
75 | 75 |
|
@@ -209,30 +209,59 @@ Long getThrottlingLimit(final String taskKey) {
|
209 | 209 | return tasksThreshold.get(taskKey);
|
210 | 210 | }
|
211 | 211 |
|
| 212 | + private void failFastWhenThrottlingThresholdsAreAlreadyBreached( |
| 213 | + final boolean throttlingEnabledWithThreshold, |
| 214 | + final Long threshold, |
| 215 | + final long existingTaskCount, |
| 216 | + final int incomingTaskCount, |
| 217 | + final String taskThrottlingKey |
| 218 | + ) { |
| 219 | + if (throttlingEnabledWithThreshold && shouldThrottle(threshold, existingTaskCount, incomingTaskCount)) { |
| 220 | + throw new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for " + taskThrottlingKey); |
| 221 | + } |
| 222 | + } |
| 223 | + |
212 | 224 | @Override
|
213 | 225 | public void onBeginSubmit(List<? extends TaskBatcher.BatchedTask> tasks) {
|
214 |
| - ThrottlingKey clusterManagerThrottlingKey = ((ClusterStateTaskExecutor<Object>) tasks.get(0).batchingKey) |
| 226 | + final ThrottlingKey clusterManagerThrottlingKey = ((ClusterStateTaskExecutor<Object>) tasks.get(0).batchingKey) |
215 | 227 | .getClusterManagerThrottlingKey();
|
216 |
| - tasksCount.putIfAbsent(clusterManagerThrottlingKey.getTaskThrottlingKey(), 0L); |
217 |
| - tasksCount.computeIfPresent(clusterManagerThrottlingKey.getTaskThrottlingKey(), (key, count) -> { |
218 |
| - int size = tasks.size(); |
219 |
| - if (clusterManagerThrottlingKey.isThrottlingEnabled()) { |
220 |
| - Long threshold = tasksThreshold.get(clusterManagerThrottlingKey.getTaskThrottlingKey()); |
221 |
| - if (threshold != null && shouldThrottle(threshold, count, size)) { |
222 |
| - clusterManagerTaskThrottlerListener.onThrottle(clusterManagerThrottlingKey.getTaskThrottlingKey(), size); |
223 |
| - logger.warn( |
224 |
| - "Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]", |
225 |
| - clusterManagerThrottlingKey.getTaskThrottlingKey(), |
226 |
| - tasks.size(), |
227 |
| - threshold |
228 |
| - ); |
229 |
| - throw new ClusterManagerThrottlingException( |
230 |
| - "Throttling Exception : Limit exceeded for " + clusterManagerThrottlingKey.getTaskThrottlingKey() |
231 |
| - ); |
232 |
| - } |
233 |
| - } |
234 |
| - return count + size; |
235 |
| - }); |
| 228 | + final String taskThrottlingKey = clusterManagerThrottlingKey.getTaskThrottlingKey(); |
| 229 | + final Long threshold = getThrottlingLimit(taskThrottlingKey); |
| 230 | + final boolean isThrottlingEnabledWithThreshold = clusterManagerThrottlingKey.isThrottlingEnabled() && threshold != null; |
| 231 | + int incomingTaskCount = tasks.size(); |
| 232 | + |
| 233 | + try { |
| 234 | + tasksCount.putIfAbsent(taskThrottlingKey, 0L); |
| 235 | + // Perform shallow check before acquiring lock to avoid blocking of network threads |
| 236 | + // if throttling is ongoing for a specific task |
| 237 | + failFastWhenThrottlingThresholdsAreAlreadyBreached( |
| 238 | + isThrottlingEnabledWithThreshold, |
| 239 | + threshold, |
| 240 | + tasksCount.get(taskThrottlingKey), |
| 241 | + incomingTaskCount, |
| 242 | + taskThrottlingKey |
| 243 | + ); |
| 244 | + |
| 245 | + tasksCount.computeIfPresent(taskThrottlingKey, (key, existingTaskCount) -> { |
| 246 | + failFastWhenThrottlingThresholdsAreAlreadyBreached( |
| 247 | + isThrottlingEnabledWithThreshold, |
| 248 | + threshold, |
| 249 | + existingTaskCount, |
| 250 | + incomingTaskCount, |
| 251 | + taskThrottlingKey |
| 252 | + ); |
| 253 | + return existingTaskCount + incomingTaskCount; |
| 254 | + }); |
| 255 | + } catch (final ClusterManagerThrottlingException e) { |
| 256 | + clusterManagerTaskThrottlerListener.onThrottle(taskThrottlingKey, incomingTaskCount); |
| 257 | + logger.trace( |
| 258 | + "Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]", |
| 259 | + taskThrottlingKey, |
| 260 | + incomingTaskCount, |
| 261 | + threshold |
| 262 | + ); |
| 263 | + throw e; |
| 264 | + } |
236 | 265 | }
|
237 | 266 |
|
238 | 267 | /**
|
|
0 commit comments