Skip to content

Commit 4ce3fe1

Browse files
committed
Make garbage collecting wal and recovery files faster
Uses the same property and technique used for deleting RFiles to delete wal and recovery files. Closes apache#5397
1 parent 9037d61 commit 4ce3fe1

File tree

2 files changed

+76
-29
lines changed

2 files changed

+76
-29
lines changed

server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java

+49-19
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,15 @@
2828
import java.util.Map.Entry;
2929
import java.util.Set;
3030
import java.util.UUID;
31+
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.TimeUnit;
3133
import java.util.concurrent.atomic.AtomicBoolean;
34+
import java.util.concurrent.atomic.AtomicLong;
3235
import java.util.stream.Stream;
3336

3437
import org.apache.accumulo.core.client.Scanner;
3538
import org.apache.accumulo.core.client.TableNotFoundException;
39+
import org.apache.accumulo.core.conf.Property;
3640
import org.apache.accumulo.core.data.Key;
3741
import org.apache.accumulo.core.data.Value;
3842
import org.apache.accumulo.core.gc.thrift.GCStatus;
@@ -46,6 +50,7 @@
4650
import org.apache.accumulo.core.security.Authorizations;
4751
import org.apache.accumulo.core.trace.TraceUtil;
4852
import org.apache.accumulo.core.util.Pair;
53+
import org.apache.accumulo.core.util.threads.ThreadPools;
4954
import org.apache.accumulo.server.ServerContext;
5055
import org.apache.accumulo.server.fs.VolumeManager;
5156
import org.apache.accumulo.server.log.WalStateManager;
@@ -266,37 +271,62 @@ private long removeTabletServerMarkers(Map<UUID,TServerInstance> uidMap,
266271
return result;
267272
}
268273

