Skip to content

Commit 3c6019d

Browse files
authored
Add wait in replica recovery for allocation id to propagate on source node (opensearch-project#15558)
* Add wait for target allocation id to appear Signed-off-by: Gaurav Bafna <gbbafna@amazon.com> * making waitForAssignment same Signed-off-by: Gaurav Bafna <gbbafna@amazon.com> * Add more test Signed-off-by: Gaurav Bafna <gbbafna@amazon.com> --------- Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
1 parent debd040 commit 3c6019d

File tree

5 files changed

+423
-19
lines changed

5 files changed

+423
-19
lines changed

server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java

+1-18
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,12 @@
1212
import org.opensearch.action.StepListener;
1313
import org.opensearch.action.support.ThreadedActionListener;
1414
import org.opensearch.action.support.replication.ReplicationResponse;
15-
import org.opensearch.cluster.routing.IndexShardRoutingTable;
16-
import org.opensearch.cluster.routing.ShardRouting;
1715
import org.opensearch.common.SetOnce;
1816
import org.opensearch.common.concurrent.GatedCloseable;
1917
import org.opensearch.common.lease.Releasable;
2018
import org.opensearch.common.unit.TimeValue;
2119
import org.opensearch.core.action.ActionListener;
2220
import org.opensearch.index.engine.RecoveryEngineException;
23-
import org.opensearch.index.seqno.ReplicationTracker;
2421
import org.opensearch.index.seqno.RetentionLease;
2522
import org.opensearch.index.seqno.RetentionLeaseNotFoundException;
2623
import org.opensearch.index.seqno.RetentionLeases;
@@ -58,21 +55,7 @@ public LocalStorePeerRecoverySourceHandler(
5855
@Override
5956
protected void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener, Consumer<Exception> onFailure) throws IOException {
6057
final SetOnce<RetentionLease> retentionLeaseRef = new SetOnce<>();
61-
62-
RunUnderPrimaryPermit.run(() -> {
63-
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
64-
ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId());
65-
if (targetShardRouting == null) {
66-
logger.debug(
67-
"delaying recovery of {} as it is not listed as assigned to target node {}",
68-
request.shardId(),
69-
request.targetNode()
70-
);
71-
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
72-
}
73-
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
74-
retentionLeaseRef.set(shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)));
75-
}, shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger);
58+
waitForAssignmentPropagate(retentionLeaseRef);
7659
final Closeable retentionLock = shard.acquireHistoryRetentionLock();
7760
resources.add(retentionLock);
7861
final long startingSeqNo;

server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java

+51
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,14 @@
4141
import org.apache.lucene.util.ArrayUtil;
4242
import org.opensearch.action.ActionRunnable;
4343
import org.opensearch.action.StepListener;
44+
import org.opensearch.action.bulk.BackoffPolicy;
4445
import org.opensearch.action.support.PlainActionFuture;
4546
import org.opensearch.action.support.ThreadedActionListener;
4647
import org.opensearch.action.support.replication.ReplicationResponse;
48+
import org.opensearch.cluster.routing.IndexShardRoutingTable;
49+
import org.opensearch.cluster.routing.ShardRouting;
4750
import org.opensearch.common.CheckedRunnable;
51+
import org.opensearch.common.SetOnce;
4852
import org.opensearch.common.StopWatch;
4953
import org.opensearch.common.concurrent.GatedCloseable;
5054
import org.opensearch.common.lease.Releasable;
@@ -59,6 +63,7 @@
5963
import org.opensearch.core.action.ActionListener;
6064
import org.opensearch.core.common.unit.ByteSizeValue;
6165
import org.opensearch.index.engine.RecoveryEngineException;
66+
import org.opensearch.index.seqno.ReplicationTracker;
6267
import org.opensearch.index.seqno.RetentionLease;
6368
import org.opensearch.index.seqno.RetentionLeaseNotFoundException;
6469
import org.opensearch.index.seqno.RetentionLeases;
@@ -79,12 +84,14 @@
7984
import java.util.ArrayList;
8085
import java.util.Collections;
8186
import java.util.Comparator;
87+
import java.util.Iterator;
8288
import java.util.List;
8389
import java.util.Locale;
8490
import java.util.concurrent.CopyOnWriteArrayList;
8591
import java.util.concurrent.atomic.AtomicBoolean;
8692
import java.util.concurrent.atomic.AtomicInteger;
8793
import java.util.concurrent.atomic.AtomicLong;
94+
import java.util.concurrent.atomic.AtomicReference;
8895
import java.util.function.Consumer;
8996
import java.util.function.IntSupplier;
9097
import java.util.stream.StreamSupport;
@@ -191,6 +198,50 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
191198
protected abstract void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener, Consumer<Exception> onFailure)
192199
throws IOException;
193200

