@@ -81,27 +81,28 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
81
81
/**
82
82
* Is the allocator responsible for allocating the given {@link ShardRouting}?
83
83
*/
84
- private static boolean isResponsibleFor (final ShardRouting shard ) {
84
+ protected static boolean isResponsibleFor (final ShardRouting shard ) {
85
85
return shard .primary () // must be primary
86
86
&& shard .unassigned () // must be unassigned
87
87
// only handle either an existing store or a snapshot recovery
88
88
&& (shard .recoverySource ().getType () == RecoverySource .Type .EXISTING_STORE
89
89
|| shard .recoverySource ().getType () == RecoverySource .Type .SNAPSHOT );
90
90
}
91
91
92
- @ Override
93
- public AllocateUnassignedDecision makeAllocationDecision (
94
- final ShardRouting unassignedShard ,
95
- final RoutingAllocation allocation ,
96
- final Logger logger
97
- ) {
92
+ /**
93
+ * Skip doing fetchData call for a shard if recovery mode is snapshot. Also do not take decision if allocator is
94
+ * not responsible for this particular shard.
95
+ *
96
+ * @param unassignedShard unassigned shard routing
97
+ * @param allocation routing allocation object
98
+ * @return allocation decision taken for this shard
99
+ */
100
+ protected AllocateUnassignedDecision getInEligibleShardDecision (ShardRouting unassignedShard , RoutingAllocation allocation ) {
98
101
if (isResponsibleFor (unassignedShard ) == false ) {
99
102
// this allocator is not responsible for allocating this shard
100
103
return AllocateUnassignedDecision .NOT_TAKEN ;
101
104
}
102
-
103
105
final boolean explain = allocation .debugDecision ();
104
-
105
106
if (unassignedShard .recoverySource ().getType () == RecoverySource .Type .SNAPSHOT
106
107
&& allocation .snapshotShardSizeInfo ().getShardSize (unassignedShard ) == null ) {
107
108
List <NodeAllocationResult > nodeDecisions = null ;
@@ -110,17 +111,52 @@ public AllocateUnassignedDecision makeAllocationDecision(
110
111
}
111
112
return AllocateUnassignedDecision .no (UnassignedInfo .AllocationStatus .FETCHING_SHARD_DATA , nodeDecisions );
112
113
}
114
+ return null ;
115
+ }
113
116
117
+ @ Override
118
+ public AllocateUnassignedDecision makeAllocationDecision (
119
+ final ShardRouting unassignedShard ,
120
+ final RoutingAllocation allocation ,
121
+ final Logger logger
122
+ ) {
123
+ AllocateUnassignedDecision decision = getInEligibleShardDecision (unassignedShard , allocation );
124
+ if (decision != null ) {
125
+ return decision ;
126
+ }
114
127
final FetchResult <NodeGatewayStartedShards > shardState = fetchData (unassignedShard , allocation );
115
- if (shardState .hasData () == false ) {
128
+ List <NodeGatewayStartedShards > nodeShardStates = adaptToNodeStartedShardList (shardState );
129
+ return getAllocationDecision (unassignedShard , allocation , nodeShardStates , logger );
130
+ }
131
+
132
+ /**
133
+ * Transforms {@link FetchResult} of {@link NodeGatewayStartedShards} to {@link List} of {@link NodeGatewayStartedShards}
134
+ * Returns null if {@link FetchResult} does not have any data.
135
+ */
136
+ private static List <NodeGatewayStartedShards > adaptToNodeStartedShardList (FetchResult <NodeGatewayStartedShards > shardsState ) {
137
+ if (!shardsState .hasData ()) {
138
+ return null ;
139
+ }
140
+ List <NodeGatewayStartedShards > nodeShardStates = new ArrayList <>();
141
+ shardsState .getData ().forEach ((node , nodeGatewayStartedShard ) -> { nodeShardStates .add (nodeGatewayStartedShard ); });
142
+ return nodeShardStates ;
143
+ }
144
+
145
+ protected AllocateUnassignedDecision getAllocationDecision (
146
+ ShardRouting unassignedShard ,
147
+ RoutingAllocation allocation ,
148
+ List <NodeGatewayStartedShards > shardState ,
149
+ Logger logger
150
+ ) {
151
+ final boolean explain = allocation .debugDecision ();
152
+ if (shardState == null ) {
116
153
allocation .setHasPendingAsyncFetch ();
117
154
List <NodeAllocationResult > nodeDecisions = null ;
118
155
if (explain ) {
119
156
nodeDecisions = buildDecisionsForAllNodes (unassignedShard , allocation );
120
157
}
121
158
return AllocateUnassignedDecision .no (AllocationStatus .FETCHING_SHARD_DATA , nodeDecisions );
122
159
}
123
-
124
160
// don't create a new IndexSetting object for every shard as this could cause a lot of garbage
125
161
// on cluster restart if we allocate a boat load of shards
126
162
final IndexMetadata indexMetadata = allocation .metadata ().getIndexSafe (unassignedShard .index ());
@@ -260,11 +296,11 @@ public AllocateUnassignedDecision makeAllocationDecision(
260
296
*/
261
297
private static List <NodeAllocationResult > buildNodeDecisions (
262
298
NodesToAllocate nodesToAllocate ,
263
- FetchResult <NodeGatewayStartedShards > fetchedShardData ,
299
+ List <NodeGatewayStartedShards > fetchedShardData ,
264
300
Set <String > inSyncAllocationIds
265
301
) {
266
302
List <NodeAllocationResult > nodeResults = new ArrayList <>();
267
- Collection <NodeGatewayStartedShards > ineligibleShards ;
303
+ Collection <NodeGatewayStartedShards > ineligibleShards = new ArrayList <>() ;
268
304
if (nodesToAllocate != null ) {
269
305
final Set <DiscoveryNode > discoNodes = new HashSet <>();
270
306
nodeResults .addAll (
@@ -280,15 +316,13 @@ private static List<NodeAllocationResult> buildNodeDecisions(
280
316
})
281
317
.collect (Collectors .toList ())
282
318
);
283
- ineligibleShards = fetchedShardData .getData ()
284
- .values ()
285
- .stream ()
319
+ ineligibleShards = fetchedShardData .stream ()
286
320
.filter (shardData -> discoNodes .contains (shardData .getNode ()) == false )
287
321
.collect (Collectors .toList ());
288
322
} else {
289
323
// there were no shard copies that were eligible for being assigned the allocation,
290
324
// so all fetched shard data are ineligible shards
291
- ineligibleShards = fetchedShardData . getData (). values () ;
325
+ ineligibleShards = fetchedShardData ;
292
326
}
293
327
294
328
nodeResults .addAll (
@@ -328,12 +362,12 @@ protected static NodeShardsResult buildNodeShardsResult(
328
362
boolean matchAnyShard ,
329
363
Set <String > ignoreNodes ,
330
364
Set <String > inSyncAllocationIds ,
331
- FetchResult <NodeGatewayStartedShards > shardState ,
365
+ List <NodeGatewayStartedShards > shardState ,
332
366
Logger logger
333
367
) {
334
368
List <NodeGatewayStartedShards > nodeShardStates = new ArrayList <>();
335
369
int numberOfAllocationsFound = 0 ;
336
- for (NodeGatewayStartedShards nodeShardState : shardState . getData (). values () ) {
370
+ for (NodeGatewayStartedShards nodeShardState : shardState ) {
337
371
DiscoveryNode node = nodeShardState .getNode ();
338
372
String allocationId = nodeShardState .allocationId ();
339
373
@@ -386,11 +420,27 @@ protected static NodeShardsResult buildNodeShardsResult(
386
420
}
387
421
}
388
422
389
- /*
390
- Orders the active shards copies based on below comparators
391
- 1. No store exception i.e. shard copy is readable
392
- 2. Prefer previous primary shard
393
- 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices.
423
+ nodeShardStates .sort (createActiveShardComparator (matchAnyShard , inSyncAllocationIds ));
424
+
425
+ if (logger .isTraceEnabled ()) {
426
+ logger .trace (
427
+ "{} candidates for allocation: {}" ,
428
+ shard ,
429
+ nodeShardStates .stream ().map (s -> s .getNode ().getName ()).collect (Collectors .joining (", " ))
430
+ );
431
+ }
432
+ return new NodeShardsResult (nodeShardStates , numberOfAllocationsFound );
433
+ }
434
+
435
+ private static Comparator <NodeGatewayStartedShards > createActiveShardComparator (
436
+ boolean matchAnyShard ,
437
+ Set <String > inSyncAllocationIds
438
+ ) {
439
+ /**
440
+ * Orders the active shards copies based on below comparators
441
+ * 1. No store exception i.e. shard copy is readable
442
+ * 2. Prefer previous primary shard
443
+ * 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices.
394
444
*/
395
445
final Comparator <NodeGatewayStartedShards > comparator ; // allocation preference
396
446
if (matchAnyShard ) {
@@ -406,16 +456,7 @@ protected static NodeShardsResult buildNodeShardsResult(
406
456
.thenComparing (HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR );
407
457
}
408
458
409
- nodeShardStates .sort (comparator );
410
-
411
- if (logger .isTraceEnabled ()) {
412
- logger .trace (
413
- "{} candidates for allocation: {}" ,
414
- shard ,
415
- nodeShardStates .stream ().map (s -> s .getNode ().getName ()).collect (Collectors .joining (", " ))
416
- );
417
- }
418
- return new NodeShardsResult (nodeShardStates , numberOfAllocationsFound );
459
+ return comparator ;
419
460
}
420
461
421
462
/**
@@ -457,7 +498,10 @@ private static NodesToAllocate buildNodesToAllocate(
457
498
458
499
protected abstract FetchResult <NodeGatewayStartedShards > fetchData (ShardRouting shard , RoutingAllocation allocation );
459
500
460
- private static class NodeShardsResult {
501
+ /**
502
+ * This class encapsulates the result of a call to {@link #buildNodeShardsResult}
503
+ */
504
+ static class NodeShardsResult {
461
505
final List <NodeGatewayStartedShards > orderedAllocationCandidates ;
462
506
final int allocationsFound ;
463
507
@@ -467,7 +511,10 @@ private static class NodeShardsResult {
467
511
}
468
512
}
469
513
470
- static class NodesToAllocate {
514
+ /**
515
+ * This class encapsulates the result of a call to {@link #buildNodesToAllocate}
516
+ */
517
+ protected static class NodesToAllocate {
471
518
final List <DecidedNode > yesNodeShards ;
472
519
final List <DecidedNode > throttleNodeShards ;
473
520
final List <DecidedNode > noNodeShards ;
0 commit comments