Skip to content

Commit 632dbc2

Browse files
authored
limits memory and cpu used by compaction reservation request (apache#5185)
Added threads pools to execute compaction reservation request in order to limit memory and cpu used by executing reservations. Request queued up for the pool could still potentially use a lot of memory. Did two things to control memory of things in the queue. First only allow a compactor process to have one reservation processing at time. Second made the data related to a resevation request a soft reference which should allow it be garbage collected if memory gets low while it sitting in the queue. Once the request starts executing it obtains a strong refrence to the data so it can no longer be garbage collected. fixes apache#5177
1 parent 47b75d3 commit 632dbc2

File tree

4 files changed

+190
-60
lines changed

4 files changed

+190
-60
lines changed

core/src/main/java/org/apache/accumulo/core/conf/Property.java

+12
Original file line numberDiff line numberDiff line change
@@ -1189,6 +1189,18 @@ public enum Property {
11891189
COMPACTION_COORDINATOR_PREFIX("compaction.coordinator.", null, PropertyType.PREFIX,
11901190
"Properties in this category affect the behavior of the accumulo compaction coordinator server.",
11911191
"2.1.0"),
1192+
COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT("compaction.coordinator.reservation.threads.root",
1193+
"1", PropertyType.COUNT,
1194+
"The number of threads used to reserve files for compaction in a tablet for the root tablet.",
1195+
"4.0.0"),
1196+
COMPACTION_COORDINATOR_RESERVATION_THREADS_META("compaction.coordinator.reservation.threads.meta",
1197+
"1", PropertyType.COUNT,
1198+
"The number of threads used to reserve files for compaction in a tablet for accumulo.metadata tablets.",
1199+
"4.0.0"),
1200+
COMPACTION_COORDINATOR_RESERVATION_THREADS_USER("compaction.coordinator.reservation.threads.user",
1201+
"64", PropertyType.COUNT,
1202+
"The number of threads used to reserve files for compaction in a tablet for user tables.",
1203+
"4.0.0"),
11921204
@Experimental
11931205
COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL(
11941206
"compaction.coordinator.compactor.dead.check.interval", "5m", PropertyType.TIMEDURATION,

core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java

+3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ public enum ThreadPoolNames {
3434
CONDITIONAL_WRITER_CLEANUP_POOL("accumulo.pool.client.context.conditional.writer.cleanup"),
3535
COORDINATOR_FINALIZER_BACKGROUND_POOL("accumulo.pool.compaction.finalizer.background.pool"),
3636
COORDINATOR_FINALIZER_NOTIFIER_POOL("accumulo.pool.compaction.coordinator.compaction.finalizer"),
37+
COORDINATOR_RESERVATION_ROOT_POOL("accumulo.pool.compaction.coordinator.reservation.root"),
38+
COORDINATOR_RESERVATION_META_POOL("accumulo.pool.compaction.coordinator.reservation.meta"),
39+
COORDINATOR_RESERVATION_USER_POOL("accumulo.pool.compaction.coordinator.reservation.user"),
3740
GC_DELETE_POOL("accumulo.pool.gc.threads.delete"),
3841
GENERAL_SERVER_POOL("accumulo.pool.general.server"),
3942
SERVICE_LOCK_POOL("accumulo.pool.service.lock"),

core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java

+24
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import static java.util.concurrent.TimeUnit.MINUTES;
2323
import static java.util.concurrent.TimeUnit.SECONDS;
2424
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX;
25+
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_META_POOL;
26+
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_ROOT_POOL;
27+
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_USER_POOL;
2528
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_DELETE_POOL;
2629
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_POOL;
2730
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_FATE_POOL;
@@ -369,6 +372,27 @@ public ThreadPoolExecutor createExecutorService(final AccumuloConfiguration conf
369372
return builder.build();
370373
case GC_DELETE_THREADS:
371374
return getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
375+
case COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT:
376+
builder = getPoolBuilder(COORDINATOR_RESERVATION_ROOT_POOL).numCoreThreads(conf.getCount(p))
377+
.withTimeOut(60L, MILLISECONDS);
378+
if (emitThreadPoolMetrics) {
379+
builder.enableThreadPoolMetrics();
380+
}
381+
return builder.build();
382+
case COMPACTION_COORDINATOR_RESERVATION_THREADS_META:
383+
builder = getPoolBuilder(COORDINATOR_RESERVATION_META_POOL).numCoreThreads(conf.getCount(p))
384+
.withTimeOut(60L, MILLISECONDS);
385+
if (emitThreadPoolMetrics) {
386+
builder.enableThreadPoolMetrics();
387+
}
388+
return builder.build();
389+
case COMPACTION_COORDINATOR_RESERVATION_THREADS_USER:
390+
builder = getPoolBuilder(COORDINATOR_RESERVATION_USER_POOL).numCoreThreads(conf.getCount(p))
391+
.withTimeOut(60L, MILLISECONDS);
392+
if (emitThreadPoolMetrics) {
393+
builder.enableThreadPoolMetrics();
394+
}
395+
return builder.build();
372396
default:
373397
throw new IllegalArgumentException("Unhandled thread pool property: " + p);
374398
}

server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java

+151-60
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.io.FileNotFoundException;
3232
import java.io.IOException;
3333
import java.io.UncheckedIOException;
34+
import java.lang.ref.SoftReference;
3435
import java.time.Duration;
3536
import java.util.ArrayList;
3637
import java.util.Collection;
@@ -43,13 +44,17 @@
4344
import java.util.Objects;
4445
import java.util.Optional;
4546
import java.util.Set;
47+
import java.util.concurrent.CompletableFuture;
4648
import java.util.concurrent.ConcurrentHashMap;
4749
import java.util.concurrent.CountDownLatch;
50+
import java.util.concurrent.ExecutorService;
4851
import java.util.concurrent.ScheduledFuture;
4952
import java.util.concurrent.ScheduledThreadPoolExecutor;
53+
import java.util.concurrent.ThreadPoolExecutor;
5054
import java.util.concurrent.TimeUnit;
5155
import java.util.concurrent.atomic.AtomicReference;
5256
import java.util.function.Consumer;
57+
import java.util.function.Supplier;
5358
import java.util.stream.Collectors;
5459

5560
import org.apache.accumulo.core.Constants;
@@ -193,6 +198,9 @@ public class CompactionCoordinator
193198

194199
private volatile long coordinatorStartTime;
195200

201+
private final Map<DataLevel,ThreadPoolExecutor> reservationPools;
202+
private final Set<String> activeCompactorReservationRequest = ConcurrentHashMap.newKeySet();
203+
196204
public CompactionCoordinator(ServerContext ctx, SecurityOperation security,
197205
AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances, Manager manager) {
198206
this.ctx = ctx;
@@ -232,6 +240,18 @@ public CompactionCoordinator(ServerContext ctx, SecurityOperation security,
232240
deadCompactionDetector =
233241
new DeadCompactionDetector(this.ctx, this, schedExecutor, fateInstances);
234242

243+
var rootReservationPool = ThreadPools.getServerThreadPools().createExecutorService(
244+
ctx.getConfiguration(), Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT, true);
245+
246+
var metaReservationPool = ThreadPools.getServerThreadPools().createExecutorService(
247+
ctx.getConfiguration(), Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_META, true);
248+
249+
var userReservationPool = ThreadPools.getServerThreadPools().createExecutorService(
250+
ctx.getConfiguration(), Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_USER, true);
251+
252+
reservationPools = Map.of(Ample.DataLevel.ROOT, rootReservationPool, Ample.DataLevel.METADATA,
253+
metaReservationPool, Ample.DataLevel.USER, userReservationPool);
254+
235255
compactorCounts = ctx.getCaches().createNewBuilder(CacheName.COMPACTOR_COUNTS, false)
236256
.expireAfterWrite(30, TimeUnit.SECONDS).build(this::countCompactors);
237257
// At this point the manager does not have its lock so no actions should be taken yet
@@ -250,6 +270,9 @@ public void start() {
250270

251271
public void shutdown() {
252272
shutdown.countDown();
273+
274+
reservationPools.values().forEach(ExecutorService::shutdownNow);
275+
253276
var localThread = serviceThread;
254277
if (localThread != null) {
255278
try {
@@ -528,82 +551,142 @@ protected CompactionMetadata createExternalCompactionMetadata(CompactionJob job,
528551

529552
}
530553

531-
protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob metaJob,
532-
String compactorAddress, ExternalCompactionId externalCompactionId) {
554+
private class ReserveCompactionTask implements Supplier<CompactionMetadata> {
555+
556+
// Use a soft reference for this in case free memory gets low while this is sitting in the queue
557+
// waiting to process. This object can contain the tablets list of files and if there are lots
558+
// of tablet with lots of files then that could start to cause memory problems. This hack could
559+
// be removed if #5188 were implemented.
560+
private final SoftReference<CompactionJobQueues.MetaJob> metaJobRef;
561+
private final String compactorAddress;
562+
private final ExternalCompactionId externalCompactionId;
563+
564+
private ReserveCompactionTask(CompactionJobQueues.MetaJob metaJob, String compactorAddress,
565+
ExternalCompactionId externalCompactionId) {
566+
Preconditions.checkArgument(metaJob.getJob().getKind() == CompactionKind.SYSTEM
567+
|| metaJob.getJob().getKind() == CompactionKind.USER);
568+
this.metaJobRef = new SoftReference<>(Objects.requireNonNull(metaJob));
569+
this.compactorAddress = Objects.requireNonNull(compactorAddress);
570+
this.externalCompactionId = Objects.requireNonNull(externalCompactionId);
571+
Preconditions.checkState(activeCompactorReservationRequest.add(compactorAddress),
572+
"compactor %s already on has a reservation in flight, cannot process %s",
573+
compactorAddress, externalCompactionId);
574+
}
533575

534-
Preconditions.checkArgument(metaJob.getJob().getKind() == CompactionKind.SYSTEM
535-
|| metaJob.getJob().getKind() == CompactionKind.USER);
576+
@Override
577+
public CompactionMetadata get() {
578+
try {
579+
var metaJob = metaJobRef.get();
580+
if (metaJob == null) {
581+
LOG.warn("Compaction reservation request for {} {} was garbage collected.",
582+
compactorAddress, externalCompactionId);
583+
return null;
584+
}
536585

537-
var tabletMetadata = metaJob.getTabletMetadata();
586+
var tabletMetadata = metaJob.getTabletMetadata();
538587

539-
var jobFiles = metaJob.getJob().getFiles().stream().map(CompactableFileImpl::toStoredTabletFile)
540-
.collect(Collectors.toSet());
588+
var jobFiles = metaJob.getJob().getFiles().stream()
589+
.map(CompactableFileImpl::toStoredTabletFile).collect(Collectors.toSet());
541590

542-
Retry retry = Retry.builder().maxRetries(5).retryAfter(Duration.ofMillis(100))
543-
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5)
544-
.logInterval(Duration.ofMinutes(3)).createRetry();
591+
Retry retry = Retry.builder().maxRetries(5).retryAfter(Duration.ofMillis(100))
592+
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5)
593+
.logInterval(Duration.ofMinutes(3)).createRetry();
545594

546-
while (retry.canRetry()) {
547-
try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
548-
var extent = metaJob.getTabletMetadata().getExtent();
595+
while (retry.canRetry()) {
596+
try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
597+
var extent = metaJob.getTabletMetadata().getExtent();
549598

550-
if (!canReserveCompaction(tabletMetadata, metaJob.getJob().getKind(), jobFiles, ctx,
551-
manager.getSteadyTime())) {
552-
return null;
553-
}
599+
if (!canReserveCompaction(tabletMetadata, metaJob.getJob().getKind(), jobFiles, ctx,
600+
manager.getSteadyTime())) {
601+
return null;
602+
}
554603

555-
var ecm = createExternalCompactionMetadata(metaJob.getJob(), jobFiles, tabletMetadata,
556-
compactorAddress, externalCompactionId);
557-
558-
// any data that is read from the tablet to make a decision about if it can compact or not
559-
// must be checked for changes in the conditional mutation.
560-
var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation()
561-
.requireFiles(jobFiles).requireNotCompacting(jobFiles);
562-
if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
563-
// For system compactions the user compaction requested column is examined when deciding
564-
// if a compaction can start so need to check for changes to this column.
565-
tabletMutator.requireSame(tabletMetadata, SELECTED, USER_COMPACTION_REQUESTED);
566-
} else {
567-
tabletMutator.requireSame(tabletMetadata, SELECTED);
568-
}
604+
var ecm = createExternalCompactionMetadata(metaJob.getJob(), jobFiles, tabletMetadata,
605+
compactorAddress, externalCompactionId);
606+
607+
// any data that is read from the tablet to make a decision about if it can compact or
608+
// not
609+
// must be checked for changes in the conditional mutation.
610+
var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation()
611+
.requireFiles(jobFiles).requireNotCompacting(jobFiles);
612+
if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
613+
// For system compactions the user compaction requested column is examined when
614+
// deciding
615+
// if a compaction can start so need to check for changes to this column.
616+
tabletMutator.requireSame(tabletMetadata, SELECTED, USER_COMPACTION_REQUESTED);
617+
} else {
618+
tabletMutator.requireSame(tabletMetadata, SELECTED);
619+
}
569620

570-
if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
571-
var selectedFiles = tabletMetadata.getSelectedFiles();
572-
var reserved = getFilesReservedBySelection(tabletMetadata, manager.getSteadyTime(), ctx);
573-
574-
// If there is a selectedFiles column, and the reserved set is empty this means that
575-
// either no user jobs were completed yet or the selection expiration time has passed
576-
// so the column is eligible to be deleted so a system job can run instead
577-
if (selectedFiles != null && reserved.isEmpty()
578-
&& !Collections.disjoint(jobFiles, selectedFiles.getFiles())) {
579-
LOG.debug("Deleting user compaction selected files for {} {}", extent,
580-
externalCompactionId);
581-
tabletMutator.deleteSelectedFiles();
582-
}
583-
}
621+
if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
622+
var selectedFiles = tabletMetadata.getSelectedFiles();
623+
var reserved =
624+
getFilesReservedBySelection(tabletMetadata, manager.getSteadyTime(), ctx);
625+
626+
// If there is a selectedFiles column, and the reserved set is empty this means that
627+
// either no user jobs were completed yet or the selection expiration time has passed
628+
// so the column is eligible to be deleted so a system job can run instead
629+
if (selectedFiles != null && reserved.isEmpty()
630+
&& !Collections.disjoint(jobFiles, selectedFiles.getFiles())) {
631+
LOG.debug("Deleting user compaction selected files for {} {}", extent,
632+
externalCompactionId);
633+
tabletMutator.deleteSelectedFiles();
634+
}
635+
}
584636

585-
tabletMutator.putExternalCompaction(externalCompactionId, ecm);
586-
tabletMutator.submit(tm -> tm.getExternalCompactions().containsKey(externalCompactionId));
637+
tabletMutator.putExternalCompaction(externalCompactionId, ecm);
638+
tabletMutator
639+
.submit(tm -> tm.getExternalCompactions().containsKey(externalCompactionId));
587640

588-
var result = tabletsMutator.process().get(extent);
641+
var result = tabletsMutator.process().get(extent);
589642

590-
if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
591-
return ecm;
592-
} else {
593-
tabletMetadata = result.readMetadata();
643+
if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
644+
return ecm;
645+
} else {
646+
tabletMetadata = result.readMetadata();
647+
}
648+
}
649+
650+
retry.useRetry();
651+
try {
652+
retry.waitForNextAttempt(LOG,
653+
"Reserved compaction for " + metaJob.getTabletMetadata().getExtent());
654+
} catch (InterruptedException e) {
655+
throw new RuntimeException(e);
656+
}
594657
}
595-
}
596658

597-
retry.useRetry();
598-
try {
599-
retry.waitForNextAttempt(LOG,
600-
"Reserved compaction for " + metaJob.getTabletMetadata().getExtent());
601-
} catch (InterruptedException e) {
602-
throw new RuntimeException(e);
659+
return null;
660+
} finally {
661+
Preconditions.checkState(activeCompactorReservationRequest.remove(compactorAddress),
662+
"compactorAddress:%s", compactorAddress);
603663
}
604664
}
665+
}
666+
667+
protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob metaJob,
668+
String compactorAddress, ExternalCompactionId externalCompactionId) {
669+
670+
if (activeCompactorReservationRequest.contains(compactorAddress)) {
671+
// In this case the compactor has a previously submitted reservation request that is still
672+
// processing. Do not want to let it queue up another reservation request. One possible cause
673+
// of this is that compactor timed out waiting for its last request to process and is now
674+
// making another request. The previously submitted request can not be used because the
675+
// compactor generates a new uuid for each request it makes. So the best thing to do is to
676+
// return null and wait for this situation to resolve. This will likely happen when some part
677+
// of the distributed system is not working well, so at this point want to avoid making
678+
// problems worse instead of trying to reserve a job.
679+
LOG.warn(
680+
"Ignoring request from {} to reserve compaction job because it has a reservation request in progress.",
681+
compactorAddress);
682+
return null;
683+
}
605684

606-
return null;
685+
var dataLevel = DataLevel.of(metaJob.getTabletMetadata().getTableId());
686+
var future = CompletableFuture.supplyAsync(
687+
new ReserveCompactionTask(metaJob, compactorAddress, externalCompactionId),
688+
reservationPools.get(dataLevel));
689+
return future.join();
607690
}
608691

609692
protected TExternalCompactionJob createThriftJob(String externalCompactionId,
@@ -1123,6 +1206,14 @@ public void cleanUpInternalState() {
11231206
// 5. Log compactors with no groups
11241207
// 6. Log groups with compactors and queued jos that have not checked in
11251208

1209+
var config = ctx.getConfiguration();
1210+
ThreadPools.resizePool(reservationPools.get(DataLevel.ROOT), config,
1211+
Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT);
1212+
ThreadPools.resizePool(reservationPools.get(DataLevel.METADATA), config,
1213+
Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_META);
1214+
ThreadPools.resizePool(reservationPools.get(DataLevel.USER), config,
1215+
Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_USER);
1216+
11261217
// grab a snapshot of the ids in the set before reading the metadata table. This is done to
11271218
// avoid removing things that are added while reading the metadata.
11281219
final Set<ExternalCompactionId> idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet());

0 commit comments

Comments
 (0)