Skip to content

Commit 0c79e44

Browse files
committed
Merge branch '2.1'
2 parents 15be02a + f675fdd commit 0c79e44

File tree

5 files changed

+124
-33
lines changed

5 files changed

+124
-33
lines changed

core/src/main/java/org/apache/accumulo/core/conf/Property.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -772,8 +772,10 @@ public enum Property {
772772
"1.3.5"),
773773
GC_PORT("gc.port.client", "9998", PropertyType.PORT,
774774
"The listening port for the garbage collector's monitor service.", "1.3.5"),
775+
GC_DELETE_WAL_THREADS("gc.threads.delete.wal", "4", PropertyType.COUNT,
776+
"The number of threads used to delete write-ahead logs and recovery files.", "2.1.4"),
775777
GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT,
776-
"The number of threads used to delete RFiles and write-ahead logs.", "1.3.5"),
778+
"The number of threads used to delete RFiles.", "1.3.5"),
777779
GC_SAFEMODE("gc.safemode", "false", PropertyType.BOOLEAN,
778780
"Provides listing of files to be deleted but does not delete any files.", "2.1.0"),
779781
GC_USE_FULL_COMPACTION("gc.post.metadata.action", "flush", PropertyType.GC_POST_ACTION,

core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public enum ThreadPoolNames {
3838
COORDINATOR_RESERVATION_META_POOL("accumulo.pool.compaction.coordinator.reservation.meta"),
3939
COORDINATOR_RESERVATION_USER_POOL("accumulo.pool.compaction.coordinator.reservation.user"),
4040
GC_DELETE_POOL("accumulo.pool.gc.threads.delete"),
41+
GC_WAL_DELETE_POOL("accumulo.pool.gc.threads.delete.wal"),
4142
GENERAL_SERVER_POOL("accumulo.pool.general.server"),
4243
SERVICE_LOCK_POOL("accumulo.pool.service.lock"),
4344
IMPORT_TABLE_RENAME_POOL("accumulo.pool.import.table.rename"),

core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java

+3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_ROOT_POOL;
2727
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_USER_POOL;
2828
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_DELETE_POOL;
29+
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_WAL_DELETE_POOL;
2930
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_POOL;
3031
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_FATE_POOL;
3132
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_STATUS_POOL;
@@ -370,6 +371,8 @@ public ThreadPoolExecutor createExecutorService(final AccumuloConfiguration conf
370371
builder.enableThreadPoolMetrics();
371372
}
372373
return builder.build();
374+
case GC_DELETE_WAL_THREADS:
375+
return getPoolBuilder(GC_WAL_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
373376
case GC_DELETE_THREADS:
374377
return getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
375378
case COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT:

server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java

+94-22
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,20 @@
2929
import java.util.Collection;
3030
import java.util.HashMap;
3131
import java.util.HashSet;
32+
import java.util.Iterator;
3233
import java.util.List;
3334
import java.util.Map;
3435
import java.util.Map.Entry;
3536
import java.util.Set;
3637
import java.util.UUID;
38+
import java.util.concurrent.ExecutionException;
39+
import java.util.concurrent.ExecutorService;
40+
import java.util.concurrent.Future;
3741
import java.util.concurrent.atomic.AtomicBoolean;
42+
import java.util.concurrent.atomic.AtomicLong;
3843
import java.util.stream.Stream;
3944

45+
import org.apache.accumulo.core.conf.Property;
4046
import org.apache.accumulo.core.gc.thrift.GCStatus;
4147
import org.apache.accumulo.core.gc.thrift.GcCycleStats;
4248
import org.apache.accumulo.core.metadata.TServerInstance;
@@ -48,6 +54,7 @@
4854
import org.apache.accumulo.core.tabletserver.log.LogEntry;
4955
import org.apache.accumulo.core.trace.TraceUtil;
5056
import org.apache.accumulo.core.util.Pair;
57+
import org.apache.accumulo.core.util.threads.ThreadPools;
5158
import org.apache.accumulo.server.ServerContext;
5259
import org.apache.accumulo.server.fs.VolumeManager;
5360
import org.apache.accumulo.server.log.WalStateManager;
@@ -236,37 +243,102 @@ private long removeTabletServerMarkers(Map<UUID,TServerInstance> uidMap,
236243
return result;
237244
}
238245

239-
private long removeFile(Path path) {
240-
try {
241-
if (!fs.moveToTrash(path)) {
242-
fs.deleteRecursively(path);
246+
private Future<?> removeFile(ExecutorService deleteThreadPool, Path path, AtomicLong counter,
247+
String msg) {
248+
return deleteThreadPool.submit(() -> {
249+
try {
250+
log.debug(msg);
251+
if (!fs.moveToTrash(path)) {
252+
fs.deleteRecursively(path);
253+
}
254+
counter.incrementAndGet();
255+
} catch (FileNotFoundException ex) {
256+
// ignored
257+
} catch (IOException ex) {
258+
log.error("Unable to delete {}", path, ex);
243259
}
244-
return 1;
245-
} catch (FileNotFoundException ex) {
246-
// ignored
247-
} catch (IOException ex) {
248-
log.error("Unable to delete wal {}", path, ex);
249-
}
250-
251-
return 0;
260+
});
252261
}
253262

254263
private long removeFiles(Collection<Pair<WalState,Path>> collection, final GCStatus status) {
255-
for (Pair<WalState,Path> stateFile : collection) {
256-
Path path = stateFile.getSecond();
257-
log.debug("Removing {} WAL {}", stateFile.getFirst(), path);
258-
status.currentLog.deleted += removeFile(path);
264+
265+
final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools()
266+
.createExecutorService(context.getConfiguration(), Property.GC_DELETE_WAL_THREADS);
267+
final Map<Path,Future<?>> futures = new HashMap<>(collection.size());
268+
final AtomicLong counter = new AtomicLong();
269+
270+
try {
271+
for (Pair<WalState,Path> stateFile : collection) {
272+
Path path = stateFile.getSecond();
273+
futures.put(path, removeFile(deleteThreadPool, path, counter,
274+
"Removing " + stateFile.getFirst() + " WAL " + path));
275+
}
276+
277+
while (!futures.isEmpty()) {
278+
Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator();
279+
while (iter.hasNext()) {
280+
Entry<Path,Future<?>> f = iter.next();
281+
if (f.getValue().isDone()) {
282+
try {
283+
iter.remove();
284+
f.getValue().get();
285+
} catch (InterruptedException | ExecutionException e) {
286+
throw new RuntimeException("Uncaught exception deleting wal file" + f.getKey(), e);
287+
}
288+
}
289+
}
290+
try {
291+
Thread.sleep(500);
292+
} catch (InterruptedException e) {
293+
Thread.currentThread().interrupt();
294+
throw new RuntimeException("Interrupted while sleeping", e);
295+
}
296+
}
297+
} finally {
298+
deleteThreadPool.shutdownNow();
259299
}
260-
return status.currentLog.deleted;
300+
status.currentLog.deleted += counter.get();
301+
return counter.get();
261302
}
262303

263304
private long removeFiles(Collection<Path> values) {
264-
long count = 0;
265-
for (Path path : values) {
266-
log.debug("Removing recovery log {}", path);
267-
count += removeFile(path);
305+
306+
final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools()
307+
.createExecutorService(context.getConfiguration(), Property.GC_DELETE_WAL_THREADS);
308+
final Map<Path,Future<?>> futures = new HashMap<>(values.size());
309+
final AtomicLong counter = new AtomicLong();
310+
311+
try {
312+
for (Path path : values) {
313+
futures.put(path,
314+
removeFile(deleteThreadPool, path, counter, "Removing recovery log " + path));
315+
}
316+
317+
while (!futures.isEmpty()) {
318+
Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator();
319+
while (iter.hasNext()) {
320+
Entry<Path,Future<?>> f = iter.next();
321+
if (f.getValue().isDone()) {
322+
try {
323+
iter.remove();
324+
f.getValue().get();
325+
} catch (InterruptedException | ExecutionException e) {
326+
throw new RuntimeException(
327+
"Uncaught exception deleting recovery log file" + f.getKey(), e);
328+
}
329+
}
330+
}
331+
try {
332+
Thread.sleep(500);
333+
} catch (InterruptedException e) {
334+
Thread.currentThread().interrupt();
335+
throw new RuntimeException("Interrupted while sleeping", e);
336+
}
337+
}
338+
} finally {
339+
deleteThreadPool.shutdownNow();
268340
}
269-
return count;
341+
return counter.get();
270342
}
271343

272344
private Map<UUID,TServerInstance> removeEntriesInUse(Map<TServerInstance,Set<UUID>> candidates,

server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java

+23-10
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import java.util.stream.Stream;
3232

3333
import org.apache.accumulo.core.client.admin.TabletAvailability;
34+
import org.apache.accumulo.core.conf.AccumuloConfiguration;
35+
import org.apache.accumulo.core.conf.Property;
3436
import org.apache.accumulo.core.dataImpl.KeyExtent;
3537
import org.apache.accumulo.core.gc.thrift.GCStatus;
3638
import org.apache.accumulo.core.gc.thrift.GcCycleStats;
@@ -80,6 +82,7 @@ public class GarbageCollectWriteAheadLogsTest {
8082

8183
@Test
8284
public void testRemoveUnusedLog() throws Exception {
85+
AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
8386
ServerContext context = EasyMock.createMock(ServerContext.class);
8487
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
8588
EasyMock.expect(fs.moveToTrash(EasyMock.anyObject())).andReturn(false).anyTimes();
@@ -88,6 +91,8 @@ public void testRemoveUnusedLog() throws Exception {
8891

8992
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
9093

94+
EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
95+
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
9196
tserverSet.scanServers();
9297
EasyMock.expectLastCall();
9398
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -97,7 +102,7 @@ public void testRemoveUnusedLog() throws Exception {
97102
EasyMock.expect(fs.deleteRecursively(path)).andReturn(true).once();
98103
marker.removeWalMarker(server1, id);
99104
EasyMock.expectLastCall().once();
100-
EasyMock.replay(context, fs, marker, tserverSet);
105+
EasyMock.replay(conf, context, fs, marker, tserverSet);
101106
var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker) {
102107
@Override
103108
protected Map<UUID,Path> getSortedWALogs() {
@@ -113,26 +118,30 @@ Stream<TabletMetadata> createStore(Set<TServerInstance> liveTservers) {
113118
assertThrows(IllegalStateException.class, () -> gc.collect(status),
114119
"Should only be able to call collect once");
115120

116-
EasyMock.verify(context, fs, marker, tserverSet);
121+
EasyMock.verify(conf, context, fs, marker, tserverSet);
117122
}
118123

119124
@Test
120125
public void testKeepClosedLog() throws Exception {
126+
AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
121127
ServerContext context = EasyMock.createMock(ServerContext.class);
122128
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
123129
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
124130
LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
125131

126132
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
127133

134+
EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
135+
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
128136
tserverSet.scanServers();
129137
EasyMock.expectLastCall();
130138
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
131139

132140
EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
133141
EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<>(WalState.CLOSED, path));
134-
EasyMock.replay(context, marker, tserverSet, fs);
142+
EasyMock.replay(conf, context, marker, tserverSet, fs);
135143
var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker) {
144+
136145
@Override
137146
protected Map<UUID,Path> getSortedWALogs() {
138147
return Collections.emptyMap();
@@ -144,12 +153,12 @@ Stream<TabletMetadata> createStore(Set<TServerInstance> liveTservers) {
144153
}
145154
};
146155
gc.collect(status);
147-
148-
EasyMock.verify(context, marker, tserverSet, fs);
156+
EasyMock.verify(conf, context, marker, tserverSet, fs);
149157
}
150158

151159
@Test
152160
public void deleteUnreferencedLogOnDeadServer() throws Exception {
161+
AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
153162
ServerContext context = EasyMock.createMock(ServerContext.class);
154163
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
155164
EasyMock.expect(fs.moveToTrash(EasyMock.anyObject())).andReturn(false).anyTimes();
@@ -158,6 +167,8 @@ public void deleteUnreferencedLogOnDeadServer() throws Exception {
158167

159168
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
160169

170+
EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
171+
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
161172
tserverSet.scanServers();
162173
EasyMock.expectLastCall();
163174
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
@@ -170,7 +181,7 @@ public void deleteUnreferencedLogOnDeadServer() throws Exception {
170181
EasyMock.expectLastCall().once();
171182
marker.forget(server2);
172183
EasyMock.expectLastCall().once();
173-
EasyMock.replay(context, fs, marker, tserverSet);
184+
EasyMock.replay(conf, context, fs, marker, tserverSet);
174185
var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker) {
175186
@Override
176187
protected Map<UUID,Path> getSortedWALogs() {
@@ -183,27 +194,29 @@ Stream<TabletMetadata> createStore(Set<TServerInstance> liveTservers) {
183194
}
184195
};
185196
gc.collect(status);
186-
187-
EasyMock.verify(context, fs, marker, tserverSet);
197+
EasyMock.verify(conf, context, fs, marker, tserverSet);
188198
}
189199

190200
@Test
191201
public void ignoreReferenceLogOnDeadServer() throws Exception {
202+
AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
192203
ServerContext context = EasyMock.createMock(ServerContext.class);
193204
VolumeManager fs = EasyMock.createMock(VolumeManager.class);
194205
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
195206
LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
196207

197208
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
198209

210+
EasyMock.expect(context.getConfiguration()).andReturn(conf).times(2);
211+
EasyMock.expect(conf.getCount(Property.GC_DELETE_WAL_THREADS)).andReturn(8).anyTimes();
199212
tserverSet.scanServers();
200213
EasyMock.expectLastCall();
201214
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
202215

203216
EasyMock.expect(marker.getAllMarkers()).andReturn(markers2).once();
204217
EasyMock.expect(marker.state(server2, id)).andReturn(new Pair<>(WalState.OPEN, path));
205218

206-
EasyMock.replay(context, fs, marker, tserverSet);
219+
EasyMock.replay(conf, context, fs, marker, tserverSet);
207220
GarbageCollectWriteAheadLogs gc =
208221
new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker) {
209222
@Override
@@ -218,7 +231,7 @@ Stream<TabletMetadata> createStore(Set<TServerInstance> liveTservers) {
218231
};
219232
gc.collect(status);
220233

221-
EasyMock.verify(context, fs, marker, tserverSet);
234+
EasyMock.verify(conf, context, fs, marker, tserverSet);
222235
}
223236

224237
}

0 commit comments

Comments
 (0)