44
44
import org .opensearch .common .settings .Setting .Property ;
45
45
import org .opensearch .common .settings .Settings ;
46
46
47
- import java .util .Locale ;
48
47
import java .util .function .BiFunction ;
49
48
50
49
import static org .opensearch .cluster .routing .allocation .decider .Decision .THROTTLE ;
@@ -211,20 +210,9 @@ private Decision allocateInitialShardCopies(ShardRouting shardRouting, RoutingNo
211
210
allocation ,
212
211
currentInRecoveries ,
213
212
replicasInitialRecoveries ,
214
- ( x , y ) -> getInitialPrimaryNodeOutgoingRecoveries ( x , y ) ,
213
+ this :: getInitialPrimaryNodeOutgoingRecoveries ,
215
214
replicasInitialRecoveries ,
216
- String .format (
217
- Locale .ROOT ,
218
- "[%s=%d]" ,
219
- CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING .getKey (),
220
- replicasInitialRecoveries
221
- ),
222
- String .format (
223
- Locale .ROOT ,
224
- "[%s=%d]" ,
225
- CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING .getKey (),
226
- replicasInitialRecoveries
227
- )
215
+ true
228
216
);
229
217
}
230
218
@@ -238,22 +226,9 @@ private Decision allocateNonInitialShardCopies(ShardRouting shardRouting, Routin
238
226
allocation ,
239
227
currentInRecoveries ,
240
228
concurrentIncomingRecoveries ,
241
- ( x , y ) -> getPrimaryNodeOutgoingRecoveries ( x , y ) ,
229
+ this :: getPrimaryNodeOutgoingRecoveries ,
242
230
concurrentOutgoingRecoveries ,
243
- String .format (
244
- Locale .ROOT ,
245
- "[%s=%d] (can also be set via [%s])" ,
246
- CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING .getKey (),
247
- concurrentIncomingRecoveries ,
248
- CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING .getKey ()
249
- ),
250
- String .format (
251
- Locale .ROOT ,
252
- "[%s=%d] (can also be set via [%s])" ,
253
- CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING .getKey (),
254
- concurrentOutgoingRecoveries ,
255
- CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING .getKey ()
256
- )
231
+ false
257
232
);
258
233
}
259
234
@@ -274,18 +249,30 @@ private Decision allocateShardCopies(
274
249
int inRecoveriesLimit ,
275
250
BiFunction <ShardRouting , RoutingAllocation , Integer > primaryNodeOutRecoveriesFunc ,
276
251
int outRecoveriesLimit ,
277
- String incomingRecoveriesSettingMsg ,
278
- String outGoingRecoveriesSettingMsg
252
+ boolean isInitialShardCopies
279
253
) {
280
254
// Allocating a shard to this node will increase the incoming recoveries
281
255
if (currentInRecoveries >= inRecoveriesLimit ) {
282
- return allocation .decision (
283
- THROTTLE ,
284
- NAME ,
285
- "reached the limit of incoming shard recoveries [%d], cluster setting %s" ,
286
- currentInRecoveries ,
287
- incomingRecoveriesSettingMsg
288
- );
256
+ if (isInitialShardCopies ) {
257
+ return allocation .decision (
258
+ THROTTLE ,
259
+ NAME ,
260
+ "reached the limit of incoming shard recoveries [%d], cluster setting [%s=%d]" ,
261
+ currentInRecoveries ,
262
+ CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING .getKey (),
263
+ inRecoveriesLimit
264
+ );
265
+ } else {
266
+ return allocation .decision (
267
+ THROTTLE ,
268
+ NAME ,
269
+ "reached the limit of incoming shard recoveries [%d], cluster setting [%s=%d] (can also be set via [%s])" ,
270
+ currentInRecoveries ,
271
+ CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING .getKey (),
272
+ inRecoveriesLimit ,
273
+ CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING .getKey ()
274
+ );
275
+ }
289
276
} else {
290
277
// search for corresponding recovery source (= primary shard) and check number of outgoing recoveries on that node
291
278
ShardRouting primaryShard = allocation .routingNodes ().activePrimary (shardRouting .shardId ());
@@ -294,14 +281,30 @@ private Decision allocateShardCopies(
294
281
}
295
282
int primaryNodeOutRecoveries = primaryNodeOutRecoveriesFunc .apply (shardRouting , allocation );
296
283
if (primaryNodeOutRecoveries >= outRecoveriesLimit ) {
297
- return allocation .decision (
298
- THROTTLE ,
299
- NAME ,
300
- "reached the limit of outgoing shard recoveries [%d] on the node [%s] which holds the primary, " + "cluster setting %s" ,
301
- primaryNodeOutRecoveries ,
302
- primaryShard .currentNodeId (),
303
- outGoingRecoveriesSettingMsg
304
- );
284
+ if (isInitialShardCopies ) {
285
+ return allocation .decision (
286
+ THROTTLE ,
287
+ NAME ,
288
+ "reached the limit of outgoing shard recoveries [%d] on the node [%s] which holds the primary, "
289
+ + "cluster setting [%s=%d]" ,
290
+ primaryNodeOutRecoveries ,
291
+ primaryShard .currentNodeId (),
292
+ CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING .getKey (),
293
+ inRecoveriesLimit
294
+ );
295
+ } else {
296
+ return allocation .decision (
297
+ THROTTLE ,
298
+ NAME ,
299
+ "reached the limit of outgoing shard recoveries [%d] on the node [%s] which holds the primary, "
300
+ + "cluster setting [%s=%d] (can also be set via [%s])" ,
301
+ primaryNodeOutRecoveries ,
302
+ primaryShard .currentNodeId (),
303
+ CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING .getKey (),
304
+ outRecoveriesLimit ,
305
+ CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING .getKey ()
306
+ );
307
+ }
305
308
} else {
306
309
return allocation .decision (
307
310
YES ,
0 commit comments