48
48
import java .util .Map ;
49
49
import java .util .Objects ;
50
50
import java .util .concurrent .CountDownLatch ;
51
+ import java .util .concurrent .TimeUnit ;
51
52
import java .util .concurrent .atomic .AtomicLong ;
52
53
53
54
import static org .opensearch .cluster .metadata .IndexMetadata .SETTING_REPLICATION_TYPE ;
@@ -470,6 +471,25 @@ public void testRefreshPersistentFailure() throws Exception {
470
471
assertFalse ("remote store should not in sync" , tuple .v1 ().isRemoteSegmentStoreInSync ());
471
472
}
472
473
474
+ public void testRefreshPersistentFailureAndIndexShardClosed () throws Exception {
475
+ int succeedOnAttempt = 3 ;
476
+ int closeShardOnAttempt = 1 ;
477
+ CountDownLatch refreshCountLatch = new CountDownLatch (1 );
478
+ CountDownLatch successLatch = new CountDownLatch (10 );
479
+ Tuple <RemoteStoreRefreshListener , RemoteStoreStatsTrackerFactory > tuple = mockIndexShardWithRetryAndScheduleRefresh (
480
+ succeedOnAttempt ,
481
+ refreshCountLatch ,
482
+ successLatch ,
483
+ true ,
484
+ closeShardOnAttempt
485
+ );
486
+ // Giving 10ms for some iterations of remote refresh upload
487
+ Thread .sleep (TimeUnit .SECONDS .toMillis (2 ));
488
+ RemoteStoreRefreshListener listener = tuple .v1 ();
489
+ assertFalse ("remote store should not in sync" , listener .isRemoteSegmentStoreInSync ());
490
+ assertFalse (listener .getRetryScheduledStatus ());
491
+ }
492
+
473
493
private void assertNoLagAndTotalUploadsFailed (RemoteSegmentTransferTracker segmentTracker , long totalUploadsFailed ) throws Exception {
474
494
assertBusy (() -> {
475
495
assertEquals (0 , segmentTracker .getBytesLag ());
@@ -548,6 +568,49 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
548
568
return mockIndexShardWithRetryAndScheduleRefresh (succeedOnAttempt , refreshCountLatch , successLatch , 1 , noOpLatch );
549
569
}
550
570
571
+ private Tuple <RemoteStoreRefreshListener , RemoteStoreStatsTrackerFactory > mockIndexShardWithRetryAndScheduleRefresh (
572
+ int totalAttempt ,
573
+ CountDownLatch refreshCountLatch ,
574
+ CountDownLatch successLatch ,
575
+ int checkpointPublishSucceedOnAttempt ,
576
+ CountDownLatch reachedCheckpointPublishLatch ,
577
+ boolean mockPrimaryTerm ,
578
+ boolean testUploadTimeout
579
+ ) throws IOException {
580
+ return mockIndexShardWithRetryAndScheduleRefresh (
581
+ totalAttempt ,
582
+ refreshCountLatch ,
583
+ successLatch ,
584
+ checkpointPublishSucceedOnAttempt ,
585
+ reachedCheckpointPublishLatch ,
586
+ mockPrimaryTerm ,
587
+ testUploadTimeout ,
588
+ false ,
589
+ 0
590
+ );
591
+ }
592
+
593
+ private Tuple <RemoteStoreRefreshListener , RemoteStoreStatsTrackerFactory > mockIndexShardWithRetryAndScheduleRefresh (
594
+ int succeedOnAttempt ,
595
+ CountDownLatch refreshCountLatch ,
596
+ CountDownLatch successLatch ,
597
+ boolean closedShard ,
598
+ int closeShardAfterAttempt
599
+ ) throws IOException {
600
+ CountDownLatch noOpLatch = new CountDownLatch (0 );
601
+ return mockIndexShardWithRetryAndScheduleRefresh (
602
+ succeedOnAttempt ,
603
+ refreshCountLatch ,
604
+ successLatch ,
605
+ 1 ,
606
+ noOpLatch ,
607
+ true ,
608
+ false ,
609
+ closedShard ,
610
+ closeShardAfterAttempt
611
+ );
612
+ }
613
+
551
614
private Tuple <RemoteStoreRefreshListener , RemoteStoreStatsTrackerFactory > mockIndexShardWithRetryAndScheduleRefresh (
552
615
int succeedOnAttempt ,
553
616
CountDownLatch refreshCountLatch ,
@@ -562,7 +625,9 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
562
625
succeedCheckpointPublishOnAttempt ,
563
626
reachedCheckpointPublishLatch ,
564
627
true ,
565
- false
628
+ false ,
629
+ false ,
630
+ 0
566
631
);
567
632
}
568
633
@@ -573,7 +638,9 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
573
638
int succeedCheckpointPublishOnAttempt ,
574
639
CountDownLatch reachedCheckpointPublishLatch ,
575
640
boolean mockPrimaryTerm ,
576
- boolean testUploadTimeout
641
+ boolean testUploadTimeout ,
642
+ boolean closeShard ,
643
+ int closeShardAfterAttempt
577
644
) throws IOException {
578
645
// Create index shard that we will be using to mock different methods in IndexShard for the unit test
579
646
indexShard = newStartedShard (
@@ -601,7 +668,6 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
601
668
IndexShard shard = mock (IndexShard .class );
602
669
Store store = mock (Store .class );
603
670
when (shard .store ()).thenReturn (store );
604
- when (shard .state ()).thenReturn (IndexShardState .STARTED );
605
671
when (store .directory ()).thenReturn (indexShard .store ().directory ());
606
672
607
673
// Mock (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
@@ -663,6 +729,14 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
663
729
return indexShard .getLatestReplicationCheckpoint ();
664
730
})).when (shard ).computeReplicationCheckpoint (any ());
665
731
732
+ doAnswer ((invocationOnMock -> {
733
+ if (closeShard && counter .get () == closeShardAfterAttempt ) {
734
+ logger .info ("Closing shard..." );
735
+ return IndexShardState .CLOSED ;
736
+ }
737
+ return IndexShardState .STARTED ;
738
+ })).when (shard ).state ();
739
+
666
740
doAnswer (invocation -> {
667
741
if (Objects .nonNull (successLatch )) {
668
742
successLatch .countDown ();
0 commit comments