Skip to content

Commit 12115d1

Browse files
authoredMar 14, 2024··
Add batch async shard fetch transport action for replica #8218 (#8356)
* Add batch async shard fetch transport action for replica Signed-off-by: sudarshan baliga <baliga108@gmail.com> Signed-off-by: Shivansh Arora <hishiv@amazon.com> Signed-off-by: Aman Khare <amkhar@amazon.com>
1 parent b265215 commit 12115d1

File tree

8 files changed

+728
-207
lines changed

8 files changed

+728
-207
lines changed
 

‎server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java

+133
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,12 @@
3232

3333
package org.opensearch.gateway;
3434

35+
import org.opensearch.Version;
3536
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction;
3637
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
3738
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
3839
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
40+
import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse;
3941
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
4042
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
4143
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
@@ -46,6 +48,7 @@
4648
import org.opensearch.cluster.coordination.ElectionSchedulerFactory;
4749
import org.opensearch.cluster.metadata.IndexMetadata;
4850
import org.opensearch.cluster.node.DiscoveryNode;
51+
import org.opensearch.cluster.routing.ShardRouting;
4952
import org.opensearch.cluster.routing.UnassignedInfo;
5053
import org.opensearch.cluster.service.ClusterService;
5154
import org.opensearch.common.settings.Settings;
@@ -63,6 +66,8 @@
6366
import org.opensearch.indices.recovery.RecoveryState;
6467
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
6568
import org.opensearch.indices.store.ShardAttributes;
69+
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch;
70+
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper;
6671
import org.opensearch.plugins.Plugin;
6772
import org.opensearch.test.InternalSettingsPlugin;
6873
import org.opensearch.test.InternalTestCluster.RestartCallback;
@@ -82,8 +87,11 @@
8287
import java.util.List;
8388
import java.util.Map;
8489
import java.util.Set;
90+
import java.util.concurrent.ExecutionException;
8591
import java.util.stream.IntStream;
8692

93+
import static java.util.Collections.emptyMap;
94+
import static java.util.Collections.emptySet;
8795
import static org.opensearch.cluster.coordination.ClusterBootstrapService.INITIAL_CLUSTER_MANAGER_NODES_SETTING;
8896
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
8997
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
@@ -817,6 +825,131 @@ public void testShardFetchCorruptedShardsUsingBatchAction() throws Exception {
817825
assertTrue(nodeGatewayStartedShards.primary());
818826
}
819827

