Skip to content

Commit bdec3a7

Browse files
authored
fixes clearing suspension for offline tables (apache#4295)
There was code in the manager that seemed to have the intent of clearing suspension markers in tablets for an offline table. This code was not working or tested. This commit fixes this code and adds a tests that validates that suspension markers are removed when a table is taken offline. fixes apache#3314
1 parent 1a40aee commit bdec3a7

File tree

3 files changed

+159
-98
lines changed

3 files changed

+159
-98
lines changed

server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,8 @@ public void unsuspend(Collection<TabletLocationState> tablets) throws Distribute
150150
try (var tabletsMutator = ample.mutateTablets()) {
151151
for (TabletLocationState tls : tablets) {
152152
if (tls.suspend != null) {
153-
continue;
153+
tabletsMutator.mutateTablet(tls.extent).deleteSuspension().mutate();
154154
}
155-
tabletsMutator.mutateTablet(tls.extent).deleteSuspension().mutate();
156155
}
157156
} catch (RuntimeException ex) {
158157
throw new DistributedStoreException(ex);

server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateChangeIterator.java

+3
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,9 @@ protected void consume() throws IOException {
209209
case ASSIGNED_TO_DEAD_SERVER:
210210
return;
211211
case SUSPENDED:
212+
// Always return data about suspended tablets. Need to clear the suspension stats when the
213+
// tablet is offline. May need to assign the tablet when the tablet is online.
214+
return;
212215
case UNASSIGNED:
213216
if (shouldBeOnline) {
214217
return;

test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java

+155-96
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ public class SuspendedTabletsIT extends ConfigurableMacBase {
8383
private static ExecutorService THREAD_POOL;
8484

8585
public static final int TSERVERS = 3;
86-
public static final long SUSPEND_DURATION = 80;
8786
public static final int TABLETS = 30;
8887

8988
private ProcessReference metadataTserverProcess;
@@ -95,7 +94,6 @@ protected Duration defaultTimeout() {
9594

9695
@Override
9796
public void configure(MiniAccumuloConfigImpl cfg, Configuration fsConf) {
98-
cfg.setProperty(Property.TABLE_SUSPEND_DURATION, SUSPEND_DURATION + "s");
9997
cfg.setClientProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT, "5s");
10098
cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
10199
// Start with 1 tserver, we'll increase that later
@@ -149,99 +147,54 @@ public void setUp() throws Exception {
149147
getCluster().start();
150148
}
151149

150+
enum AfterSuspendAction {
151+
RESUME("80s"),
152+
// Set a long suspend time for testing offline table, want the suspension to be cleared because
153+
// the tablet went offline and not the because the suspension timed out.
154+
OFFLINE("800s");
155+
156+
public final String suspendTime;
157+
158+
AfterSuspendAction(String suspendTime) {
159+
this.suspendTime = suspendTime;
160+
}
161+
}
162+
152163
@Test
153164
public void crashAndResumeTserver() throws Exception {
154165
// Run the test body. When we get to the point where we need a tserver to go away, get rid of it
155166
// via crashing
156-
suspensionTestBody((ctx, locs, count) -> {
157-
// Exclude the tablet server hosting the metadata table from the list and only
158-
// kill tablet servers that are not hosting the metadata table.
159-
List<ProcessReference> procs = getCluster().getProcesses().get(ServerType.TABLET_SERVER)
160-
.stream().filter(p -> !metadataTserverProcess.equals(p)).collect(Collectors.toList());
161-
Collections.shuffle(procs, random);
162-
assertEquals(TSERVERS - 1, procs.size(), "Not enough tservers exist");
163-
assertTrue(procs.size() >= count, "Attempting to kill more tservers (" + count
164-
+ ") than exist in the cluster (" + procs.size() + ")");
167+
suspensionTestBody(new CrashTserverKiller(), AfterSuspendAction.RESUME);
168+
}
165169

166-
for (int i = 0; i < count; ++i) {
167-
ProcessReference pr = procs.get(i);
168-
log.info("Crashing {}", pr.getProcess());
169-
getCluster().killProcess(ServerType.TABLET_SERVER, pr);
170-
}
171-
});
170+
@Test
171+
public void crashAndOffline() throws Exception {
172+
// Test to ensure that taking a table offline causes the suspension markers to be cleared.
173+
// Suspension markers can prevent balancing and possibly cause other problems, so its good to
174+
// clear them for offline tables.
175+
suspensionTestBody(new CrashTserverKiller(), AfterSuspendAction.OFFLINE);
172176
}
173177

174178
@Test
175179
public void shutdownAndResumeTserver() throws Exception {
176180
// Run the test body. When we get to the point where we need tservers to go away, stop them via
177181
// a clean shutdown.
178-
suspensionTestBody((ctx, locs, count) -> {
179-
Set<TServerInstance> tserverSet = new HashSet<>();
180-
Set<TServerInstance> metadataServerSet = new HashSet<>();
181-
182-
TabletLocator tl = TabletLocator.getLocator(ctx, MetadataTable.ID);
183-
for (TabletLocationState tls : locs.locationStates.values()) {
184-
if (tls.current != null) {
185-
// add to set of all servers
186-
tserverSet.add(tls.current.getServerInstance());
187-
188-
// get server that the current tablets metadata is on
189-
TabletLocator.TabletLocation tab =
190-
tl.locateTablet(ctx, tls.extent.toMetaRow(), false, false);
191-
// add it to the set of servers with metadata
192-
metadataServerSet
193-
.add(new TServerInstance(tab.tablet_location, Long.valueOf(tab.tablet_session, 16)));
194-
}
195-
}
196-
197-
// remove servers with metadata on them from the list of servers to be shutdown
198-
assertEquals(1, metadataServerSet.size(), "Expecting a single tServer in metadataServerSet");
199-
tserverSet.removeAll(metadataServerSet);
200-
201-
assertEquals(TSERVERS - 1, tserverSet.size(),
202-
"Expecting " + (TSERVERS - 1) + " tServers in shutdown-list");
203-
204-
List<TServerInstance> tserversList = new ArrayList<>(tserverSet);
205-
Collections.shuffle(tserversList, random);
206-
207-
for (int i1 = 0; i1 < count; ++i1) {
208-
final String tserverName = tserversList.get(i1).getHostPortSession();
209-
ThriftClientTypes.MANAGER.executeVoid(ctx, client -> {
210-
log.info("Sending shutdown command to {} via ManagerClientService", tserverName);
211-
client.shutdownTabletServer(null, ctx.rpcCreds(), tserverName, false);
212-
});
213-
}
182+
suspensionTestBody(new ShutdownTserverKiller(), AfterSuspendAction.RESUME);
183+
}
214184

215-
log.info("Waiting for tserver process{} to die", count == 1 ? "" : "es");
216-
for (int i2 = 0; i2 < 10; ++i2) {
217-
List<ProcessReference> deadProcs = new ArrayList<>();
218-
for (ProcessReference pr1 : getCluster().getProcesses().get(ServerType.TABLET_SERVER)) {
219-
Process p = pr1.getProcess();
220-
if (!p.isAlive()) {
221-
deadProcs.add(pr1);
222-
}
223-
}
224-
for (ProcessReference pr2 : deadProcs) {
225-
log.info("Process {} is dead, informing cluster control about this", pr2.getProcess());
226-
getCluster().getClusterControl().killProcess(ServerType.TABLET_SERVER, pr2);
227-
--count;
228-
}
229-
if (count == 0) {
230-
return;
231-
} else {
232-
Thread.sleep(SECONDS.toMillis(2));
233-
}
234-
}
235-
throw new IllegalStateException("Tablet servers didn't die!");
236-
});
185+
@Test
186+
public void shutdownAndOffline() throws Exception {
187+
// Test to ensure that taking a table offline causes the suspension markers to be cleared.
188+
suspensionTestBody(new ShutdownTserverKiller(), AfterSuspendAction.OFFLINE);
237189
}
238190

239191
/**
240192
* Main test body for suspension tests.
241193
*
242194
* @param serverStopper callback which shuts down some tablet servers.
243195
*/
244-
private void suspensionTestBody(TServerKiller serverStopper) throws Exception {
196+
private void suspensionTestBody(TServerKiller serverStopper, AfterSuspendAction action)
197+
throws Exception {
245198
try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
246199
ClientContext ctx = (ClientContext) client;
247200

@@ -253,6 +206,7 @@ private void suspensionTestBody(TServerKiller serverStopper) throws Exception {
253206
}
254207
log.info("Creating table " + tableName);
255208
NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splitPoints);
209+
ntc.setProperties(Map.of(Property.TABLE_SUSPEND_DURATION.getKey(), action.suspendTime));
256210
ctx.tableOperations().create(tableName, ntc);
257211

258212
// Wait for all of the tablets to hosted ...
@@ -300,27 +254,43 @@ private void suspensionTestBody(TServerKiller serverStopper) throws Exception {
300254
assertEquals(beforeDeathState.hosted.get(server), deadTabletsByServer.get(server));
301255
}
302256
assertEquals(TABLETS, ds.hostedCount + ds.suspendedCount);
303-
// Restart the first tablet server, making sure it ends up on the same port
304-
HostAndPort restartedServer = deadTabletsByServer.keySet().iterator().next();
305-
log.info("Restarting " + restartedServer);
306-
getCluster().getClusterControl().start(ServerType.TABLET_SERVER,
307-
Map.of(Property.TSERV_CLIENTPORT.getKey(), "" + restartedServer.getPort(),
308-
Property.TSERV_PORTSEARCH.getKey(), "false"),
309-
1);
310-
311-
// Eventually, the suspended tablets should be reassigned to the newly alive tserver.
312-
log.info("Awaiting tablet unsuspension for tablets belonging to " + restartedServer);
313-
while (ds.suspended.containsKey(restartedServer) || ds.assignedCount != 0) {
314-
Thread.sleep(1000);
315-
ds = TabletLocations.retrieve(ctx, tableName);
316-
}
317-
assertEquals(deadTabletsByServer.get(restartedServer), ds.hosted.get(restartedServer));
318257

319-
// Finally, after much longer, remaining suspended tablets should be reassigned.
320-
log.info("Awaiting tablet reassignment for remaining tablets");
321-
while (ds.hostedCount != TABLETS) {
322-
Thread.sleep(1000);
323-
ds = TabletLocations.retrieve(ctx, tableName);
258+
assertTrue(ds.suspendedCount > 0);
259+
260+
if (action == AfterSuspendAction.OFFLINE) {
261+
client.tableOperations().offline(tableName, true);
262+
263+
while (ds.suspendedCount > 0) {
264+
Thread.sleep(1000);
265+
ds = TabletLocations.retrieve(ctx, tableName);
266+
log.info("Waiting for suspended {}", ds.suspended);
267+
}
268+
} else if (action == AfterSuspendAction.RESUME) {
269+
// Restart the first tablet server, making sure it ends up on the same port
270+
HostAndPort restartedServer = deadTabletsByServer.keySet().iterator().next();
271+
log.info("Restarting " + restartedServer);
272+
getCluster().getClusterControl()
273+
.start(
274+
ServerType.TABLET_SERVER, Map.of(Property.TSERV_CLIENTPORT.getKey(),
275+
"" + restartedServer.getPort(), Property.TSERV_PORTSEARCH.getKey(), "false"),
276+
1);
277+
278+
// Eventually, the suspended tablets should be reassigned to the newly alive tserver.
279+
log.info("Awaiting tablet unsuspension for tablets belonging to " + restartedServer);
280+
while (ds.suspended.containsKey(restartedServer) || ds.assignedCount != 0) {
281+
Thread.sleep(1000);
282+
ds = TabletLocations.retrieve(ctx, tableName);
283+
}
284+
assertEquals(deadTabletsByServer.get(restartedServer), ds.hosted.get(restartedServer));
285+
286+
// Finally, after much longer, remaining suspended tablets should be reassigned.
287+
log.info("Awaiting tablet reassignment for remaining tablets");
288+
while (ds.hostedCount != TABLETS) {
289+
Thread.sleep(1000);
290+
ds = TabletLocations.retrieve(ctx, tableName);
291+
}
292+
} else {
293+
throw new IllegalStateException("Unknown action " + action);
324294
}
325295
}
326296
}
@@ -330,6 +300,95 @@ void eliminateTabletServers(ClientContext ctx, TabletLocations locs, int count)
330300
throws Exception;
331301
}
332302

303+
private class ShutdownTserverKiller implements TServerKiller {
304+
305+
@Override
306+
public void eliminateTabletServers(ClientContext ctx, TabletLocations locs, int count)
307+
throws Exception {
308+
309+
Set<TServerInstance> tserverSet = new HashSet<>();
310+
Set<TServerInstance> metadataServerSet = new HashSet<>();
311+
312+
TabletLocator tl = TabletLocator.getLocator(ctx, MetadataTable.ID);
313+
for (TabletLocationState tls : locs.locationStates.values()) {
314+
if (tls.current != null) {
315+
// add to set of all servers
316+
tserverSet.add(tls.current.getServerInstance());
317+
318+
// get server that the current tablets metadata is on
319+
TabletLocator.TabletLocation tab =
320+
tl.locateTablet(ctx, tls.extent.toMetaRow(), false, false);
321+
// add it to the set of servers with metadata
322+
metadataServerSet
323+
.add(new TServerInstance(tab.tablet_location, Long.valueOf(tab.tablet_session, 16)));
324+
}
325+
}
326+
327+
// remove servers with metadata on them from the list of servers to be shutdown
328+
assertEquals(1, metadataServerSet.size(), "Expecting a single tServer in metadataServerSet");
329+
tserverSet.removeAll(metadataServerSet);
330+
331+
assertEquals(TSERVERS - 1, tserverSet.size(),
332+
"Expecting " + (TSERVERS - 1) + " tServers in shutdown-list");
333+
334+
List<TServerInstance> tserversList = new ArrayList<>(tserverSet);
335+
Collections.shuffle(tserversList, random);
336+
337+
for (int i1 = 0; i1 < count; ++i1) {
338+
final String tserverName = tserversList.get(i1).getHostPortSession();
339+
ThriftClientTypes.MANAGER.executeVoid(ctx, client -> {
340+
log.info("Sending shutdown command to {} via ManagerClientService", tserverName);
341+
client.shutdownTabletServer(null, ctx.rpcCreds(), tserverName, false);
342+
});
343+
}
344+
345+
log.info("Waiting for tserver process{} to die", count == 1 ? "" : "es");
346+
for (int i2 = 0; i2 < 10; ++i2) {
347+
List<ProcessReference> deadProcs = new ArrayList<>();
348+
for (ProcessReference pr1 : getCluster().getProcesses().get(ServerType.TABLET_SERVER)) {
349+
Process p = pr1.getProcess();
350+
if (!p.isAlive()) {
351+
deadProcs.add(pr1);
352+
}
353+
}
354+
for (ProcessReference pr2 : deadProcs) {
355+
log.info("Process {} is dead, informing cluster control about this", pr2.getProcess());
356+
getCluster().getClusterControl().killProcess(ServerType.TABLET_SERVER, pr2);
357+
--count;
358+
}
359+
if (count == 0) {
360+
return;
361+
} else {
362+
Thread.sleep(SECONDS.toMillis(2));
363+
}
364+
}
365+
throw new IllegalStateException("Tablet servers didn't die!");
366+
367+
}
368+
}
369+
370+
private class CrashTserverKiller implements TServerKiller {
371+
372+
@Override
373+
public void eliminateTabletServers(ClientContext ctx, TabletLocations locs, int count)
374+
throws Exception {
375+
// Exclude the tablet server hosting the metadata table from the list and only
376+
// kill tablet servers that are not hosting the metadata table.
377+
List<ProcessReference> procs = getCluster().getProcesses().get(ServerType.TABLET_SERVER)
378+
.stream().filter(p -> !metadataTserverProcess.equals(p)).collect(Collectors.toList());
379+
Collections.shuffle(procs, random);
380+
assertEquals(TSERVERS - 1, procs.size(), "Not enough tservers exist");
381+
assertTrue(procs.size() >= count, "Attempting to kill more tservers (" + count
382+
+ ") than exist in the cluster (" + procs.size() + ")");
383+
384+
for (int i = 0; i < count; ++i) {
385+
ProcessReference pr = procs.get(i);
386+
log.info("Crashing {}", pr.getProcess());
387+
getCluster().killProcess(ServerType.TABLET_SERVER, pr);
388+
}
389+
}
390+
}
391+
333392
private static final AtomicInteger threadCounter = new AtomicInteger(0);
334393

335394
@BeforeAll

0 commit comments

Comments
 (0)