Skip to content

Commit 18c5bb6

Browse files
authored
Fix unassigned batch allocation (opensearch-project#13748) (opensearch-project#13748)
Signed-off-by: Swetha Guptha <gupthasg@amazon.com>
1 parent afeddc2 commit 18c5bb6

File tree

6 files changed

+505
-181
lines changed

6 files changed

+505
-181
lines changed

server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java

+98
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import org.apache.lucene.index.CorruptIndexException;
3636
import org.opensearch.Version;
37+
import org.opensearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse;
3738
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction;
3839
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
3940
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
@@ -55,7 +56,9 @@
5556
import org.opensearch.cluster.metadata.IndexMetadata;
5657
import org.opensearch.cluster.node.DiscoveryNode;
5758
import org.opensearch.cluster.routing.ShardRouting;
59+
import org.opensearch.cluster.routing.ShardRoutingState;
5860
import org.opensearch.cluster.routing.UnassignedInfo;
61+
import org.opensearch.cluster.routing.allocation.AllocationDecision;
5962
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
6063
import org.opensearch.cluster.service.ClusterService;
6164
import org.opensearch.common.settings.Settings;
@@ -98,15 +101,18 @@
98101
import java.util.Map;
99102
import java.util.Set;
100103
import java.util.concurrent.ExecutionException;
104+
import java.util.stream.Collectors;
101105
import java.util.stream.IntStream;
102106

103107
import static java.util.Collections.emptyMap;
104108
import static java.util.Collections.emptySet;
105109
import static org.opensearch.cluster.coordination.ClusterBootstrapService.INITIAL_CLUSTER_MANAGER_NODES_SETTING;
106110
import static org.opensearch.cluster.health.ClusterHealthStatus.GREEN;
107111
import static org.opensearch.cluster.health.ClusterHealthStatus.RED;
112+
import static org.opensearch.cluster.health.ClusterHealthStatus.YELLOW;
108113
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
109114
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
115+
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
110116
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
111117
import static org.opensearch.gateway.GatewayRecoveryTestUtils.corruptShard;
112118
import static org.opensearch.gateway.GatewayRecoveryTestUtils.getDiscoveryNodes;
@@ -753,6 +759,7 @@ public void testMessyElectionsStillMakeClusterGoGreen() throws Exception {
753759
Settings.builder()
754760
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
755761
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
762+
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
756763
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
757764
.build()
758765
);
@@ -843,6 +850,87 @@ public void testBatchModeDisabled() throws Exception {
843850
ensureGreen("test");
844851
}
845852

853+
public void testMultipleReplicaShardAssignmentWithDelayedAllocationAndDifferentNodeStartTimeInBatchMode() throws Exception {
854+
internalCluster().startClusterManagerOnlyNodes(
855+
1,
856+
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build()
857+
);
858+
internalCluster().startDataOnlyNodes(6);
859+
createIndex(
860+
"test",
861+
Settings.builder()
862+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
863+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 3)
864+
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
865+
.build()
866+
);
867+
ensureGreen("test");
868+
869+
List<String> nodesWithReplicaShards = findNodesWithShard(false);
870+
Settings replicaNode0DataPathSettings = internalCluster().dataPathSettings(nodesWithReplicaShards.get(0));
871+
Settings replicaNode1DataPathSettings = internalCluster().dataPathSettings(nodesWithReplicaShards.get(1));
872+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodesWithReplicaShards.get(0)));
873+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodesWithReplicaShards.get(1)));
874+
875+
ensureStableCluster(5);
876+
877+
logger.info("--> explicitly triggering reroute");
878+
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
879+
assertTrue(clusterRerouteResponse.isAcknowledged());
880+
881+
ClusterHealthResponse health = client().admin().cluster().health(Requests.clusterHealthRequest().timeout("5m")).actionGet();
882+
assertFalse(health.isTimedOut());
883+
assertEquals(YELLOW, health.getStatus());
884+
assertEquals(2, health.getUnassignedShards());
885+
// shard should be unassigned because of Allocation_Delayed
886+
ClusterAllocationExplainResponse allocationExplainResponse = client().admin()
887+
.cluster()
888+
.prepareAllocationExplain()
889+
.setIndex("test")
890+
.setShard(0)
891+
.setPrimary(false)
892+
.get();
893+
assertEquals(
894+
AllocationDecision.ALLOCATION_DELAYED,
895+
allocationExplainResponse.getExplanation().getShardAllocationDecision().getAllocateDecision().getAllocationDecision()
896+
);
897+
898+
logger.info("--> restarting the node 1");
899+
internalCluster().startDataOnlyNode(
900+
Settings.builder().put("node.name", nodesWithReplicaShards.get(0)).put(replicaNode0DataPathSettings).build()
901+
);
902+
clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
903+
assertTrue(clusterRerouteResponse.isAcknowledged());
904+
ensureStableCluster(6);
905+
waitUntil(
906+
() -> client().admin().cluster().health(Requests.clusterHealthRequest().timeout("5m")).actionGet().getInitializingShards() == 0
907+
);
908+
909+
health = client().admin().cluster().health(Requests.clusterHealthRequest().timeout("5m")).actionGet();
910+
assertFalse(health.isTimedOut());
911+
assertEquals(YELLOW, health.getStatus());
912+
assertEquals(1, health.getUnassignedShards());
913+
assertEquals(1, health.getDelayedUnassignedShards());
914+
allocationExplainResponse = client().admin()
915+
.cluster()
916+
.prepareAllocationExplain()
917+
.setIndex("test")
918+
.setShard(0)
919+
.setPrimary(false)
920+
.get();
921+
assertEquals(
922+
AllocationDecision.ALLOCATION_DELAYED,
923+
allocationExplainResponse.getExplanation().getShardAllocationDecision().getAllocateDecision().getAllocationDecision()
924+
);
925+
926+
logger.info("--> restarting the node 0");
927+
internalCluster().startDataOnlyNode(
928+
Settings.builder().put("node.name", nodesWithReplicaShards.get(1)).put(replicaNode1DataPathSettings).build()
929+
);
930+
ensureStableCluster(7);
931+
ensureGreen("test");
932+
}
933+
846934
public void testNBatchesCreationAndAssignment() throws Exception {
847935
// we will reduce batch size to 5 to make sure we have enough batches to test assignment
848936
// Total number of primary shards = 50 (50 indices*1)
@@ -1293,4 +1381,14 @@ private void prepareIndex(String indexName, int numberOfPrimaryShards) {
12931381
index(indexName, "type", "1", Collections.emptyMap());
12941382
flush(indexName);
12951383
}
1384+
1385+
private List<String> findNodesWithShard(final boolean primary) {
1386+
ClusterState state = client().admin().cluster().prepareState().get().getState();
1387+
List<ShardRouting> startedShards = state.routingTable().shardsWithState(ShardRoutingState.STARTED);
1388+
List<ShardRouting> requiredStartedShards = startedShards.stream()
1389+
.filter(startedShard -> startedShard.primary() == primary)
1390+
.collect(Collectors.toList());
1391+
Collections.shuffle(requiredStartedShards, random());
1392+
return requiredStartedShards.stream().map(shard -> state.nodes().get(shard.currentNodeId()).getName()).collect(Collectors.toList());
1393+
}
12961394
}

