Skip to content

Commit 0ad96b1

Browse files
author
Ed Coleman
committed
Merge remote-tracking branch 'upstream/2.1'
2 parents 8b0262d + ec8ae12 commit 0ad96b1

File tree

38 files changed

+433
-247
lines changed

38 files changed

+433
-247
lines changed

core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -260,17 +260,18 @@ public Ample getAmple() {
260260
submitScannerReadAheadTask(Callable<List<KeyValue>> c) {
261261
ensureOpen();
262262
if (scannerReadaheadPool == null) {
263-
scannerReadaheadPool = clientThreadPools.createThreadPool(0, Integer.MAX_VALUE, 3L, SECONDS,
264-
"Accumulo scanner read ahead thread", new SynchronousQueue<>(), true);
263+
scannerReadaheadPool = clientThreadPools.getPoolBuilder("Accumulo scanner read ahead thread")
264+
.numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(3L, SECONDS)
265+
.withQueue(new SynchronousQueue<>()).enableThreadPoolMetrics().build();
265266
}
266267
return scannerReadaheadPool.submit(c);
267268
}
268269

269270
public synchronized void executeCleanupTask(Runnable r) {
270271
ensureOpen();
271272
if (cleanupThreadPool == null) {
272-
cleanupThreadPool = clientThreadPools.createFixedThreadPool(1, 3, SECONDS,
273-
"Conditional Writer Cleanup Thread", true);
273+
cleanupThreadPool = clientThreadPools.getPoolBuilder("Conditional Writer Cleanup Thread")
274+
.numCoreThreads(1).withTimeOut(3L, SECONDS).enableThreadPoolMetrics().build();
274275
}
275276
this.cleanupThreadPool.execute(r);
276277
}

core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ private TabletServerMutations<QCMutation> dequeue(String location) {
372372
this.auths = config.getAuthorizations();
373373
this.ve = new VisibilityEvaluator(config.getAuthorizations());
374374
this.threadPool = context.threadPools().createScheduledExecutorService(
375-
config.getMaxWriteThreads(), this.getClass().getSimpleName(), false);
375+
config.getMaxWriteThreads(), this.getClass().getSimpleName());
376376
this.locator = new SyncingTabletLocator(context, tableId);
377377
this.serverQueues = new HashMap<>();
378378
this.tableId = tableId;

core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -307,8 +307,8 @@ public List<ActiveCompaction> getActiveCompactions()
307307
List<String> tservers = getTabletServers();
308308

309309
int numThreads = Math.max(4, Math.min((tservers.size() + compactors.size()) / 10, 256));
310-
var executorService =
311-
context.threadPools().createFixedThreadPool(numThreads, "getactivecompactions", false);
310+
var executorService = context.threadPools().getPoolBuilder("getactivecompactions")
311+
.numCoreThreads(numThreads).build();
312312
try {
313313
List<Future<List<ActiveCompaction>>> futures = new ArrayList<>();
314314

core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,8 @@ public void addSplits(String tableName, SortedSet<Text> partitionKeys)
493493
CountDownLatch latch = new CountDownLatch(splits.size());
494494
AtomicReference<Exception> exception = new AtomicReference<>(null);
495495

496-
ExecutorService executor = context.threadPools().createFixedThreadPool(16, "addSplits", false);
496+
ExecutorService executor =
497+
context.threadPools().getPoolBuilder("addSplits").numCoreThreads(16).build();
497498
try {
498499
executor.execute(
499500
new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits));

core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ protected TabletServerBatchReader(ClientContext context, Class<?> scopeClass, Ta
7171
this.tableName = tableName;
7272
this.numThreads = numQueryThreads;
7373

74-
queryThreadPool = context.threadPools().createFixedThreadPool(numQueryThreads,
75-
"batch scanner " + batchReaderInstance + "-", false);
74+
queryThreadPool =
75+
context.threadPools().getPoolBuilder("batch scanner " + batchReaderInstance + "-")
76+
.numCoreThreads(numQueryThreads).build();
7677
// Call shutdown on this thread pool in case the caller does not call close().
7778
cleanable = CleanerUtil.shutdownThreadPoolExecutor(queryThreadPool, closed, log);
7879
}

core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -671,11 +671,11 @@ private class MutationWriter {
671671
public MutationWriter(int numSendThreads) {
672672
serversMutations = new HashMap<>();
673673
queued = new HashSet<>();
674-
sendThreadPool = context.threadPools().createFixedThreadPool(numSendThreads,
675-
this.getClass().getName(), false);
674+
sendThreadPool = context.threadPools().getPoolBuilder(this.getClass().getName())
675+
.numCoreThreads(numSendThreads).build();
676676
locators = new HashMap<>();
677-
binningThreadPool = context.threadPools().createFixedThreadPool(1, "BinMutations",
678-
new SynchronousQueue<>(), false);
677+
binningThreadPool = context.threadPools().getPoolBuilder("BinMutations").numCoreThreads(1)
678+
.withQueue(new SynchronousQueue<>()).build();
679679
binningThreadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
680680
}
681681

core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -464,12 +464,12 @@ private SortedMap<KeyExtent,Bulk.Files> computeMappingFromFiles(FileSystem fs, T
464464
if (this.executor != null) {
465465
executor = this.executor;
466466
} else if (numThreads > 0) {
467-
executor = service =
468-
context.threadPools().createFixedThreadPool(numThreads, "BulkImportThread", false);
467+
executor = service = context.threadPools().getPoolBuilder("BulkImportThread")
468+
.numCoreThreads(numThreads).build();
469469
} else {
470470
String threads = context.getConfiguration().get(ClientProperty.BULK_LOAD_THREADS.getKey());
471-
executor = service = context.threadPools().createFixedThreadPool(
472-
ConfigurationTypeHelper.getNumThreads(threads), "BulkImportThread", false);
471+
executor = service = context.threadPools().getPoolBuilder("BulkImportThread")
472+
.numCoreThreads(ConfigurationTypeHelper.getNumThreads(threads)).build();
473473
}
474474

475475
try {

core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,9 @@ private static synchronized ExecutorService getLoadThreadPool(int maxLoadThreads
8080
}
8181

8282
if (maxLoadThreads > 0) {
83-
loadThreadPool = ThreadPools.getServerThreadPools().createThreadPool(0, maxLoadThreads, 60,
84-
SECONDS, "bloom-loader", false);
83+
loadThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder("bloom-loader")
84+
.numCoreThreads(0).numMaxThreads(maxLoadThreads).withTimeOut(60L, SECONDS).build();
8585
}
86-
8786
return loadThreadPool;
8887
}
8988

core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,8 @@ public class LruBlockCache extends SynchronousLoadingBlockCache implements Block
102102
private final EvictionThread evictionThread;
103103

104104
/** Statistics thread schedule pool (for heavy debugging, could remove) */
105-
private final ScheduledExecutorService scheduleThreadPool = ThreadPools.getServerThreadPools()
106-
.createScheduledExecutorService(1, "LRUBlockCacheStats", false);
105+
private final ScheduledExecutorService scheduleThreadPool =
106+
ThreadPools.getServerThreadPools().createScheduledExecutorService(1, "LRUBlockCacheStats");
107107

108108
/** Current size of cache */
109109
private final AtomicLong size;

core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public final class TinyLfuBlockCache implements BlockCache {
6262
private final Policy.Eviction<String,Block> policy;
6363
private final int maxSize;
6464
private final ScheduledExecutorService statsExecutor = ThreadPools.getServerThreadPools()
65-
.createScheduledExecutorService(1, "TinyLfuBlockCacheStatsExecutor", false);
65+
.createScheduledExecutorService(1, "TinyLfuBlockCacheStatsExecutor");
6666

6767
public TinyLfuBlockCache(Configuration conf, CacheType type) {
6868
cache = Caffeine.newBuilder()

core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -218,9 +218,8 @@ private static ExternalCompactionId getRunningCompactionId(HostAndPort compactor
218218
*/
219219
public static List<RunningCompaction> getCompactionsRunningOnCompactors(ClientContext context) {
220220
final List<RunningCompactionFuture> rcFutures = new ArrayList<>();
221-
final ExecutorService executor = ThreadPools.getServerThreadPools().createFixedThreadPool(16,
222-
"CompactorRunningCompactions", false);
223-
221+
final ExecutorService executor = ThreadPools.getServerThreadPools()
222+
.getPoolBuilder("CompactorRunningCompactions").numCoreThreads(16).build();
224223
getCompactorAddrs(context).forEach((q, hp) -> {
225224
hp.forEach(hostAndPort -> {
226225
rcFutures.add(new RunningCompactionFuture(q, hostAndPort,
@@ -246,9 +245,8 @@ public static List<RunningCompaction> getCompactionsRunningOnCompactors(ClientCo
246245

247246
public static Collection<ExternalCompactionId>
248247
getCompactionIdsRunningOnCompactors(ClientContext context) {
249-
final ExecutorService executor = ThreadPools.getServerThreadPools().createFixedThreadPool(16,
250-
"CompactorRunningCompactions", false);
251-
248+
final ExecutorService executor = ThreadPools.getServerThreadPools()
249+
.getPoolBuilder("CompactorRunningCompactions").numCoreThreads(16).build();
252250
List<Future<ExternalCompactionId>> futures = new ArrayList<>();
253251

254252
getCompactorAddrs(context).forEach((q, hp) -> {

0 commit comments

Comments
 (0)