201+
/*
202+
Waits for cluster state propagation of assignment of replica on the target node
203+
*/
204+
void waitForAssignmentPropagate(SetOnce<RetentionLease> retentionLeaseRef) {
205+
BackoffPolicy EXPONENTIAL_BACKOFF_POLICY = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 5);
206+
AtomicReference<ShardRouting> targetShardRouting = new AtomicReference<>();
207+
Iterator<TimeValue> backoffDelayIterator = EXPONENTIAL_BACKOFF_POLICY.iterator();
208+
while (backoffDelayIterator.hasNext()) {
209+
RunUnderPrimaryPermit.run(() -> {
210+
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
211+
targetShardRouting.set(routingTable.getByAllocationId(request.targetAllocationId()));
212+
if (targetShardRouting.get() == null) {
213+
logger.info(
214+
"delaying recovery of {} as it is not listed as assigned to target node {}",
215+
request.shardId(),
216+
request.targetNode()
217+
);
218+
Thread.sleep(backoffDelayIterator.next().millis());
219+
}
220+
if (targetShardRouting.get() != null) {
221+
assert targetShardRouting.get().initializing() : "expected recovery target to be initializing but was "
222+
+ targetShardRouting;
223+
retentionLeaseRef.set(
224+
shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting.get()))
225+
);
226+
}
227+
228+
},
229+
shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ",
230+
shard,
231+
cancellableThreads,
232+
logger
233+
);
234+
235+
if (targetShardRouting.get() != null) {
236+
return;
237+
}
238+
}
239+
if (targetShardRouting.get() != null) {
240+
return;
241+
}
242+
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
243+
}
244+
194245
protected void finalizeStepAndCompleteFuture(
195246
long startingSeqNo,
196247
StepListener<SendSnapshotResult> sendSnapshotStep,

server/src/main/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandler.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@
1010

1111
import org.apache.lucene.index.IndexCommit;
1212
import org.opensearch.action.StepListener;
13+
import org.opensearch.common.SetOnce;
1314
import org.opensearch.common.concurrent.GatedCloseable;
1415
import org.opensearch.common.lease.Releasable;
1516
import org.opensearch.common.unit.TimeValue;
1617
import org.opensearch.core.action.ActionListener;
1718
import org.opensearch.index.engine.RecoveryEngineException;
19+
import org.opensearch.index.seqno.RetentionLease;
1820
import org.opensearch.index.seqno.SequenceNumbers;
1921
import org.opensearch.index.shard.IndexShard;
2022
import org.opensearch.indices.RunUnderPrimaryPermit;
@@ -48,7 +50,8 @@ protected void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener,
4850
// A replica of an index with remote translog does not require the translogs locally and keeps receiving the
4951
// updated segments file on refresh, flushes, and merges. In recovery, here, only file-based recovery is performed
5052
// and there is no translog replay done.
51-
53+
final SetOnce<RetentionLease> retentionLeaseRef = new SetOnce<>();
54+
waitForAssignmentPropagate(retentionLeaseRef);
5255
final StepListener<SendFileResult> sendFileStep = new StepListener<>();
5356
final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
5457
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
@@ -102,4 +105,5 @@ protected void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener,
102105

103106
finalizeStepAndCompleteFuture(startingSeqNo, sendSnapshotStep, sendFileStep, prepareEngineStep, onFailure);
104107
}
108+
105109
}

server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java

+204
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import org.opensearch.action.support.PlainActionFuture;
5353
import org.opensearch.cluster.metadata.IndexMetadata;
5454
import org.opensearch.cluster.node.DiscoveryNode;
55+
import org.opensearch.cluster.routing.IndexShardRoutingTable;
56+
import org.opensearch.cluster.routing.ShardRouting;
5557
import org.opensearch.common.Numbers;
5658
import org.opensearch.common.Randomness;
5759
import org.opensearch.common.SetOnce;
@@ -141,6 +143,8 @@
141143
import static org.mockito.Mockito.anyString;
142144
import static org.mockito.Mockito.doAnswer;
143145
import static org.mockito.Mockito.mock;
146+
import static org.mockito.Mockito.times;
147+
import static org.mockito.Mockito.verify;
144148
import static org.mockito.Mockito.when;
145149

