@@ -262,8 +262,12 @@ public void testAlreadyOnNewCheckpoint() {
262
262
}
263
263
264
264
@ TestLogging (reason = "Getting trace logs from replication package" , value = "org.opensearch.indices.replication:TRACE" )
265
- public void testShardAlreadyReplicating () {
265
+ public void testShardAlreadyReplicating () throws InterruptedException {
266
+ // in this case shard is already replicating and we receive an ahead checkpoint with same pterm.
267
+ // ongoing replication is not cancelled and new one does not start.
266
268
CountDownLatch blockGetCheckpointMetadata = new CountDownLatch (1 );
269
+ CountDownLatch continueGetCheckpointMetadata = new CountDownLatch (1 );
270
+ CountDownLatch replicationCompleteLatch = new CountDownLatch (1 );
267
271
SegmentReplicationSource source = new TestReplicationSource () {
268
272
@ Override
269
273
public void getCheckpointMetadata (
@@ -272,11 +276,13 @@ public void getCheckpointMetadata(
272
276
ActionListener <CheckpointInfoResponse > listener
273
277
) {
274
278
try {
275
- blockGetCheckpointMetadata .await ();
276
- final CopyState copyState = new CopyState (primaryShard );
277
- listener .onResponse (
278
- new CheckpointInfoResponse (copyState .getCheckpoint (), copyState .getMetadataMap (), copyState .getInfosBytes ())
279
- );
279
+ blockGetCheckpointMetadata .countDown ();
280
+ continueGetCheckpointMetadata .await ();
281
+ try (final CopyState copyState = new CopyState (primaryShard )) {
282
+ listener .onResponse (
283
+ new CheckpointInfoResponse (copyState .getCheckpoint (), copyState .getMetadataMap (), copyState .getInfosBytes ())
284
+ );
285
+ }
280
286
} catch (InterruptedException | IOException e ) {
281
287
throw new RuntimeException (e );
282
288
}
@@ -297,24 +303,73 @@ public void getSegmentFiles(
297
303
final SegmentReplicationTarget target = spy (
298
304
new SegmentReplicationTarget (
299
305
replicaShard ,
300
- primaryShard . getLatestReplicationCheckpoint () ,
306
+ initialCheckpoint ,
301
307
source ,
302
- mock (SegmentReplicationTargetService .SegmentReplicationListener .class )
308
+ new SegmentReplicationTargetService .SegmentReplicationListener () {
309
+ @ Override
310
+ public void onReplicationDone (SegmentReplicationState state ) {
311
+ replicationCompleteLatch .countDown ();
312
+ }
313
+
314
+ @ Override
315
+ public void onReplicationFailure (
316
+ SegmentReplicationState state ,
317
+ ReplicationFailedException e ,
318
+ boolean sendShardFailure
319
+ ) {
320
+ Assert .fail ("Replication should not fail" );
321
+ }
322
+ }
303
323
)
304
324
);
305
325
306
326
final SegmentReplicationTargetService spy = spy (sut );
307
- doReturn (false ).when (spy ).processLatestReceivedCheckpoint (eq (replicaShard ), any ());
308
327
// Start first round of segment replication.
309
328
spy .startReplication (target );
329
+ // wait until we are at getCheckpointMetadata stage
330
+ blockGetCheckpointMetadata .await (5 , TimeUnit .MINUTES );
310
331
311
- // Start second round of segment replication, this should fail to start as first round is still in-progress
312
- spy .onNewCheckpoint (newPrimaryCheckpoint , replicaShard );
313
- verify (spy , times (1 )).processLatestReceivedCheckpoint (eq (replicaShard ), any ());
314
- blockGetCheckpointMetadata .countDown ();
332
+ // try and insert a new target directly - it should fail immediately and alert listener
333
+ spy .startReplication (
334
+ new SegmentReplicationTarget (
335
+ replicaShard ,
336
+ aheadCheckpoint ,
337
+ source ,
338
+ new SegmentReplicationTargetService .SegmentReplicationListener () {
339
+ @ Override
340
+ public void onReplicationDone (SegmentReplicationState state ) {
341
+ Assert .fail ("Should not succeed" );
342
+ }
343
+
344
+ @ Override
345
+ public void onReplicationFailure (
346
+ SegmentReplicationState state ,
347
+ ReplicationFailedException e ,
348
+ boolean sendShardFailure
349
+ ) {
350
+ assertFalse (sendShardFailure );
351
+ assertEquals ("Shard " + replicaShard .shardId () + " is already replicating" , e .getMessage ());
352
+ }
353
+ }
354
+ )
355
+ );
356
+
357
+ // Start second round of segment replication through onNewCheckpoint, this should fail to start as first round is still in-progress
358
+ // aheadCheckpoint is of same pterm but higher version
359
+ assertTrue (replicaShard .shouldProcessCheckpoint (aheadCheckpoint ));
360
+ spy .onNewCheckpoint (aheadCheckpoint , replicaShard );
361
+ verify (spy , times (0 )).processLatestReceivedCheckpoint (eq (replicaShard ), any ());
362
+ // start replication is not invoked with aheadCheckpoint
363
+ verify (spy , times (0 )).startReplication (
364
+ eq (replicaShard ),
365
+ eq (aheadCheckpoint ),
366
+ any (SegmentReplicationTargetService .SegmentReplicationListener .class )
367
+ );
368
+ continueGetCheckpointMetadata .countDown ();
369
+ replicationCompleteLatch .await (5 , TimeUnit .MINUTES );
315
370
}
316
371
317
- public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication () throws InterruptedException {
372
+ public void testShardAlreadyReplicating_HigherPrimaryTermReceived () throws InterruptedException {
318
373
// Create a spy of Target Service so that we can verify invocation of startReplication call with specific checkpoint on it.
319
374
SegmentReplicationTargetService serviceSpy = spy (sut );
320
375
doNothing ().when (serviceSpy ).updateVisibleCheckpoint (anyLong (), any ());
0 commit comments