Skip to content

Commit 9f430a2

Browse files
committed
Merge branch '2.1'
2 parents 978f8b9 + 3298d6d commit 9f430a2

File tree

3 files changed

+26
-10
lines changed

3 files changed

+26
-10
lines changed

core/src/main/java/org/apache/accumulo/core/conf/Property.java

+10
Original file line numberDiff line numberDiff line change
@@ -1127,6 +1127,16 @@ public enum Property {
11271127
COMPACTOR_PREFIX("compactor.", null, PropertyType.PREFIX,
11281128
"Properties in this category affect the behavior of the accumulo compactor server.", "2.1.0"),
11291129
@Experimental
1130+
COMPACTOR_MIN_JOB_WAIT_TIME("compactor.wait.time.job.min", "1s", PropertyType.TIMEDURATION,
1131+
"The minimum amount of time to wait between checks for the next compaction job, backing off"
1132+
+ "exponentially until COMPACTOR_MAX_JOB_WAIT_TIME is reached.",
1133+
"2.1.3"),
1134+
@Experimental
1135+
COMPACTOR_MAX_JOB_WAIT_TIME("compactor.wait.time.job.max", "5m", PropertyType.TIMEDURATION,
1136+
"Compactors do exponential backoff when their request for work repeatedly come back empty. "
1137+
+ "This is the maximum amount of time to wait between checks for the next compaction job.",
1138+
"2.1.3"),
1139+
@Experimental
11301140
COMPACTOR_PORTSEARCH("compactor.port.search", "false", PropertyType.BOOLEAN,
11311141
"If the compactor.port.client is in use, search higher ports until one is available.",
11321142
"2.1.0"),

server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,7 @@ public class CompactionCoordinator extends AbstractServer
104104
implements CompactionCoordinatorService.Iface, LiveTServerSet.Listener {
105105

106106
private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class);
107-
private static final long FIFTEEN_MINUTES = TimeUnit.MINUTES.toMillis(15);
108-
107+
private static final long TIME_BETWEEN_GC_CHECKS = 5000;
109108
protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries();
110109

111110
/*
@@ -382,7 +381,7 @@ protected void startDeadCompactionDetector() {
382381
}
383382

384383
protected long getMissingCompactorWarningTime() {
385-
return FIFTEEN_MINUTES;
384+
return getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME) * 3;
386385
}
387386

388387
protected long getTServerCheckInterval() {

server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java

+14-7
Original file line numberDiff line numberDiff line change
@@ -437,8 +437,13 @@ protected void updateCompactionCompleted(TExternalCompactionJob job, TCompaction
437437
* @throws RetriesExceededException thrown when retries have been exceeded
438438
*/
439439
protected TExternalCompactionJob getNextJob(Supplier<UUID> uuid) throws RetriesExceededException {
440+
final long startingWaitTime =
441+
getConfiguration().getTimeInMillis(Property.COMPACTOR_MIN_JOB_WAIT_TIME);
442+
final long maxWaitTime =
443+
getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME);
444+
440445
RetryableThriftCall<TExternalCompactionJob> nextJobThriftCall =
441-
new RetryableThriftCall<>(1000, RetryableThriftCall.MAX_WAIT_TIME, 0, () -> {
446+
new RetryableThriftCall<>(startingWaitTime, maxWaitTime, 0, () -> {
442447
Client coordinatorClient = getCoordinatorClient();
443448
try {
444449
ExternalCompactionId eci = ExternalCompactionId.generate(uuid.get());
@@ -579,12 +584,14 @@ protected Supplier<UUID> getNextId() {
579584
protected long getWaitTimeBetweenCompactionChecks() {
580585
// get the total number of compactors assigned to this queue
581586
int numCompactors = ExternalCompactionUtil.countCompactors(queueName, getContext());
582-
// Aim for around 3 compactors checking in every second
583-
long sleepTime = numCompactors * 1000L / 3;
584-
// Ensure a compactor sleeps at least around a second
585-
sleepTime = Math.max(1000, sleepTime);
586-
// Ensure a compactor sleep not too much more than 5 mins
587-
sleepTime = Math.min(300_000L, sleepTime);
587+
long minWait = getConfiguration().getTimeInMillis(Property.COMPACTOR_MIN_JOB_WAIT_TIME);
588+
// Aim for around 3 compactors checking in per min wait time.
589+
long sleepTime = numCompactors * minWait / 3;
590+
// Ensure a compactor waits at least the minimum time
591+
sleepTime = Math.max(minWait, sleepTime);
592+
// Ensure a sleeping compactor has a configurable max sleep time
593+
sleepTime = Math.min(getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME),
594+
sleepTime);
588595
// Add some random jitter to the sleep time, that averages out to sleep time. This will spread
589596
// compactors out evenly over time.
590597
sleepTime = (long) (.9 * sleepTime + sleepTime * .2 * RANDOM.get().nextDouble());

0 commit comments

Comments
 (0)