146150
/**
@@ -746,6 +750,206 @@ void phase2(
746750
assertFalse(phase2Called.get());
747751
}
748752

753+
/*
754+
If the replica allocation id is not reflected in source nodes routing table even after retries,
755+
recoveries should fail
756+
*/
757+
public void testThrowExceptionOnNoTargetInRouting() throws IOException {
758+
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
759+
final StartRecoveryRequest request = getStartRecoveryRequest();
760+
final IndexShard shard = mock(IndexShard.class);
761+
when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class));
762+
when(shard.segmentStats(anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class));
763+
when(shard.isRelocatedPrimary()).thenReturn(false);
764+
final org.opensearch.index.shard.ReplicationGroup replicationGroup = mock(org.opensearch.index.shard.ReplicationGroup.class);
765+
final IndexShardRoutingTable routingTable = mock(IndexShardRoutingTable.class);
766+
when(routingTable.getByAllocationId(anyString())).thenReturn(null);
767+
when(shard.getReplicationGroup()).thenReturn(replicationGroup);
768+
when(replicationGroup.getRoutingTable()).thenReturn(routingTable);
769+
when(shard.acquireSafeIndexCommit()).thenReturn(mock(GatedCloseable.class));
770+
doAnswer(invocation -> {
771+
((ActionListener<Releasable>) invocation.getArguments()[0]).onResponse(() -> {});
772+
return null;
773+
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), any());
774+
775+
final IndexMetadata.Builder indexMetadata = IndexMetadata.builder("test")
776+
.settings(
777+
Settings.builder()
778+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 5))
779+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
780+
.put(IndexMetadata.SETTING_VERSION_CREATED, VersionUtils.randomVersion(random()))
781+
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))
782+
);
783+
if (randomBoolean()) {
784+
indexMetadata.state(IndexMetadata.State.CLOSE);
785+
}
786+
when(shard.indexSettings()).thenReturn(new IndexSettings(indexMetadata.build(), Settings.EMPTY));
787+
788+
final AtomicBoolean phase1Called = new AtomicBoolean();
789+
final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean();
790+
final AtomicBoolean phase2Called = new AtomicBoolean();
791+
final RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler(
792+
shard,
793+
mock(RecoveryTargetHandler.class),
794+
threadPool,
795+
request,
796+
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
797+
between(1, 8),
798+
between(1, 8)
799+
) {
800+
801+
@Override
802+
void phase1(
803+
IndexCommit snapshot,
804+
long startingSeqNo,
805+
IntSupplier translogOps,
806+
ActionListener<SendFileResult> listener,
807+
boolean skipCreateRetentionLeaseStep
808+
) {
809+
phase1Called.set(true);
810+
super.phase1(snapshot, startingSeqNo, translogOps, listener, skipCreateRetentionLeaseStep);
811+
}
812+
813+
@Override
814+
void prepareTargetForTranslog(int totalTranslogOps, ActionListener<TimeValue> listener) {
815+
prepareTargetForTranslogCalled.set(true);
816+
super.prepareTargetForTranslog(totalTranslogOps, listener);
817+
}
818+
819+
@Override
820+
void phase2(
821+
long startingSeqNo,
822+
long endingSeqNo,
823+
Translog.Snapshot snapshot,
824+
long maxSeenAutoIdTimestamp,
825+
long maxSeqNoOfUpdatesOrDeletes,
826+
RetentionLeases retentionLeases,
827+
long mappingVersion,
828+
ActionListener<SendSnapshotResult> listener
829+
) throws IOException {
830+
phase2Called.set(true);
831+
super.phase2(
832+
startingSeqNo,
833+
endingSeqNo,
834+
snapshot,
835+
maxSeenAutoIdTimestamp,
836+
maxSeqNoOfUpdatesOrDeletes,
837+
retentionLeases,
838+
mappingVersion,
839+
listener
840+
);
841+
}
842+
843+
};
844+
PlainActionFuture<RecoveryResponse> future = new PlainActionFuture<>();
845+
expectThrows(DelayRecoveryException.class, () -> {
846+
handler.recoverToTarget(future);
847+
future.actionGet();
848+
});
849+
verify(routingTable, times(5)).getByAllocationId(null);
850+
assertFalse(phase1Called.get());
851+
assertFalse(prepareTargetForTranslogCalled.get());
852+
assertFalse(phase2Called.get());
853+
}
854+
855+
/*
856+
Tests when the replica allocation id is reflected in source nodes routing table even after 1 retry
857+
*/
858+
public void testTargetInRoutingInSecondAttempt() throws IOException {
859+
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
860+
final StartRecoveryRequest request = getStartRecoveryRequest();
861+
final IndexShard shard = mock(IndexShard.class);
862+
when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class));
863+
when(shard.segmentStats(anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class));
864+
when(shard.isRelocatedPrimary()).thenReturn(false);
865+
when(shard.getRetentionLeases()).thenReturn(mock(RetentionLeases.class));
866+
final org.opensearch.index.shard.ReplicationGroup replicationGroup = mock(org.opensearch.index.shard.ReplicationGroup.class);
867+
final IndexShardRoutingTable routingTable = mock(IndexShardRoutingTable.class);
868+
final ShardRouting shardRouting = mock(ShardRouting.class);
869+
when(shardRouting.initializing()).thenReturn(true);
870+
when(shardRouting.currentNodeId()).thenReturn("node");
871+
when(routingTable.getByAllocationId(any())).thenReturn(null, shardRouting);
872+
when(shard.getReplicationGroup()).thenReturn(replicationGroup);
873+
when(replicationGroup.getRoutingTable()).thenReturn(routingTable);
874+
when(shard.acquireSafeIndexCommit()).thenReturn(mock(GatedCloseable.class));
875+
doAnswer(invocation -> {
876+
((ActionListener<Releasable>) invocation.getArguments()[0]).onResponse(() -> {});
877+
return null;
878+
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), any());
879+
880+
final IndexMetadata.Builder indexMetadata = IndexMetadata.builder("test")
881+
.settings(
882+
Settings.builder()
883+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 5))
884+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
885+
.put(IndexMetadata.SETTING_VERSION_CREATED, VersionUtils.randomVersion(random()))
886+
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))
887+
);
888+
if (randomBoolean()) {
889+
indexMetadata.state(IndexMetadata.State.CLOSE);
890+
}
891+
when(shard.indexSettings()).thenReturn(new IndexSettings(indexMetadata.build(), Settings.EMPTY));
892+
893+
final AtomicBoolean phase1Called = new AtomicBoolean();
894+
final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean();
895+
final AtomicBoolean phase2Called = new AtomicBoolean();
896+
final RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler(
897+
shard,
898+
mock(RecoveryTargetHandler.class),
899+
threadPool,
900+
request,
901+
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
902+
between(1, 8),
903+
between(1, 8)
904+
) {
905+
906+
@Override
907+
void phase1(
908+
IndexCommit snapshot,
909+
long startingSeqNo,
910+
IntSupplier translogOps,
911+
ActionListener<SendFileResult> listener,
912+
boolean skipCreateRetentionLeaseStep
913+
) {
914+
phase1Called.set(true);
915+
super.phase1(snapshot, startingSeqNo, translogOps, listener, skipCreateRetentionLeaseStep);
916+
}
917+
918+
@Override
919+
void prepareTargetForTranslog(int totalTranslogOps, ActionListener<TimeValue> listener) {
920+
prepareTargetForTranslogCalled.set(true);
921+
super.prepareTargetForTranslog(totalTranslogOps, listener);
922+
}
923+
924+
@Override
925+
void phase2(
926+
long startingSeqNo,
927+
long endingSeqNo,
928+
Translog.Snapshot snapshot,
929+
long maxSeenAutoIdTimestamp,
930+
long maxSeqNoOfUpdatesOrDeletes,
931+
RetentionLeases retentionLeases,
932+
long mappingVersion,
933+
ActionListener<SendSnapshotResult> listener
934+
) throws IOException {
935+
phase2Called.set(true);
936+
super.phase2(
937+
startingSeqNo,
938+
endingSeqNo,
939+
snapshot,
940+
maxSeenAutoIdTimestamp,
941+
maxSeqNoOfUpdatesOrDeletes,
942+
retentionLeases,
943+
mappingVersion,
944+
listener
945+
);
946+
}
947+
948+
};
949+
handler.waitForAssignmentPropagate(new SetOnce<>());
950+
verify(routingTable, times(2)).getByAllocationId(null);
951+
}
952+
749953
public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception {
750954
final CancellableThreads cancellableThreads = new CancellableThreads();
751955
final IndexShard shard = mock(IndexShard.class);

0 commit comments

Comments
 (0)