19
19
package org .apache .accumulo .test .functional ;
20
20
21
21
import static java .util .concurrent .TimeUnit .SECONDS ;
22
- import static org .junit .jupiter .api .Assertions .assertEquals ;
23
22
import static org .junit .jupiter .api .Assertions .assertTrue ;
24
23
25
24
import java .util .ArrayList ;
43
42
import org .apache .accumulo .minicluster .ServerType ;
44
43
import org .apache .accumulo .miniclusterImpl .MiniAccumuloConfigImpl ;
45
44
import org .apache .accumulo .test .TestIngest ;
45
+ import org .apache .accumulo .test .util .Wait ;
46
46
import org .apache .hadoop .conf .Configuration ;
47
47
import org .apache .hadoop .io .Text ;
48
48
import org .junit .jupiter .api .Test ;
@@ -61,53 +61,69 @@ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite)
61
61
@ Test
62
62
public void simpleBalancerFairness () throws Exception {
63
63
try (AccumuloClient c = Accumulo .newClient ().from (getClientProperties ()).build ()) {
64
- c .tableOperations ().create ("test_ingest" );
65
- c .tableOperations ().setProperty ("test_ingest" , Property .TABLE_SPLIT_THRESHOLD .getKey (), "1K" );
66
- c .tableOperations ().create ("unused" );
67
- TreeSet <Text > splits = TestIngest .getSplitPoints (0 , 10000000 , NUM_SPLITS );
64
+ final String ingestTable = "test_ingest" ;
65
+ final String unusedTable = "unused" ;
66
+
67
+ c .tableOperations ().create (ingestTable );
68
+ c .tableOperations ().setProperty (ingestTable , Property .TABLE_SPLIT_THRESHOLD .getKey (), "1K" );
69
+ c .tableOperations ().create (unusedTable );
70
+ TreeSet <Text > splits = TestIngest .getSplitPoints (0 , 10_000_000 , NUM_SPLITS );
68
71
log .info ("Creating {} splits" , splits .size ());
69
- c .tableOperations ().addSplits ("unused" , splits );
72
+ c .tableOperations ().addSplits (unusedTable , splits );
70
73
Set <ServerId > tservers = c .instanceOperations ().getServers (ServerId .Type .TABLET_SERVER );
71
74
TestIngest .IngestParams params = new TestIngest .IngestParams (getClientProperties ());
72
75
params .rows = 5000 ;
73
76
TestIngest .ingest (c , params );
74
- c .tableOperations ().flush ("test_ingest" , null , null , false );
75
- Thread .sleep (SECONDS .toMillis (45 ));
77
+ c .tableOperations ().flush (ingestTable , null , null , false );
76
78
Credentials creds = new Credentials ("root" , new PasswordToken (ROOT_PASSWORD ));
77
79
78
- int unassignedTablets = 1 ;
79
- ManagerMonitorInfo stats = null ;
80
80
ClientContext context = (ClientContext ) c ;
81
- for (int i = 0 ; unassignedTablets > 0 && i < 20 ; i ++) {
82
- stats = ThriftClientTypes .MANAGER .execute (context ,
81
+
82
+ // wait for tablet assignment
83
+ Wait .waitFor (() -> {
84
+ ManagerMonitorInfo stats = ThriftClientTypes .MANAGER .execute (context ,
83
85
client -> client .getManagerStats (TraceUtil .traceInfo (),
84
86
creds .toThrift (c .instanceOperations ().getInstanceId ())));
85
- unassignedTablets = stats .getUnassignedTablets ();
87
+ int unassignedTablets = stats .getUnassignedTablets ();
86
88
if (unassignedTablets > 0 ) {
87
89
log .info ("Found {} unassigned tablets, sleeping 3 seconds for tablet assignment" ,
88
90
unassignedTablets );
89
- Thread .sleep (3000 );
91
+ return false ;
92
+ } else {
93
+ return true ;
90
94
}
91
- }
95
+ }, SECONDS . toMillis ( 45 ), SECONDS . toMillis ( 3 ));
92
96
93
- assertEquals (0 , unassignedTablets , "Unassigned tablets were not assigned within 60 seconds" );
97
+ // wait for tablets to be balanced
98
+ Wait .waitFor (() -> {
99
+ ManagerMonitorInfo stats = ThriftClientTypes .MANAGER .execute (context ,
100
+ client -> client .getManagerStats (TraceUtil .traceInfo (),
101
+ creds .toThrift (c .instanceOperations ().getInstanceId ())));
94
102
95
- // Compute online tablets per tserver
96
- List <Integer > counts = new ArrayList <>();
97
- for (TabletServerStatus server : stats .tServerInfo ) {
98
- int count = 0 ;
99
- for (TableInfo table : server .tableMap .values ()) {
100
- count += table .onlineTablets ;
103
+ List <Integer > counts = new ArrayList <>();
104
+ for (TabletServerStatus server : stats .tServerInfo ) {
105
+ int count = 0 ;
106
+ for (TableInfo table : server .tableMap .values ()) {
107
+ count += table .onlineTablets ;
108
+ }
109
+ counts .add (count );
101
110
}
102
- counts .add (count );
103
- }
104
- assertTrue (counts .size () > 1 , "Expected to have at least two TabletServers" );
105
- for (int i = 1 ; i < counts .size (); i ++) {
106
- int diff = Math .abs (counts .get (0 ) - counts .get (i ));
107
- assertTrue (diff <= tservers .size (),
108
- "Expected difference in tablets to be less than or equal to " + counts .size ()
109
- + " but was " + diff + ". Counts " + counts );
110
- }
111
+ assertTrue (counts .size () >= 2 ,
112
+ "Expected at least 2 tservers to have tablets, but found " + counts );
113
+
114
+ for (int i = 1 ; i < counts .size (); i ++) {
115
+ int diff = Math .abs (counts .get (0 ) - counts .get (i ));
116
+ log .info (" Counts: {}" , counts );
117
+ if (diff > tservers .size ()) {
118
+ log .info ("Difference in tablets between tservers is greater than expected. Counts: {}" ,
119
+ counts );
120
+ return false ;
121
+ }
122
+ }
123
+
124
+ // if diff is less than the number of tservers, then we are good
125
+ return true ;
126
+ }, SECONDS .toMillis (60 ), SECONDS .toMillis (3 ));
111
127
}
112
128
}
113
129
0 commit comments