Skip to content

Commit baa07b7

Browse files
author
Swetha Guptha
committed
Use set of shard routing for shard in unassigned shard batch check.
Signed-off-by: Swetha Guptha <gupthasg@amazon.com>
1 parent 605543b commit baa07b7

File tree

3 files changed

+49
-6
lines changed

3 files changed

+49
-6
lines changed

server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import java.util.ArrayList;
2424
import java.util.Collections;
2525
import java.util.HashMap;
26+
import java.util.HashSet;
2627
import java.util.List;
2728
import java.util.Map;
29+
import java.util.Set;
2830

2931
/**
3032
* PrimaryShardBatchAllocator is similar to {@link org.opensearch.gateway.PrimaryShardAllocator} only difference is
@@ -82,6 +84,7 @@ public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassigned
8284
* @param allocation the allocation state container object
8385
*/
8486
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
87+
logger.trace("Starting shard allocation execution for unassigned primary shards: {}", shardRoutings.size());
8588
HashMap<ShardId, AllocateUnassignedDecision> ineligibleShardAllocationDecisions = new HashMap<>();
8689
List<ShardRouting> eligibleShards = new ArrayList<>();
8790
List<ShardRouting> inEligibleShards = new ArrayList<>();
@@ -99,13 +102,13 @@ public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAll
99102
// only fetch data for eligible shards
100103
final FetchResult<NodeGatewayStartedShardsBatch> shardsState = fetchData(eligibleShards, inEligibleShards, allocation);
101104

105+
Set<ShardRouting> batchShardRoutingSet = new HashSet<>(shardRoutings);
102106
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
103107
while (iterator.hasNext()) {
104108
ShardRouting unassignedShard = iterator.next();
105109
AllocateUnassignedDecision allocationDecision;
106110

107-
if (shardRoutings.contains(unassignedShard)) {
108-
assert unassignedShard.primary();
111+
if (unassignedShard.primary() && batchShardRoutingSet.contains(unassignedShard)) {
109112
if (ineligibleShardAllocationDecisions.containsKey(unassignedShard.shardId())) {
110113
allocationDecision = ineligibleShardAllocationDecisions.get(unassignedShard.shardId());
111114
} else {
@@ -115,6 +118,7 @@ public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAll
115118
executeDecision(unassignedShard, allocationDecision, allocation, iterator);
116119
}
117120
}
121+
logger.trace("Finished shard allocation execution for unassigned primary shards: {}", shardRoutings.size());
118122
}
119123

120124
/**

server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@
2828
import java.util.ArrayList;
2929
import java.util.Collections;
3030
import java.util.HashMap;
31+
import java.util.HashSet;
3132
import java.util.List;
3233
import java.util.Map;
34+
import java.util.Set;
3335
import java.util.function.Supplier;
34-
import java.util.stream.Collectors;
3536

3637
/**
3738
* Allocates replica shards in a batch mode
@@ -117,6 +118,7 @@ public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassigned
117118
* @param allocation the allocation state container object
118119
*/
119120
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
121+
logger.trace("Starting shard allocation execution for unassigned replica shards: {}", shardRoutings.size());
120122
List<ShardRouting> eligibleShards = new ArrayList<>();
121123
List<ShardRouting> ineligibleShards = new ArrayList<>();
122124
Map<ShardRouting, AllocateUnassignedDecision> ineligibleShardAllocationDecisions = new HashMap<>();
@@ -135,7 +137,11 @@ public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAll
135137
// only fetch data for eligible shards
136138
final FetchResult<NodeStoreFilesMetadataBatch> shardsState = fetchData(eligibleShards, ineligibleShards, allocation);
137139

138-
List<ShardId> shardIdsFromBatch = shardRoutings.stream().map(shardRouting -> shardRouting.shardId()).collect(Collectors.toList());
140+
Set<ShardId> shardIdsFromBatch = new HashSet<>();
141+
for (ShardRouting shardRouting : shardRoutings) {
142+
ShardId shardId = shardRouting.shardId();
143+
shardIdsFromBatch.add(shardId);
144+
}
139145
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
140146
while (iterator.hasNext()) {
141147
ShardRouting unassignedShard = iterator.next();
@@ -159,6 +165,7 @@ public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAll
159165
executeDecision(unassignedShard, allocateUnassignedDecision, allocation, iterator);
160166
}
161167
}
168+
logger.trace("Finished shard allocation execution for unassigned replica shards: {}", shardRoutings.size());
162169
}
163170

164171
private AllocateUnassignedDecision getUnassignedShardAllocationDecision(

server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java

+34-2
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,10 @@ private void allocateAllUnassignedBatch(final RoutingAllocation allocation) {
8585
final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
8686
List<ShardRouting> shardsToBatch = new ArrayList<>();
8787
while (iterator.hasNext()) {
88-
shardsToBatch.add(iterator.next());
88+
ShardRouting unassignedShardRouting = iterator.next();
89+
if (unassignedShardRouting.primary()) {
90+
shardsToBatch.add(unassignedShardRouting);
91+
}
8992
}
9093
batchAllocator.allocateUnassignedBatch(shardsToBatch, allocation);
9194
}
@@ -180,6 +183,35 @@ public void testInitializePrimaryShards() {
180183
assertEquals(2, routingAllocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()));
181184
}
182185

186+
public void testInitializeOnlyPrimaryUnassignedShardsIgnoreReplicaShards() {
187+
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
188+
AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder().build(), clusterSettings, random());
189+
setUpShards(1);
190+
final RoutingAllocation routingAllocation = routingAllocationWithOnePrimary(allocationDeciders, CLUSTER_RECOVERED, "allocId-0");
191+
192+
for (ShardId shardId : shardsInBatch) {
193+
batchAllocator.addShardData(
194+
node1,
195+
"allocId-0",
196+
shardId,
197+
true,
198+
new ReplicationCheckpoint(shardId, 20, 101, 1, Codec.getDefault().getName()),
199+
null
200+
);
201+
}
202+
203+
allocateAllUnassignedBatch(routingAllocation);
204+
205+
List<ShardRouting> initializingShards = routingAllocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING);
206+
assertEquals(1, initializingShards.size());
207+
assertTrue(shardsInBatch.contains(initializingShards.get(0).shardId()));
208+
assertTrue(initializingShards.get(0).primary());
209+
assertEquals(1, routingAllocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()));
210+
List<ShardRouting> unassignedShards = routingAllocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED);
211+
assertEquals(1, unassignedShards.size());
212+
assertTrue(!unassignedShards.get(0).primary());
213+
}
214+
183215
public void testAllocateUnassignedBatchThrottlingAllocationDeciderIsHonoured() {
184216
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
185217
AllocationDeciders allocationDeciders = randomAllocationDeciders(
@@ -258,7 +290,7 @@ private RoutingAllocation routingAllocationWithOnePrimary(
258290
.routingTable(routingTableBuilder.build())
259291
.nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3))
260292
.build();
261-
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, null, null, System.nanoTime());
293+
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, ClusterInfo.EMPTY, null, System.nanoTime());
262294
}
263295

264296
private RoutingAllocation routingAllocationWithMultiplePrimaries(

0 commit comments

Comments
 (0)