Skip to content

Commit 5b71400

Browse files
opensearch-trigger-bot[bot]github-actions[bot]Aman Khare
authored
BaseGatewayShardAllocator changes for Assigning the batch of shards (opensearch-project#8776) (opensearch-project#12773) (opensearch-project#12810)
* BaseGatewayShardAllocator changes for Assigning the batch of shards (cherry picked from commit ef50fb4) (cherry picked from commit 41d11e1) Signed-off-by: Gaurav Chandani <chngau@amazon.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Aman Khare <amkhar@amazon.com>
1 parent e19cf64 commit 5b71400

File tree

1 file changed

+63
-5
lines changed

1 file changed

+63
-5
lines changed

server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java

+63-5
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.logging.log4j.Logger;
3737
import org.opensearch.cluster.routing.RecoverySource;
3838
import org.opensearch.cluster.routing.RoutingNode;
39+
import org.opensearch.cluster.routing.RoutingNodes;
3940
import org.opensearch.cluster.routing.ShardRouting;
4041
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
4142
import org.opensearch.cluster.routing.allocation.AllocationDecision;
@@ -45,7 +46,9 @@
4546
import org.opensearch.cluster.routing.allocation.decider.Decision;
4647

4748
import java.util.ArrayList;
49+
import java.util.HashMap;
4850
import java.util.List;
51+
import java.util.stream.Collectors;
4952

5053
/**
5154
* An abstract class that implements basic functionality for allocating
@@ -64,8 +67,9 @@ public abstract class BaseGatewayShardAllocator {
6467
* Allocate an unassigned shard to nodes (if any) where valid copies of the shard already exist.
6568
* It is up to the individual implementations of {@link #makeAllocationDecision(ShardRouting, RoutingAllocation, Logger)}
6669
* to make decisions on assigning shards to nodes.
67-
* @param shardRouting the shard to allocate
68-
* @param allocation the allocation state container object
70+
*
71+
* @param shardRouting the shard to allocate
72+
* @param allocation the allocation state container object
6973
* @param unassignedAllocationHandler handles the allocation of the current shard
7074
*/
7175
public void allocateUnassigned(
@@ -74,7 +78,46 @@ public void allocateUnassigned(
7478
ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler
7579
) {
7680
final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger);
81+
executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler);
82+
}
83+
84+
/**
85+
* Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists
86+
* @param shardRoutings the shards to allocate
87+
* @param allocation the allocation state container object
88+
*/
89+
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
90+
// make Allocation Decisions for all shards
91+
HashMap<ShardRouting, AllocateUnassignedDecision> decisionMap = makeAllocationDecision(shardRoutings, allocation, logger);
92+
assert shardRoutings.size() == decisionMap.size() : "make allocation decision didn't return allocation decision for "
93+
+ "some shards";
94+
// get all unassigned shards iterator
95+
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
96+
97+
while (iterator.hasNext()) {
98+
ShardRouting shard = iterator.next();
99+
try {
100+
if (decisionMap.isEmpty() == false) {
101+
if (decisionMap.containsKey(shard)) {
102+
executeDecision(shard, decisionMap.remove(shard), allocation, iterator);
103+
}
104+
} else {
105+
// no need to keep iterating the unassigned shards, if we don't have anything in decision map
106+
break;
107+
}
108+
} catch (Exception e) {
109+
logger.error("Failed to execute decision for shard {} while initializing {}", shard, e);
110+
throw e;
111+
}
112+
}
113+
}
77114

115+
private void executeDecision(
116+
ShardRouting shardRouting,
117+
AllocateUnassignedDecision allocateUnassignedDecision,
118+
RoutingAllocation allocation,
119+
ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler
120+
) {
78121
if (allocateUnassignedDecision.isDecisionTaken() == false) {
79122
// no decision was taken by this allocator
80123
return;
@@ -109,9 +152,9 @@ protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation
109152
* {@link #allocateUnassigned(ShardRouting, RoutingAllocation, ExistingShardsAllocator.UnassignedAllocationHandler)} to make decisions
110153
* about whether or not the shard can be allocated by this allocator and if so, to which node it will be allocated.
111154
*
112-
* @param unassignedShard the unassigned shard to allocate
113-
* @param allocation the current routing state
114-
* @param logger the logger
155+
* @param unassignedShard the unassigned shard to allocate
156+
* @param allocation the current routing state
157+
* @param logger the logger
115158
* @return an {@link AllocateUnassignedDecision} with the final decision of whether to allocate and details of the decision
116159
*/
117160
public abstract AllocateUnassignedDecision makeAllocationDecision(
@@ -120,6 +163,21 @@ public abstract AllocateUnassignedDecision makeAllocationDecision(
120163
Logger logger
121164
);
122165

166+
public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
167+
List<ShardRouting> unassignedShardBatch,
168+
RoutingAllocation allocation,
169+
Logger logger
170+
) {
171+
172+
return (HashMap<ShardRouting, AllocateUnassignedDecision>) unassignedShardBatch.stream()
173+
.collect(
174+
Collectors.toMap(
175+
unassignedShard -> unassignedShard,
176+
unassignedShard -> makeAllocationDecision(unassignedShard, allocation, logger)
177+
)
178+
);
179+
}
180+
123181
/**
124182
* Builds decisions for all nodes in the cluster, so that the explain API can provide information on
125183
* allocation decisions for each node, while still waiting to allocate the shard (e.g. due to fetching shard data).

0 commit comments

Comments
 (0)