86
86
import java .util .Set ;
87
87
import java .util .concurrent .ArrayBlockingQueue ;
88
88
import java .util .concurrent .BlockingQueue ;
89
- import java .util .concurrent .BrokenBarrierException ;
90
89
import java .util .concurrent .CopyOnWriteArrayList ;
91
90
import java .util .concurrent .CountDownLatch ;
92
91
import java .util .concurrent .CyclicBarrier ;
@@ -715,6 +714,7 @@ public void testSimpleOperationsUpload() throws Exception {
715
714
translog .setMinSeqNoToKeep (0 );
716
715
// This should not trim anything from local
717
716
translog .trimUnreferencedReaders ();
717
+ assertBusy (() -> assertTrue (translog .isRemoteGenerationDeletionPermitsAvailable ()));
718
718
assertEquals (2 , translog .readers .size ());
719
719
assertBusy (() -> {
720
720
assertEquals (4 , translog .allUploaded ().size ());
@@ -728,6 +728,7 @@ public void testSimpleOperationsUpload() throws Exception {
728
728
// This should not trim tlog-2.* files from remote as we not uploading any more translog to remote
729
729
translog .setMinSeqNoToKeep (1 );
730
730
translog .trimUnreferencedReaders ();
731
+ assertBusy (() -> assertTrue (translog .isRemoteGenerationDeletionPermitsAvailable ()));
731
732
assertEquals (1 , translog .readers .size ());
732
733
assertBusy (() -> {
733
734
assertEquals (4 , translog .allUploaded ().size ());
@@ -766,6 +767,7 @@ public void testMetadataFileDeletion() throws Exception {
766
767
addToTranslogAndListAndUpload (translog , ops , new Translog .Index (String .valueOf (i ), i , primaryTerm .get (), new byte [] { 1 }));
767
768
translog .setMinSeqNoToKeep (i );
768
769
translog .trimUnreferencedReaders ();
770
+ assertBusy (() -> assertTrue (translog .isRemoteGenerationDeletionPermitsAvailable ()));
769
771
assertEquals (1 , translog .readers .size ());
770
772
}
771
773
assertBusy (() -> assertEquals (4 , translog .allUploaded ().size ()));
@@ -776,13 +778,15 @@ public void testMetadataFileDeletion() throws Exception {
776
778
addToTranslogAndListAndUpload (translog , ops , new Translog .Index (String .valueOf (i ), i , primaryTerm .get (), new byte [] { 1 }));
777
779
}
778
780
translog .trimUnreferencedReaders ();
781
+ assertBusy (() -> assertTrue (translog .isRemoteGenerationDeletionPermitsAvailable ()));
779
782
assertEquals (1 + moreDocs , translog .readers .size ());
780
783
assertBusy (() -> assertEquals (2 + 2L * moreDocs , translog .allUploaded ().size ()));
781
784
assertBusy (() -> assertEquals (1 , blobStoreTransferService .listAll (getTranslogDirectory ().add (METADATA_DIR )).size ()));
782
785
783
786
int totalDocs = numDocs + moreDocs ;
784
787
translog .setMinSeqNoToKeep (totalDocs - 1 );
785
788
translog .trimUnreferencedReaders ();
789
+ assertBusy (() -> assertTrue (translog .isRemoteGenerationDeletionPermitsAvailable ()));
786
790
787
791
addToTranslogAndListAndUpload (
788
792
translog ,
@@ -791,6 +795,7 @@ public void testMetadataFileDeletion() throws Exception {
791
795
);
792
796
translog .setMinSeqNoToKeep (totalDocs );
793
797
translog .trimUnreferencedReaders ();
798
+ assertBusy (() -> assertTrue (translog .isRemoteGenerationDeletionPermitsAvailable ()));
794
799
assertBusy (() -> assertEquals (1 , blobStoreTransferService .listAll (getTranslogDirectory ().add (METADATA_DIR )).size ()));
795
800
796
801
// Change primary term and test the deletion of older primaries
@@ -841,6 +846,7 @@ public void testDrainSync() throws Exception {
841
846
842
847
translog .setMinSeqNoToKeep (0 );
843
848
translog .trimUnreferencedReaders ();
849
+ assertBusy (() -> assertTrue (translog .isRemoteGenerationDeletionPermitsAvailable ()));
844
850
assertEquals (1 , translog .readers .size ());
845
851
846
852
// Case 1 - During ongoing uploads, the available permits are 0.
@@ -869,6 +875,7 @@ public void testDrainSync() throws Exception {
869
875
// Case 3 - After drainSync, if trimUnreferencedReaders is attempted, we do not delete from remote store.
870
876
translog .setMinSeqNoToKeep (1 );
871
877
translog .trimUnreferencedReaders ();
878
+ assertBusy (() -> assertTrue (translog .isRemoteGenerationDeletionPermitsAvailable ()));
872
879
assertEquals (1 , translog .readers .size ());
873
880
assertEquals (6 , translog .allUploaded ().size ());
874
881
assertEquals (mdFiles , blobStoreTransferService .listAll (getTranslogDirectory ().add (METADATA_DIR )));
@@ -892,6 +899,7 @@ public void testDrainSync() throws Exception {
892
899
893
900
translog .setMinSeqNoToKeep (3 );
894
901
translog .trimUnreferencedReaders ();
902
+ assertBusy (() -> assertTrue (translog .isRemoteGenerationDeletionPermitsAvailable ()));
895
903
assertEquals (1 , translog .readers .size ());
896
904
assertBusy (() -> assertEquals (4 , translog .allUploaded ().size ()));
897
905
assertBusy (() -> assertEquals (1 , blobStoreTransferService .listAll (getTranslogDirectory ().add (METADATA_DIR )).size ()));
@@ -1048,7 +1056,7 @@ public void testConcurrentWriteViewsAndSnapshot() throws Throwable {
1048
1056
final int threadId = i ;
1049
1057
writers [i ] = new Thread (new AbstractRunnable () {
1050
1058
@ Override
1051
- public void doRun () throws BrokenBarrierException , InterruptedException , IOException {
1059
+ public void doRun () throws Exception {
1052
1060
barrier .await ();
1053
1061
int counter = 0 ;
1054
1062
while (run .get () && idGenerator .get () < maxOps ) {
@@ -1090,6 +1098,7 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep
1090
1098
// deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
1091
1099
translog .setMinSeqNoToKeep (localCheckpoint + 1 );
1092
1100
translog .trimUnreferencedReaders ();
1101
+ assertBusy (() -> assertTrue (translog .isRemoteGenerationDeletionPermitsAvailable ()));
1093
1102
}
1094
1103
}
1095
1104
if (id % 7 == 0 ) {
0 commit comments