Skip to content

Commit f675fdd

Browse files
authoredMar 13, 2025
Make garbage collecting wal and recovery files faster (apache#5399)
Created a new property, gc.threads.delete.wal, which is used to configure the number of threads to use for deleting wal and recovery files. Closes apache#5397
1 parent 0751f01 commit f675fdd

File tree

5 files changed

+128
-33
lines changed

5 files changed

+128
-33
lines changed
 

‎core/src/main/java/org/apache/accumulo/core/conf/Property.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -940,8 +940,10 @@ public enum Property {
940940
"1.3.5"),
941941
GC_PORT("gc.port.client", "9998", PropertyType.PORT,
942942
"The listening port for the garbage collector's monitor service.", "1.3.5"),
943+
GC_DELETE_WAL_THREADS("gc.threads.delete.wal", "4", PropertyType.COUNT,
944+
"The number of threads used to delete write-ahead logs and recovery files.", "2.1.4"),
943945
GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT,
944-
"The number of threads used to delete RFiles and write-ahead logs.", "1.3.5"),
946+
"The number of threads used to delete RFiles.", "1.3.5"),
945947
@Experimental
946948
GC_REMOVE_IN_USE_CANDIDATES("gc.remove.in.use.candidates", "false", PropertyType.BOOLEAN,
947949
"GC will remove deletion candidates that are in-use from the metadata location. "

‎core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public enum ThreadPoolNames {
3636
COORDINATOR_FINALIZER_BACKGROUND_POOL("accumulo.pool.compaction.finalizer.background.pool"),
3737
COORDINATOR_FINALIZER_NOTIFIER_POOL("accumulo.pool.compaction.coordinator.compaction.finalizer"),
3838
GC_DELETE_POOL("accumulo.pool.gc.threads.delete"),
39+
GC_WAL_DELETE_POOL("accumulo.pool.gc.threads.delete.wal"),
3940
GENERAL_SERVER_POOL("accumulo.pool.general.server"),
4041
GENERAL_SERVER_SIMPLETIMER_POOL("accumulo.pool.general.server.simpletimer"),
4142
IMPORT_TABLE_RENAME_POOL("accumulo.pool.import.table.rename"),

‎core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java

+3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static java.util.concurrent.TimeUnit.SECONDS;
2424
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX;
2525
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_DELETE_POOL;
26+
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_WAL_DELETE_POOL;
2627
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_POOL;
2728
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_SIMPLETIMER_POOL;
2829
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_BULK_IMPORT_POOL;
@@ -375,6 +376,8 @@ public ThreadPoolExecutor createExecutorService(final AccumuloConfiguration conf
375376
builder.enableThreadPoolMetrics();
376377
}
377378
return builder.build();
379+
case GC_DELETE_WAL_THREADS:
380+
return getPoolBuilder(GC_WAL_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
378381
case GC_DELETE_THREADS:
379382
return getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
380383
case REPLICATION_WORKER_THREADS:

‎server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java

+94-22
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,22 @@
2323
import java.util.Collection;
2424
import java.util.HashMap;
2525
import java.util.HashSet;
26+
import java.util.Iterator;
2627
import java.util.List;
2728
import java.util.Map;
2829
import java.util.Map.Entry;
2930
import java.util.Set;
3031
import java.util.UUID;
32+
import java.util.concurrent.ExecutionException;
33+
import java.util.concurrent.ExecutorService;
34+
import java.util.concurrent.Future;
3135
import java.util.concurrent.atomic.AtomicBoolean;
36+
import java.util.concurrent.atomic.AtomicLong;
3237
import java.util.stream.Stream;
3338

3439
import org.apache.accumulo.core.client.Scanner;
3540
import org.apache.accumulo.core.client.TableNotFoundException;
41+
import org.apache.accumulo.core.conf.Property;
3642
import org.apache.accumulo.core.data.Key;
3743
import org.apache.accumulo.core.data.Value;
3844
import org.apache.accumulo.core.gc.thrift.GCStatus;
@@ -46,6 +52,7 @@
4652
import org.apache.accumulo.core.security.Authorizations;
4753
import org.apache.accumulo.core.trace.TraceUtil;
4854
import org.apache.accumulo.core.util.Pair;
55+
import org.apache.accumulo.core.util.threads.ThreadPools;
4956
import org.apache.accumulo.server.ServerContext;
5057
import org.apache.accumulo.server.fs.VolumeManager;
5158
import org.apache.accumulo.server.log.WalStateManager;
@@ -266,37 +273,102 @@ private long removeTabletServerMarkers(Map<UUID,TServerInstance> uidMap,
266273
return result;
267274
}
268275

269-
private long removeFile(Path path) {
270-
try {
271-
if (!useTrash || !fs.moveToTrash(path)) {
272-
fs.deleteRecursively(path);
276+
private Future<?> removeFile(ExecutorService deleteThreadPool, Path path, AtomicLong counter,
277+
String msg) {
278+
return deleteThreadPool.submit(() -> {
279+
try {
280+
log.debug(msg);
281+
if (!useTrash || !fs.moveToTrash(path)) {
282+
fs.deleteRecursively(path);
283+
}
284+
counter.incrementAndGet();
285+
} catch (FileNotFoundException ex) {
286+
// ignored
287+
} catch (IOException ex) {
288+
log.error("Unable to delete {}", path, ex);
273289
}
274-
return 1;
275-
} catch (FileNotFoundException ex) {
276-
// ignored
277-
} catch (IOException ex) {
278-
log.error("Unable to delete wal {}", path, ex);
279-
}
280-
281-
return 0;
290+
});
282291
}
283292

284293
private long removeFiles(Collection<Pair<WalState,Path>> collection, final GCStatus status) {
285-
for (Pair<WalState,Path> stateFile : collection) {
286-
Path path = stateFile.getSecond();
287-
log.debug("Removing {} WAL {}", stateFile.getFirst(), path);
288-
status.currentLog.deleted += removeFile(path);
294+
295+
final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools()
296+
.createExecutorService(context.getConfiguration(), Property.GC_DELETE_WAL_THREADS);
297+
final Map<Path,Future<?>> futures = new HashMap<>(collection.size());
298+
final AtomicLong counter = new AtomicLong();
299+
300+
try {
301+
for (Pair<WalState,Path> stateFile : collection) {
302+
Path path = stateFile.getSecond();
303+
futures.put(path, removeFile(deleteThreadPool, path, counter,
304+
"Removing " + stateFile.getFirst() + " WAL " + path));
305+
}
306+
307+
while (!futures.isEmpty()) {
308+
Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator();
309+
while (iter.hasNext()) {
310+
Entry<Path,Future<?>> f = iter.next();
311+
if (f.getValue().isDone()) {
312+
try {
313+
iter.remove();
314+
f.getValue().get();
315+
} catch (InterruptedException | ExecutionException e) {
316+
throw new RuntimeException("Uncaught exception deleting wal file" + f.getKey(), e);
317+
}
318+
}
319+
}
320+
try {
321+
Thread.sleep(500);
322+
} catch (InterruptedException e) {
323+
Thread.currentThread().interrupt();
324+
throw new RuntimeException("Interrupted while sleeping", e);
325+
}
326+
}
327+
} finally {
328+
deleteThreadPool.shutdownNow();
289329
}
290-
return status.currentLog.deleted;
330+
status.currentLog.deleted += counter.get();
331+
return counter.get();
291332
}
292333

293334
private long removeFiles(Collection<Path> values) {
294-
long count = 0;
295-
for (Path path : values) {
296-
log.debug("Removing recovery log {}", path);
297-
count += removeFile(path);
335+
336+
final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools()
337+
.createExecutorService(context.getConfiguration(), Property.GC_DELETE_WAL_THREADS);
338+
final Map<Path,Future<?>> futures = new HashMap<>(values.size());
339+
final AtomicLong counter = new AtomicLong();
340+
341+
try {
342+
for (Path path : values) {
343+
futures.put(path,
344+
removeFile(deleteThreadPool, path, counter, "Removing recovery log " + path));
345+
}
346+
347+
while (!futures.isEmpty()) {
348+
Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator();
349+
while (iter.hasNext()) {
350+
Entry<Path,Future<?>> f = iter.next();
351+
if (f.getValue().isDone()) {
352+
try {
353+
iter.remove();
354+
f.getValue().get();
355+
} catch (InterruptedException | ExecutionException e) {
356+
throw new RuntimeException(
357+
"Uncaught exception deleting recovery log file" + f.getKey(), e);
358+
}
359+
}
360+
}
361+
try {
362+
Thread.sleep(500);
363+
} catch (InterruptedException e) {
364+
Thread.currentThread().interrupt();
365+
throw new RuntimeException("Interrupted while sleeping", e);
366+
}
367+
}
368+
} finally {
369+
deleteThreadPool.shutdownNow();
298370
}
299-
return count;
371+
return counter.get();
300372
}
301373

302374
private UUID path2uuid(Path path) {

‎server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java

+27-10
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import java.util.stream.Stream;
2929

3030
import org.apache.accumulo.core.client.Scanner;
31+
import org.apache.accumulo.core.conf.AccumuloConfiguration;
32+
import org.apache.accumulo.core.conf.Property;
3133
import org.apache.accumulo.core.data.Key;
3234
import org.apache.accumulo.core.data.Value;
3335
import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -92,13 +94,16 @@ public class GarbageCollectWriteAheadLogsTest {
9294

9395
@Test
9496
public void testRemoveUnusedLog() throws Exception {
97+
AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
9598
ServerContext context = EasyMock.createMock(ServerContext.class);
9699
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
97100
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
98101
LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
99102

100103
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
101104

105+
EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
106+
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
102107
tserverSet.scanServers();
103108
EasyMock.expectLastCall();
104109
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -108,7 +113,7 @@ public void testRemoveUnusedLog() throws Exception {
108113
EasyMock.expect(fs.deleteRecursively(path)).andReturn(true).once();
109114
marker.removeWalMarker(server1, id);
110115
EasyMock.expectLastCall().once();
111-
EasyMock.replay(context, fs, marker, tserverSet);
116+
EasyMock.replay(conf, context, fs, marker, tserverSet);
112117
var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker,
113118
tabletOnServer1List) {
114119
@Override
@@ -123,25 +128,28 @@ protected Map<UUID,Path> getSortedWALogs() {
123128
}
124129
};
125130
gc.collect(status);
126-
EasyMock.verify(context, fs, marker, tserverSet);
131+
EasyMock.verify(conf, context, fs, marker, tserverSet);
127132
}
128133

129134
@Test
130135
public void testKeepClosedLog() throws Exception {
136+
AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
131137
ServerContext context = EasyMock.createMock(ServerContext.class);
132138
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
133139
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
134140
LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
135141

136142
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
137143

144+
EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
145+
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
138146
tserverSet.scanServers();
139147
EasyMock.expectLastCall();
140148
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
141149

142150
EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
143151
EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<>(WalState.CLOSED, path));
144-
EasyMock.replay(context, marker, tserverSet, fs);
152+
EasyMock.replay(conf, context, marker, tserverSet, fs);
145153
var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker,
146154
tabletOnServer1List) {
147155
@Override
@@ -156,11 +164,12 @@ protected Map<UUID,Path> getSortedWALogs() {
156164
}
157165
};
158166
gc.collect(status);
159-
EasyMock.verify(context, marker, tserverSet, fs);
167+
EasyMock.verify(conf, context, marker, tserverSet, fs);
160168
}
161169

162170
@Test
163171
public void deleteUnreferencedLogOnDeadServer() throws Exception {
172+
AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
164173
ServerContext context = EasyMock.createMock(ServerContext.class);
165174
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
166175
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
@@ -170,6 +179,8 @@ public void deleteUnreferencedLogOnDeadServer() throws Exception {
170179

171180
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
172181

182+
EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
183+
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
173184
tserverSet.scanServers();
174185
EasyMock.expectLastCall();
175186
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -195,7 +206,7 @@ public void deleteUnreferencedLogOnDeadServer() throws Exception {
195206
EasyMock.expectLastCall().once();
196207
marker.forget(server2);
197208
EasyMock.expectLastCall().once();
198-
EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner);
209+
EasyMock.replay(conf, context, fs, marker, tserverSet, rscanner, mscanner);
199210
var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker,
200211
tabletOnServer1List) {
201212
@Override
@@ -204,11 +215,12 @@ protected Map<UUID,Path> getSortedWALogs() {
204215
}
205216
};
206217
gc.collect(status);
207-
EasyMock.verify(context, fs, marker, tserverSet, rscanner, mscanner);
218+
EasyMock.verify(conf, context, fs, marker, tserverSet, rscanner, mscanner);
208219
}
209220

210221
@Test
211222
public void ignoreReferenceLogOnDeadServer() throws Exception {
223+
AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
212224
ServerContext context = EasyMock.createMock(ServerContext.class);
213225
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
214226
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
@@ -218,6 +230,8 @@ public void ignoreReferenceLogOnDeadServer() throws Exception {
218230

219231
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
220232

233+
EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
234+
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
221235
tserverSet.scanServers();
222236
EasyMock.expectLastCall();
223237
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -238,7 +252,7 @@ public void ignoreReferenceLogOnDeadServer() throws Exception {
238252
mscanner.setRange(ReplicationSection.getRange());
239253
EasyMock.expectLastCall().once();
240254
EasyMock.expect(mscanner.iterator()).andReturn(emptyKV);
241-
EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner);
255+
EasyMock.replay(conf, context, fs, marker, tserverSet, rscanner, mscanner);
242256
var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker,
243257
tabletOnServer2List) {
244258
@Override
@@ -247,11 +261,12 @@ protected Map<UUID,Path> getSortedWALogs() {
247261
}
248262
};
249263
gc.collect(status);
250-
EasyMock.verify(context, fs, marker, tserverSet, rscanner, mscanner);
264+
EasyMock.verify(conf, context, fs, marker, tserverSet, rscanner, mscanner);
251265
}
252266

253267
@Test
254268
public void replicationDelaysFileCollection() throws Exception {
269+
AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
255270
ServerContext context = EasyMock.createMock(ServerContext.class);
256271
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
257272
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
@@ -266,6 +281,8 @@ public void replicationDelaysFileCollection() throws Exception {
266281

267282
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
268283

284+
EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
285+
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
269286
tserverSet.scanServers();
270287
EasyMock.expectLastCall();
271288
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -286,7 +303,7 @@ public void replicationDelaysFileCollection() throws Exception {
286303
mscanner.setRange(ReplicationSection.getRange());
287304
EasyMock.expectLastCall().once();
288305
EasyMock.expect(mscanner.iterator()).andReturn(replicationWork.entrySet().iterator());
289-
EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner);
306+
EasyMock.replay(conf, context, fs, marker, tserverSet, rscanner, mscanner);
290307
var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker,
291308
tabletOnServer1List) {
292309
@Override
@@ -295,6 +312,6 @@ protected Map<UUID,Path> getSortedWALogs() {
295312
}
296313
};
297314
gc.collect(status);
298-
EasyMock.verify(context, fs, marker, tserverSet, rscanner, mscanner);
315+
EasyMock.verify(conf, context, fs, marker, tserverSet, rscanner, mscanner);
299316
}
300317
}

0 commit comments

Comments
 (0)
Please sign in to comment.