Skip to content

Commit 7c54495

Browse files
committed
Use futures, new property, and update gcstatus variable
1 parent 4ce3fe1 commit 7c54495

File tree

5 files changed

+58
-14
lines changed

5 files changed

+58
-14
lines changed

core/src/main/java/org/apache/accumulo/core/conf/Property.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -940,8 +940,10 @@ public enum Property {
940940
"1.3.5"),
941941
GC_PORT("gc.port.client", "9998", PropertyType.PORT,
942942
"The listening port for the garbage collector's monitor service.", "1.3.5"),
943+
GC_DELETE_WAL_THREADS("gc.threads.delete.wal", "4", PropertyType.COUNT,
944+
"The number of threads used to delete write-ahead logs and recovery files.", "2.1.4"),
943945
GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT,
944-
"The number of threads used to delete RFiles and write-ahead logs.", "1.3.5"),
946+
"The number of threads used to delete RFiles.", "1.3.5"),
945947
@Experimental
946948
GC_REMOVE_IN_USE_CANDIDATES("gc.remove.in.use.candidates", "false", PropertyType.BOOLEAN,
947949
"GC will remove deletion candidates that are in-use from the metadata location. "

core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public enum ThreadPoolNames {
3636
COORDINATOR_FINALIZER_BACKGROUND_POOL("accumulo.pool.compaction.finalizer.background.pool"),
3737
COORDINATOR_FINALIZER_NOTIFIER_POOL("accumulo.pool.compaction.coordinator.compaction.finalizer"),
3838
GC_DELETE_POOL("accumulo.pool.gc.threads.delete"),
39+
GC_WAL_DELETE_POOL("accumulo.pool.gc.threads.delete.wal"),
3940
GENERAL_SERVER_POOL("accumulo.pool.general.server"),
4041
GENERAL_SERVER_SIMPLETIMER_POOL("accumulo.pool.general.server.simpletimer"),
4142
IMPORT_TABLE_RENAME_POOL("accumulo.pool.import.table.rename"),

core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java

+3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static java.util.concurrent.TimeUnit.SECONDS;
2424
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX;
2525
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_DELETE_POOL;
26+
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_WAL_DELETE_POOL;
2627
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_POOL;
2728
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_SIMPLETIMER_POOL;
2829
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_BULK_IMPORT_POOL;
@@ -375,6 +376,8 @@ public ThreadPoolExecutor createExecutorService(final AccumuloConfiguration conf
375376
builder.enableThreadPoolMetrics();
376377
}
377378
return builder.build();
379+
case GC_DELETE_WAL_THREADS:
380+
return getPoolBuilder(GC_WAL_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
378381
case GC_DELETE_THREADS:
379382
return getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
380383
case REPLICATION_WORKER_THREADS:

server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java

+46-8
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@
2323
import java.util.Collection;
2424
import java.util.HashMap;
2525
import java.util.HashSet;
26+
import java.util.Iterator;
2627
import java.util.List;
2728
import java.util.Map;
2829
import java.util.Map.Entry;
2930
import java.util.Set;
3031
import java.util.UUID;
32+
import java.util.concurrent.ExecutionException;
3133
import java.util.concurrent.ExecutorService;
34+
import java.util.concurrent.Future;
3235
import java.util.concurrent.TimeUnit;
3336
import java.util.concurrent.atomic.AtomicBoolean;
3437
import java.util.concurrent.atomic.AtomicLong;
@@ -271,9 +274,9 @@ private long removeTabletServerMarkers(Map<UUID,TServerInstance> uidMap,
271274
return result;
272275
}
273276

274-
private void removeFile(ExecutorService deleteThreadPool, Path path, AtomicLong counter,
277+
private Future<?> removeFile(ExecutorService deleteThreadPool, Path path, AtomicLong counter,
275278
String msg) {
276-
deleteThreadPool.execute(() -> {
279+
return deleteThreadPool.submit(() -> {
277280
try {
278281
log.debug(msg);
279282
if (!useTrash || !fs.moveToTrash(path)) {
@@ -283,42 +286,77 @@ private void removeFile(ExecutorService deleteThreadPool, Path path, AtomicLong
283286
} catch (FileNotFoundException ex) {
284287
// ignored
285288
} catch (IOException ex) {
286-
log.error("Unable to delete wal {}", path, ex);
289+
log.error("Unable to delete {}", path, ex);
287290
}
288291
});
289292
}
290293

291294
private long removeFiles(Collection<Pair<WalState,Path>> collection, final GCStatus status) {
292295

293296
final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools()
294-
.createExecutorService(context.getConfiguration(), Property.GC_DELETE_THREADS);
297+
.createExecutorService(context.getConfiguration(), Property.GC_DELETE_WAL_THREADS);
298+
299+
final Map<Path,Future<?>> futures = new HashMap<>(collection.size());
295300
final AtomicLong counter = new AtomicLong();
296301

297302
for (Pair<WalState,Path> stateFile : collection) {
298303
Path path = stateFile.getSecond();
299-
removeFile(deleteThreadPool, path, counter,
300-
"Removing " + stateFile.getFirst() + " WAL " + path);
304+
futures.put(path, removeFile(deleteThreadPool, path, counter,
305+
"Removing " + stateFile.getFirst() + " WAL " + path));
301306
}
302307

308+
while (!futures.isEmpty()) {
309+
Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator();
310+
while (iter.hasNext()) {
311+
Entry<Path,Future<?>> f = iter.next();
312+
if (f.getValue().isDone()) {
313+
try {
314+
iter.remove();
315+
f.getValue().get();
316+
} catch (InterruptedException | ExecutionException e) {
317+
throw new RuntimeException("Uncaught exception deleting wal file" + f.getKey(), e);
318+
}
319+
}
320+
}
321+
}
303322
deleteThreadPool.shutdown();
304323
try {
305324
while (!deleteThreadPool.awaitTermination(1000, TimeUnit.MILLISECONDS)) { // empty
306325
}
307326
} catch (InterruptedException e1) {
308327
log.error("{}", e1.getMessage(), e1);
309328
}
329+
status.currentLog.deleted += counter.get();
310330
return counter.get();
311331
}
312332

313333
private long removeFiles(Collection<Path> values) {
334+
314335
final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools()
315-
.createExecutorService(context.getConfiguration(), Property.GC_DELETE_THREADS);
336+
.createExecutorService(context.getConfiguration(), Property.GC_DELETE_WAL_THREADS);
337+
final Map<Path,Future<?>> futures = new HashMap<>(values.size());
316338
final AtomicLong counter = new AtomicLong();
317339

318340
for (Path path : values) {
319-
removeFile(deleteThreadPool, path, counter, "Removing recovery log " + path);
341+
futures.put(path,
342+
removeFile(deleteThreadPool, path, counter, "Removing recovery log " + path));
320343
}
321344

345+
while (!futures.isEmpty()) {
346+
Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator();
347+
while (iter.hasNext()) {
348+
Entry<Path,Future<?>> f = iter.next();
349+
if (f.getValue().isDone()) {
350+
try {
351+
iter.remove();
352+
f.getValue().get();
353+
} catch (InterruptedException | ExecutionException e) {
354+
throw new RuntimeException("Uncaught exception deleting recovery log file" + f.getKey(),
355+
e);
356+
}
357+
}
358+
}
359+
}
322360
deleteThreadPool.shutdown();
323361
try {
324362
while (!deleteThreadPool.awaitTermination(1000, TimeUnit.MILLISECONDS)) { // empty

server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public void testRemoveUnusedLog() throws Exception {
103103
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
104104

105105
EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
106-
EasyMock.expect(conf.getCount(Property.GC_DELETE_THREADS)).andReturn(8).anyTimes();
106+
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
107107
tserverSet.scanServers();
108108
EasyMock.expectLastCall();
109109
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -142,7 +142,7 @@ public void testKeepClosedLog() throws Exception {
142142
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
143143

144144
EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
145-
EasyMock.expect(conf.getCount(Property.GC_DELETE_THREADS)).andReturn(8).anyTimes();
145+
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
146146
tserverSet.scanServers();
147147
EasyMock.expectLastCall();
148148
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -180,7 +180,7 @@ public void deleteUnreferencedLogOnDeadServer() throws Exception {
180180
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
181181

182182
EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
183-
EasyMock.expect(conf.getCount(Property.GC_DELETE_THREADS)).andReturn(8).anyTimes();
183+
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
184184
tserverSet.scanServers();
185185
EasyMock.expectLastCall();
186186
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -231,7 +231,7 @@ public void ignoreReferenceLogOnDeadServer() throws Exception {
231231
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
232232

233233
EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
234-
EasyMock.expect(conf.getCount(Property.GC_DELETE_THREADS)).andReturn(8).anyTimes();
234+
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
235235
tserverSet.scanServers();
236236
EasyMock.expectLastCall();
237237
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -282,7 +282,7 @@ public void replicationDelaysFileCollection() throws Exception {
282282
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
283283

284284
EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
285-
EasyMock.expect(conf.getCount(Property.GC_DELETE_THREADS)).andReturn(8).anyTimes();
285+
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
286286
tserverSet.scanServers();
287287
EasyMock.expectLastCall();
288288
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));

0 commit comments

Comments
 (0)