Skip to content

Commit 3c8eafd

Browse files
authored
[Remote Store - Dual Replication] Create missing Retention Leases for docrep shard copies during failover (#13159)
Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com>
1 parent 035d8b8 commit 3c8eafd

File tree

2 files changed

+86
-9
lines changed

2 files changed

+86
-9
lines changed

server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java

+55-5
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class RemoteDualReplicationIT extends MigrationBaseTestCase {
4141
private final String REMOTE_PRI_DOCREP_REP = "remote-primary-docrep-replica";
4242
private final String REMOTE_PRI_DOCREP_REMOTE_REP = "remote-primary-docrep-remote-replica";
4343
private final String FAILOVER_REMOTE_TO_DOCREP = "failover-remote-to-docrep";
44+
private final String FAILOVER_REMOTE_TO_REMOTE = "failover-remote-to-remote";
4445

4546
@Override
4647
protected Collection<Class<? extends Plugin>> nodePlugins() {
@@ -241,14 +242,63 @@ RLs on remote enabled copies are brought up to (GlobalCkp + 1) upon a flush requ
241242
*/
242243
extraSettings = Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.getKey(), "3s").build();
243244
testRemotePrimaryDocRepAndRemoteReplica();
244-
DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
245-
assertBusy(() -> {
246-
for (ShardStats shardStats : internalCluster().client()
245+
pollAndCheckRetentionLeases(REMOTE_PRI_DOCREP_REMOTE_REP);
246+
}
247+
248+
public void testMissingRetentionLeaseCreatedOnFailedOverRemoteReplica() throws Exception {
249+
internalCluster().startClusterManagerOnlyNode();
250+
251+
logger.info("---> Starting docrep data node");
252+
internalCluster().startDataOnlyNode();
253+
254+
Settings zeroReplicasAndOverridenSyncIntervals = Settings.builder()
255+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
256+
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms")
257+
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms")
258+
.build();
259+
createIndex(FAILOVER_REMOTE_TO_REMOTE, zeroReplicasAndOverridenSyncIntervals);
260+
ensureGreen(FAILOVER_REMOTE_TO_REMOTE);
261+
262+
indexBulk(FAILOVER_REMOTE_TO_REMOTE, 100);
263+
264+
logger.info("---> Starting first remote node");
265+
initDocRepToRemoteMigration();
266+
addRemote = true;
267+
String firstRemoteNode = internalCluster().startDataOnlyNode();
268+
String primaryShardHostingNode = primaryNodeName(FAILOVER_REMOTE_TO_REMOTE);
269+
logger.info("---> Moving primary copy from {} to remote enabled node {}", primaryShardHostingNode, firstRemoteNode);
270+
assertAcked(
271+
internalCluster().client()
272+
.admin()
273+
.cluster()
274+
.prepareReroute()
275+
.add(new MoveAllocationCommand(FAILOVER_REMOTE_TO_REMOTE, 0, primaryShardHostingNode, firstRemoteNode))
276+
.get()
277+
);
278+
ensureGreen(FAILOVER_REMOTE_TO_REMOTE);
279+
assertReplicaAndPrimaryConsistency(FAILOVER_REMOTE_TO_REMOTE, 100, 0);
280+
281+
String secondRemoteNode = internalCluster().startDataOnlyNode();
282+
Settings twoReplicas = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2).build();
283+
assertAcked(
284+
internalCluster().client()
247285
.admin()
248286
.indices()
249-
.prepareStats(REMOTE_PRI_DOCREP_REMOTE_REP)
287+
.prepareUpdateSettings()
288+
.setIndices(FAILOVER_REMOTE_TO_REMOTE)
289+
.setSettings(twoReplicas)
250290
.get()
251-
.getShards()) {
291+
);
292+
ensureGreen(FAILOVER_REMOTE_TO_REMOTE);
293+
294+
logger.info("---> Checking retention leases");
295+
pollAndCheckRetentionLeases(FAILOVER_REMOTE_TO_REMOTE);
296+
}
297+
298+
private void pollAndCheckRetentionLeases(String indexName) throws Exception {
299+
DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
300+
assertBusy(() -> {
301+
for (ShardStats shardStats : internalCluster().client().admin().indices().prepareStats(indexName).get().getShards()) {
252302
ShardRouting shardRouting = shardStats.getShardRouting();
253303
DiscoveryNode discoveryNode = nodes.get(shardRouting.currentNodeId());
254304
RetentionLeases retentionLeases = shardStats.getRetentionLeaseStats().retentionLeases();

server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java

+31-4
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,12 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
255255

256256
private final Function<String, Boolean> isShardOnRemoteEnabledNode;
257257

258+
/**
259+
* Flag to indicate whether {@link ReplicationTracker#createMissingPeerRecoveryRetentionLeases(ActionListener)}
260+
* has been run successfully
261+
*/
262+
private boolean createdMissingRetentionLeases;
263+
258264
/**
259265
* Get all retention leases tracked on this shard.
260266
*
@@ -955,7 +961,13 @@ private boolean invariant() {
955961
assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked";
956962
}
957963

958-
if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) {
964+
if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases
965+
// Skip assertion if createMissingPeerRecoveryRetentionLeases has not yet run after activating primary context
966+
// This is required since during an ongoing remote store migration,
967+
// remote enabled primary taking over primary context from another remote enabled shard
968+
// might not have retention leases for docrep shard copies
969+
// (since all RetentionLease sync actions are blocked on remote shard copies)
970+
&& createdMissingRetentionLeases) {
959971
// all tracked shard copies have a corresponding peer-recovery retention lease
960972
for (final ShardRouting shardRouting : routingTable.assignedShards()) {
961973
final CheckpointState cps = checkpoints.get(shardRouting.allocationId().getId());
@@ -1843,19 +1855,34 @@ private synchronized void setHasAllPeerRecoveryRetentionLeases() {
18431855
assert invariant();
18441856
}
18451857

1858+
private synchronized void setCreatedMissingRetentionLeases() {
1859+
createdMissingRetentionLeases = true;
1860+
assert invariant();
1861+
}
1862+
18461863
public synchronized boolean hasAllPeerRecoveryRetentionLeases() {
18471864
return hasAllPeerRecoveryRetentionLeases;
18481865
}
18491866

18501867
/**
1851-
* Create any required peer-recovery retention leases that do not currently exist because we just did a rolling upgrade from a version
1852-
* prior to {@code LegacyESVersion#V_7_4_0} that does not create peer-recovery retention leases.
1868+
* Create any required peer-recovery retention leases that do not currently exist. This can happen if either:
1869+
* - We just did a rolling upgrade from a version prior to {@code LegacyESVersion#V_7_4_0} that does not create peer-recovery retention leases.
1870+
* - In a mixed mode cluster (during remote store migration), a remote enabled primary shard copy fails over to another remote enabled shard copy,
1871+
* but the replication group still has other shards in docrep nodes
18531872
*/
18541873
public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener<Void> listener) {
1855-
if (hasAllPeerRecoveryRetentionLeases == false) {
1874+
// Create missing RetentionLeases if the primary is on a remote enabled
1875+
// and the replication group has at-least one shard copy in docrep enabled node
1876+
// No-Op if retention leases for the tracked shard copy already exists
1877+
boolean createMissingRetentionLeasesDuringMigration = indexSettings.isAssignedOnRemoteNode()
1878+
&& replicationGroup.getReplicationTargets()
1879+
.stream()
1880+
.anyMatch(shardRouting -> isShardOnRemoteEnabledNode.apply(shardRouting.currentNodeId()) == false);
1881+
if (hasAllPeerRecoveryRetentionLeases == false || createMissingRetentionLeasesDuringMigration) {
18561882
final List<ShardRouting> shardRoutings = routingTable.assignedShards();
18571883
final GroupedActionListener<ReplicationResponse> groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> {
18581884
setHasAllPeerRecoveryRetentionLeases();
1885+
setCreatedMissingRetentionLeases();
18591886
listener.onResponse(null);
18601887
}, listener::onFailure), shardRoutings.size());
18611888
for (ShardRouting shardRouting : shardRoutings) {

0 commit comments

Comments
 (0)