server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java

+1-52
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.apache.logging.log4j.Logger;
3737
import org.opensearch.cluster.routing.RecoverySource;
3838
import org.opensearch.cluster.routing.RoutingNode;
39-
import org.opensearch.cluster.routing.RoutingNodes;
4039
import org.opensearch.cluster.routing.ShardRouting;
4140
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
4241
import org.opensearch.cluster.routing.allocation.AllocationDecision;
@@ -46,9 +45,7 @@
4645
import org.opensearch.cluster.routing.allocation.decider.Decision;
4746

4847
import java.util.ArrayList;
49-
import java.util.HashMap;
5048
import java.util.List;
51-
import java.util.stream.Collectors;
5249

5350
/**
5451
* An abstract class that implements basic functionality for allocating
@@ -81,38 +78,7 @@ public void allocateUnassigned(
8178
executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler);
8279
}
8380

84-
/**
85-
* Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists
86-
* @param shardRoutings the shards to allocate
87-
* @param allocation the allocation state container object
88-
*/
89-
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
90-
// make Allocation Decisions for all shards
91-
HashMap<ShardRouting, AllocateUnassignedDecision> decisionMap = makeAllocationDecision(shardRoutings, allocation, logger);
92-
assert shardRoutings.size() == decisionMap.size() : "make allocation decision didn't return allocation decision for "
93-
+ "some shards";
94-
// get all unassigned shards iterator
95-
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
96-
97-
while (iterator.hasNext()) {
98-
ShardRouting shard = iterator.next();
99-
try {
100-
if (decisionMap.isEmpty() == false) {
101-
if (decisionMap.containsKey(shard)) {
102-
executeDecision(shard, decisionMap.remove(shard), allocation, iterator);
103-
}
104-
} else {
105-
// no need to keep iterating the unassigned shards, if we don't have anything in decision map
106-
break;
107-
}
108-
} catch (Exception e) {
109-
logger.error("Failed to execute decision for shard {} while initializing {}", shard, e);
110-
throw e;
111-
}
112-
}
113-
}
114-
115-
private void executeDecision(
81+
protected void executeDecision(
11682
ShardRouting shardRouting,
11783
AllocateUnassignedDecision allocateUnassignedDecision,
11884
RoutingAllocation allocation,
@@ -135,8 +101,6 @@ private void executeDecision(
135101
}
136102
}
137103

138-
public void allocateUnassignedBatch(String batchId, RoutingAllocation allocation) {}
139-
140104
protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation allocation) {
141105
if (shardRouting.primary()) {
142106
if (shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
@@ -165,21 +129,6 @@ public abstract AllocateUnassignedDecision makeAllocationDecision(
165129
Logger logger
166130
);
167131

168-
public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
169-
List<ShardRouting> unassignedShardBatch,
170-
RoutingAllocation allocation,
171-
Logger logger
172-
) {
173-
174-
return (HashMap<ShardRouting, AllocateUnassignedDecision>) unassignedShardBatch.stream()
175-
.collect(
176-
Collectors.toMap(
177-
unassignedShard -> unassignedShard,
178-
unassignedShard -> makeAllocationDecision(unassignedShard, allocation, logger)
179-
)
180-
);
181-
}
182-
183132
/**
184133
* Builds decisions for all nodes in the cluster, so that the explain API can provide information on
185134
* allocation decisions for each node, while still waiting to allocate the shard (e.g. due to fetching shard data).

server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java

+35-25
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.opensearch.cluster.routing.ShardRouting;
1515
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
1616
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
17+
import org.opensearch.core.index.shard.ShardId;
1718
import org.opensearch.gateway.AsyncShardFetch.FetchResult;
1819
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard;
1920
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShard;
@@ -61,50 +62,59 @@ protected FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedS
6162

6263
@Override
6364
public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, Logger logger) {
64-
return makeAllocationDecision(Collections.singletonList(unassignedShard), allocation, logger).get(unassignedShard);
65+
AllocateUnassignedDecision decision = getInEligibleShardDecision(unassignedShard, allocation);
66+
if (decision != null) {
67+
return decision;
68+
}
69+
final FetchResult<NodeGatewayStartedShardsBatch> shardsState = fetchData(
70+
List.of(unassignedShard),
71+
Collections.emptyList(),
72+
allocation
73+
);
74+
List<NodeGatewayStartedShard> nodeGatewayStartedShards = adaptToNodeShardStates(unassignedShard, shardsState);
75+
return getAllocationDecision(unassignedShard, allocation, nodeGatewayStartedShards, logger);
6576
}
6677

6778
/**
68-
* Build allocation decisions for all the shards present in the batch identified by batchId.
79+
* Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists
6980
*
70-
* @param shards set of shards given for allocation
71-
* @param allocation current allocation of all the shards
72-
* @param logger logger used for logging
73-
* @return shard to allocation decision map
81+
* @param shardRoutings the shards to allocate
82+
* @param allocation the allocation state container object
7483
*/
75-
@Override
76-
public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
77-
List<ShardRouting> shards,
78-
RoutingAllocation allocation,
79-
Logger logger
80-
) {
81-
HashMap<ShardRouting, AllocateUnassignedDecision> shardAllocationDecisions = new HashMap<>();
84+
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
85+
HashMap<ShardId, AllocateUnassignedDecision> ineligibleShardAllocationDecisions = new HashMap<>();
8286
List<ShardRouting> eligibleShards = new ArrayList<>();
8387
List<ShardRouting> inEligibleShards = new ArrayList<>();
8488
// identify ineligible shards
85-
for (ShardRouting shard : shards) {
89+
for (ShardRouting shard : shardRoutings) {
8690
AllocateUnassignedDecision decision = getInEligibleShardDecision(shard, allocation);
8791
if (decision != null) {
92+
ineligibleShardAllocationDecisions.put(shard.shardId(), decision);
8893
inEligibleShards.add(shard);
89-
shardAllocationDecisions.put(shard, decision);
9094
} else {
9195
eligibleShards.add(shard);
9296
}
9397
}
94-
// Do not call fetchData if there are no eligible shards
95-
if (eligibleShards.isEmpty()) {
96-
return shardAllocationDecisions;
97-
}
98+
9899
// only fetch data for eligible shards
99100
final FetchResult<NodeGatewayStartedShardsBatch> shardsState = fetchData(eligibleShards, inEligibleShards, allocation);
100101

101-
// process the received data
102-
for (ShardRouting unassignedShard : eligibleShards) {
103-
List<NodeGatewayStartedShard> nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState);
104-
// get allocation decision for this shard
105-
shardAllocationDecisions.put(unassignedShard, getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger));
102+
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
103+
while (iterator.hasNext()) {
104+
ShardRouting unassignedShard = iterator.next();
105+
AllocateUnassignedDecision allocationDecision;
106+
107+
if (shardRoutings.contains(unassignedShard)) {
108+
assert unassignedShard.primary();
109+
if (ineligibleShardAllocationDecisions.containsKey(unassignedShard.shardId())) {
110+
allocationDecision = ineligibleShardAllocationDecisions.get(unassignedShard.shardId());
111+
} else {
112+
List<NodeGatewayStartedShard> nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState);
113+
allocationDecision = getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger);
114+
}
115+
executeDecision(unassignedShard, allocationDecision, allocation, iterator);
116+
}
106117
}
107-
return shardAllocationDecisions;
108118
}
109119

110120
/**

0 commit comments

Comments
 (0)