Skip to content

Commit 8158304

Browse files
committed
Suggestions for graceful shutdown branch
1 parent 0d5f014 commit 8158304

File tree

3 files changed

+43
-54
lines changed

3 files changed

+43
-54
lines changed

minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java

+5-23
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.ArrayList;
2525
import java.util.Collections;
2626
import java.util.HashMap;
27-
import java.util.Iterator;
2827
import java.util.List;
2928
import java.util.Map;
3029
import java.util.Map.Entry;
@@ -532,12 +531,7 @@ public void refreshProcesses(ServerType type) {
532531
}
533532
break;
534533
case COMPACTOR:
535-
Iterator<Process> iterC = compactorProcesses.iterator();
536-
while (iterC.hasNext()) {
537-
if (!iterC.next().isAlive()) {
538-
iterC.remove();
539-
}
540-
}
534+
compactorProcesses.removeIf(process -> !process.isAlive());
541535
break;
542536
case GARBAGE_COLLECTOR:
543537
if (!gcProcess.isAlive()) {
@@ -556,20 +550,10 @@ public void refreshProcesses(ServerType type) {
556550
}
557551
break;
558552
case SCAN_SERVER:
559-
Iterator<Process> iterS = scanServerProcesses.iterator();
560-
while (iterS.hasNext()) {
561-
if (!iterS.next().isAlive()) {
562-
iterS.remove();
563-
}
564-
}
553+
scanServerProcesses.removeIf(process -> !process.isAlive());
565554
break;
566555
case TABLET_SERVER:
567-
Iterator<Process> iterT = tabletServerProcesses.iterator();
568-
while (iterT.hasNext()) {
569-
if (!iterT.next().isAlive()) {
570-
iterT.remove();
571-
}
572-
}
556+
tabletServerProcesses.removeIf(process -> !process.isAlive());
573557
break;
574558
case ZOOKEEPER:
575559
if (!zooKeeperProcess.isAlive()) {
@@ -586,8 +570,7 @@ public Set<Process> getProcesses(ServerType type) {
586570
case COMPACTION_COORDINATOR:
587571
return coordinatorProcess == null ? Set.of() : Set.of(coordinatorProcess);
588572
case COMPACTOR:
589-
return compactorProcesses == null ? Set.of()
590-
: Set.of(compactorProcesses.toArray(new Process[] {}));
573+
return Set.of(compactorProcesses.toArray(new Process[] {}));
591574
case GARBAGE_COLLECTOR:
592575
return gcProcess == null ? Set.of() : Set.of(gcProcess);
593576
case MANAGER:
@@ -596,8 +579,7 @@ public Set<Process> getProcesses(ServerType type) {
596579
case MONITOR:
597580
return monitor == null ? Set.of() : Set.of(monitor);
598581
case SCAN_SERVER:
599-
return scanServerProcesses == null ? Set.of()
600-
: Set.of(scanServerProcesses.toArray(new Process[] {}));
582+
return Set.of(scanServerProcesses.toArray(new Process[] {}));
601583
case TABLET_SERVER:
602584
return tabletServerProcesses == null ? Set.of()
603585
: Set.of(tabletServerProcesses.toArray(new Process[] {}));

server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java

+18-16
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.util.concurrent.ScheduledFuture;
6060
import java.util.concurrent.ThreadPoolExecutor;
6161
import java.util.concurrent.TimeUnit;
62+
import java.util.concurrent.atomic.AtomicBoolean;
6263
import java.util.concurrent.atomic.AtomicInteger;
6364
import java.util.concurrent.atomic.AtomicLong;
6465
import java.util.concurrent.locks.ReentrantLock;
@@ -963,40 +964,41 @@ public void run() {
963964
.numMaxThreads(16).build();
964965

965966
ManagerClientService.Client iface = managerConnection(getManagerAddress());
966-
boolean managerDown = false;
967+
final AtomicBoolean managerDown = new AtomicBoolean(false);
967968

968969
try {
969-
for (DataLevel level : new DataLevel[] {DataLevel.USER, DataLevel.METADATA, DataLevel.ROOT}) {
970+
for (DataLevel level : DataLevel.values()) {
970971
getOnlineTablets().keySet().forEach(ke -> {
971972
if (DataLevel.of(ke.tableId()) == level) {
972973
futures.add(
973974
tpe.submit(new UnloadTabletHandler(this, ke, TUnloadTabletGoal.UNASSIGNED, 5000)));
974975
}
975976
});
976977
while (!futures.isEmpty()) {
977-
Iterator<Future<?>> unloads = futures.iterator();
978-
while (unloads.hasNext()) {
979-
Future<?> f = unloads.next();
980-
if (f.isDone()) {
981-
if (!managerDown) {
982-
ManagerMessage mm = managerMessages.poll();
983-
try {
978+
futures.removeIf(f -> {
979+
if (!f.isDone()) {
980+
return false;
981+
}
982+
if (!managerDown.get()) {
983+
ManagerMessage mm = managerMessages.poll();
984+
try {
985+
if (mm != null) {
984986
mm.send(getContext().rpcCreds(), getClientAddressString(), iface);
985-
} catch (TException e) {
986-
managerDown = true;
987-
LOG.debug("Error sending message to Manager during tablet unloading, msg: {}",
988-
e.getMessage());
989987
}
988+
} catch (TException e) {
989+
managerDown.set(true);
990+
log.debug("Error sending message to Manager during tablet unloading, msg: {}",
991+
e.getMessage());
990992
}
991-
unloads.remove();
992993
}
993-
}
994+
return true;
995+
});
994996
log.debug("Waiting on {} {} tablets to close.", futures.size(), level);
995997
UtilWaitThread.sleep(1000);
996998
}
997999
}
9981000
} finally {
999-
if (!managerDown) {
1001+
if (!managerDown.get()) {
10001002
try {
10011003
ManagerMessage mm = managerMessages.poll();
10021004
do {

test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java

+20-15
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.accumulo.core.data.Value;
5050
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
5151
import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath;
52+
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
5253
import org.apache.accumulo.core.security.Authorizations;
5354
import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
5455
import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
@@ -147,17 +148,17 @@ public void testGracefulShutdown() throws Exception {
147148
final TableId tid = ctx.getTableId(tableName);
148149

149150
// Insert 10 rows, flush after every row to create 10 files
150-
final BatchWriter writer = client.createBatchWriter(tableName);
151-
for (int i : IntStream.rangeClosed(1, 10).toArray()) {
152-
String val = i + "";
153-
Mutation m = new Mutation(val);
154-
m.put(val, val, val);
155-
writer.addMutation(m);
156-
writer.flush();
157-
client.tableOperations().flush(tableName, null, null, true);
151+
try (BatchWriter writer = client.createBatchWriter(tableName)) {
152+
for (int i : IntStream.rangeClosed(1, 10).toArray()) {
153+
String val = i + "";
154+
Mutation m = new Mutation(val);
155+
m.put(val, val, val);
156+
writer.addMutation(m);
157+
writer.flush();
158+
client.tableOperations().flush(tableName, null, null, true);
159+
}
158160
}
159-
final long numFiles = ctx.getAmple().readTablets().forTable(tid).build().stream()
160-
.mapToLong(tm -> tm.getFiles().size()).sum();
161+
long numFiles = getNumFilesForTable(ctx, tid);
161162
assertEquals(10, numFiles);
162163
client.instanceOperations().waitForBalance();
163164

@@ -220,9 +221,8 @@ public void testGracefulShutdown() throws Exception {
220221
cc.setIterators(List.of(is));
221222
cc.setWait(false);
222223

223-
final long numFiles2 = ctx.getAmple().readTablets().forTable(tid).build().stream()
224-
.mapToLong(tm -> tm.getFiles().size()).sum();
225-
assertTrue(numFiles2 == numFiles);
224+
final long numFiles2 = getNumFilesForTable(ctx, tid);
225+
assertEquals(numFiles2, numFiles);
226226
assertEquals(0, ExternalCompactionTestUtils.getRunningCompactions(ctx).getCompactionsSize());
227227
client.tableOperations().compact(tableName, cc);
228228
Wait.waitFor(
@@ -232,8 +232,7 @@ public void testGracefulShutdown() throws Exception {
232232
control.refreshProcesses(ServerType.COMPACTOR);
233233
return control.getProcesses(ServerType.COMPACTOR).isEmpty();
234234
});
235-
final long numFiles3 = ctx.getAmple().readTablets().forTable(tid).build().stream()
236-
.mapToLong(tm -> tm.getFiles().size()).sum();
235+
final long numFiles3 = getNumFilesForTable(ctx, tid);
237236
assertTrue(numFiles3 < numFiles2);
238237
assertEquals(1, numFiles3);
239238

@@ -266,4 +265,10 @@ public void testGracefulShutdown() throws Exception {
266265
}
267266

268267
}
268+
269+
long getNumFilesForTable(ServerContext ctx, TableId tid) {
270+
try (TabletsMetadata tablets = ctx.getAmple().readTablets().forTable(tid).build()) {
271+
return tablets.stream().mapToLong(tm -> tm.getFiles().size()).sum();
272+
}
273+
}
269274
}

0 commit comments

Comments
 (0)