269-
private long removeFile(Path path) {
270-
try {
271-
if (!useTrash || !fs.moveToTrash(path)) {
272-
fs.deleteRecursively(path);
274+
private void removeFile(ExecutorService deleteThreadPool, Path path, AtomicLong counter,
275+
String msg) {
276+
deleteThreadPool.execute(() -> {
277+
try {
278+
log.debug(msg);
279+
if (!useTrash || !fs.moveToTrash(path)) {
280+
fs.deleteRecursively(path);
281+
}
282+
counter.incrementAndGet();
283+
} catch (FileNotFoundException ex) {
284+
// ignored
285+
} catch (IOException ex) {
286+
log.error("Unable to delete wal {}", path, ex);
273287
}
274-
return 1;
275-
} catch (FileNotFoundException ex) {
276-
// ignored
277-
} catch (IOException ex) {
278-
log.error("Unable to delete wal {}", path, ex);
279-
}
280-
281-
return 0;
288+
});
282289
}
283290

284291
private long removeFiles(Collection<Pair<WalState,Path>> collection, final GCStatus status) {
292+
293+
final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools()
294+
.createExecutorService(context.getConfiguration(), Property.GC_DELETE_THREADS);
295+
final AtomicLong counter = new AtomicLong();
296+
285297
for (Pair<WalState,Path> stateFile : collection) {
286298
Path path = stateFile.getSecond();
287-
log.debug("Removing {} WAL {}", stateFile.getFirst(), path);
288-
status.currentLog.deleted += removeFile(path);
299+
removeFile(deleteThreadPool, path, counter,
300+
"Removing " + stateFile.getFirst() + " WAL " + path);
289301
}
290-
return status.currentLog.deleted;
302+
303+
deleteThreadPool.shutdown();
304+
try {
305+
while (!deleteThreadPool.awaitTermination(1000, TimeUnit.MILLISECONDS)) { // empty
306+
}
307+
} catch (InterruptedException e1) {
308+
log.error("{}", e1.getMessage(), e1);
309+
}
310+
return counter.get();
291311
}
292312

293313
private long removeFiles(Collection<Path> values) {
294-
long count = 0;
314+
final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools()
315+
.createExecutorService(context.getConfiguration(), Property.GC_DELETE_THREADS);
316+
final AtomicLong counter = new AtomicLong();
317+
295318
for (Path path : values) {
296-
log.debug("Removing recovery log {}", path);
297-
count += removeFile(path);
319+
removeFile(deleteThreadPool, path, counter, "Removing recovery log " + path);
320+
}
321+
322+
deleteThreadPool.shutdown();
323+
try {
324+
while (!deleteThreadPool.awaitTermination(1000, TimeUnit.MILLISECONDS)) { // empty
325+
}
326+
} catch (InterruptedException e1) {
327+
log.error("{}", e1.getMessage(), e1);
298328
}
299-
return count;
329+
return counter.get();
300330
}
301331

302332
private UUID path2uuid(Path path) {

server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java

+27-10
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import java.util.stream.Stream;
2929

3030
import org.apache.accumulo.core.client.Scanner;
31+
import org.apache.accumulo.core.conf.AccumuloConfiguration;
32+
import org.apache.accumulo.core.conf.Property;
3133
import org.apache.accumulo.core.data.Key;
3234
import org.apache.accumulo.core.data.Value;
3335
import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -92,13 +94,16 @@ public class GarbageCollectWriteAheadLogsTest {
9294

9395
@Test
9496
public void testRemoveUnusedLog() throws Exception {
97+
AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
9598
ServerContext context = EasyMock.createMock(ServerContext.class);
9699
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
97100
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
98101
LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
99102

100103
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
101104

105+
EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
106+
EasyMock.expect(conf.getCount(Property.GC_DELETE_THREADS)).andReturn(8).anyTimes();
102107
tserverSet.scanServers();
103108
EasyMock.expectLastCall();
104109
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -108,7 +113,7 @@ public void testRemoveUnusedLog() throws Exception {
108113
EasyMock.expect(fs.deleteRecursively(path)).andReturn(true).once();
109114
marker.removeWalMarker(server1, id);
110115
EasyMock.expectLastCall().once();
111-
EasyMock.replay(context, fs, marker, tserverSet);
116+
EasyMock.replay(conf, context, fs, marker, tserverSet);
112117
var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker,
113118
tabletOnServer1List) {
114119
@Override
@@ -123,25 +128,28 @@ protected Map<UUID,Path> getSortedWALogs() {
123128
}
124129
};
125130
gc.collect(status);
126-
EasyMock.verify(context, fs, marker, tserverSet);
131+
EasyMock.verify(conf, context, fs, marker, tserverSet);
127132
}
128133

129134
@Test
130135
public void testKeepClosedLog() throws Exception {
136+
AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
131137
ServerContext context = EasyMock.createMock(ServerContext.class);
132138
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
133139
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
134140
LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
135141

136142
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
137143

144+
EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
145+
EasyMock.expect(conf.getCount(Property.GC_DELETE_THREADS)).andReturn(8).anyTimes();
138146
tserverSet.scanServers();
139147
EasyMock.expectLastCall();
140148
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
141149

142150
EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
143151
EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<>(WalState.CLOSED, path));
144-
EasyMock.replay(context, marker, tserverSet, fs);
152+
EasyMock.replay(conf, context, marker, tserverSet, fs);
145153
var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker,
146154
tabletOnServer1List) {
147155
@Override
@@ -156,11 +164,12 @@ protected Map<UUID,Path> getSortedWALogs() {
156164
}
157165
};
158166
gc.collect(status);
159-
EasyMock.verify(context, marker, tserverSet, fs);
167+
EasyMock.verify(conf, context, marker, tserverSet, fs);
160168
}
161169

162170
@Test
163171
public void deleteUnreferencedLogOnDeadServer() throws Exception {
172+
AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
164173
ServerContext context = EasyMock.createMock(ServerContext.class);
165174
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
166175
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
@@ -170,6 +179,8 @@ public void deleteUnreferencedLogOnDeadServer() throws Exception {
170179

171180
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
172181

182+
EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
183+
EasyMock.expect(conf.getCount(Property.GC_DELETE_THREADS)).andReturn(8).anyTimes();
173184
tserverSet.scanServers();
174185
EasyMock.expectLastCall();
175186
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -195,7 +206,7 @@ public void deleteUnreferencedLogOnDeadServer() throws Exception {
195206
EasyMock.expectLastCall().once();
196207
marker.forget(server2);
197208
EasyMock.expectLastCall().once();
198-
EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner);
209+
EasyMock.replay(conf, context, fs, marker, tserverSet, rscanner, mscanner);
199210
var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker,
200211
tabletOnServer1List) {
201212
@Override
@@ -204,11 +215,12 @@ protected Map<UUID,Path> getSortedWALogs() {
204215
}
205216
};
206217
gc.collect(status);
207-
EasyMock.verify(context, fs, marker, tserverSet, rscanner, mscanner);
218+
EasyMock.verify(conf, context, fs, marker, tserverSet, rscanner, mscanner);
208219
}
209220

210221
@Test
211222
public void ignoreReferenceLogOnDeadServer() throws Exception {
223+
AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
212224
ServerContext context = EasyMock.createMock(ServerContext.class);
213225
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
214226
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
@@ -218,6 +230,8 @@ public void ignoreReferenceLogOnDeadServer() throws Exception {
218230

219231
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
220232

233+
EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
234+
EasyMock.expect(conf.getCount(Property.GC_DELETE_THREADS)).andReturn(8).anyTimes();
221235
tserverSet.scanServers();
222236
EasyMock.expectLastCall();
223237
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -238,7 +252,7 @@ public void ignoreReferenceLogOnDeadServer() throws Exception {
238252
mscanner.setRange(ReplicationSection.getRange());
239253
EasyMock.expectLastCall().once();
240254
EasyMock.expect(mscanner.iterator()).andReturn(emptyKV);
241-
EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner);
255+
EasyMock.replay(conf, context, fs, marker, tserverSet, rscanner, mscanner);
242256
var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker,
243257
tabletOnServer2List) {
244258
@Override
@@ -247,11 +261,12 @@ protected Map<UUID,Path> getSortedWALogs() {
247261
}
248262
};
249263
gc.collect(status);
250-
EasyMock.verify(context, fs, marker, tserverSet, rscanner, mscanner);
264+
EasyMock.verify(conf, context, fs, marker, tserverSet, rscanner, mscanner);
251265
}
252266

253267
@Test
254268
public void replicationDelaysFileCollection() throws Exception {
269+
AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
255270
ServerContext context = EasyMock.createMock(ServerContext.class);
256271
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
257272
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
@@ -266,6 +281,8 @@ public void replicationDelaysFileCollection() throws Exception {
266281

267282
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
268283

284+
EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
285+
EasyMock.expect(conf.getCount(Property.GC_DELETE_THREADS)).andReturn(8).anyTimes();
269286
tserverSet.scanServers();
270287
EasyMock.expectLastCall();
271288
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -286,7 +303,7 @@ public void replicationDelaysFileCollection() throws Exception {
286303
mscanner.setRange(ReplicationSection.getRange());
287304
EasyMock.expectLastCall().once();
288305
EasyMock.expect(mscanner.iterator()).andReturn(replicationWork.entrySet().iterator());
289-
EasyMock.replay(context, fs, marker, tserverSet, rscanner, mscanner);
306+
EasyMock.replay(conf, context, fs, marker, tserverSet, rscanner, mscanner);
290307
var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, false, marker,
291308
tabletOnServer1List) {
292309
@Override
@@ -295,6 +312,6 @@ protected Map<UUID,Path> getSortedWALogs() {
295312
}
296313
};
297314
gc.collect(status);
298-
EasyMock.verify(context, fs, marker, tserverSet, rscanner, mscanner);
315+
EasyMock.verify(conf, context, fs, marker, tserverSet, rscanner, mscanner);
299316
}
300317
}

0 commit comments

Comments
 (0)