22
22
import java .util .Set ;
23
23
import java .util .concurrent .ConcurrentHashMap ;
24
24
import java .util .concurrent .ConcurrentMap ;
25
+ import java .util .concurrent .atomic .AtomicBoolean ;
25
26
import java .util .function .Supplier ;
26
27
27
28
/**
@@ -51,6 +52,11 @@ public class ClusterManagerTaskThrottler implements TaskBatcherListener {
51
52
private final ConcurrentMap <String , Long > tasksThreshold ;
52
53
private final Supplier <Version > minNodeVersionSupplier ;
53
54
55
+ // Once all nodes are greater than or equal 2.5.0 version, then only it will start throttling.
56
+ // During upgrade as well, it will wait for all older version nodes to leave the cluster before starting throttling.
57
+ // This is needed specifically for static setting to enable throttling.
58
+ private AtomicBoolean startThrottling = new AtomicBoolean ();
59
+
54
60
public ClusterManagerTaskThrottler (
55
61
final Settings settings ,
56
62
final ClusterSettings clusterSettings ,
@@ -168,7 +174,7 @@ public void onBeginSubmit(List<? extends TaskBatcher.BatchedTask> tasks) {
168
174
int size = tasks .size ();
169
175
if (clusterManagerThrottlingKey .isThrottlingEnabled ()) {
170
176
Long threshold = tasksThreshold .get (clusterManagerThrottlingKey .getTaskThrottlingKey ());
171
- if (threshold != null && ( count + size > threshold )) {
177
+ if (threshold != null && shouldThrottle ( threshold , count , size )) {
172
178
clusterManagerTaskThrottlerListener .onThrottle (clusterManagerThrottlingKey .getTaskThrottlingKey (), size );
173
179
logger .warn (
174
180
"Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]" ,
@@ -185,6 +191,28 @@ public void onBeginSubmit(List<? extends TaskBatcher.BatchedTask> tasks) {
185
191
});
186
192
}
187
193
194
+ /**
195
+ * If throttling thresholds are set via static setting, it will update the threshold map.
196
+ * It may start throwing throttling exception to older nodes in cluster.
197
+ * Older version nodes will not be equipped to handle the throttling exception and
198
+ * this may result in unexpected behavior where internal tasks would start failing without any retries.
199
+ *
200
+ * For every task submission request, it will validate if nodes version is greater or equal to 2.5.0 and set the startThrottling flag.
201
+ * Once the startThrottling flag is set, it will not perform check for next set of tasks.
202
+ */
203
+ private boolean shouldThrottle (Long threshold , Long count , int size ) {
204
+ if (!startThrottling .get ()) {
205
+ if (minNodeVersionSupplier .get ().compareTo (Version .V_2_5_0 ) >= 0 ) {
206
+ startThrottling .compareAndSet (false , true );
207
+ logger .info ("Starting cluster manager throttling as all nodes are higher than or equal to 2.5.0" );
208
+ } else {
209
+ logger .info ("Skipping cluster manager throttling as at least one node < 2.5.0 is present in cluster" );
210
+ return false ;
211
+ }
212
+ }
213
+ return count + size > threshold ;
214
+ }
215
+
188
216
@ Override
189
217
public void onSubmitFailure (List <? extends TaskBatcher .BatchedTask > tasks ) {
190
218
reduceTaskCount (tasks );
0 commit comments