Skip to content

Commit 3298d6d

Browse files
authored
Add Compaction Job Min & Max Wait properties (apache#4223)
* Add Compaction Min Wait and Max Wait properties * Adds min and max wait properties to configure the min and max wait intervals in the compactor. * Changes the logic in compaction-coordinator to use these new properties when calculating the wait period for sending warning messages * Also use the MAX_JOB_WAIT_TIME prop for the thrift retry interval when the compactor is unable to communicate with the compaction-coordinator.
1 parent f28b55d commit 3298d6d

File tree

5 files changed

+75
-10
lines changed

5 files changed

+75
-10
lines changed

core/src/main/java/org/apache/accumulo/core/conf/Property.java

+10
Original file line numberDiff line numberDiff line change
@@ -1433,6 +1433,16 @@ public enum Property {
14331433
COMPACTOR_PREFIX("compactor.", null, PropertyType.PREFIX,
14341434
"Properties in this category affect the behavior of the accumulo compactor server.", "2.1.0"),
14351435
@Experimental
1436+
COMPACTOR_MIN_JOB_WAIT_TIME("compactor.wait.time.job.min", "1s", PropertyType.TIMEDURATION,
1437+
"The minimum amount of time to wait between checks for the next compaction job, backing off"
1438+
+ "exponentially until COMPACTOR_MAX_JOB_WAIT_TIME is reached.",
1439+
"2.1.3"),
1440+
@Experimental
1441+
COMPACTOR_MAX_JOB_WAIT_TIME("compactor.wait.time.job.max", "5m", PropertyType.TIMEDURATION,
1442+
"Compactors do exponential backoff when their request for work repeatedly come back empty. "
1443+
+ "This is the maximum amount of time to wait between checks for the next compaction job.",
1444+
"2.1.3"),
1445+
@Experimental
14361446
COMPACTOR_PORTSEARCH("compactor.port.search", "false", PropertyType.BOOLEAN,
14371447
"If the compactor.port.client is in use, search higher ports until one is available.",
14381448
"2.1.0"),

server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,6 @@ public class CompactionCoordinator extends AbstractServer
101101

102102
private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class);
103103
private static final long TIME_BETWEEN_GC_CHECKS = 5000;
104-
private static final long FIFTEEN_MINUTES = TimeUnit.MINUTES.toMillis(15);
105-
106104
protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries();
107105

108106
/*
@@ -383,7 +381,7 @@ protected void startDeadCompactionDetector() {
383381
}
384382

385383
protected long getMissingCompactorWarningTime() {
386-
return FIFTEEN_MINUTES;
384+
return getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME) * 3;
387385
}
388386

389387
protected long getTServerCheckInterval() {

server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java

+21
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@
3939

4040
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
4141
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
42+
import org.apache.accumulo.core.conf.ConfigurationCopy;
4243
import org.apache.accumulo.core.conf.DefaultConfiguration;
44+
import org.apache.accumulo.core.conf.Property;
45+
import org.apache.accumulo.core.conf.SiteConfiguration;
4346
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
4447
import org.apache.accumulo.core.metadata.TServerInstance;
4548
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
@@ -195,6 +198,24 @@ public void resetInternals() {
195198

196199
}
197200

201+
@Test
202+
public void testCoordinatorWarningTime() {
203+
PowerMock.resetAll();
204+
PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
205+
ServerContext context = PowerMock.createNiceMock(ServerContext.class);
206+
207+
SiteConfiguration aconf = SiteConfiguration.empty()
208+
.withOverrides(Map.of(Property.COMPACTOR_MAX_JOB_WAIT_TIME.getKey(), "15s")).build();
209+
ConfigurationCopy config = new ConfigurationCopy(aconf);
210+
expect(context.getConfiguration()).andReturn(config).anyTimes();
211+
212+
PowerMock.replay(context);
213+
214+
var coordinator = new TestCoordinator(null, null, null, null, context, null);
215+
// Should be equal to 3 * 15_000 milliseconds
216+
assertEquals(45_000, coordinator.getMissingCompactorWarningTime());
217+
}
218+
198219
@Test
199220
public void testCoordinatorColdStartNoCompactions() throws Exception {
200221
PowerMock.resetAll();

server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java

+14-7
Original file line numberDiff line numberDiff line change
@@ -447,8 +447,13 @@ protected void updateCompactionCompleted(TExternalCompactionJob job, TCompaction
447447
* @throws RetriesExceededException thrown when retries have been exceeded
448448
*/
449449
protected TExternalCompactionJob getNextJob(Supplier<UUID> uuid) throws RetriesExceededException {
450+
final long startingWaitTime =
451+
getConfiguration().getTimeInMillis(Property.COMPACTOR_MIN_JOB_WAIT_TIME);
452+
final long maxWaitTime =
453+
getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME);
454+
450455
RetryableThriftCall<TExternalCompactionJob> nextJobThriftCall =
451-
new RetryableThriftCall<>(1000, RetryableThriftCall.MAX_WAIT_TIME, 0, () -> {
456+
new RetryableThriftCall<>(startingWaitTime, maxWaitTime, 0, () -> {
452457
Client coordinatorClient = getCoordinatorClient();
453458
try {
454459
ExternalCompactionId eci = ExternalCompactionId.generate(uuid.get());
@@ -587,12 +592,14 @@ protected Supplier<UUID> getNextId() {
587592
protected long getWaitTimeBetweenCompactionChecks() {
588593
// get the total number of compactors assigned to this queue
589594
int numCompactors = ExternalCompactionUtil.countCompactors(queueName, getContext());
590-
// Aim for around 3 compactors checking in every second
591-
long sleepTime = numCompactors * 1000L / 3;
592-
// Ensure a compactor sleeps at least around a second
593-
sleepTime = Math.max(1000, sleepTime);
594-
// Ensure a compactor sleep not too much more than 5 mins
595-
sleepTime = Math.min(300_000L, sleepTime);
595+
long minWait = getConfiguration().getTimeInMillis(Property.COMPACTOR_MIN_JOB_WAIT_TIME);
596+
// Aim for around 3 compactors checking in per min wait time.
597+
long sleepTime = numCompactors * minWait / 3;
598+
// Ensure a compactor waits at least the minimum time
599+
sleepTime = Math.max(minWait, sleepTime);
600+
// Ensure a sleeping compactor has a configurable max sleep time
601+
sleepTime = Math.min(getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME),
602+
sleepTime);
596603
// Add some random jitter to the sleep time, that averages out to sleep time. This will spread
597604
// compactors out evenly over time.
598605
sleepTime = (long) (.9 * sleepTime + sleepTime * .2 * random.nextDouble());

server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java

+29
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.junit.Assert.assertTrue;
2525

2626
import java.net.UnknownHostException;
27+
import java.util.List;
2728
import java.util.Timer;
2829
import java.util.TimerTask;
2930
import java.util.UUID;
@@ -42,6 +43,7 @@
4243
import org.apache.accumulo.core.data.TableId;
4344
import org.apache.accumulo.core.dataImpl.KeyExtent;
4445
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
46+
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
4547
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
4648
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
4749
import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
@@ -449,4 +451,31 @@ public void testCompactionInterrupted() throws Exception {
449451
assertEquals(TCompactionState.CANCELLED, c.getLatestState());
450452
}
451453

454+
@Test
455+
public void testCompactionWaitProperty() {
456+
PowerMock.resetAll();
457+
PowerMock.suppress(PowerMock.methods(Halt.class, "halt"));
458+
PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
459+
460+
var conf = new ConfigurationCopy(DefaultConfiguration.getInstance());
461+
conf.set(Property.COMPACTOR_MAX_JOB_WAIT_TIME, "800ms");
462+
463+
ServerContext context = PowerMock.createNiceMock(ServerContext.class);
464+
expect(context.getConfiguration()).andReturn(conf).anyTimes();
465+
expect(context.getZooKeeperRoot()).andReturn("test").anyTimes();
466+
ZooCache zkc = PowerMock.createNiceMock(ZooCache.class);
467+
expect(zkc.getChildren("test/compactors/testQ")).andReturn(List.of("compactor_1")).anyTimes();
468+
expect(context.getZooCache()).andReturn(zkc).anyTimes();
469+
470+
PowerMock.replayAll();
471+
472+
SuccessfulCompactor c = new SuccessfulCompactor(null, null, null, context, null);
473+
PowerMock.verifyAll();
474+
475+
Long maxWait = c.getWaitTimeBetweenCompactionChecks();
476+
// compaction jitter means maxWait is between 0.9 and 1.1 of the desired value.
477+
assertTrue(maxWait >= 720L);
478+
assertTrue(maxWait <= 968L);
479+
}
480+
452481
}

0 commit comments

Comments
 (0)