Skip to content

Commit d08c425

Browse files
[Batch Fetch] Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs (opensearch-project#14972)
* Fix for hasInitiatedFetching() in batch mode Signed-off-by: Rahul Karajgikar <karajgik@amazon.com>
1 parent 59302a3 commit d08c425

File tree

5 files changed

+191
-11
lines changed

5 files changed

+191
-11
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55

66
## [Unreleased 2.x]
77
### Added
8+
- Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs (([#14972](https://github.com/opensearch-project/OpenSearch/pull/14972))
89

910
### Dependencies
1011
- Bump `org.apache.commons:commons-lang3` from 3.14.0 to 3.15.0 ([#14861](https://github.com/opensearch-project/OpenSearch/pull/14861))

server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java

+151-9
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.opensearch.cluster.routing.ShardRouting;
5858
import org.opensearch.cluster.routing.ShardRoutingState;
5959
import org.opensearch.cluster.routing.UnassignedInfo;
60+
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
6061
import org.opensearch.cluster.routing.allocation.AllocationDecision;
6162
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
6263
import org.opensearch.cluster.service.ClusterService;
@@ -797,11 +798,26 @@ public void testBatchModeEnabledWithoutTimeout() throws Exception {
797798
);
798799
assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
799800
assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches());
800-
assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches());
801+
// Replica shard would be marked ineligible since there are no data nodes.
802+
// It would then be removed from any batch and batches would get deleted, so we would have 0 replica batches
803+
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
801804

802-
// Now start both data nodes and ensure batch mode is working
803-
logger.info("--> restarting the stopped nodes");
805+
// Now start one data node
806+
logger.info("--> restarting the first stopped node");
804807
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build());
808+
ensureStableCluster(2);
809+
ensureYellow("test");
810+
assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches());
811+
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
812+
assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches());
813+
814+
// calling reroute and asserting on reroute response
815+
logger.info("--> calling reroute while cluster is yellow");
816+
clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
817+
assertTrue(clusterRerouteResponse.isAcknowledged());
818+
819+
// Now start last data node and ensure batch mode is working and cluster goes green
820+
logger.info("--> restarting the second stopped node");
805821
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build());
806822
ensureStableCluster(3);
807823
ensureGreen("test");
@@ -842,11 +858,26 @@ public void testBatchModeEnabledWithSufficientTimeoutAndClusterGreen() throws Ex
842858
);
843859
assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
844860
assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches());
845-
assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches());
861+
// Replica shard would be marked ineligible since there are no data nodes.
862+
// It would then be removed from any batch and batches would get deleted, so we would have 0 replica batches
863+
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
846864

847-
// Now start both data nodes and ensure batch mode is working
848-
logger.info("--> restarting the stopped nodes");
865+
// Now start one data nodes and ensure batch mode is working
866+
logger.info("--> restarting the first stopped node");
849867
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build());
868+
ensureStableCluster(2);
869+
ensureYellow("test");
870+
assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches());
871+
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
872+
assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches());
873+
874+
// calling reroute and asserting on reroute response
875+
logger.info("--> calling reroute while cluster is yellow");
876+
clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
877+
assertTrue(clusterRerouteResponse.isAcknowledged());
878+
879+
// Now start last data node and ensure batch mode is working and cluster goes green
880+
logger.info("--> restarting the second stopped node");
850881
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build());
851882
ensureStableCluster(3);
852883
ensureGreen("test");
@@ -907,7 +938,9 @@ public void testBatchModeEnabledWithInSufficientTimeoutButClusterGreen() throws
907938

908939
assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
909940
assertEquals(10, gatewayAllocator.getNumberOfStartedShardBatches());
910-
assertEquals(10, gatewayAllocator.getNumberOfStoreShardBatches());
941+
// All replica shards would be marked ineligible since there are no data nodes.
942+
// They would then be removed from any batch and batches would get deleted, so we would have 0 replica batches
943+
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
911944
health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet();
912945
assertFalse(health.isTimedOut());
913946
assertEquals(RED, health.getStatus());
@@ -1051,6 +1084,18 @@ public void testMultipleReplicaShardAssignmentWithDelayedAllocationAndDifferentN
10511084
ensureGreen("test");
10521085
}
10531086

