Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implements interuption of compactions and adds an IT to verify compaction cancels on tablet migration #5395

Open
wants to merge 4 commits into
base: 2.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,55 @@ private void completeCompaction(CompactionJob job, Set<StoredTabletFile> jobFile
selectFiles();
}

private final Set<CompactionInterrupter> compactionInterrupters = ConcurrentHashMap.newKeySet();

private class CompactionInterrupter implements AutoCloseable {
private Thread thread;

CompactionInterrupter() throws InterruptedException {

this.thread = Thread.currentThread();

if (Thread.currentThread().isInterrupted() || closed) {
throw new InterruptedException();
}

compactionInterrupters.add(this);
log.trace("Registered compaction interrupter for {} {}", thread.getId(), getExtent());
}

public synchronized void interrupt() {
if (thread != null) {
log.debug("Interrupting compaction thread {} for {}", thread.getId(), getExtent());
thread.interrupt();
}
}

public void close() throws InterruptedException {
synchronized (this) {
Preconditions.checkState(thread == Thread.currentThread());

// The goal of this class is not to interrupt the compaction thread after this close()
// method returns. Two things help achieve this goal, first we set the thread instance var
// null here. Second this code block and the interrupt() method are synchronized.
thread = null;
}

// call this outside of synchronized block to avoid deadlock with the locks inside the
// concurrent hash set
compactionInterrupters.remove(this);

log.trace("Unregistered compaction interrupter for {} {}", Thread.currentThread().getId(),
getExtent());

// Its possible the threads interrupt status was set but nothing ever checked it. For example
// interrupt() could have been called immediately before this method was called.
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
}

@Override
public void compact(CompactionServiceId service, CompactionJob job, BooleanSupplier keepRunning,
RateLimiter readLimiter, RateLimiter writeLimiter, long queuedTime) {
Expand All @@ -1332,6 +1381,7 @@ public void compact(CompactionServiceId service, CompactionJob job, BooleanSuppl
boolean successful = false;
try {
TabletLogger.compacting(getExtent(), job, cInfo.localCompactionCfg);

tablet.incrementStatusMajor();
var check = new CompactionCheck(service, kind, keepRunning, cInfo.checkCompactionId);
TabletFile tmpFileName = tablet.getNextMapFilenameForMajc(cInfo.propagateDeletes);
Expand All @@ -1340,8 +1390,11 @@ public void compact(CompactionServiceId service, CompactionJob job, BooleanSuppl
SortedMap<StoredTabletFile,DataFileValue> allFiles = tablet.getDatafiles();
HashMap<StoredTabletFile,DataFileValue> compactFiles = new HashMap<>();
cInfo.jobFiles.forEach(file -> compactFiles.put(file, allFiles.get(file)));

stats = CompactableUtils.compact(tablet, job, cInfo, compactEnv, compactFiles, tmpFileName);
// Limit interrupting compactions to the part that reads and writes files and avoid
// interrupting the metadata table updates.
try (var ignored = new CompactionInterrupter()) {
stats = CompactableUtils.compact(tablet, job, cInfo, compactEnv, compactFiles, tmpFileName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand how this PR is different than what is happening inside CompactableUtils.compact at:

final Runnable compactionCancellerTask = () -> {
if (!cenv.isCompactionEnabled()) {
compactor.interrupt();
}
};
final ScheduledFuture<?> future = tablet.getContext().getScheduledExecutor()
.scheduleWithFixedDelay(compactionCancellerTask, 10, 10, TimeUnit.SECONDS);

When CompactableImpl.close is called and sets closed=true, then cenv.isCompactionEnabled() should return false and interrupt the FileCompactor. I'm thinking the difference here is checking the Thread.interrupted state and throwing an InterruptedException in CompactionInterrupter.close. I'm wondering if the code below is functionally equivalent:

  try {
    stats = CompactableUtils.compact(tablet, job, cInfo, compactEnv, compactFiles, tmpFileName);
  } finally {
    if (Thread.interrupted()) {
        throw new InterruptedException();
    }
  }

Copy link
Contributor Author

@keith-turner keith-turner Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main difference is that compactor.interrupt() was not calling Thread.interrupt(). However it could call that, it was already tracking the thread running the compaction. Modified it do that in 7d39c8d moving where Thread.interrupt() is called down a level (this will probably benefit external compactions also). The test still pass w/ this change, however they were a bit slower because the interrupting the thread could take up 10 seconds after calling close(). So changed the scheduled timer to run every 3 seconds. The previous code would immediately interrupt the thread when close() was called.

}

newFile = CompactableUtils.bringOnline(tablet.getDatafileManager(), cInfo, stats,
compactFiles, allFiles, kind, tmpFileName);
Expand Down Expand Up @@ -1599,6 +1652,8 @@ public void close() {

closed = true;

compactionInterrupters.forEach(CompactionInterrupter::interrupt);

// Wait while internal jobs are running or external compactions are committing. When
// chopStatus is MARKING or selectStatus is SELECTING, there may be metadata table writes so
// wait on those. Do not wait on external compactions that are running.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,15 @@
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
Expand Down Expand Up @@ -86,11 +90,14 @@
import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.VerifyIngest;
import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.apache.accumulo.test.compaction.CompactionExecutorIT;
import org.apache.accumulo.test.compaction.ExternalCompaction_1_IT.FSelector;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -797,6 +804,108 @@ public void testGetActiveCompactions() throws Exception {
}
}

@Test
public void testMigrationCancelCompaction() throws Exception {

// This test creates 40 tablets w/ slow iterator, causes 40 compactions to start, and then
// starts a new tablet server. Some of the tablets should migrate to the new tserver and cancel
// their compaction. Because the test uses a slow iterator, if close blocks on compaction then
// the test should timeout. Two tables are used to have different iterator settings inorder to
// test the two different way compactions can be canceled. Compactions can be canceled by thread
// interrupt or by a check that is done after a compaction iterator returns a key value.

final String[] tables = this.getUniqueNames(2);
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
client.instanceOperations().setProperty(
Property.TSERV_COMPACTION_SERVICE_DEFAULT_EXECUTORS.getKey(),
"[{'name':'any','numThreads':20}]".replaceAll("'", "\""));

SortedSet<Text> splits = IntStream.range(1, 20).mapToObj(i -> String.format("%06d", i * 1000))
.map(Text::new).collect(Collectors.toCollection(TreeSet::new));

// This iterator is intended to cover the case of a compaction being canceled by thread
// interrupt.
IteratorSetting setting1 = new IteratorSetting(50, "sleepy", SlowIterator.class);
setting1.addOption("sleepTime", "300000");
setting1.addOption("seekSleepTime", "3000");
SlowIterator.sleepUninterruptibly(setting1, false);

client.tableOperations().create(tables[0], new NewTableConfiguration().withSplits(splits)
.attachIterator(setting1, EnumSet.of(IteratorScope.majc)));

// This iterator is intended to cover the case of compaction being canceled by the check after
// a key value is returned. The iterator is configured to ignore interrupts.
IteratorSetting setting2 = new IteratorSetting(50, "sleepy", SlowIterator.class);
setting2.addOption("sleepTime", "2000");
setting2.addOption("seekSleepTime", "2000");
SlowIterator.sleepUninterruptibly(setting2, true);

client.tableOperations().create(tables[1], new NewTableConfiguration().withSplits(splits)
.attachIterator(setting2, EnumSet.of(IteratorScope.majc)));

// write files to each tablet, should cause compactions to start
for (var table : tables) {
for (int round = 0; round < 5; round++) {
try (var writer = client.createBatchWriter(table)) {
for (int i = 0; i < 20_000; i++) {
Mutation m = new Mutation(String.format("%06d", i));
m.put("f", "q", "v");
writer.addMutation(m);
}
}
client.tableOperations().flush(table, null, null, true);
}
}

assertEquals(2, client.instanceOperations().getTabletServers().size());

var ctx = (ClientContext) client;
var tableId1 = ctx.getTableId(tables[0]);
var tableId2 = ctx.getTableId(tables[1]);

Wait.waitFor(() -> {
var runningCompactions = client.instanceOperations().getActiveCompactions().stream()
.map(ac -> ac.getTablet().getTable())
.filter(tid -> tid.equals(tableId1) || tid.equals(tableId2)).count();
log.debug("Running compactions {}", runningCompactions);
return runningCompactions == 40;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test usually passes for me, every once in a while it will get stuck here w/ less than the expected 40 compactions. Probably some issue w/ the propagation of compaction config changes.

});

((MiniAccumuloClusterImpl) getCluster()).getConfig().setNumTservers(3);
getCluster().getClusterControl().start(ServerType.TABLET_SERVER, "localhost");

Wait.waitFor(() -> {
var servers = client.instanceOperations().getTabletServers().size();
log.debug("Server count {}", servers);
return 3 == servers;
});

Wait.waitFor(() -> {
try (var tablets = ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER)
.fetch(ColumnType.LOCATION, ColumnType.PREV_ROW).build()) {
Map<String,Long> counts = new HashMap<>();
for (var tablet : tablets) {
if (!tablet.getTableId().equals(tableId1) && !tablet.getTableId().equals(tableId2)) {
continue;
}

if (tablet.getLocation() != null
&& tablet.getLocation().getType() == TabletMetadata.LocationType.CURRENT) {
counts.merge(tablet.getLocation().getHostPort(), 1L, Long::sum);
}
}

var total = counts.values().stream().mapToLong(l -> l).sum();
var min = counts.values().stream().mapToLong(l -> l).min().orElse(0);
var max = counts.values().stream().mapToLong(l -> l).max().orElse(100);
var serversSeen = counts.keySet();
log.debug("total:{} min:{} max:{} serversSeen:{}", total, min, max, serversSeen);
return total == 40 && min == 12 && max == 14 && serversSeen.size() == 3;
}
});
}
}

/**
* Counts the number of tablets and files in a table.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.accumulo.test.functional;

import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
Expand All @@ -33,14 +31,17 @@
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.WrappingIterator;
import org.apache.accumulo.core.util.UtilWaitThread;

public class SlowIterator extends WrappingIterator {

private static final String SLEEP_TIME = "sleepTime";
private static final String SEEK_SLEEP_TIME = "seekSleepTime";
private static final String SLEEP_UNINTERRUPTIBLY = "sleepUninterruptibly";

private long sleepTime = 0;
private long seekSleepTime = 0;
private boolean sleepUninterruptibly = true;

public static void setSleepTime(IteratorSetting is, long millis) {
is.addOption(SLEEP_TIME, Long.toString(millis));
Expand All @@ -50,21 +51,37 @@ public static void setSeekSleepTime(IteratorSetting is, long t) {
is.addOption(SEEK_SLEEP_TIME, Long.toString(t));
}

public static void sleepUninterruptibly(IteratorSetting is, boolean b) {
is.addOption(SLEEP_UNINTERRUPTIBLY, Boolean.toString(b));
}

private void sleep(long time) throws IOException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked these changes by jstacking a tablet server running the new test and verifying that compaction threads were sleeping both ways.

if (sleepUninterruptibly) {
UtilWaitThread.sleepUninterruptibly(time, TimeUnit.MILLISECONDS);
} else {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
}

@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
throw new UnsupportedOperationException();
}

@Override
public void next() throws IOException {
sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
sleep(sleepTime);
super.next();
}

@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
throws IOException {
sleepUninterruptibly(seekSleepTime, TimeUnit.MILLISECONDS);
sleep(seekSleepTime);
super.seek(range, columnFamilies, inclusive);
}

Expand All @@ -79,6 +96,10 @@ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> op
if (options.containsKey(SEEK_SLEEP_TIME)) {
seekSleepTime = Long.parseLong(options.get(SEEK_SLEEP_TIME));
}

if (options.containsKey(SLEEP_UNINTERRUPTIBLY)) {
sleepUninterruptibly = Boolean.parseBoolean(options.get(SLEEP_UNINTERRUPTIBLY));
}
}

}
Loading