Skip to content

Commit e2ca2f1

Browse files
Add PrimaryShardBatchAllocator to take allocation decisions for a batch of shards (opensearch-project#8916) (opensearch-project#12813) (opensearch-project#12823)
* Add PrimaryShardBatchAllocator to take allocation decisions for a batch of shards (cherry picked from commit a499d1e) (cherry picked from commit b2d22d4) Signed-off-by: Aman Khare <amkhar@amazon.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 5b71400 commit e2ca2f1

File tree

10 files changed

+754
-268
lines changed

10 files changed

+754
-268
lines changed

server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java

+16-20
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.opensearch.core.index.Index;
5757
import org.opensearch.core.index.shard.ShardId;
5858
import org.opensearch.env.NodeEnvironment;
59+
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard;
5960
import org.opensearch.index.IndexService;
6061
import org.opensearch.index.IndexSettings;
6162
import org.opensearch.index.MergePolicyProvider;
@@ -720,11 +721,11 @@ public Settings onNodeStopped(String nodeName) throws Exception {
720721
);
721722

722723
assertThat(response.getNodes(), hasSize(1));
723-
assertThat(response.getNodes().get(0).allocationId(), notNullValue());
724+
assertThat(response.getNodes().get(0).getGatewayShardStarted().allocationId(), notNullValue());
724725
if (corrupt) {
725-
assertThat(response.getNodes().get(0).storeException(), notNullValue());
726+
assertThat(response.getNodes().get(0).getGatewayShardStarted().storeException(), notNullValue());
726727
} else {
727-
assertThat(response.getNodes().get(0).storeException(), nullValue());
728+
assertThat(response.getNodes().get(0).getGatewayShardStarted().storeException(), nullValue());
728729
}
729730

730731
// start another node so cluster consistency checks won't time out due to the lack of state
@@ -764,11 +765,11 @@ public void testSingleShardFetchUsingBatchAction() {
764765
);
765766
final Index index = resolveIndex(indexName);
766767
final ShardId shardId = new ShardId(index, 0);
767-
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards = response.getNodesMap()
768+
GatewayStartedShard gatewayStartedShard = response.getNodesMap()
768769
.get(searchShardsResponse.getNodes()[0].getId())
769770
.getNodeGatewayStartedShardsBatch()
770771
.get(shardId);
771-
assertNodeGatewayStartedShardsHappyCase(nodeGatewayStartedShards);
772+
assertNodeGatewayStartedShardsHappyCase(gatewayStartedShard);
772773
}
773774

774775
public void testShardFetchMultiNodeMultiIndexesUsingBatchAction() {
@@ -792,11 +793,8 @@ public void testShardFetchMultiNodeMultiIndexesUsingBatchAction() {
792793
ShardId shardId = clusterSearchShardsGroup.getShardId();
793794
assertEquals(1, clusterSearchShardsGroup.getShards().length);
794795
String nodeId = clusterSearchShardsGroup.getShards()[0].currentNodeId();
795-
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards = response.getNodesMap()
796-
.get(nodeId)
797-
.getNodeGatewayStartedShardsBatch()
798-
.get(shardId);
799-
assertNodeGatewayStartedShardsHappyCase(nodeGatewayStartedShards);
796+
GatewayStartedShard gatewayStartedShard = response.getNodesMap().get(nodeId).getNodeGatewayStartedShardsBatch().get(shardId);
797+
assertNodeGatewayStartedShardsHappyCase(gatewayStartedShard);
800798
}
801799
}
802800

@@ -816,13 +814,13 @@ public void testShardFetchCorruptedShardsUsingBatchAction() throws Exception {
816814
new TransportNodesListGatewayStartedShardsBatch.Request(getDiscoveryNodes(), shardIdShardAttributesMap)
817815
);
818816
DiscoveryNode[] discoveryNodes = getDiscoveryNodes();
819-
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards = response.getNodesMap()
817+
GatewayStartedShard gatewayStartedShard = response.getNodesMap()
820818
.get(discoveryNodes[0].getId())
821819
.getNodeGatewayStartedShardsBatch()
822820
.get(shardId);
823-
assertNotNull(nodeGatewayStartedShards.storeException());
824-
assertNotNull(nodeGatewayStartedShards.allocationId());
825-
assertTrue(nodeGatewayStartedShards.primary());
821+
assertNotNull(gatewayStartedShard.storeException());
822+
assertNotNull(gatewayStartedShard.allocationId());
823+
assertTrue(gatewayStartedShard.primary());
826824
}
827825

