Skip to content

Commit c1d1aa9

Browse files
author
Swetha Guptha
committed
Use set of shard routing for shard in unassigned shard batch check.
Signed-off-by: Swetha Guptha <gupthasg@amazon.com>
1 parent 9729a92 commit c1d1aa9

File tree

2 files changed

+11
-2
lines changed

2 files changed

+11
-2
lines changed

server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import java.util.ArrayList;
2424
import java.util.Collections;
2525
import java.util.HashMap;
26+
import java.util.HashSet;
2627
import java.util.List;
2728
import java.util.Map;
29+
import java.util.Set;
2830

2931
/**
3032
* PrimaryShardBatchAllocator is similar to {@link org.opensearch.gateway.PrimaryShardAllocator} only difference is
@@ -82,6 +84,7 @@ public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassigned
8284
* @param allocation the allocation state container object
8385
*/
8486
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
87+
logger.trace("Starting shard allocation execution for unassigned primary shards: {}", shardRoutings.size());
8588
HashMap<ShardId, AllocateUnassignedDecision> ineligibleShardAllocationDecisions = new HashMap<>();
8689
List<ShardRouting> eligibleShards = new ArrayList<>();
8790
List<ShardRouting> inEligibleShards = new ArrayList<>();
@@ -99,12 +102,13 @@ public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAll
99102
// only fetch data for eligible shards
100103
final FetchResult<NodeGatewayStartedShardsBatch> shardsState = fetchData(eligibleShards, inEligibleShards, allocation);
101104

105+
Set<ShardRouting> batchShardRoutingSet = new HashSet<>(shardRoutings);
102106
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
103107
while (iterator.hasNext()) {
104108
ShardRouting unassignedShard = iterator.next();
105109
AllocateUnassignedDecision allocationDecision;
106110

107-
if (shardRoutings.contains(unassignedShard)) {
111+
if (unassignedShard.primary() && batchShardRoutingSet.contains(unassignedShard)) {
108112
assert unassignedShard.primary();
109113
if (ineligibleShardAllocationDecisions.containsKey(unassignedShard.shardId())) {
110114
allocationDecision = ineligibleShardAllocationDecisions.get(unassignedShard.shardId());
@@ -115,6 +119,7 @@ public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAll
115119
executeDecision(unassignedShard, allocationDecision, allocation, iterator);
116120
}
117121
}
122+
logger.trace("Finished shard allocation execution for unassigned primary shards: {}", shardRoutings.size());
118123
}
119124

120125
/**

server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@
2828
import java.util.ArrayList;
2929
import java.util.Collections;
3030
import java.util.HashMap;
31+
import java.util.HashSet;
3132
import java.util.List;
3233
import java.util.Map;
34+
import java.util.Set;
3335
import java.util.function.Supplier;
3436
import java.util.stream.Collectors;
3537

@@ -117,6 +119,7 @@ public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassigned
117119
* @param allocation the allocation state container object
118120
*/
119121
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
122+
logger.trace("Starting shard allocation execution for unassigned replica shards: {}", shardRoutings.size());
120123
List<ShardRouting> eligibleShards = new ArrayList<>();
121124
List<ShardRouting> ineligibleShards = new ArrayList<>();
122125
Map<ShardRouting, AllocateUnassignedDecision> ineligibleShardAllocationDecisions = new HashMap<>();
@@ -135,7 +138,7 @@ public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAll
135138
// only fetch data for eligible shards
136139
final FetchResult<NodeStoreFilesMetadataBatch> shardsState = fetchData(eligibleShards, ineligibleShards, allocation);
137140

138-
List<ShardId> shardIdsFromBatch = shardRoutings.stream().map(shardRouting -> shardRouting.shardId()).collect(Collectors.toList());
141+
Set<ShardId> shardIdsFromBatch = shardRoutings.stream().map(ShardRouting::shardId).collect(Collectors.toCollection(HashSet::new));
139142
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
140143
while (iterator.hasNext()) {
141144
ShardRouting unassignedShard = iterator.next();
@@ -159,6 +162,7 @@ public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAll
159162
executeDecision(unassignedShard, allocateUnassignedDecision, allocation, iterator);
160163
}
161164
}
165+
logger.trace("Finished shard allocation execution for unassigned replica shards: {}", shardRoutings.size());
162166
}
163167

164168
private AllocateUnassignedDecision getUnassignedShardAllocationDecision(

0 commit comments

Comments
 (0)