828+
public void testSingleShardStoreFetchUsingBatchAction() throws ExecutionException, InterruptedException {
829+
String indexName = "test";
830+
DiscoveryNode[] nodes = getDiscoveryNodes();
831+
TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response = prepareAndSendRequest(
832+
new String[] { indexName },
833+
nodes
834+
);
835+
Index index = resolveIndex(indexName);
836+
ShardId shardId = new ShardId(index, 0);
837+
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata = response.getNodesMap()
838+
.get(nodes[0].getId())
839+
.getNodeStoreFilesMetadataBatch()
840+
.get(shardId);
841+
assertNodeStoreFilesMetadataSuccessCase(nodeStoreFilesMetadata, shardId);
842+
}
843+
844+
public void testShardStoreFetchMultiNodeMultiIndexesUsingBatchAction() throws Exception {
845+
internalCluster().startNodes(2);
846+
String indexName1 = "test1";
847+
String indexName2 = "test2";
848+
DiscoveryNode[] nodes = getDiscoveryNodes();
849+
TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response = prepareAndSendRequest(
850+
new String[] { indexName1, indexName2 },
851+
nodes
852+
);
853+
ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName1, indexName2).get();
854+
for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
855+
ShardId shardId = clusterSearchShardsGroup.getShardId();
856+
ShardRouting[] shardRoutings = clusterSearchShardsGroup.getShards();
857+
assertEquals(2, shardRoutings.length);
858+
for (ShardRouting shardRouting : shardRoutings) {
859+
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata = response.getNodesMap()
860+
.get(shardRouting.currentNodeId())
861+
.getNodeStoreFilesMetadataBatch()
862+
.get(shardId);
863+
assertNodeStoreFilesMetadataSuccessCase(nodeStoreFilesMetadata, shardId);
864+
}
865+
}
866+
}
867+
868+
public void testShardStoreFetchNodeNotConnectedUsingBatchAction() {
869+
DiscoveryNode nonExistingNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
870+
String indexName = "test";
871+
TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response = prepareAndSendRequest(
872+
new String[] { indexName },
873+
new DiscoveryNode[] { nonExistingNode }
874+
);
875+
assertTrue(response.hasFailures());
876+
assertEquals(1, response.failures().size());
877+
assertEquals(nonExistingNode.getId(), response.failures().get(0).nodeId());
878+
}
879+
880+
public void testShardStoreFetchCorruptedIndexUsingBatchAction() throws Exception {
881+
internalCluster().startNodes(2);
882+
String index1Name = "test1";
883+
String index2Name = "test2";
884+
prepareIndices(new String[] { index1Name, index2Name }, 1, 1);
885+
Map<ShardId, ShardAttributes> shardAttributesMap = prepareRequestMap(new String[] { index1Name, index2Name }, 1);
886+
Index index1 = resolveIndex(index1Name);
887+
ShardId shardId1 = new ShardId(index1, 0);
888+
ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(index1Name).get();
889+
assertEquals(2, searchShardsResponse.getNodes().length);
890+
891+
// corrupt test1 index shards
892+
corruptShard(searchShardsResponse.getNodes()[0].getName(), shardId1);
893+
corruptShard(searchShardsResponse.getNodes()[1].getName(), shardId1);
894+
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(false).get();
895+
DiscoveryNode[] discoveryNodes = getDiscoveryNodes();
896+
TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response;
897+
response = ActionTestUtils.executeBlocking(
898+
internalCluster().getInstance(TransportNodesListShardStoreMetadataBatch.class),
899+
new TransportNodesListShardStoreMetadataBatch.Request(shardAttributesMap, discoveryNodes)
900+
);
901+
Map<ShardId, TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata> nodeStoreFilesMetadata = response.getNodesMap()
902+
.get(discoveryNodes[0].getId())
903+
.getNodeStoreFilesMetadataBatch();
904+
// We don't store exception in case of corrupt index, rather just return an empty response
905+
assertNull(nodeStoreFilesMetadata.get(shardId1).getStoreFileFetchException());
906+
assertEquals(shardId1, nodeStoreFilesMetadata.get(shardId1).storeFilesMetadata().shardId());
907+
assertTrue(nodeStoreFilesMetadata.get(shardId1).storeFilesMetadata().isEmpty());
908+
909+
Index index2 = resolveIndex(index2Name);
910+
ShardId shardId2 = new ShardId(index2, 0);
911+
assertNodeStoreFilesMetadataSuccessCase(nodeStoreFilesMetadata.get(shardId2), shardId2);
912+
}
913+
914+
private void prepareIndices(String[] indices, int numberOfPrimaryShards, int numberOfReplicaShards) {
915+
for (String index : indices) {
916+
createIndex(
917+
index,
918+
Settings.builder()
919+
.put(SETTING_NUMBER_OF_SHARDS, numberOfPrimaryShards)
920+
.put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicaShards)
921+
.build()
922+
);
923+
index(index, "type", "1", Collections.emptyMap());
924+
flush(index);
925+
}
926+
}
927+
928+
private TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch prepareAndSendRequest(
929+
String[] indices,
930+
DiscoveryNode[] nodes
931+
) {
932+
Map<ShardId, ShardAttributes> shardAttributesMap = null;
933+
prepareIndices(indices, 1, 1);
934+
shardAttributesMap = prepareRequestMap(indices, 1);
935+
TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response;
936+
return ActionTestUtils.executeBlocking(
937+
internalCluster().getInstance(TransportNodesListShardStoreMetadataBatch.class),
938+
new TransportNodesListShardStoreMetadataBatch.Request(shardAttributesMap, nodes)
939+
);
940+
}
941+
942+
private void assertNodeStoreFilesMetadataSuccessCase(
943+
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata,
944+
ShardId shardId
945+
) {
946+
assertNull(nodeStoreFilesMetadata.getStoreFileFetchException());
947+
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFileMetadata = nodeStoreFilesMetadata.storeFilesMetadata();
948+
assertFalse(storeFileMetadata.isEmpty());
949+
assertEquals(shardId, storeFileMetadata.shardId());
950+
assertNotNull(storeFileMetadata.peerRecoveryRetentionLeases());
951+
}
952+
820953
private void assertNodeGatewayStartedShardsHappyCase(
821954
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards
822955
) {

‎server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java

+11-20
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@
5151
import org.opensearch.common.unit.TimeValue;
5252
import org.opensearch.core.common.unit.ByteSizeValue;
5353
import org.opensearch.index.store.StoreFileMetadata;
54-
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
5554
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata;
55+
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata;
5656

5757
import java.util.ArrayList;
5858
import java.util.Collections;
@@ -106,7 +106,7 @@ public void processExistingRecoveries(RoutingAllocation allocation) {
106106
assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary";
107107
assert primaryShard.currentNodeId() != null;
108108
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
109-
final TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
109+
final StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
110110
if (primaryStore == null) {
111111
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
112112
// just let the recovery find it out, no need to do anything about it for the initializing shard
@@ -223,7 +223,7 @@ public AllocateUnassignedDecision makeAllocationDecision(
223223
}
224224
assert primaryShard.currentNodeId() != null;
225225
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
226-
final TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
226+
final StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
227227
if (primaryStore == null) {
228228
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
229229
// we want to let the replica be allocated in order to expose the actual problem with the primary that the replica
@@ -357,10 +357,7 @@ private static List<NodeAllocationResult> augmentExplanationsWithStoreInfo(
357357
/**
358358
* Finds the store for the assigned shard in the fetched data, returns null if none is found.
359359
*/
360-
private static TransportNodesListShardStoreMetadata.StoreFilesMetadata findStore(
361-
DiscoveryNode node,
362-
AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> data
363-
) {
360+
private static StoreFilesMetadata findStore(DiscoveryNode node, AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> data) {
364361
NodeStoreFilesMetadata nodeFilesStore = data.getData().get(node);
365362
if (nodeFilesStore == null) {
366363
return null;
@@ -373,7 +370,7 @@ private MatchingNodes findMatchingNodes(
373370
RoutingAllocation allocation,
374371
boolean noMatchFailedNodes,
375372
DiscoveryNode primaryNode,
376-
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
373+
StoreFilesMetadata primaryStore,
377374
AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> data,
378375
boolean explain
379376
) {
@@ -386,7 +383,7 @@ private MatchingNodes findMatchingNodes(
386383
&& shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) {
387384
continue;
388385
}
389-
TransportNodesListShardStoreMetadata.StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue().storeFilesMetadata();
386+
StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue().storeFilesMetadata();
390387
// we don't have any files at all, it is an empty index
391388
if (storeFilesMetadata.isEmpty()) {
392389
continue;
@@ -441,10 +438,7 @@ private MatchingNodes findMatchingNodes(
441438
return new MatchingNodes(matchingNodes, nodeDecisions);
442439
}
443440

444-
private static long computeMatchingBytes(
445-
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
446-
TransportNodesListShardStoreMetadata.StoreFilesMetadata storeFilesMetadata
447-
) {
441+
private static long computeMatchingBytes(StoreFilesMetadata primaryStore, StoreFilesMetadata storeFilesMetadata) {
448442
long sizeMatched = 0;
449443
for (StoreFileMetadata storeFileMetadata : storeFilesMetadata) {
450444
String metadataFileName = storeFileMetadata.name();
@@ -455,19 +449,16 @@ private static long computeMatchingBytes(
455449
return sizeMatched;
456450
}
457451

458-
private static boolean hasMatchingSyncId(
459-
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
460-
TransportNodesListShardStoreMetadata.StoreFilesMetadata replicaStore
461-
) {
452+
private static boolean hasMatchingSyncId(StoreFilesMetadata primaryStore, StoreFilesMetadata replicaStore) {
462453
String primarySyncId = primaryStore.syncId();
463454
return primarySyncId != null && primarySyncId.equals(replicaStore.syncId());
464455
}
465456

466457
private static MatchingNode computeMatchingNode(
467458
DiscoveryNode primaryNode,
468-
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
459+
StoreFilesMetadata primaryStore,
469460
DiscoveryNode replicaNode,
470-
TransportNodesListShardStoreMetadata.StoreFilesMetadata replicaStore
461+
StoreFilesMetadata replicaStore
471462
) {
472463
final long retainingSeqNoForPrimary = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(primaryNode);
473464
final long retainingSeqNoForReplica = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(replicaNode);
@@ -478,7 +469,7 @@ private static MatchingNode computeMatchingNode(
478469
}
479470

480471
private static boolean canPerformOperationBasedRecovery(
481-
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
472+
StoreFilesMetadata primaryStore,
482473
AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> shardStores,
483474
DiscoveryNode targetNode
484475
) {

‎server/src/main/java/org/opensearch/indices/IndicesModule.java

+2
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
8282
import org.opensearch.indices.store.IndicesStore;
8383
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
84+
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch;
8485
import org.opensearch.plugins.MapperPlugin;
8586

8687
import java.util.ArrayList;
@@ -281,6 +282,7 @@ protected void configure() {
281282
bind(IndicesStore.class).asEagerSingleton();
282283
bind(IndicesClusterStateService.class).asEagerSingleton();
283284
bind(TransportNodesListShardStoreMetadata.class).asEagerSingleton();
285+
bind(TransportNodesListShardStoreMetadataBatch.class).asEagerSingleton();
284286
bind(GlobalCheckpointSyncAction.class).asEagerSingleton();
285287
bind(TransportResyncReplicationAction.class).asEagerSingleton();
286288
bind(PrimaryReplicaSyncer.class).asEagerSingleton();

0 commit comments

Comments
 (0)
Please sign in to comment.