828826
public void testSingleShardStoreFetchUsingBatchAction() throws ExecutionException, InterruptedException {
@@ -950,12 +948,10 @@ private void assertNodeStoreFilesMetadataSuccessCase(
950948
assertNotNull(storeFileMetadata.peerRecoveryRetentionLeases());
951949
}
952950

953-
private void assertNodeGatewayStartedShardsHappyCase(
954-
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards
955-
) {
956-
assertNull(nodeGatewayStartedShards.storeException());
957-
assertNotNull(nodeGatewayStartedShards.allocationId());
958-
assertTrue(nodeGatewayStartedShards.primary());
951+
private void assertNodeGatewayStartedShardsHappyCase(GatewayStartedShard gatewayStartedShard) {
952+
assertNull(gatewayStartedShard.storeException());
953+
assertNotNull(gatewayStartedShard.allocationId());
954+
assertTrue(gatewayStartedShard.primary());
959955
}
960956

961957
private void prepareIndex(String indexName, int numberOfPrimaryShards) {

server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -258,9 +258,9 @@ void finish() {
258258
storeStatuses.add(
259259
new IndicesShardStoresResponse.StoreStatus(
260260
response.getNode(),
261-
response.allocationId(),
261+
response.getGatewayShardStarted().allocationId(),
262262
allocationStatus,
263-
response.storeException()
263+
response.getGatewayShardStarted().storeException()
264264
)
265265
);
266266
}
@@ -308,7 +308,8 @@ private IndicesShardStoresResponse.StoreStatus.AllocationStatus getAllocationSta
308308
* A shard exists/existed in a node only if shard state file exists in the node
309309
*/
310310
private boolean shardExistsInNode(final NodeGatewayStartedShards response) {
311-
return response.storeException() != null || response.allocationId() != null;
311+
return response.getGatewayShardStarted().storeException() != null
312+
|| response.getGatewayShardStarted().allocationId() != null;
312313
}
313314

314315
@Override

server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java

+40-32
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.opensearch.cluster.routing.allocation.decider.Decision.Type;
5151
import org.opensearch.env.ShardLockObtainFailedException;
5252
import org.opensearch.gateway.AsyncShardFetch.FetchResult;
53+
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShard;
5354
import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards;
5455

5556
import java.util.ArrayList;
@@ -125,27 +126,37 @@ public AllocateUnassignedDecision makeAllocationDecision(
125126
return decision;
126127
}
127128
final FetchResult<NodeGatewayStartedShards> shardState = fetchData(unassignedShard, allocation);
128-
List<NodeGatewayStartedShards> nodeShardStates = adaptToNodeStartedShardList(shardState);
129+
List<NodeGatewayStartedShard> nodeShardStates = adaptToNodeStartedShardList(shardState);
129130
return getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger);
130131
}
131132

132133
/**
133-
* Transforms {@link FetchResult} of {@link NodeGatewayStartedShards} to {@link List} of {@link NodeGatewayStartedShards}
134+
* Transforms {@link FetchResult} of {@link NodeGatewayStartedShards} to {@link List} of {@link NodeGatewayStartedShard}
134135
* Returns null if {@link FetchResult} does not have any data.
135136
*/
136-
private static List<NodeGatewayStartedShards> adaptToNodeStartedShardList(FetchResult<NodeGatewayStartedShards> shardsState) {
137+
private static List<NodeGatewayStartedShard> adaptToNodeStartedShardList(FetchResult<NodeGatewayStartedShards> shardsState) {
137138
if (!shardsState.hasData()) {
138139
return null;
139140
}
140-
List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
141-
shardsState.getData().forEach((node, nodeGatewayStartedShard) -> { nodeShardStates.add(nodeGatewayStartedShard); });
141+
List<NodeGatewayStartedShard> nodeShardStates = new ArrayList<>();
142+
shardsState.getData().forEach((node, nodeGatewayStartedShard) -> {
143+
nodeShardStates.add(
144+
new NodeGatewayStartedShard(
145+
nodeGatewayStartedShard.getGatewayShardStarted().allocationId(),
146+
nodeGatewayStartedShard.getGatewayShardStarted().primary(),
147+
nodeGatewayStartedShard.getGatewayShardStarted().replicationCheckpoint(),
148+
nodeGatewayStartedShard.getGatewayShardStarted().storeException(),
149+
node
150+
)
151+
);
152+
});
142153
return nodeShardStates;
143154
}
144155

145156
protected AllocateUnassignedDecision getAllocationDecision(
146157
ShardRouting unassignedShard,
147158
RoutingAllocation allocation,
148-
List<NodeGatewayStartedShards> shardState,
159+
List<NodeGatewayStartedShard> shardState,
149160
Logger logger
150161
) {
151162
final boolean explain = allocation.debugDecision();
@@ -236,7 +247,7 @@ protected AllocateUnassignedDecision getAllocationDecision(
236247
nodesToAllocate = buildNodesToAllocate(allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, true);
237248
if (nodesToAllocate.yesNodeShards.isEmpty() == false) {
238249
final DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0);
239-
final NodeGatewayStartedShards nodeShardState = decidedNode.nodeShardState;
250+
final NodeGatewayStartedShard nodeShardState = decidedNode.nodeShardState;
240251
logger.debug(
241252
"[{}][{}]: allocating [{}] to [{}] on forced primary allocation",
242253
unassignedShard.index(),
@@ -296,11 +307,11 @@ protected AllocateUnassignedDecision getAllocationDecision(
296307
*/
297308
private static List<NodeAllocationResult> buildNodeDecisions(
298309
NodesToAllocate nodesToAllocate,
299-
List<NodeGatewayStartedShards> fetchedShardData,
310+
List<NodeGatewayStartedShard> fetchedShardData,
300311
Set<String> inSyncAllocationIds
301312
) {
302313
List<NodeAllocationResult> nodeResults = new ArrayList<>();
303-
Collection<NodeGatewayStartedShards> ineligibleShards = new ArrayList<>();
314+
Collection<NodeGatewayStartedShard> ineligibleShards = new ArrayList<>();
304315
if (nodesToAllocate != null) {
305316
final Set<DiscoveryNode> discoNodes = new HashSet<>();
306317
nodeResults.addAll(
@@ -334,21 +345,21 @@ private static List<NodeAllocationResult> buildNodeDecisions(
334345
return nodeResults;
335346
}
336347

337-
private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardState, Set<String> inSyncAllocationIds) {
348+
private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShard nodeShardState, Set<String> inSyncAllocationIds) {
338349
final Exception storeErr = nodeShardState.storeException();
339350
final boolean inSync = nodeShardState.allocationId() != null && inSyncAllocationIds.contains(nodeShardState.allocationId());
340351
return new ShardStoreInfo(nodeShardState.allocationId(), inSync, storeErr);
341352
}
342353

343-
private static final Comparator<NodeGatewayStartedShards> NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing(
344-
(NodeGatewayStartedShards state) -> state.storeException() == null
354+
private static final Comparator<NodeGatewayStartedShard> NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing(
355+
(NodeGatewayStartedShard state) -> state.storeException() == null
345356
).reversed();
346-
private static final Comparator<NodeGatewayStartedShards> PRIMARY_FIRST_COMPARATOR = Comparator.comparing(
347-
NodeGatewayStartedShards::primary
357+
private static final Comparator<NodeGatewayStartedShard> PRIMARY_FIRST_COMPARATOR = Comparator.comparing(
358+
NodeGatewayStartedShard::primary
348359
).reversed();
349360

350-
private static final Comparator<NodeGatewayStartedShards> HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing(
351-
NodeGatewayStartedShards::replicationCheckpoint,
361+
private static final Comparator<NodeGatewayStartedShard> HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing(
362+
NodeGatewayStartedShard::replicationCheckpoint,
352363
Comparator.nullsLast(Comparator.naturalOrder())
353364
);
354365

@@ -362,12 +373,12 @@ protected static NodeShardsResult buildNodeShardsResult(
362373
boolean matchAnyShard,
363374
Set<String> ignoreNodes,
364375
Set<String> inSyncAllocationIds,
365-
List<NodeGatewayStartedShards> shardState,
376+
List<NodeGatewayStartedShard> shardState,
366377
Logger logger
367378
) {
368-
List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
379+
List<NodeGatewayStartedShard> nodeShardStates = new ArrayList<>();
369380
int numberOfAllocationsFound = 0;
370-
for (NodeGatewayStartedShards nodeShardState : shardState) {
381+
for (NodeGatewayStartedShard nodeShardState : shardState) {
371382
DiscoveryNode node = nodeShardState.getNode();
372383
String allocationId = nodeShardState.allocationId();
373384

@@ -432,21 +443,18 @@ protected static NodeShardsResult buildNodeShardsResult(
432443
return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound);
433444
}
434445

435-
private static Comparator<NodeGatewayStartedShards> createActiveShardComparator(
436-
boolean matchAnyShard,
437-
Set<String> inSyncAllocationIds
438-
) {
446+
private static Comparator<NodeGatewayStartedShard> createActiveShardComparator(boolean matchAnyShard, Set<String> inSyncAllocationIds) {
439447
/**
440448
* Orders the active shards copies based on below comparators
441449
* 1. No store exception i.e. shard copy is readable
442450
* 2. Prefer previous primary shard
443451
* 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices.
444452
*/
445-
final Comparator<NodeGatewayStartedShards> comparator; // allocation preference
453+
final Comparator<NodeGatewayStartedShard> comparator; // allocation preference
446454
if (matchAnyShard) {
447455
// prefer shards with matching allocation ids
448-
Comparator<NodeGatewayStartedShards> matchingAllocationsFirst = Comparator.comparing(
449-
(NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId())
456+
Comparator<NodeGatewayStartedShard> matchingAllocationsFirst = Comparator.comparing(
457+
(NodeGatewayStartedShard state) -> inSyncAllocationIds.contains(state.allocationId())
450458
).reversed();
451459
comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR)
452460
.thenComparing(PRIMARY_FIRST_COMPARATOR)
@@ -464,14 +472,14 @@ private static Comparator<NodeGatewayStartedShards> createActiveShardComparator(
464472
*/
465473
private static NodesToAllocate buildNodesToAllocate(
466474
RoutingAllocation allocation,
467-
List<NodeGatewayStartedShards> nodeShardStates,
475+
List<NodeGatewayStartedShard> nodeShardStates,
468476
ShardRouting shardRouting,
469477
boolean forceAllocate
470478
) {
471479
List<DecidedNode> yesNodeShards = new ArrayList<>();
472480
List<DecidedNode> throttledNodeShards = new ArrayList<>();
473481
List<DecidedNode> noNodeShards = new ArrayList<>();
474-
for (NodeGatewayStartedShards nodeShardState : nodeShardStates) {
482+
for (NodeGatewayStartedShard nodeShardState : nodeShardStates) {
475483
RoutingNode node = allocation.routingNodes().node(nodeShardState.getNode().getId());
476484
if (node == null) {
477485
continue;
@@ -502,10 +510,10 @@ private static NodesToAllocate buildNodesToAllocate(
502510
* This class encapsulates the result of a call to {@link #buildNodeShardsResult}
503511
*/
504512
static class NodeShardsResult {
505-
final List<NodeGatewayStartedShards> orderedAllocationCandidates;
513+
final List<NodeGatewayStartedShard> orderedAllocationCandidates;
506514
final int allocationsFound;
507515

508-
NodeShardsResult(List<NodeGatewayStartedShards> orderedAllocationCandidates, int allocationsFound) {
516+
NodeShardsResult(List<NodeGatewayStartedShard> orderedAllocationCandidates, int allocationsFound) {
509517
this.orderedAllocationCandidates = orderedAllocationCandidates;
510518
this.allocationsFound = allocationsFound;
511519
}
@@ -531,10 +539,10 @@ protected static class NodesToAllocate {
531539
* by the allocator for allocating to the node that holds the shard copy.
532540
*/
533541
private static class DecidedNode {
534-
final NodeGatewayStartedShards nodeShardState;
542+
final NodeGatewayStartedShard nodeShardState;
535543
final Decision decision;
536544

537-
private DecidedNode(NodeGatewayStartedShards nodeShardState, Decision decision) {
545+
private DecidedNode(NodeGatewayStartedShard nodeShardState, Decision decision) {
538546
this.nodeShardState = nodeShardState;
539547
this.decision = decision;
540548
}

0 commit comments

Comments
 (0)