Skip to content

Commit efa06fe

Browse files
authored
Prefer remote replicas for primary promotion during migration (opensearch-project#13136)
Signed-off-by: Lakshya Taragi <lakshya.taragi@gmail.com>
1 parent c9f1565 commit efa06fe

File tree

4 files changed

+316
-5
lines changed

4 files changed

+316
-5
lines changed

server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java

+135
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,141 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception {
439439
);
440440
}
441441

442+
/*
443+
Scenario:
444+
- Starts 1 docrep backed data node
445+
- Creates an index with 0 replica
446+
- Starts 1 remote backed data node
447+
- Moves primary copy from docrep to remote through _cluster/reroute
448+
- Starts 1 more remote backed data node
449+
- Expands index to 2 replicas, one each on new remote node and docrep node
450+
- Stops remote enabled node hosting the primary
451+
- Ensures remote replica gets promoted to primary
452+
- Ensures doc count is same after failover
453+
- Indexes some more docs to ensure working of failed-over primary
454+
*/
455+
public void testFailoverRemotePrimaryToRemoteReplica() throws Exception {
456+
internalCluster().startClusterManagerOnlyNode();
457+
458+
logger.info("---> Starting 1 docrep data node");
459+
String docrepNodeName = internalCluster().startDataOnlyNode();
460+
internalCluster().validateClusterFormed();
461+
assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0);
462+
463+
logger.info("---> Creating index with 0 replica");
464+
createIndex(FAILOVER_REMOTE_TO_REMOTE, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
465+
ensureGreen(FAILOVER_REMOTE_TO_REMOTE);
466+
initDocRepToRemoteMigration();
467+
468+
logger.info("---> Starting 1 remote enabled data node");
469+
addRemote = true;
470+
String remoteNodeName1 = internalCluster().startDataOnlyNode();
471+
internalCluster().validateClusterFormed();
472+
assertEquals(
473+
internalCluster().client()
474+
.admin()
475+
.cluster()
476+
.prepareGetRepositories(REPOSITORY_NAME, REPOSITORY_2_NAME)
477+
.get()
478+
.repositories()
479+
.size(),
480+
2
481+
);
482+
483+
logger.info("---> Starting doc ingestion in parallel thread");
484+
AsyncIndexingService asyncIndexingService = new AsyncIndexingService(FAILOVER_REMOTE_TO_REMOTE);
485+
asyncIndexingService.startIndexing();
486+
487+
logger.info("---> Moving primary copy from docrep node {} to remote enabled node {}", docrepNodeName, remoteNodeName1);
488+
assertAcked(
489+
internalCluster().client()
490+
.admin()
491+
.cluster()
492+
.prepareReroute()
493+
.add(new MoveAllocationCommand(FAILOVER_REMOTE_TO_REMOTE, 0, docrepNodeName, remoteNodeName1))
494+
.get()
495+
);
496+
ensureGreen(FAILOVER_REMOTE_TO_REMOTE);
497+
assertEquals(primaryNodeName(FAILOVER_REMOTE_TO_REMOTE), remoteNodeName1);
498+
499+
logger.info("---> Starting 1 more remote enabled data node");
500+
String remoteNodeName2 = internalCluster().startDataOnlyNode();
501+
internalCluster().validateClusterFormed();
502+
503+
logger.info("---> Expanding index to 2 replica copies, on docrepNode and remoteNode2");
504+
assertAcked(
505+
internalCluster().client()
506+
.admin()
507+
.indices()
508+
.prepareUpdateSettings()
509+
.setIndices(FAILOVER_REMOTE_TO_REMOTE)
510+
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2).build())
511+
.get()
512+
);
513+
ensureGreen(FAILOVER_REMOTE_TO_REMOTE);
514+
515+
logger.info("---> Stopping indexing thread");
516+
asyncIndexingService.stopIndexing();
517+
518+
refreshAndWaitForReplication(FAILOVER_REMOTE_TO_REMOTE);
519+
Map<ShardRouting, ShardStats> shardStatsMap = internalCluster().client()
520+
.admin()
521+
.indices()
522+
.prepareStats(FAILOVER_REMOTE_TO_REMOTE)
523+
.setDocs(true)
524+
.get()
525+
.asMap();
526+
DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
527+
long initialPrimaryDocCount = 0;
528+
for (ShardRouting shardRouting : shardStatsMap.keySet()) {
529+
if (shardRouting.primary()) {
530+
assertTrue(nodes.get(shardRouting.currentNodeId()).isRemoteStoreNode());
531+
initialPrimaryDocCount = shardStatsMap.get(shardRouting).getStats().getDocs().getCount();
532+
}
533+
}
534+
int firstBatch = (int) asyncIndexingService.getIndexedDocs();
535+
assertReplicaAndPrimaryConsistency(FAILOVER_REMOTE_TO_REMOTE, firstBatch, 0);
536+
537+
logger.info("---> Stop remote store enabled node hosting the primary");
538+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(remoteNodeName1));
539+
ensureStableCluster(3);
540+
ensureYellow(FAILOVER_REMOTE_TO_REMOTE);
541+
DiscoveryNodes finalNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
542+
543+
waitUntil(() -> {
544+
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
545+
String nodeId = clusterState.getRoutingTable().index(FAILOVER_REMOTE_TO_REMOTE).shard(0).primaryShard().currentNodeId();
546+
if (nodeId == null) {
547+
return false;
548+
} else {
549+
assertEquals(finalNodes.get(nodeId).getName(), remoteNodeName2);
550+
return finalNodes.get(nodeId).isRemoteStoreNode();
551+
}
552+
});
553+
554+
shardStatsMap = internalCluster().client().admin().indices().prepareStats(FAILOVER_REMOTE_TO_REMOTE).setDocs(true).get().asMap();
555+
long primaryDocCountAfterFailover = 0;
556+
for (ShardRouting shardRouting : shardStatsMap.keySet()) {
557+
if (shardRouting.primary()) {
558+
assertTrue(finalNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode());
559+
primaryDocCountAfterFailover = shardStatsMap.get(shardRouting).getStats().getDocs().getCount();
560+
}
561+
}
562+
assertEquals(initialPrimaryDocCount, primaryDocCountAfterFailover);
563+
564+
logger.info("---> Index some more docs to ensure that the failed over primary is ingesting new docs");
565+
int secondBatch = randomIntBetween(1, 10);
566+
logger.info("---> Indexing {} more docs", secondBatch);
567+
indexBulk(FAILOVER_REMOTE_TO_REMOTE, secondBatch);
568+
refreshAndWaitForReplication(FAILOVER_REMOTE_TO_REMOTE);
569+
570+
shardStatsMap = internalCluster().client().admin().indices().prepareStats(FAILOVER_REMOTE_TO_REMOTE).setDocs(true).get().asMap();
571+
assertEquals(2, shardStatsMap.size());
572+
shardStatsMap.forEach(
573+
(shardRouting, shardStats) -> { assertEquals(firstBatch + secondBatch, shardStats.getStats().getDocs().getCount()); }
574+
);
575+
}
576+
442577
/*
443578
Scenario:
444579
- Starts 1 docrep backed data node

server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java

+27-5
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@
6767
import java.util.stream.Collectors;
6868
import java.util.stream.Stream;
6969

70+
import static org.opensearch.node.remotestore.RemoteStoreNodeService.isMigratingToRemoteStore;
71+
7072
/**
7173
* {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}.
7274
* It can be either initialized as mutable or immutable (see {@link #RoutingNodes(ClusterState, boolean)}), allowing
@@ -418,6 +420,20 @@ public ShardRouting activeReplicaWithOldestVersion(ShardId shardId) {
418420
.orElse(null);
419421
}
420422

423+
/**
424+
* Returns one active replica shard on a remote node for the given shard id or <code>null</code> if
425+
* no such replica is found.
426+
* <p>
427+
* Since we aim to continue moving forward during remote store migration, replicas already migrated to remote nodes
428+
* are preferred for primary promotion
429+
*/
430+
public ShardRouting activeReplicaOnRemoteNode(ShardId shardId) {
431+
return assignedShards(shardId).stream().filter(shr -> !shr.primary() && shr.active()).filter((shr) -> {
432+
RoutingNode nd = node(shr.currentNodeId());
433+
return (nd != null && nd.node().isRemoteStoreNode());
434+
}).findFirst().orElse(null);
435+
}
436+
421437
/**
422438
* Returns <code>true</code> iff all replicas are active for the given shard routing. Otherwise <code>false</code>
423439
*/
@@ -735,11 +751,17 @@ private void unassignPrimaryAndPromoteActiveReplicaIfExists(
735751
RoutingChangesObserver routingChangesObserver
736752
) {
737753
assert failedShard.primary();
738-
ShardRouting activeReplica;
739-
if (metadata.isSegmentReplicationEnabled(failedShard.getIndexName())) {
740-
activeReplica = activeReplicaWithOldestVersion(failedShard.shardId());
741-
} else {
742-
activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
754+
ShardRouting activeReplica = null;
755+
if (isMigratingToRemoteStore(metadata)) {
756+
// we might not find any replica on remote node
757+
activeReplica = activeReplicaOnRemoteNode(failedShard.shardId());
758+
}
759+
if (activeReplica == null) {
760+
if (metadata.isSegmentReplicationEnabled(failedShard.getIndexName())) {
761+
activeReplica = activeReplicaWithOldestVersion(failedShard.shardId());
762+
} else {
763+
activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
764+
}
743765
}
744766
if (activeReplica == null) {
745767
moveToUnassigned(failedShard, unassignedInfo);

server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java

+14
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.apache.logging.log4j.message.ParameterizedMessage;
14+
import org.opensearch.cluster.metadata.Metadata;
1415
import org.opensearch.cluster.metadata.RepositoriesMetadata;
1516
import org.opensearch.cluster.metadata.RepositoryMetadata;
1617
import org.opensearch.cluster.node.DiscoveryNode;
@@ -238,4 +239,17 @@ public static boolean isMigratingToRemoteStore(ClusterSettings clusterSettings)
238239

239240
return (isMixedMode && isRemoteStoreMigrationDirection);
240241
}
242+
243+
/**
244+
* To check if the cluster is undergoing remote store migration using clusterState metadata
245+
* @return
246+
* <code>true</code> For <code>REMOTE_STORE</code> migration direction and <code>MIXED</code> compatibility mode,
247+
* <code>false</code> otherwise
248+
*/
249+
public static boolean isMigratingToRemoteStore(Metadata metadata) {
250+
boolean isMixedMode = REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(metadata.settings()).equals(CompatibilityMode.MIXED);
251+
boolean isRemoteStoreMigrationDirection = MIGRATION_DIRECTION_SETTING.get(metadata.settings()).equals(Direction.REMOTE_STORE);
252+
253+
return (isMixedMode && isRemoteStoreMigrationDirection);
254+
}
241255
}

server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java

+140
Original file line numberDiff line numberDiff line change
@@ -40,27 +40,37 @@
4040
import org.opensearch.cluster.metadata.IndexMetadata;
4141
import org.opensearch.cluster.metadata.Metadata;
4242
import org.opensearch.cluster.node.DiscoveryNode;
43+
import org.opensearch.cluster.node.DiscoveryNodeRole;
4344
import org.opensearch.cluster.node.DiscoveryNodes;
4445
import org.opensearch.cluster.routing.RoutingNodes;
4546
import org.opensearch.cluster.routing.RoutingTable;
4647
import org.opensearch.cluster.routing.ShardRouting;
4748
import org.opensearch.cluster.routing.allocation.command.AllocationCommands;
4849
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
4950
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
51+
import org.opensearch.common.UUIDs;
5052
import org.opensearch.common.settings.Settings;
53+
import org.opensearch.common.util.FeatureFlags;
5154
import org.opensearch.core.index.shard.ShardId;
5255
import org.opensearch.indices.replication.common.ReplicationType;
56+
import org.opensearch.node.remotestore.RemoteStoreNodeService;
5357
import org.opensearch.test.VersionUtils;
5458

5559
import java.util.ArrayList;
5660
import java.util.HashSet;
61+
import java.util.List;
62+
import java.util.Map;
5763
import java.util.Set;
5864

5965
import static org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
6066
import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING;
6167
import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING;
6268
import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
6369
import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED;
70+
import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL;
71+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
72+
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
73+
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
6474
import static org.hamcrest.Matchers.anyOf;
6575
import static org.hamcrest.Matchers.equalTo;
6676
import static org.hamcrest.Matchers.lessThan;
@@ -812,4 +822,134 @@ private void testReplicaIsPromoted(boolean isSegmentReplicationEnabled) {
812822
}
813823
}
814824
}
825+
826+
public void testPreferReplicaOnRemoteNodeForPrimaryPromotion() {
827+
FeatureFlags.initializeFeatureFlags(Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build());
828+
AllocationService allocation = createAllocationService(Settings.builder().build());
829+
830+
// segment replication enabled
831+
Settings.Builder settingsBuilder = settings(Version.CURRENT).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);
832+
833+
// remote store migration metadata settings
834+
Metadata metadata = Metadata.builder()
835+
.put(IndexMetadata.builder("test").settings(settingsBuilder).numberOfShards(1).numberOfReplicas(4))
836+
.persistentSettings(
837+
Settings.builder()
838+
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.MIXED.mode)
839+
.put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE.direction)
840+
.build()
841+
)
842+
.build();
843+
844+
RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build();
845+
846+
ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
847+
.metadata(metadata)
848+
.routingTable(initialRoutingTable)
849+
.build();
850+
851+
ShardId shardId = new ShardId(metadata.index("test").getIndex(), 0);
852+
853+
// add a remote node and start primary shard
854+
Map<String, String> remoteStoreNodeAttributes = Map.of(
855+
REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY,
856+
"REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_VALUE"
857+
);
858+
DiscoveryNode remoteNode1 = new DiscoveryNode(
859+
UUIDs.base64UUID(),
860+
buildNewFakeTransportAddress(),
861+
remoteStoreNodeAttributes,
862+
DiscoveryNodeRole.BUILT_IN_ROLES,
863+
Version.CURRENT
864+
);
865+
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(remoteNode1)).build();
866+
clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState, "reroute").routingTable()).build();
867+
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
868+
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4));
869+
870+
clusterState = startInitializingShardsAndReroute(allocation, clusterState);
871+
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1));
872+
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4));
873+
874+
// add remote and non-remote nodes and start replica shards
875+
DiscoveryNode remoteNode2 = new DiscoveryNode(
876+
UUIDs.base64UUID(),
877+
buildNewFakeTransportAddress(),
878+
remoteStoreNodeAttributes,
879+
DiscoveryNodeRole.BUILT_IN_ROLES,
880+
Version.CURRENT
881+
);
882+
DiscoveryNode remoteNode3 = new DiscoveryNode(
883+
UUIDs.base64UUID(),
884+
buildNewFakeTransportAddress(),
885+
remoteStoreNodeAttributes,
886+
DiscoveryNodeRole.BUILT_IN_ROLES,
887+
Version.CURRENT
888+
);
889+
DiscoveryNode nonRemoteNode1 = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT);
890+
DiscoveryNode nonRemoteNode2 = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT);
891+
List<DiscoveryNode> replicaShardNodes = List.of(remoteNode2, remoteNode3, nonRemoteNode1, nonRemoteNode2);
892+
893+
for (int i = 0; i < 4; i++) {
894+
clusterState = ClusterState.builder(clusterState)
895+
.nodes(DiscoveryNodes.builder(clusterState.nodes()).add(replicaShardNodes.get(i)))
896+
.build();
897+
898+
clusterState = allocation.reroute(clusterState, "reroute");
899+
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1 + i));
900+
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
901+
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4 - (i + 1)));
902+
903+
clusterState = startInitializingShardsAndReroute(allocation, clusterState);
904+
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1 + (i + 1)));
905+
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4 - (i + 1)));
906+
}
907+
908+
// fail primary shard
909+
ShardRouting primaryShard0 = clusterState.routingTable().index("test").shard(0).primaryShard();
910+
ClusterState newState = allocation.applyFailedShard(clusterState, primaryShard0, randomBoolean());
911+
assertNotEquals(clusterState, newState);
912+
clusterState = newState;
913+
914+
// verify that promoted replica exists on a remote node
915+
assertEquals(4, clusterState.getRoutingNodes().shardsWithState(STARTED).size());
916+
ShardRouting primaryShardRouting1 = clusterState.routingTable().index("test").shard(0).primaryShard();
917+
assertNotEquals(primaryShard0, primaryShardRouting1);
918+
assertTrue(
919+
primaryShardRouting1.currentNodeId().equals(remoteNode2.getId())
920+
|| primaryShardRouting1.currentNodeId().equals(remoteNode3.getId())
921+
);
922+
923+
// fail primary shard again
924+
newState = allocation.applyFailedShard(clusterState, primaryShardRouting1, randomBoolean());
925+
assertNotEquals(clusterState, newState);
926+
clusterState = newState;
927+
928+
// verify that promoted replica again exists on a remote node
929+
assertEquals(3, clusterState.getRoutingNodes().shardsWithState(STARTED).size());
930+
ShardRouting primaryShardRouting2 = clusterState.routingTable().index("test").shard(0).primaryShard();
931+
assertNotEquals(primaryShardRouting1, primaryShardRouting2);
932+
assertTrue(
933+
primaryShardRouting2.currentNodeId().equals(remoteNode2.getId())
934+
|| primaryShardRouting2.currentNodeId().equals(remoteNode3.getId())
935+
);
936+
assertNotEquals(primaryShardRouting1.currentNodeId(), primaryShardRouting2.currentNodeId());
937+
938+
ShardRouting expectedCandidateForSegRep = clusterState.getRoutingNodes().activeReplicaWithOldestVersion(shardId);
939+
940+
// fail primary shard again
941+
newState = allocation.applyFailedShard(clusterState, primaryShardRouting2, randomBoolean());
942+
assertNotEquals(clusterState, newState);
943+
clusterState = newState;
944+
945+
// verify that promoted replica exists on a non-remote node
946+
assertEquals(2, clusterState.getRoutingNodes().shardsWithState(STARTED).size());
947+
ShardRouting primaryShardRouting3 = clusterState.routingTable().index("test").shard(0).primaryShard();
948+
assertNotEquals(primaryShardRouting2, primaryShardRouting3);
949+
assertTrue(
950+
primaryShardRouting3.currentNodeId().equals(nonRemoteNode1.getId())
951+
|| primaryShardRouting3.currentNodeId().equals(nonRemoteNode2.getId())
952+
);
953+
assertEquals(expectedCandidateForSegRep.allocationId(), primaryShardRouting3.allocationId());
954+
}
815955
}

0 commit comments

Comments
 (0)