1087+
public void testAllocationExplainReturnsNoWhenExtraReplicaShardInNonBatchMode() throws Exception {
1088+
// Non batch mode - This test is to validate that we don't return AWAITING_INFO in allocation explain API when the deciders are
1089+
// returning NO
1090+
this.allocationExplainReturnsNoWhenExtraReplicaShard(false);
1091+
}
1092+
1093+
public void testAllocationExplainReturnsNoWhenExtraReplicaShardInBatchMode() throws Exception {
1094+
// Batch mode - This test is to validate that we don't return AWAITING_INFO in allocation explain API when the deciders are
1095+
// returning NO
1096+
this.allocationExplainReturnsNoWhenExtraReplicaShard(true);
1097+
}
1098+
10541099
public void testNBatchesCreationAndAssignment() throws Exception {
10551100
// we will reduce batch size to 5 to make sure we have enough batches to test assignment
10561101
// Total number of primary shards = 50 (50 indices*1)
@@ -1104,7 +1149,9 @@ public void testNBatchesCreationAndAssignment() throws Exception {
11041149
);
11051150
assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
11061151
assertEquals(10, gatewayAllocator.getNumberOfStartedShardBatches());
1107-
assertEquals(10, gatewayAllocator.getNumberOfStoreShardBatches());
1152+
// All replica shards would be marked ineligible since there are no data nodes.
1153+
// They would then be removed from any batch and batches would get deleted, so we would have 0 replica batches
1154+
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
11081155
health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet();
11091156
assertFalse(health.isTimedOut());
11101157
assertEquals(RED, health.getStatus());
@@ -1193,7 +1240,9 @@ public void testCulpritShardInBatch() throws Exception {
11931240
);
11941241
assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
11951242
assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches());
1196-
assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches());
1243+
// Replica shard would be marked ineligible since there are no data nodes.
1244+
// It would then be removed from any batch and batches would get deleted, so we would have 0 replica batches
1245+
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
11971246
assertTrue(clusterRerouteResponse.isAcknowledged());
11981247
health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet();
11991248
assertFalse(health.isTimedOut());
@@ -1511,4 +1560,97 @@ private List<String> findNodesWithShard(final boolean primary) {
15111560
Collections.shuffle(requiredStartedShards, random());
15121561
return requiredStartedShards.stream().map(shard -> state.nodes().get(shard.currentNodeId()).getName()).collect(Collectors.toList());
15131562
}
1563+
1564+
private void allocationExplainReturnsNoWhenExtraReplicaShard(boolean batchModeEnabled) throws Exception {
1565+
internalCluster().startClusterManagerOnlyNodes(
1566+
1,
1567+
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), batchModeEnabled).build()
1568+
);
1569+
internalCluster().startDataOnlyNodes(5);
1570+
createIndex(
1571+
"test",
1572+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 4).build()
1573+
);
1574+
ensureGreen("test");
1575+
ensureStableCluster(6);
1576+
1577+
// Stop one of the nodes to make the cluster yellow
1578+
// We cannot directly create an index with replica = data node count because then the whole flow will get skipped due to
1579+
// INDEX_CREATED
1580+
List<String> nodesWithReplicaShards = findNodesWithShard(false);
1581+
Settings replicaNodeDataPathSettings = internalCluster().dataPathSettings(nodesWithReplicaShards.get(0));
1582+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodesWithReplicaShards.get(0)));
1583+
1584+
ensureStableCluster(5);
1585+
ensureYellow("test");
1586+
1587+
logger.info("--> calling allocation explain API");
1588+
// shard should have decision NO because there is no valid node for the extra replica to go to
1589+
AllocateUnassignedDecision aud = client().admin()
1590+
.cluster()
1591+
.prepareAllocationExplain()
1592+
.setIndex("test")
1593+
.setShard(0)
1594+
.setPrimary(false)
1595+
.get()
1596+
.getExplanation()
1597+
.getShardAllocationDecision()
1598+
.getAllocateDecision();
1599+
1600+
assertEquals(AllocationDecision.NO, aud.getAllocationDecision());
1601+
assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation());
1602+
1603+
// Now creating a new index with too many replicas and trying again
1604+
createIndex(
1605+
"test2",
1606+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 5).build()
1607+
);
1608+
1609+
ensureYellowAndNoInitializingShards("test2");
1610+
1611+
logger.info("--> calling allocation explain API again");
1612+
// shard should have decision NO because there are 6 replicas and 4 data nodes
1613+
aud = client().admin()
1614+
.cluster()
1615+
.prepareAllocationExplain()
1616+
.setIndex("test2")
1617+
.setShard(0)
1618+
.setPrimary(false)
1619+
.get()
1620+
.getExplanation()
1621+
.getShardAllocationDecision()
1622+
.getAllocateDecision();
1623+
1624+
assertEquals(AllocationDecision.NO, aud.getAllocationDecision());
1625+
assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation());
1626+
1627+
logger.info("--> restarting the stopped node");
1628+
internalCluster().startDataOnlyNode(
1629+
Settings.builder().put("node.name", nodesWithReplicaShards.get(0)).put(replicaNodeDataPathSettings).build()
1630+
);
1631+
1632+
ensureStableCluster(6);
1633+
ensureGreen("test");
1634+
1635+
logger.info("--> calling allocation explain API 3rd time");
1636+
// shard should still have decision NO because there are 6 replicas and 5 data nodes
1637+
aud = client().admin()
1638+
.cluster()
1639+
.prepareAllocationExplain()
1640+
.setIndex("test2")
1641+
.setShard(0)
1642+
.setPrimary(false)
1643+
.get()
1644+
.getExplanation()
1645+
.getShardAllocationDecision()
1646+
.getAllocateDecision();
1647+
1648+
assertEquals(AllocationDecision.NO, aud.getAllocationDecision());
1649+
assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation());
1650+
1651+
internalCluster().startDataOnlyNodes(1);
1652+
1653+
ensureStableCluster(7);
1654+
ensureGreen("test2");
1655+
}
15141656
}

