Skip to content

Commit afd3969

Browse files
authored
PrimaryShardAllocator refactor to abstract out shard state and method calls (opensearch-project#9760)
* PrimaryShardAllocator refactor to abstract out shard state and method calls Signed-off-by: Shivansh Arora <shivansh.arora@protonmail.com> Signed-off-by: Shivansh Arora <hishiv@amazon.com>
1 parent 1e9c05c commit afd3969

File tree

1 file changed

+83
-36
lines changed

1 file changed

+83
-36
lines changed

server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java

+83-36
Original file line numberDiff line numberDiff line change
@@ -81,27 +81,28 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
8181
/**
8282
* Is the allocator responsible for allocating the given {@link ShardRouting}?
8383
*/
84-
private static boolean isResponsibleFor(final ShardRouting shard) {
84+
protected static boolean isResponsibleFor(final ShardRouting shard) {
8585
return shard.primary() // must be primary
8686
&& shard.unassigned() // must be unassigned
8787
// only handle either an existing store or a snapshot recovery
8888
&& (shard.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE
8989
|| shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT);
9090
}
9191

92-
@Override
93-
public AllocateUnassignedDecision makeAllocationDecision(
94-
final ShardRouting unassignedShard,
95-
final RoutingAllocation allocation,
96-
final Logger logger
97-
) {
92+
/**
93+
* Skip doing fetchData call for a shard if recovery mode is snapshot. Also do not take decision if allocator is
94+
* not responsible for this particular shard.
95+
*
96+
* @param unassignedShard unassigned shard routing
97+
* @param allocation routing allocation object
98+
* @return allocation decision taken for this shard
99+
*/
100+
protected AllocateUnassignedDecision getInEligibleShardDecision(ShardRouting unassignedShard, RoutingAllocation allocation) {
98101
if (isResponsibleFor(unassignedShard) == false) {
99102
// this allocator is not responsible for allocating this shard
100103
return AllocateUnassignedDecision.NOT_TAKEN;
101104
}
102-
103105
final boolean explain = allocation.debugDecision();
104-
105106
if (unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT
106107
&& allocation.snapshotShardSizeInfo().getShardSize(unassignedShard) == null) {
107108
List<NodeAllocationResult> nodeDecisions = null;
@@ -110,17 +111,52 @@ public AllocateUnassignedDecision makeAllocationDecision(
110111
}
111112
return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions);
112113
}
114+
return null;
115+
}
113116

117+
@Override
118+
public AllocateUnassignedDecision makeAllocationDecision(
119+
final ShardRouting unassignedShard,
120+
final RoutingAllocation allocation,
121+
final Logger logger
122+
) {
123+
AllocateUnassignedDecision decision = getInEligibleShardDecision(unassignedShard, allocation);
124+
if (decision != null) {
125+
return decision;
126+
}
114127
final FetchResult<NodeGatewayStartedShards> shardState = fetchData(unassignedShard, allocation);
115-
if (shardState.hasData() == false) {
128+
List<NodeGatewayStartedShards> nodeShardStates = adaptToNodeStartedShardList(shardState);
129+
return getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger);
130+
}
131+
132+
/**
133+
* Transforms {@link FetchResult} of {@link NodeGatewayStartedShards} to {@link List} of {@link NodeGatewayStartedShards}
134+
* Returns null if {@link FetchResult} does not have any data.
135+
*/
136+
private static List<NodeGatewayStartedShards> adaptToNodeStartedShardList(FetchResult<NodeGatewayStartedShards> shardsState) {
137+
if (!shardsState.hasData()) {
138+
return null;
139+
}
140+
List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
141+
shardsState.getData().forEach((node, nodeGatewayStartedShard) -> { nodeShardStates.add(nodeGatewayStartedShard); });
142+
return nodeShardStates;
143+
}
144+
145+
protected AllocateUnassignedDecision getAllocationDecision(
146+
ShardRouting unassignedShard,
147+
RoutingAllocation allocation,
148+
List<NodeGatewayStartedShards> shardState,
149+
Logger logger
150+
) {
151+
final boolean explain = allocation.debugDecision();
152+
if (shardState == null) {
116153
allocation.setHasPendingAsyncFetch();
117154
List<NodeAllocationResult> nodeDecisions = null;
118155
if (explain) {
119156
nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation);
120157
}
121158
return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions);
122159
}
123-
124160
// don't create a new IndexSetting object for every shard as this could cause a lot of garbage
125161
// on cluster restart if we allocate a boat load of shards
126162
final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(unassignedShard.index());
@@ -260,11 +296,11 @@ public AllocateUnassignedDecision makeAllocationDecision(
260296
*/
261297
private static List<NodeAllocationResult> buildNodeDecisions(
262298
NodesToAllocate nodesToAllocate,
263-
FetchResult<NodeGatewayStartedShards> fetchedShardData,
299+
List<NodeGatewayStartedShards> fetchedShardData,
264300
Set<String> inSyncAllocationIds
265301
) {
266302
List<NodeAllocationResult> nodeResults = new ArrayList<>();
267-
Collection<NodeGatewayStartedShards> ineligibleShards;
303+
Collection<NodeGatewayStartedShards> ineligibleShards = new ArrayList<>();
268304
if (nodesToAllocate != null) {
269305
final Set<DiscoveryNode> discoNodes = new HashSet<>();
270306
nodeResults.addAll(
@@ -280,15 +316,13 @@ private static List<NodeAllocationResult> buildNodeDecisions(
280316
})
281317
.collect(Collectors.toList())
282318
);
283-
ineligibleShards = fetchedShardData.getData()
284-
.values()
285-
.stream()
319+
ineligibleShards = fetchedShardData.stream()
286320
.filter(shardData -> discoNodes.contains(shardData.getNode()) == false)
287321
.collect(Collectors.toList());
288322
} else {
289323
// there were no shard copies that were eligible for being assigned the allocation,
290324
// so all fetched shard data are ineligible shards
291-
ineligibleShards = fetchedShardData.getData().values();
325+
ineligibleShards = fetchedShardData;
292326
}
293327

294328
nodeResults.addAll(
@@ -328,12 +362,12 @@ protected static NodeShardsResult buildNodeShardsResult(
328362
boolean matchAnyShard,
329363
Set<String> ignoreNodes,
330364
Set<String> inSyncAllocationIds,
331-
FetchResult<NodeGatewayStartedShards> shardState,
365+
List<NodeGatewayStartedShards> shardState,
332366
Logger logger
333367
) {
334368
List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
335369
int numberOfAllocationsFound = 0;
336-
for (NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {
370+
for (NodeGatewayStartedShards nodeShardState : shardState) {
337371
DiscoveryNode node = nodeShardState.getNode();
338372
String allocationId = nodeShardState.allocationId();
339373

@@ -386,11 +420,27 @@ protected static NodeShardsResult buildNodeShardsResult(
386420
}
387421
}
388422

389-
/*
390-
Orders the active shards copies based on below comparators
391-
1. No store exception i.e. shard copy is readable
392-
2. Prefer previous primary shard
393-
3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices.
423+
nodeShardStates.sort(createActiveShardComparator(matchAnyShard, inSyncAllocationIds));
424+
425+
if (logger.isTraceEnabled()) {
426+
logger.trace(
427+
"{} candidates for allocation: {}",
428+
shard,
429+
nodeShardStates.stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", "))
430+
);
431+
}
432+
return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound);
433+
}
434+
435+
private static Comparator<NodeGatewayStartedShards> createActiveShardComparator(
436+
boolean matchAnyShard,
437+
Set<String> inSyncAllocationIds
438+
) {
439+
/**
440+
* Orders the active shards copies based on below comparators
441+
* 1. No store exception i.e. shard copy is readable
442+
* 2. Prefer previous primary shard
443+
* 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices.
394444
*/
395445
final Comparator<NodeGatewayStartedShards> comparator; // allocation preference
396446
if (matchAnyShard) {
@@ -406,16 +456,7 @@ protected static NodeShardsResult buildNodeShardsResult(
406456
.thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR);
407457
}
408458

409-
nodeShardStates.sort(comparator);
410-
411-
if (logger.isTraceEnabled()) {
412-
logger.trace(
413-
"{} candidates for allocation: {}",
414-
shard,
415-
nodeShardStates.stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", "))
416-
);
417-
}
418-
return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound);
459+
return comparator;
419460
}
420461

421462
/**
@@ -457,7 +498,10 @@ private static NodesToAllocate buildNodesToAllocate(
457498

458499
protected abstract FetchResult<NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation);
459500

460-
private static class NodeShardsResult {
501+
/**
502+
* This class encapsulates the result of a call to {@link #buildNodeShardsResult}
503+
*/
504+
static class NodeShardsResult {
461505
final List<NodeGatewayStartedShards> orderedAllocationCandidates;
462506
final int allocationsFound;
463507

@@ -467,7 +511,10 @@ private static class NodeShardsResult {
467511
}
468512
}
469513

470-
static class NodesToAllocate {
514+
/**
515+
* This class encapsulates the result of a call to {@link #buildNodesToAllocate}
516+
*/
517+
protected static class NodesToAllocate {
471518
final List<DecidedNode> yesNodeShards;
472519
final List<DecidedNode> throttleNodeShards;
473520
final List<DecidedNode> noNodeShards;

0 commit comments

Comments
 (0)