Skip to content

Commit ac0df36

Browse files
amkharshiv0408
authored andcommitted
Add PrimaryShardBatchAllocator to take allocation decisions for a batch of shards (opensearch-project#8916)
* Add PrimaryShardBatchAllocator to take allocation decisions for a batch of shards Signed-off-by: Aman Khare <amkhar@amazon.com> Signed-off-by: Shivansh Arora <hishiv@amazon.com>
1 parent 73fcf07 commit ac0df36

File tree

10 files changed

+754
-268
lines changed

10 files changed

+754
-268
lines changed

server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java

+16-20
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.opensearch.core.index.Index;
6363
import org.opensearch.core.index.shard.ShardId;
6464
import org.opensearch.env.NodeEnvironment;
65+
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard;
6566
import org.opensearch.index.IndexService;
6667
import org.opensearch.index.IndexSettings;
6768
import org.opensearch.index.MergePolicyProvider;
@@ -729,11 +730,11 @@ public Settings onNodeStopped(String nodeName) throws Exception {
729730
);
730731

731732
assertThat(response.getNodes(), hasSize(1));
732-
assertThat(response.getNodes().get(0).allocationId(), notNullValue());
733+
assertThat(response.getNodes().get(0).getGatewayShardStarted().allocationId(), notNullValue());
733734
if (corrupt) {
734-
assertThat(response.getNodes().get(0).storeException(), notNullValue());
735+
assertThat(response.getNodes().get(0).getGatewayShardStarted().storeException(), notNullValue());
735736
} else {
736-
assertThat(response.getNodes().get(0).storeException(), nullValue());
737+
assertThat(response.getNodes().get(0).getGatewayShardStarted().storeException(), nullValue());
737738
}
738739

739740
// start another node so cluster consistency checks won't time out due to the lack of state
@@ -773,11 +774,11 @@ public void testSingleShardFetchUsingBatchAction() {
773774
);
774775
final Index index = resolveIndex(indexName);
775776
final ShardId shardId = new ShardId(index, 0);
776-
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards = response.getNodesMap()
777+
GatewayStartedShard gatewayStartedShard = response.getNodesMap()
777778
.get(searchShardsResponse.getNodes()[0].getId())
778779
.getNodeGatewayStartedShardsBatch()
779780
.get(shardId);
780-
assertNodeGatewayStartedShardsHappyCase(nodeGatewayStartedShards);
781+
assertNodeGatewayStartedShardsHappyCase(gatewayStartedShard);
781782
}
782783

783784
public void testShardFetchMultiNodeMultiIndexesUsingBatchAction() {
@@ -801,11 +802,8 @@ public void testShardFetchMultiNodeMultiIndexesUsingBatchAction() {
801802
ShardId shardId = clusterSearchShardsGroup.getShardId();
802803
assertEquals(1, clusterSearchShardsGroup.getShards().length);
803804
String nodeId = clusterSearchShardsGroup.getShards()[0].currentNodeId();
804-
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards = response.getNodesMap()
805-
.get(nodeId)
806-
.getNodeGatewayStartedShardsBatch()
807-
.get(shardId);
808-
assertNodeGatewayStartedShardsHappyCase(nodeGatewayStartedShards);
805+
GatewayStartedShard gatewayStartedShard = response.getNodesMap().get(nodeId).getNodeGatewayStartedShardsBatch().get(shardId);
806+
assertNodeGatewayStartedShardsHappyCase(gatewayStartedShard);
809807
}
810808
}
811809

@@ -825,13 +823,13 @@ public void testShardFetchCorruptedShardsUsingBatchAction() throws Exception {
825823
new TransportNodesListGatewayStartedShardsBatch.Request(getDiscoveryNodes(), shardIdShardAttributesMap)
826824
);
827825
DiscoveryNode[] discoveryNodes = getDiscoveryNodes();
828-
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards = response.getNodesMap()
826+
GatewayStartedShard gatewayStartedShard = response.getNodesMap()
829827
.get(discoveryNodes[0].getId())
830828
.getNodeGatewayStartedShardsBatch()
831829
.get(shardId);
832-
assertNotNull(nodeGatewayStartedShards.storeException());
833-
assertNotNull(nodeGatewayStartedShards.allocationId());
834-
assertTrue(nodeGatewayStartedShards.primary());
830+
assertNotNull(gatewayStartedShard.storeException());
831+
assertNotNull(gatewayStartedShard.allocationId());
832+
assertTrue(gatewayStartedShard.primary());
835833
}
836834