server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java

+8
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,14 @@ public synchronized void clearShard(ShardId shardId) {
8080
this.cache.deleteShard(shardId);
8181
}
8282

83+
public boolean hasEmptyCache() {
84+
return this.cache.getCache().isEmpty();
85+
}
86+
87+
public AsyncShardFetchCache<T> getCache() {
88+
return this.cache;
89+
}
90+
8391
/**
8492
* Cache implementation of transport actions returning batch of shards related data in the response.
8593
* Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShardsBatch} or

server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ private AllocateUnassignedDecision getUnassignedShardAllocationDecision(
183183
if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(shardRouting))) {
184184
// only return early if we are not in explain mode, or we are in explain mode but we have not
185185
// yet attempted to fetch any shard data
186-
logger.trace("{}: ignoring allocation, can't be allocated on any node", shardRouting);
186+
logger.trace("{}: ignoring allocation, can't be allocated on any node. Decision: {}", shardRouting, allocationDecision.type());
187187
return AllocateUnassignedDecision.no(
188188
UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()),
189189
result.v2() != null ? new ArrayList<>(result.v2().values()) : null

server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java

+30-1
Original file line numberDiff line numberDiff line change
@@ -576,8 +576,37 @@ protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadataBatch.
576576

577577
@Override
578578
protected boolean hasInitiatedFetching(ShardRouting shard) {
579+
/**
580+
* This function is to check if asyncFetch has happened before for this shard batch, or is ongoing.
581+
* It should return false if there has never been a fetch for this batch.
582+
* This function is currently only used in the case of replica shards when all deciders returned NO/THROTTLE, and explain mode is ON.
583+
* Allocation explain and manual reroute APIs try to append shard store information (matching bytes) to the allocation decision.
584+
* However, these APIs do not want to trigger a new asyncFetch for these ineligible shards, unless the data from nodes is already there.
585+
* This function is used to see if a fetch has happened to decide if it is possible to append shard store info without a new async fetch.
586+
* In the case when shard has a batch but no fetch has happened before, it would be because it is a new batch.
587+
* In the case when shard has a batch, and a fetch has happened before, and no fetch is ongoing, it would be because we have already completed fetch for all nodes.
588+
*
589+
* In order to check if a fetch has ever happened, we check 2 things:
590+
* 1. If the shard batch cache is empty, we know that fetch has never happened so we return false.
591+
* 2. If we see that the list of nodes to fetch from is empty, we know that all nodes have data or are ongoing a fetch. So we return true.
592+
* 3. Otherwise we return false.
593+
*
594+
* see {@link AsyncShardFetchCache#findNodesToFetch()}
595+
*/
579596
String batchId = getBatchId(shard, shard.primary());
580-
return batchId != null;
597+
if (batchId == null) {
598+
return false;
599+
}
600+
logger.trace("Checking if fetching done for batch id {}", batchId);
601+
ShardsBatch shardsBatch = shard.primary() ? batchIdToStartedShardBatch.get(batchId) : batchIdToStoreShardBatch.get(batchId);
602+
// if fetchData has never been called, the per node cache will be empty and have no nodes
603+
// this is because cache.fillShardCacheWithDataNodes(nodes) initialises this map and is called in AsyncShardFetch.fetchData
604+
if (shardsBatch == null || shardsBatch.getAsyncFetcher().hasEmptyCache()) {
605+
logger.trace("Batch cache is empty for batch {} ", batchId);
606+
return false;
607+
}
608+
// this check below is to make sure we already have all the data and that we wouldn't create a new async fetchData call
609+
return shardsBatch.getAsyncFetcher().getCache().findNodesToFetch().isEmpty();
581610
}
582611
}
583612

0 commit comments

Comments
 (0)