31
31
import static org .apache .accumulo .test .compaction .ExternalCompactionTestUtils .QUEUE8 ;
32
32
import static org .apache .accumulo .test .compaction .ExternalCompactionTestUtils .compact ;
33
33
import static org .apache .accumulo .test .compaction .ExternalCompactionTestUtils .createTable ;
34
+ import static org .apache .accumulo .test .compaction .ExternalCompactionTestUtils .getFinalStateForTableCount ;
34
35
import static org .apache .accumulo .test .compaction .ExternalCompactionTestUtils .getFinalStatesForTable ;
35
36
import static org .apache .accumulo .test .compaction .ExternalCompactionTestUtils .row ;
36
37
import static org .apache .accumulo .test .compaction .ExternalCompactionTestUtils .verify ;
56
57
import java .util .concurrent .TimeUnit ;
57
58
import java .util .concurrent .atomic .AtomicReference ;
58
59
import java .util .stream .Collectors ;
59
- import java .util .stream .Stream ;
60
60
61
61
import org .apache .accumulo .compactor .Compactor ;
62
62
import org .apache .accumulo .compactor .ExtCEnv .CompactorIterEnv ;
90
90
import org .apache .accumulo .core .iterators .SortedKeyValueIterator ;
91
91
import org .apache .accumulo .core .metadata .AccumuloTable ;
92
92
import org .apache .accumulo .core .metadata .schema .ExternalCompactionFinalState ;
93
- import org .apache .accumulo .core .metadata .schema .ExternalCompactionFinalState .FinalState ;
94
93
import org .apache .accumulo .core .metadata .schema .ExternalCompactionId ;
95
94
import org .apache .accumulo .core .metadata .schema .ExternalCompactionMetadata ;
96
95
import org .apache .accumulo .core .metadata .schema .TabletMetadata ;
97
96
import org .apache .accumulo .core .metadata .schema .TabletMetadata .ColumnType ;
98
97
import org .apache .accumulo .core .metadata .schema .TabletsMetadata ;
99
98
import org .apache .accumulo .core .spi .compaction .SimpleCompactionDispatcher ;
100
- import org .apache .accumulo .core .util .UtilWaitThread ;
101
99
import org .apache .accumulo .harness .MiniClusterConfigurationCallback ;
102
100
import org .apache .accumulo .harness .SharedMiniClusterBase ;
103
101
import org .apache .accumulo .minicluster .ServerType ;
104
102
import org .apache .accumulo .miniclusterImpl .MiniAccumuloClusterImpl .ProcessInfo ;
105
103
import org .apache .accumulo .miniclusterImpl .MiniAccumuloConfigImpl ;
106
104
import org .apache .accumulo .test .functional .SlowIterator ;
105
+ import org .apache .accumulo .test .util .Wait ;
107
106
import org .apache .hadoop .conf .Configuration ;
108
107
import org .apache .hadoop .io .Text ;
109
108
import org .junit .jupiter .api .AfterEach ;
@@ -245,12 +244,7 @@ public void testCompactionAndCompactorDies() throws Exception {
245
244
getCluster ().getClusterControl ().stop (ServerType .COMPACTOR );
246
245
247
246
// DeadCompactionDetector in the CompactionCoordinator should fail the compaction.
248
- long count = 0 ;
249
- while (count == 0 ) {
250
- count = getFinalStatesForTable (getCluster (), tid )
251
- .filter (state -> state .getFinalState ().equals (FinalState .FAILED )).count ();
252
- UtilWaitThread .sleep (250 );
253
- }
247
+ Wait .waitFor (() -> getFinalStateForTableCount (getCluster (), tid ) > 0 );
254
248
255
249
// We need to cancel the compaction or delete the table here because we initiate a user
256
250
// compaction above in the test. Even though the external compaction was cancelled
@@ -411,24 +405,22 @@ public void testExternalCompactionDeadTServer() throws Exception {
411
405
// metadata table entries to show up.
412
406
LOG .info ("Waiting for external compaction to complete." );
413
407
TableId tid = getCluster ().getServerContext ().getTableId (table3 );
414
- Stream <ExternalCompactionFinalState > fs = getFinalStatesForTable (getCluster (), tid );
415
- while (fs .findAny ().isEmpty ()) {
408
+ Wait .waitFor (() -> {
416
409
LOG .info ("Waiting for compaction completed marker to appear" );
417
- UtilWaitThread .sleep (250 );
418
- fs = getFinalStatesForTable (getCluster (), tid );
419
- }
410
+ return getFinalStateForTableCount (getCluster (), tid ) > 0 ;
411
+ }, 120_000 , 250 );
420
412
421
413
LOG .info ("Validating metadata table contents." );
422
414
try (TabletsMetadata tm = getCluster ().getServerContext ().getAmple ().readTablets ()
423
415
.forTable (tid ).fetch (ColumnType .ECOMP ).build ()) {
424
416
TabletMetadata m = tm .stream ().collect (onlyElement ());
425
417
Map <ExternalCompactionId ,ExternalCompactionMetadata > em = m .getExternalCompactions ();
426
418
assertEquals (1 , em .size ());
427
- List < ExternalCompactionFinalState > finished = new ArrayList <>();
428
- getFinalStatesForTable ( getCluster (), tid ). forEach ( f -> finished . add ( f ));
429
- assertEquals ( 1 , finished . size ());
430
- assertEquals (em .entrySet ().iterator ().next ().getKey (),
431
- finished . get ( 0 ). getExternalCompactionId ());
419
+ try ( var finalStates = getFinalStatesForTable ( getCluster (), tid )
420
+ . map ( ExternalCompactionFinalState :: getExternalCompactionId )) {
421
+ ExternalCompactionId actual = finalStates . collect ( onlyElement ());
422
+ assertEquals (em .entrySet ().iterator ().next ().getKey (), actual );
423
+ }
432
424
}
433
425
434
426
// Force a flush on the metadata table before killing our tserver
@@ -445,13 +437,10 @@ public void testExternalCompactionDeadTServer() throws Exception {
445
437
getCluster ().getClusterControl ().start (ServerType .TABLET_SERVER );
446
438
447
439
// Wait for the compaction to be committed.
448
- LOG .info ("Waiting for compaction completed marker to disappear" );
449
- Stream <ExternalCompactionFinalState > fs2 = getFinalStatesForTable (getCluster (), tid );
450
- while (fs2 .findAny ().isPresent ()) {
440
+ Wait .waitFor (() -> {
451
441
LOG .info ("Waiting for compaction completed marker to disappear" );
452
- UtilWaitThread .sleep (500 );
453
- fs2 = getFinalStatesForTable (getCluster (), tid );
454
- }
442
+ return getFinalStateForTableCount (getCluster (), tid ) == 0 ;
443
+ }, 120_000 , 500 );
455
444
verify (client , table3 , 2 );
456
445
457
446
// We need to cancel the compaction or delete the table here because we initiate a user
0 commit comments