837835
public void testSingleShardStoreFetchUsingBatchAction() throws ExecutionException, InterruptedException {
@@ -959,12 +957,10 @@ private void assertNodeStoreFilesMetadataSuccessCase(
959957
assertNotNull(storeFileMetadata.peerRecoveryRetentionLeases());
960958
}
961959

962-
private void assertNodeGatewayStartedShardsHappyCase(
963-
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards
964-
) {
965-
assertNull(nodeGatewayStartedShards.storeException());
966-
assertNotNull(nodeGatewayStartedShards.allocationId());
967-
assertTrue(nodeGatewayStartedShards.primary());
960+
private void assertNodeGatewayStartedShardsHappyCase(GatewayStartedShard gatewayStartedShard) {
961+
assertNull(gatewayStartedShard.storeException());
962+
assertNotNull(gatewayStartedShard.allocationId());
963+
assertTrue(gatewayStartedShard.primary());
968964
}
969965

970966
private void prepareIndex(String indexName, int numberOfPrimaryShards) {

server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -258,9 +258,9 @@ void finish() {
258258
storeStatuses.add(
259259
new IndicesShardStoresResponse.StoreStatus(
260260
response.getNode(),
261-
response.allocationId(),
261+
response.getGatewayShardStarted().allocationId(),
262262
allocationStatus,
263-
response.storeException()
263+
response.getGatewayShardStarted().storeException()
264264
)
265265
);
266266
}
@@ -308,7 +308,8 @@ private IndicesShardStoresResponse.StoreStatus.AllocationStatus getAllocationSta
308308
* A shard exists/existed in a node only if shard state file exists in the node
309309
*/
310310
private boolean shardExistsInNode(final NodeGatewayStartedShards response) {
311-
return response.storeException() != null || response.allocationId() != null;
311+
return response.getGatewayShardStarted().storeException() != null
312+
|| response.getGatewayShardStarted().allocationId() != null;
312313
}
313314

314315
@Override

server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java

+40-32
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.opensearch.cluster.routing.allocation.decider.Decision.Type;
5151
import org.opensearch.env.ShardLockObtainFailedException;
5252
import org.opensearch.gateway.AsyncShardFetch.FetchResult;
53+
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShard;
5354
import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards;
5455

5556
import java.util.ArrayList;
@@ -125,27 +126,37 @@ public AllocateUnassignedDecision makeAllocationDecision(
125126
return decision;
126127
}
127128
final FetchResult<NodeGatewayStartedShards> shardState = fetchData(unassignedShard, allocation);
128-
List<NodeGatewayStartedShards> nodeShardStates = adaptToNodeStartedShardList(shardState);
129+
List<NodeGatewayStartedShard> nodeShardStates = adaptToNodeStartedShardList(shardState);
129130
return getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger);
130131
}
131132

132133
/**
133-
* Transforms {@link FetchResult} of {@link NodeGatewayStartedShards} to {@link List} of {@link NodeGatewayStartedShards}
134+
* Transforms {@link FetchResult} of {@link NodeGatewayStartedShards} to {@link List} of {@link NodeGatewayStartedShard}
134135
* Returns null if {@link FetchResult} does not have any data.
135136
*/
136-
private static List<NodeGatewayStartedShards> adaptToNodeStartedShardList(FetchResult<NodeGatewayStartedShards> shardsState) {
137+
private static List<NodeGatewayStartedShard> adaptToNodeStartedShardList(FetchResult<NodeGatewayStartedShards> shardsState) {
137138
if (!shardsState.hasData()) {
138139
return null;
139140
}
140-
List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
141-
shardsState.getData().forEach((node, nodeGatewayStartedShard) -> { nodeShardStates.add(nodeGatewayStartedShard); });
141+
List<NodeGatewayStartedShard> nodeShardStates = new ArrayList<>();
142+
shardsState.getData().forEach((node, nodeGatewayStartedShard) -> {
143+
nodeShardStates.add(
144+
new NodeGatewayStartedShard(
145+
nodeGatewayStartedShard.getGatewayShardStarted().allocationId(),
146+
nodeGatewayStartedShard.getGatewayShardStarted().primary(),
147+
nodeGatewayStartedShard.getGatewayShardStarted().replicationCheckpoint(),
148+
nodeGatewayStartedShard.getGatewayShardStarted().storeException(),
149+
node
150+
)
151+
);
152+
});
142153
return nodeShardStates;
143154
}
144155

145156
protected AllocateUnassignedDecision getAllocationDecision(
146157
ShardRouting unassignedShard,
147158
RoutingAllocation allocation,
148-
List<NodeGatewayStartedShards> shardState,
159+
List<NodeGatewayStartedShard> shardState,
149160
Logger logger
150161
) {
151162
final boolean explain = allocation.debugDecision();
@@ -236,7 +247,7 @@ protected AllocateUnassignedDecision getAllocationDecision(
236247
nodesToAllocate = buildNodesToAllocate(allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, true);
237248
if (nodesToAllocate.yesNodeShards.isEmpty() == false) {
238249
final DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0);
239-
final NodeGatewayStartedShards nodeShardState = decidedNode.nodeShardState;
250+
final NodeGatewayStartedShard nodeShardState = decidedNode.nodeShardState;
240251
logger.debug(
241252
"[{}][{}]: allocating [{}] to [{}] on forced primary allocation",
242253
unassignedShard.index(),
@@ -296,11 +307,11 @@ protected AllocateUnassignedDecision getAllocationDecision(
296307
*/
297308
private static List<NodeAllocationResult> buildNodeDecisions(
298309
NodesToAllocate nodesToAllocate,
299-
List<NodeGatewayStartedShards> fetchedShardData,
310+
List<NodeGatewayStartedShard> fetchedShardData,
300311
Set<String> inSyncAllocationIds
301312
) {
302313
List<NodeAllocationResult> nodeResults = new ArrayList<>();
303-
Collection<NodeGatewayStartedShards> ineligibleShards = new ArrayList<>();
314+
Collection<NodeGatewayStartedShard> ineligibleShards = new ArrayList<>();
304315
if (nodesToAllocate != null) {
305316
final Set<DiscoveryNode> discoNodes = new HashSet<>();
306317
nodeResults.addAll(
@@ -334,21 +345,21 @@ private static List<NodeAllocationResult> buildNodeDecisions(
334345
return nodeResults;
335346
}
336347

337-
private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardState, Set<String> inSyncAllocationIds) {
348+
private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShard nodeShardState, Set<String> inSyncAllocationIds) {
338349
final Exception storeErr = nodeShardState.storeException();
339350
final boolean inSync = nodeShardState.allocationId() != null && inSyncAllocationIds.contains(nodeShardState.allocationId());
340351
return new ShardStoreInfo(nodeShardState.allocationId(), inSync, storeErr);
341352
}
342353

343-
private static final Comparator<NodeGatewayStartedShards> NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing(
344-
(NodeGatewayStartedShards state) -> state.storeException() == null
354+
private static final Comparator<NodeGatewayStartedShard> NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing(
355+
(NodeGatewayStartedShard state) -> state.storeException() == null
345356
).reversed();
346-
private static final Comparator<NodeGatewayStartedShards> PRIMARY_FIRST_COMPARATOR = Comparator.comparing(
347-
NodeGatewayStartedShards::primary
357+
private static final Comparator<NodeGatewayStartedShard> PRIMARY_FIRST_COMPARATOR = Comparator.comparing(
358+
NodeGatewayStartedShard::primary
348359
).reversed();
349360

350-
private static final Comparator<NodeGatewayStartedShards> HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing(
351-
NodeGatewayStartedShards::replicationCheckpoint,
361+
private static final Comparator<NodeGatewayStartedShard> HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing(
362+
NodeGatewayStartedShard::replicationCheckpoint,
352363
Comparator.nullsLast(Comparator.naturalOrder())
353364
);
354365

@@ -362,12 +373,12 @@ protected static NodeShardsResult buildNodeShardsResult(
362373
boolean matchAnyShard,
363374
Set<String> ignoreNodes,
364375
Set<String> inSyncAllocationIds,
365-
List<NodeGatewayStartedShards> shardState,
376+
List<NodeGatewayStartedShard> shardState,
366377
Logger logger
367378
) {
368-
List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
379+
List<NodeGatewayStartedShard> nodeShardStates = new ArrayList<>();
369380
int numberOfAllocationsFound = 0;
370-
for (NodeGatewayStartedShards nodeShardState : shardState) {
381+
for (NodeGatewayStartedShard nodeShardState : shardState) {
371382
DiscoveryNode node = nodeShardState.getNode();
372383
String allocationId = nodeShardState.allocationId();
373384

@@ -432,21 +443,18 @@ protected static NodeShardsResult buildNodeShardsResult(
432443
return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound);
433444
}
434445

435-
private static Comparator<NodeGatewayStartedShards> createActiveShardComparator(
436-
boolean matchAnyShard,
437-
Set<String> inSyncAllocationIds
438-
) {
446+
private static Comparator<NodeGatewayStartedShard> createActiveShardComparator(boolean matchAnyShard, Set<String> inSyncAllocationIds) {
439447
/**
440448
* Orders the active shards copies based on below comparators
441449
* 1. No store exception i.e. shard copy is readable
442450
* 2. Prefer previous primary shard
443451
* 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices.
444452
*/
445-
final Comparator<NodeGatewayStartedShards> comparator; // allocation preference
453+
final Comparator<NodeGatewayStartedShard> comparator; // allocation preference
446454
if (matchAnyShard) {
447455
// prefer shards with matching allocation ids
448-
Comparator<NodeGatewayStartedShards> matchingAllocationsFirst = Comparator.comparing(
449-
(NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId())
456+
Comparator<NodeGatewayStartedShard> matchingAllocationsFirst = Comparator.comparing(
457+
(NodeGatewayStartedShard state) -> inSyncAllocationIds.contains(state.allocationId())
450458
).reversed();
451459
comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR)
452460
.thenComparing(PRIMARY_FIRST_COMPARATOR)
@@ -464,14 +472,14 @@ private static Comparator<NodeGatewayStartedShards> createActiveShardComparator(
464472
*/
465473
private static NodesToAllocate buildNodesToAllocate(
466474
RoutingAllocation allocation,
467-
List<NodeGatewayStartedShards> nodeShardStates,
475+
List<NodeGatewayStartedShard> nodeShardStates,
468476
ShardRouting shardRouting,
469477
boolean forceAllocate
470478
) {
471479
List<DecidedNode> yesNodeShards = new ArrayList<>();
472480
List<DecidedNode> throttledNodeShards = new ArrayList<>();
473481
List<DecidedNode> noNodeShards = new ArrayList<>();
474-
for (NodeGatewayStartedShards nodeShardState : nodeShardStates) {
482+
for (NodeGatewayStartedShard nodeShardState : nodeShardStates) {
475483
RoutingNode node = allocation.routingNodes().node(nodeShardState.getNode().getId());
476484
if (node == null) {
477485
continue;
@@ -502,10 +510,10 @@ private static NodesToAllocate buildNodesToAllocate(
502510
* This class encapsulates the result of a call to {@link #buildNodeShardsResult}
503511
*/
504512
static class NodeShardsResult {
505-
final List<NodeGatewayStartedShards> orderedAllocationCandidates;
513+
final List<NodeGatewayStartedShard> orderedAllocationCandidates;
506514
final int allocationsFound;
507515

508-
NodeShardsResult(List<NodeGatewayStartedShards> orderedAllocationCandidates, int allocationsFound) {
516+
NodeShardsResult(List<NodeGatewayStartedShard> orderedAllocationCandidates, int allocationsFound) {
509517
this.orderedAllocationCandidates = orderedAllocationCandidates;
510518
this.allocationsFound = allocationsFound;
511519
}
@@ -531,10 +539,10 @@ protected static class NodesToAllocate {
531539
* by the allocator for allocating to the node that holds the shard copy.
532540
*/
533541
private static class DecidedNode {
534-
final NodeGatewayStartedShards nodeShardState;
542+
final NodeGatewayStartedShard nodeShardState;
535543
final Decision decision;
536544

537-
private DecidedNode(NodeGatewayStartedShards nodeShardState, Decision decision) {
545+
private DecidedNode(NodeGatewayStartedShard nodeShardState, Decision decision) {
538546
this.nodeShardState = nodeShardState;
539547
this.decision = decision;
540548
}

0 commit comments

Comments
 (0)