@@ -83,7 +83,6 @@ public class SuspendedTabletsIT extends ConfigurableMacBase {
83
83
private static ExecutorService THREAD_POOL ;
84
84
85
85
public static final int TSERVERS = 3 ;
86
- public static final long SUSPEND_DURATION = 80 ;
87
86
public static final int TABLETS = 30 ;
88
87
89
88
private ProcessReference metadataTserverProcess ;
@@ -95,7 +94,6 @@ protected Duration defaultTimeout() {
95
94
96
95
@ Override
97
96
public void configure (MiniAccumuloConfigImpl cfg , Configuration fsConf ) {
98
- cfg .setProperty (Property .TABLE_SUSPEND_DURATION , SUSPEND_DURATION + "s" );
99
97
cfg .setClientProperty (ClientProperty .INSTANCE_ZOOKEEPERS_TIMEOUT , "5s" );
100
98
cfg .setProperty (Property .INSTANCE_ZK_TIMEOUT , "5s" );
101
99
// Start with 1 tserver, we'll increase that later
@@ -152,99 +150,54 @@ public void setUp() throws Exception {
152
150
getCluster ().start ();
153
151
}
154
152
153
+ enum AfterSuspendAction {
154
+ RESUME ("80s" ),
155
+ // Set a long suspend time for testing offline table, want the suspension to be cleared because
156
+ // the tablet went offline and not the because the suspension timed out.
157
+ OFFLINE ("800s" );
158
+
159
+ public final String suspendTime ;
160
+
161
+ AfterSuspendAction (String suspendTime ) {
162
+ this .suspendTime = suspendTime ;
163
+ }
164
+ }
165
+
155
166
@ Test
156
167
public void crashAndResumeTserver () throws Exception {
157
168
// Run the test body. When we get to the point where we need a tserver to go away, get rid of it
158
169
// via crashing
159
- suspensionTestBody ((ctx , locs , count ) -> {
160
- // Exclude the tablet server hosting the metadata table from the list and only
161
- // kill tablet servers that are not hosting the metadata table.
162
- List <ProcessReference > procs = getCluster ().getProcesses ().get (ServerType .TABLET_SERVER )
163
- .stream ().filter (p -> !metadataTserverProcess .equals (p )).collect (Collectors .toList ());
164
- Collections .shuffle (procs , RANDOM .get ());
165
- assertEquals (TSERVERS - 1 , procs .size (), "Not enough tservers exist" );
166
- assertTrue (procs .size () >= count , "Attempting to kill more tservers (" + count
167
- + ") than exist in the cluster (" + procs .size () + ")" );
170
+ suspensionTestBody (new CrashTserverKiller (), AfterSuspendAction .RESUME );
171
+ }
168
172
169
- for ( int i = 0 ; i < count ; ++ i ) {
170
- ProcessReference pr = procs . get ( i );
171
- log . info ( "Crashing {}" , pr . getProcess ());
172
- getCluster (). killProcess ( ServerType . TABLET_SERVER , pr );
173
- }
174
- } );
173
+ @ Test
174
+ public void crashAndOffline () throws Exception {
175
+ // Test to ensure that taking a table offline causes the suspension markers to be cleared.
176
+ // Suspension markers can prevent balancing and possibly cause other problems, so its good to
177
+ // clear them for offline tables.
178
+ suspensionTestBody ( new CrashTserverKiller (), AfterSuspendAction . OFFLINE );
175
179
}
176
180
177
181
@ Test
178
182
public void shutdownAndResumeTserver () throws Exception {
179
183
// Run the test body. When we get to the point where we need tservers to go away, stop them via
180
184
// a clean shutdown.
181
- suspensionTestBody ((ctx , locs , count ) -> {
182
- Set <TServerInstance > tserverSet = new HashSet <>();
183
- Set <TServerInstance > metadataServerSet = new HashSet <>();
184
-
185
- TabletLocator tl = TabletLocator .getLocator (ctx , AccumuloTable .METADATA .tableId ());
186
- for (TabletLocationState tls : locs .locationStates .values ()) {
187
- if (tls .current != null ) {
188
- // add to set of all servers
189
- tserverSet .add (tls .current .getServerInstance ());
190
-
191
- // get server that the current tablets metadata is on
192
- TabletLocator .TabletLocation tab =
193
- tl .locateTablet (ctx , tls .extent .toMetaRow (), false , false );
194
- // add it to the set of servers with metadata
195
- metadataServerSet .add (new TServerInstance (tab .getTserverLocation (),
196
- Long .valueOf (tab .getTserverSession (), 16 )));
197
- }
198
- }
199
-
200
- // remove servers with metadata on them from the list of servers to be shutdown
201
- assertEquals (1 , metadataServerSet .size (), "Expecting a single tServer in metadataServerSet" );
202
- tserverSet .removeAll (metadataServerSet );
203
-
204
- assertEquals (TSERVERS - 1 , tserverSet .size (),
205
- "Expecting " + (TSERVERS - 1 ) + " tServers in shutdown-list" );
206
-
207
- List <TServerInstance > tserversList = new ArrayList <>(tserverSet );
208
- Collections .shuffle (tserversList , RANDOM .get ());
209
-
210
- for (int i1 = 0 ; i1 < count ; ++i1 ) {
211
- final String tserverName = tserversList .get (i1 ).getHostPortSession ();
212
- ThriftClientTypes .MANAGER .executeVoid (ctx , client -> {
213
- log .info ("Sending shutdown command to {} via ManagerClientService" , tserverName );
214
- client .shutdownTabletServer (null , ctx .rpcCreds (), tserverName , false );
215
- });
216
- }
185
+ suspensionTestBody (new ShutdownTserverKiller (), AfterSuspendAction .RESUME );
186
+ }
217
187
218
- log .info ("Waiting for tserver process{} to die" , count == 1 ? "" : "es" );
219
- for (int i2 = 0 ; i2 < 10 ; ++i2 ) {
220
- List <ProcessReference > deadProcs = new ArrayList <>();
221
- for (ProcessReference pr1 : getCluster ().getProcesses ().get (ServerType .TABLET_SERVER )) {
222
- Process p = pr1 .getProcess ();
223
- if (!p .isAlive ()) {
224
- deadProcs .add (pr1 );
225
- }
226
- }
227
- for (ProcessReference pr2 : deadProcs ) {
228
- log .info ("Process {} is dead, informing cluster control about this" , pr2 .getProcess ());
229
- getCluster ().getClusterControl ().killProcess (ServerType .TABLET_SERVER , pr2 );
230
- --count ;
231
- }
232
- if (count == 0 ) {
233
- return ;
234
- } else {
235
- Thread .sleep (SECONDS .toMillis (2 ));
236
- }
237
- }
238
- throw new IllegalStateException ("Tablet servers didn't die!" );
239
- });
188
+ @ Test
189
+ public void shutdownAndOffline () throws Exception {
190
+ // Test to ensure that taking a table offline causes the suspension markers to be cleared.
191
+ suspensionTestBody (new ShutdownTserverKiller (), AfterSuspendAction .OFFLINE );
240
192
}
241
193
242
194
/**
243
195
* Main test body for suspension tests.
244
196
*
245
197
* @param serverStopper callback which shuts down some tablet servers.
246
198
*/
247
- private void suspensionTestBody (TServerKiller serverStopper ) throws Exception {
199
+ private void suspensionTestBody (TServerKiller serverStopper , AfterSuspendAction action )
200
+ throws Exception {
248
201
try (AccumuloClient client = Accumulo .newClient ().from (getClientProperties ()).build ()) {
249
202
ClientContext ctx = (ClientContext ) client ;
250
203
@@ -256,6 +209,7 @@ private void suspensionTestBody(TServerKiller serverStopper) throws Exception {
256
209
}
257
210
log .info ("Creating table " + tableName );
258
211
NewTableConfiguration ntc = new NewTableConfiguration ().withSplits (splitPoints );
212
+ ntc .setProperties (Map .of (Property .TABLE_SUSPEND_DURATION .getKey (), action .suspendTime ));
259
213
ctx .tableOperations ().create (tableName , ntc );
260
214
261
215
// Wait for all of the tablets to hosted ...
@@ -303,27 +257,43 @@ private void suspensionTestBody(TServerKiller serverStopper) throws Exception {
303
257
assertEquals (beforeDeathState .hosted .get (server ), deadTabletsByServer .get (server ));
304
258
}
305
259
assertEquals (TABLETS , ds .hostedCount + ds .suspendedCount );
306
- // Restart the first tablet server, making sure it ends up on the same port
307
- HostAndPort restartedServer = deadTabletsByServer .keySet ().iterator ().next ();
308
- log .info ("Restarting " + restartedServer );
309
- getCluster ().getClusterControl ().start (ServerType .TABLET_SERVER ,
310
- Map .of (Property .TSERV_CLIENTPORT .getKey (), "" + restartedServer .getPort (),
311
- Property .TSERV_PORTSEARCH .getKey (), "false" ),
312
- 1 );
313
-
314
- // Eventually, the suspended tablets should be reassigned to the newly alive tserver.
315
- log .info ("Awaiting tablet unsuspension for tablets belonging to " + restartedServer );
316
- while (ds .suspended .containsKey (restartedServer ) || ds .assignedCount != 0 ) {
317
- Thread .sleep (1000 );
318
- ds = TabletLocations .retrieve (ctx , tableName );
319
- }
320
- assertEquals (deadTabletsByServer .get (restartedServer ), ds .hosted .get (restartedServer ));
321
260
322
- // Finally, after much longer, remaining suspended tablets should be reassigned.
323
- log .info ("Awaiting tablet reassignment for remaining tablets" );
324
- while (ds .hostedCount != TABLETS ) {
325
- Thread .sleep (1000 );
326
- ds = TabletLocations .retrieve (ctx , tableName );
261
+ assertTrue (ds .suspendedCount > 0 );
262
+
263
+ if (action == AfterSuspendAction .OFFLINE ) {
264
+ client .tableOperations ().offline (tableName , true );
265
+
266
+ while (ds .suspendedCount > 0 ) {
267
+ Thread .sleep (1000 );
268
+ ds = TabletLocations .retrieve (ctx , tableName );
269
+ log .info ("Waiting for suspended {}" , ds .suspended );
270
+ }
271
+ } else if (action == AfterSuspendAction .RESUME ) {
272
+ // Restart the first tablet server, making sure it ends up on the same port
273
+ HostAndPort restartedServer = deadTabletsByServer .keySet ().iterator ().next ();
274
+ log .info ("Restarting " + restartedServer );
275
+ getCluster ().getClusterControl ()
276
+ .start (
277
+ ServerType .TABLET_SERVER , Map .of (Property .TSERV_CLIENTPORT .getKey (),
278
+ "" + restartedServer .getPort (), Property .TSERV_PORTSEARCH .getKey (), "false" ),
279
+ 1 );
280
+
281
+ // Eventually, the suspended tablets should be reassigned to the newly alive tserver.
282
+ log .info ("Awaiting tablet unsuspension for tablets belonging to " + restartedServer );
283
+ while (ds .suspended .containsKey (restartedServer ) || ds .assignedCount != 0 ) {
284
+ Thread .sleep (1000 );
285
+ ds = TabletLocations .retrieve (ctx , tableName );
286
+ }
287
+ assertEquals (deadTabletsByServer .get (restartedServer ), ds .hosted .get (restartedServer ));
288
+
289
+ // Finally, after much longer, remaining suspended tablets should be reassigned.
290
+ log .info ("Awaiting tablet reassignment for remaining tablets" );
291
+ while (ds .hostedCount != TABLETS ) {
292
+ Thread .sleep (1000 );
293
+ ds = TabletLocations .retrieve (ctx , tableName );
294
+ }
295
+ } else {
296
+ throw new IllegalStateException ("Unknown action " + action );
327
297
}
328
298
}
329
299
}
@@ -333,6 +303,93 @@ void eliminateTabletServers(ClientContext ctx, TabletLocations locs, int count)
333
303
throws Exception ;
334
304
}
335
305
306
+ private class ShutdownTserverKiller implements TServerKiller {
307
+
308
+ @ Override
309
+ public void eliminateTabletServers (ClientContext ctx , TabletLocations locs , int count )
310
+ throws Exception {
311
+ Set <TServerInstance > tserverSet = new HashSet <>();
312
+ Set <TServerInstance > metadataServerSet = new HashSet <>();
313
+
314
+ TabletLocator tl = TabletLocator .getLocator (ctx , AccumuloTable .METADATA .tableId ());
315
+ for (TabletLocationState tls : locs .locationStates .values ()) {
316
+ if (tls .current != null ) {
317
+ // add to set of all servers
318
+ tserverSet .add (tls .current .getServerInstance ());
319
+
320
+ // get server that the current tablets metadata is on
321
+ TabletLocator .TabletLocation tab =
322
+ tl .locateTablet (ctx , tls .extent .toMetaRow (), false , false );
323
+ // add it to the set of servers with metadata
324
+ metadataServerSet .add (new TServerInstance (tab .getTserverLocation (),
325
+ Long .valueOf (tab .getTserverSession (), 16 )));
326
+ }
327
+ }
328
+
329
+ // remove servers with metadata on them from the list of servers to be shutdown
330
+ assertEquals (1 , metadataServerSet .size (), "Expecting a single tServer in metadataServerSet" );
331
+ tserverSet .removeAll (metadataServerSet );
332
+
333
+ assertEquals (TSERVERS - 1 , tserverSet .size (),
334
+ "Expecting " + (TSERVERS - 1 ) + " tServers in shutdown-list" );
335
+
336
+ List <TServerInstance > tserversList = new ArrayList <>(tserverSet );
337
+ Collections .shuffle (tserversList , RANDOM .get ());
338
+
339
+ for (int i1 = 0 ; i1 < count ; ++i1 ) {
340
+ final String tserverName = tserversList .get (i1 ).getHostPortSession ();
341
+ ThriftClientTypes .MANAGER .executeVoid (ctx , client -> {
342
+ log .info ("Sending shutdown command to {} via ManagerClientService" , tserverName );
343
+ client .shutdownTabletServer (null , ctx .rpcCreds (), tserverName , false );
344
+ });
345
+ }
346
+
347
+ log .info ("Waiting for tserver process{} to die" , count == 1 ? "" : "es" );
348
+ for (int i2 = 0 ; i2 < 10 ; ++i2 ) {
349
+ List <ProcessReference > deadProcs = new ArrayList <>();
350
+ for (ProcessReference pr1 : getCluster ().getProcesses ().get (ServerType .TABLET_SERVER )) {
351
+ Process p = pr1 .getProcess ();
352
+ if (!p .isAlive ()) {
353
+ deadProcs .add (pr1 );
354
+ }
355
+ }
356
+ for (ProcessReference pr2 : deadProcs ) {
357
+ log .info ("Process {} is dead, informing cluster control about this" , pr2 .getProcess ());
358
+ getCluster ().getClusterControl ().killProcess (ServerType .TABLET_SERVER , pr2 );
359
+ --count ;
360
+ }
361
+ if (count == 0 ) {
362
+ return ;
363
+ } else {
364
+ Thread .sleep (SECONDS .toMillis (2 ));
365
+ }
366
+ }
367
+ throw new IllegalStateException ("Tablet servers didn't die!" );
368
+ }
369
+ }
370
+
371
+ private class CrashTserverKiller implements TServerKiller {
372
+
373
+ @ Override
374
+ public void eliminateTabletServers (ClientContext ctx , TabletLocations locs , int count )
375
+ throws Exception {
376
+ // Exclude the tablet server hosting the metadata table from the list and only
377
+ // kill tablet servers that are not hosting the metadata table.
378
+ List <ProcessReference > procs = getCluster ().getProcesses ().get (ServerType .TABLET_SERVER )
379
+ .stream ().filter (p -> !metadataTserverProcess .equals (p )).collect (Collectors .toList ());
380
+ Collections .shuffle (procs , RANDOM .get ());
381
+ assertEquals (TSERVERS - 1 , procs .size (), "Not enough tservers exist" );
382
+ assertTrue (procs .size () >= count , "Attempting to kill more tservers (" + count
383
+ + ") than exist in the cluster (" + procs .size () + ")" );
384
+
385
+ for (int i = 0 ; i < count ; ++i ) {
386
+ ProcessReference pr = procs .get (i );
387
+ log .info ("Crashing {}" , pr .getProcess ());
388
+ getCluster ().killProcess (ServerType .TABLET_SERVER , pr );
389
+ }
390
+ }
391
+ }
392
+
336
393
private static final AtomicInteger threadCounter = new AtomicInteger (0 );
337
394
338
395
@ BeforeAll
0 commit comments