Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implements interuption of compactions and adds an IT to verify compaction cancels on tablet migration #5395

Open
wants to merge 4 commits into
base: 2.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public long getEntriesWritten() {
}

public Thread getThread() {
return compactor.thread;
return compactor.getThread();
}

public String getOutputFile() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.collect.Collections2;

import io.opentelemetry.api.trace.Span;
Expand Down Expand Up @@ -141,13 +142,48 @@ SystemIteratorEnvironment createIteratorEnv(ServerContext context,

// a unique id to identify a compactor
private final long compactorID = nextCompactorID.getAndIncrement();
protected volatile Thread thread;
private volatile Thread thread;
private final ServerContext context;

private final AtomicBoolean interruptFlag = new AtomicBoolean(false);

public void interrupt() {
public synchronized void interrupt() {
interruptFlag.set(true);

if (thread != null) {
// Never want to interrupt the thread after clearThread was called as the thread could have
// moved on to something completely different than the compaction. This method and clearThread
// being synchronized and clearThread setting thread to null prevent this.
thread.interrupt();
}
}

private class ThreadClearer implements AutoCloseable {
@Override
public void close() throws InterruptedException {
clearThread();
}
}

private synchronized ThreadClearer setThread() {
thread = Thread.currentThread();
return new ThreadClearer();
}

private synchronized void clearThread() throws InterruptedException {
Preconditions.checkState(thread == Thread.currentThread());
thread = null;
// If the thread was interrupted during compaction do not want to allow the thread to continue
// w/ the interrupt status set as this could impact code unrelated to the compaction. For
// internal compactions the thread will execute metadata update code after the compaction and
// would not want the interrupt status set for that.
if (Thread.interrupted()) {
throw new InterruptedException();
}
}

Thread getThread() {
return thread;
}

public long getCompactorID() {
Expand Down Expand Up @@ -272,7 +308,8 @@ protected Map<String,Set<ByteSequence>> getLocalityGroups(AccumuloConfiguration
}

@Override
public CompactionStats call() throws IOException, CompactionCanceledException {
public CompactionStats call()
throws IOException, CompactionCanceledException, InterruptedException {

FileSKVWriter mfw = null;

Expand All @@ -290,8 +327,9 @@ public CompactionStats call() throws IOException, CompactionCanceledException {
String newThreadName =
"MajC compacting " + extent + " started " + threadStartDate + " file: " + outputFile;
Thread.currentThread().setName(newThreadName);
thread = Thread.currentThread();
try {
// Use try w/ resources for clearing the thread instead of finally because clearing may throw an
// exception. Java's handling of exceptions thrown in finally blocks is not good.
try (var ignored = setThread()) {
FileOperations fileFactory = FileOperations.getInstance();
FileSystem ns = this.fs.getFileSystemByPath(outputFile.getPath());

Expand Down Expand Up @@ -374,7 +412,6 @@ public CompactionStats call() throws IOException, CompactionCanceledException {
} finally {
Thread.currentThread().setName(oldThreadName);
if (remove) {
thread = null;
runningCompactions.remove(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ private static AccumuloConfiguration getCompactionConfig(TableConfiguration tabl
static CompactionStats compact(Tablet tablet, CompactionJob job,
CompactableImpl.CompactionInfo cInfo, CompactionEnv cenv,
Map<StoredTabletFile,DataFileValue> compactFiles, TabletFile tmpFileName)
throws IOException, CompactionCanceledException {
throws IOException, CompactionCanceledException, InterruptedException {
TableConfiguration tableConf = tablet.getTableConfiguration();

AccumuloConfiguration compactionConfig = getCompactionConfig(tableConf,
Expand All @@ -576,7 +576,7 @@ static CompactionStats compact(Tablet tablet, CompactionJob job,
}
};
final ScheduledFuture<?> future = tablet.getContext().getScheduledExecutor()
.scheduleWithFixedDelay(compactionCancellerTask, 10, 10, TimeUnit.SECONDS);
.scheduleWithFixedDelay(compactionCancellerTask, 3, 3, TimeUnit.SECONDS);
try {
return compactor.call();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public CompactionStats call() {
log.warn("MinC failed ({}) to create {} retrying ...", e.getMessage(), outputFileName, e);
reportedProblem = true;
retryCounter++;
} catch (CompactionCanceledException e) {
} catch (CompactionCanceledException | InterruptedException e) {
throw new IllegalStateException(e);
}

Expand All @@ -161,7 +161,6 @@ public CompactionStats call() {

} while (true);
} finally {
thread = null;
runningCompactions.remove(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,15 @@
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
Expand Down Expand Up @@ -86,11 +90,14 @@
import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.VerifyIngest;
import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.apache.accumulo.test.compaction.CompactionExecutorIT;
import org.apache.accumulo.test.compaction.ExternalCompaction_1_IT.FSelector;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -797,6 +804,108 @@ public void testGetActiveCompactions() throws Exception {
}
}

@Test
public void testMigrationCancelCompaction() throws Exception {

// This test creates 40 tablets w/ slow iterator, causes 40 compactions to start, and then
// starts a new tablet server. Some of the tablets should migrate to the new tserver and cancel
// their compaction. Because the test uses a slow iterator, if close blocks on compaction then
// the test should timeout. Two tables are used to have different iterator settings inorder to
// test the two different way compactions can be canceled. Compactions can be canceled by thread
// interrupt or by a check that is done after a compaction iterator returns a key value.

final String[] tables = this.getUniqueNames(2);
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
client.instanceOperations().setProperty(
Property.TSERV_COMPACTION_SERVICE_DEFAULT_EXECUTORS.getKey(),
"[{'name':'any','numThreads':20}]".replaceAll("'", "\""));

SortedSet<Text> splits = IntStream.range(1, 20).mapToObj(i -> String.format("%06d", i * 1000))
.map(Text::new).collect(Collectors.toCollection(TreeSet::new));

// This iterator is intended to cover the case of a compaction being canceled by thread
// interrupt.
IteratorSetting setting1 = new IteratorSetting(50, "sleepy", SlowIterator.class);
setting1.addOption("sleepTime", "300000");
setting1.addOption("seekSleepTime", "3000");
SlowIterator.sleepUninterruptibly(setting1, false);

client.tableOperations().create(tables[0], new NewTableConfiguration().withSplits(splits)
.attachIterator(setting1, EnumSet.of(IteratorScope.majc)));

// This iterator is intended to cover the case of compaction being canceled by the check after
// a key value is returned. The iterator is configured to ignore interrupts.
IteratorSetting setting2 = new IteratorSetting(50, "sleepy", SlowIterator.class);
setting2.addOption("sleepTime", "2000");
setting2.addOption("seekSleepTime", "2000");
SlowIterator.sleepUninterruptibly(setting2, true);

client.tableOperations().create(tables[1], new NewTableConfiguration().withSplits(splits)
.attachIterator(setting2, EnumSet.of(IteratorScope.majc)));

// write files to each tablet, should cause compactions to start
for (var table : tables) {
for (int round = 0; round < 5; round++) {
try (var writer = client.createBatchWriter(table)) {
for (int i = 0; i < 20_000; i++) {
Mutation m = new Mutation(String.format("%06d", i));
m.put("f", "q", "v");
writer.addMutation(m);
}
}
client.tableOperations().flush(table, null, null, true);
}
}

assertEquals(2, client.instanceOperations().getTabletServers().size());

var ctx = (ClientContext) client;
var tableId1 = ctx.getTableId(tables[0]);
var tableId2 = ctx.getTableId(tables[1]);

Wait.waitFor(() -> {
var runningCompactions = client.instanceOperations().getActiveCompactions().stream()
.map(ac -> ac.getTablet().getTable())
.filter(tid -> tid.equals(tableId1) || tid.equals(tableId2)).count();
log.debug("Running compactions {}", runningCompactions);
return runningCompactions == 40;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test usually passes for me, every once in a while it will get stuck here w/ less than the expected 40 compactions. Probably some issue w/ the propagation of compaction config changes.

});

((MiniAccumuloClusterImpl) getCluster()).getConfig().setNumTservers(3);
getCluster().getClusterControl().start(ServerType.TABLET_SERVER, "localhost");

Wait.waitFor(() -> {
var servers = client.instanceOperations().getTabletServers().size();
log.debug("Server count {}", servers);
return 3 == servers;
});

Wait.waitFor(() -> {
try (var tablets = ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER)
.fetch(ColumnType.LOCATION, ColumnType.PREV_ROW).build()) {
Map<String,Long> counts = new HashMap<>();
for (var tablet : tablets) {
if (!tablet.getTableId().equals(tableId1) && !tablet.getTableId().equals(tableId2)) {
continue;
}

if (tablet.getLocation() != null
&& tablet.getLocation().getType() == TabletMetadata.LocationType.CURRENT) {
counts.merge(tablet.getLocation().getHostPort(), 1L, Long::sum);
}
}

var total = counts.values().stream().mapToLong(l -> l).sum();
var min = counts.values().stream().mapToLong(l -> l).min().orElse(0);
var max = counts.values().stream().mapToLong(l -> l).max().orElse(100);
var serversSeen = counts.keySet();
log.debug("total:{} min:{} max:{} serversSeen:{}", total, min, max, serversSeen);
return total == 40 && min == 12 && max == 14 && serversSeen.size() == 3;
}
});
}
}

/**
* Counts the number of tablets and files in a table.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.accumulo.test.functional;

import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
Expand All @@ -33,14 +31,17 @@
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.WrappingIterator;
import org.apache.accumulo.core.util.UtilWaitThread;

public class SlowIterator extends WrappingIterator {

private static final String SLEEP_TIME = "sleepTime";
private static final String SEEK_SLEEP_TIME = "seekSleepTime";
private static final String SLEEP_UNINTERRUPTIBLY = "sleepUninterruptibly";

private long sleepTime = 0;
private long seekSleepTime = 0;
private boolean sleepUninterruptibly = true;

public static void setSleepTime(IteratorSetting is, long millis) {
is.addOption(SLEEP_TIME, Long.toString(millis));
Expand All @@ -50,21 +51,37 @@ public static void setSeekSleepTime(IteratorSetting is, long t) {
is.addOption(SEEK_SLEEP_TIME, Long.toString(t));
}

public static void sleepUninterruptibly(IteratorSetting is, boolean b) {
is.addOption(SLEEP_UNINTERRUPTIBLY, Boolean.toString(b));
}

private void sleep(long time) throws IOException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked these changes by jstacking a tablet server running the new test and verifying that compaction threads were sleeping both ways.

if (sleepUninterruptibly) {
UtilWaitThread.sleepUninterruptibly(time, TimeUnit.MILLISECONDS);
} else {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
}

@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
throw new UnsupportedOperationException();
}

@Override
public void next() throws IOException {
sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
sleep(sleepTime);
super.next();
}

@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
throws IOException {
sleepUninterruptibly(seekSleepTime, TimeUnit.MILLISECONDS);
sleep(seekSleepTime);
super.seek(range, columnFamilies, inclusive);
}

Expand All @@ -79,6 +96,10 @@ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> op
if (options.containsKey(SEEK_SLEEP_TIME)) {
seekSleepTime = Long.parseLong(options.get(SEEK_SLEEP_TIME));
}

if (options.containsKey(SLEEP_UNINTERRUPTIBLY)) {
sleepUninterruptibly = Boolean.parseBoolean(options.get(SLEEP_UNINTERRUPTIBLY));
}
}

}