Skip to content

Commit ec8ae12

Browse files
authored
Use fluent-style builder for pool creation, replacing overloaded methods (apache#4384)
* Use fluent-style builder for pool creation, replacing overloaded methods * Replaces overloaded createThreadPool methods with a fluent-style builder (ThreadPoolExecutorBuilderTest). * Adds ThreadPoolExecutorBuilderTest test, * Adds `createExecutorService` method that does not have an option to enable metrics and replaces all occurrences in the code that was calling the alternate method with emitThreadPoolMetrics=false to use it. The `createScheduledExecutorService` will be refactored in a future PR when service initialization is reworked. See PR apache#4342 for an example. This change isolates where metrics can be enabled and makes finding them easier. It also will make reviewing future changes easier because those changes will be isolated to places that currently enable thread pool metrics.
1 parent f3e75f0 commit ec8ae12

File tree

42 files changed

+465
-267
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+465
-267
lines changed

core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -260,17 +260,18 @@ public Ample getAmple() {
260260
submitScannerReadAheadTask(Callable<List<KeyValue>> c) {
261261
ensureOpen();
262262
if (scannerReadaheadPool == null) {
263-
scannerReadaheadPool = clientThreadPools.createThreadPool(0, Integer.MAX_VALUE, 3L, SECONDS,
264-
"Accumulo scanner read ahead thread", new SynchronousQueue<>(), true);
263+
scannerReadaheadPool = clientThreadPools.getPoolBuilder("Accumulo scanner read ahead thread")
264+
.numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(3L, SECONDS)
265+
.withQueue(new SynchronousQueue<>()).enableThreadPoolMetrics().build();
265266
}
266267
return scannerReadaheadPool.submit(c);
267268
}
268269

269270
public synchronized void executeCleanupTask(Runnable r) {
270271
ensureOpen();
271272
if (cleanupThreadPool == null) {
272-
cleanupThreadPool = clientThreadPools.createFixedThreadPool(1, 3, SECONDS,
273-
"Conditional Writer Cleanup Thread", true);
273+
cleanupThreadPool = clientThreadPools.getPoolBuilder("Conditional Writer Cleanup Thread")
274+
.numCoreThreads(1).withTimeOut(3L, SECONDS).enableThreadPoolMetrics().build();
274275
}
275276
this.cleanupThreadPool.execute(r);
276277
}

core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ private TabletServerMutations<QCMutation> dequeue(String location) {
371371
this.auths = config.getAuthorizations();
372372
this.ve = new VisibilityEvaluator(config.getAuthorizations());
373373
this.threadPool = context.threadPools().createScheduledExecutorService(
374-
config.getMaxWriteThreads(), this.getClass().getSimpleName(), false);
374+
config.getMaxWriteThreads(), this.getClass().getSimpleName());
375375
this.locator = new SyncingTabletLocator(context, tableId);
376376
this.serverQueues = new HashMap<>();
377377
this.tableId = tableId;

core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -301,8 +301,8 @@ public List<ActiveCompaction> getActiveCompactions()
301301
List<String> tservers = getTabletServers();
302302

303303
int numThreads = Math.max(4, Math.min((tservers.size() + compactors.size()) / 10, 256));
304-
var executorService =
305-
context.threadPools().createFixedThreadPool(numThreads, "getactivecompactions", false);
304+
var executorService = context.threadPools().getPoolBuilder("getactivecompactions")
305+
.numCoreThreads(numThreads).build();
306306
try {
307307
List<Future<List<ActiveCompaction>>> futures = new ArrayList<>();
308308

core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,8 @@ public void addSplits(String tableName, SortedSet<Text> partitionKeys)
495495
CountDownLatch latch = new CountDownLatch(splits.size());
496496
AtomicReference<Exception> exception = new AtomicReference<>(null);
497497

498-
ExecutorService executor = context.threadPools().createFixedThreadPool(16, "addSplits", false);
498+
ExecutorService executor =
499+
context.threadPools().getPoolBuilder("addSplits").numCoreThreads(16).build();
499500
try {
500501
executor.execute(
501502
new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits));

core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ protected TabletServerBatchReader(ClientContext context, Class<?> scopeClass, Ta
7171
this.tableName = tableName;
7272
this.numThreads = numQueryThreads;
7373

74-
queryThreadPool = context.threadPools().createFixedThreadPool(numQueryThreads,
75-
"batch scanner " + batchReaderInstance + "-", false);
74+
queryThreadPool =
75+
context.threadPools().getPoolBuilder("batch scanner " + batchReaderInstance + "-")
76+
.numCoreThreads(numQueryThreads).build();
7677
// Call shutdown on this thread pool in case the caller does not call close().
7778
cleanable = CleanerUtil.shutdownThreadPoolExecutor(queryThreadPool, closed, log);
7879
}

core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -672,11 +672,11 @@ private class MutationWriter {
672672
public MutationWriter(int numSendThreads) {
673673
serversMutations = new HashMap<>();
674674
queued = new HashSet<>();
675-
sendThreadPool = context.threadPools().createFixedThreadPool(numSendThreads,
676-
this.getClass().getName(), false);
675+
sendThreadPool = context.threadPools().getPoolBuilder(this.getClass().getName())
676+
.numCoreThreads(numSendThreads).build();
677677
locators = new HashMap<>();
678-
binningThreadPool = context.threadPools().createFixedThreadPool(1, "BinMutations",
679-
new SynchronousQueue<>(), false);
678+
binningThreadPool = context.threadPools().getPoolBuilder("BinMutations").numCoreThreads(1)
679+
.withQueue(new SynchronousQueue<>()).build();
680680
binningThreadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
681681
}
682682

core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -482,12 +482,12 @@ private SortedMap<KeyExtent,Bulk.Files> computeMappingFromFiles(FileSystem fs, T
482482
if (this.executor != null) {
483483
executor = this.executor;
484484
} else if (numThreads > 0) {
485-
executor = service =
486-
context.threadPools().createFixedThreadPool(numThreads, "BulkImportThread", false);
485+
executor = service = context.threadPools().getPoolBuilder("BulkImportThread")
486+
.numCoreThreads(numThreads).build();
487487
} else {
488488
String threads = context.getConfiguration().get(ClientProperty.BULK_LOAD_THREADS.getKey());
489-
executor = service = context.threadPools().createFixedThreadPool(
490-
ConfigurationTypeHelper.getNumThreads(threads), "BulkImportThread", false);
489+
executor = service = context.threadPools().getPoolBuilder("BulkImportThread")
490+
.numCoreThreads(ConfigurationTypeHelper.getNumThreads(threads)).build();
491491
}
492492

493493
try {

core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,9 @@ private static synchronized ExecutorService getLoadThreadPool(int maxLoadThreads
7979
}
8080

8181
if (maxLoadThreads > 0) {
82-
loadThreadPool = ThreadPools.getServerThreadPools().createThreadPool(0, maxLoadThreads, 60,
83-
SECONDS, "bloom-loader", false);
82+
loadThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder("bloom-loader")
83+
.numCoreThreads(0).numMaxThreads(maxLoadThreads).withTimeOut(60L, SECONDS).build();
8484
}
85-
8685
return loadThreadPool;
8786
}
8887

core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,8 @@ public class LruBlockCache extends SynchronousLoadingBlockCache implements Block
102102
private final EvictionThread evictionThread;
103103

104104
/** Statistics thread schedule pool (for heavy debugging, could remove) */
105-
private final ScheduledExecutorService scheduleThreadPool = ThreadPools.getServerThreadPools()
106-
.createScheduledExecutorService(1, "LRUBlockCacheStats", false);
105+
private final ScheduledExecutorService scheduleThreadPool =
106+
ThreadPools.getServerThreadPools().createScheduledExecutorService(1, "LRUBlockCacheStats");
107107

108108
/** Current size of cache */
109109
private final AtomicLong size;

core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public final class TinyLfuBlockCache implements BlockCache {
6262
private final Policy.Eviction<String,Block> policy;
6363
private final int maxSize;
6464
private final ScheduledExecutorService statsExecutor = ThreadPools.getServerThreadPools()
65-
.createScheduledExecutorService(1, "TinyLfuBlockCacheStatsExecutor", false);
65+
.createScheduledExecutorService(1, "TinyLfuBlockCacheStatsExecutor");
6666

6767
public TinyLfuBlockCache(Configuration conf, CacheType type) {
6868
cache = Caffeine.newBuilder()

core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,8 @@ private static ExternalCompactionId getRunningCompactionId(HostAndPort compactor
222222
*/
223223
public static List<RunningCompaction> getCompactionsRunningOnCompactors(ClientContext context) {
224224
final List<RunningCompactionFuture> rcFutures = new ArrayList<>();
225-
final ExecutorService executor = ThreadPools.getServerThreadPools().createFixedThreadPool(16,
226-
"CompactorRunningCompactions", false);
227-
225+
final ExecutorService executor = ThreadPools.getServerThreadPools()
226+
.getPoolBuilder("CompactorRunningCompactions").numCoreThreads(16).build();
228227
getCompactorAddrs(context).forEach((q, hp) -> {
229228
hp.forEach(hostAndPort -> {
230229
rcFutures.add(new RunningCompactionFuture(q, hostAndPort,
@@ -250,9 +249,8 @@ public static List<RunningCompaction> getCompactionsRunningOnCompactors(ClientCo
250249

251250
public static Collection<ExternalCompactionId>
252251
getCompactionIdsRunningOnCompactors(ClientContext context) {
253-
final ExecutorService executor = ThreadPools.getServerThreadPools().createFixedThreadPool(16,
254-
"CompactorRunningCompactions", false);
255-
252+
final ExecutorService executor = ThreadPools.getServerThreadPools()
253+
.getPoolBuilder("CompactorRunningCompactions").numCoreThreads(16).build();
256254
List<Future<ExternalCompactionId>> futures = new ArrayList<>();
257255

258256
getCompactorAddrs(context).forEach((q, hp) -> {

0 commit comments

Comments
 (0)