-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
implements interuption of compactions and adds an IT to verify compaction cancels on tablet migration #5395
base: 2.1
Are you sure you want to change the base?
Changes from 3 commits
cd894c5
b6d2f42
4e24af0
7d39c8d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,11 +37,15 @@ | |
import java.util.Map.Entry; | ||
import java.util.NoSuchElementException; | ||
import java.util.Objects; | ||
import java.util.SortedSet; | ||
import java.util.TreeSet; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.IntStream; | ||
|
||
import org.apache.accumulo.core.client.Accumulo; | ||
import org.apache.accumulo.core.client.AccumuloClient; | ||
|
@@ -86,11 +90,14 @@ | |
import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher; | ||
import org.apache.accumulo.core.util.HostAndPort; | ||
import org.apache.accumulo.harness.AccumuloClusterHarness; | ||
import org.apache.accumulo.minicluster.ServerType; | ||
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; | ||
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; | ||
import org.apache.accumulo.test.VerifyIngest; | ||
import org.apache.accumulo.test.VerifyIngest.VerifyParams; | ||
import org.apache.accumulo.test.compaction.CompactionExecutorIT; | ||
import org.apache.accumulo.test.compaction.ExternalCompaction_1_IT.FSelector; | ||
import org.apache.accumulo.test.util.Wait; | ||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.fs.FileSystem; | ||
import org.apache.hadoop.fs.Path; | ||
|
@@ -797,6 +804,108 @@ public void testGetActiveCompactions() throws Exception { | |
} | ||
} | ||
|
||
@Test | ||
public void testMigrationCancelCompaction() throws Exception { | ||
|
||
// This test creates 40 tablets w/ slow iterator, causes 40 compactions to start, and then | ||
// starts a new tablet server. Some of the tablets should migrate to the new tserver and cancel | ||
// their compaction. Because the test uses a slow iterator, if close blocks on compaction then | ||
// the test should timeout. Two tables are used to have different iterator settings inorder to | ||
// test the two different way compactions can be canceled. Compactions can be canceled by thread | ||
// interrupt or by a check that is done after a compaction iterator returns a key value. | ||
|
||
final String[] tables = this.getUniqueNames(2); | ||
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { | ||
client.instanceOperations().setProperty( | ||
Property.TSERV_COMPACTION_SERVICE_DEFAULT_EXECUTORS.getKey(), | ||
"[{'name':'any','numThreads':20}]".replaceAll("'", "\"")); | ||
|
||
SortedSet<Text> splits = IntStream.range(1, 20).mapToObj(i -> String.format("%06d", i * 1000)) | ||
.map(Text::new).collect(Collectors.toCollection(TreeSet::new)); | ||
|
||
// This iterator is intended to cover the case of a compaction being canceled by thread | ||
// interrupt. | ||
IteratorSetting setting1 = new IteratorSetting(50, "sleepy", SlowIterator.class); | ||
setting1.addOption("sleepTime", "300000"); | ||
setting1.addOption("seekSleepTime", "3000"); | ||
SlowIterator.sleepUninterruptibly(setting1, false); | ||
|
||
client.tableOperations().create(tables[0], new NewTableConfiguration().withSplits(splits) | ||
.attachIterator(setting1, EnumSet.of(IteratorScope.majc))); | ||
|
||
// This iterator is intended to cover the case of compaction being canceled by the check after | ||
// a key value is returned. The iterator is configured to ignore interrupts. | ||
IteratorSetting setting2 = new IteratorSetting(50, "sleepy", SlowIterator.class); | ||
setting2.addOption("sleepTime", "2000"); | ||
setting2.addOption("seekSleepTime", "2000"); | ||
SlowIterator.sleepUninterruptibly(setting2, true); | ||
|
||
client.tableOperations().create(tables[1], new NewTableConfiguration().withSplits(splits) | ||
.attachIterator(setting2, EnumSet.of(IteratorScope.majc))); | ||
|
||
// write files to each tablet, should cause compactions to start | ||
for (var table : tables) { | ||
for (int round = 0; round < 5; round++) { | ||
try (var writer = client.createBatchWriter(table)) { | ||
for (int i = 0; i < 20_000; i++) { | ||
Mutation m = new Mutation(String.format("%06d", i)); | ||
m.put("f", "q", "v"); | ||
writer.addMutation(m); | ||
} | ||
} | ||
client.tableOperations().flush(table, null, null, true); | ||
} | ||
} | ||
|
||
assertEquals(2, client.instanceOperations().getTabletServers().size()); | ||
|
||
var ctx = (ClientContext) client; | ||
var tableId1 = ctx.getTableId(tables[0]); | ||
var tableId2 = ctx.getTableId(tables[1]); | ||
|
||
Wait.waitFor(() -> { | ||
var runningCompactions = client.instanceOperations().getActiveCompactions().stream() | ||
.map(ac -> ac.getTablet().getTable()) | ||
.filter(tid -> tid.equals(tableId1) || tid.equals(tableId2)).count(); | ||
log.debug("Running compactions {}", runningCompactions); | ||
return runningCompactions == 40; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test usually passes for me, every once in a while it will get stuck here w/ less than the expected 40 compactions. Probably some issue w/ the propagation of compaction config changes. |
||
}); | ||
|
||
((MiniAccumuloClusterImpl) getCluster()).getConfig().setNumTservers(3); | ||
getCluster().getClusterControl().start(ServerType.TABLET_SERVER, "localhost"); | ||
|
||
Wait.waitFor(() -> { | ||
var servers = client.instanceOperations().getTabletServers().size(); | ||
log.debug("Server count {}", servers); | ||
return 3 == servers; | ||
}); | ||
|
||
Wait.waitFor(() -> { | ||
try (var tablets = ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER) | ||
.fetch(ColumnType.LOCATION, ColumnType.PREV_ROW).build()) { | ||
Map<String,Long> counts = new HashMap<>(); | ||
for (var tablet : tablets) { | ||
if (!tablet.getTableId().equals(tableId1) && !tablet.getTableId().equals(tableId2)) { | ||
continue; | ||
} | ||
|
||
if (tablet.getLocation() != null | ||
&& tablet.getLocation().getType() == TabletMetadata.LocationType.CURRENT) { | ||
counts.merge(tablet.getLocation().getHostPort(), 1L, Long::sum); | ||
} | ||
} | ||
|
||
var total = counts.values().stream().mapToLong(l -> l).sum(); | ||
var min = counts.values().stream().mapToLong(l -> l).min().orElse(0); | ||
var max = counts.values().stream().mapToLong(l -> l).max().orElse(100); | ||
var serversSeen = counts.keySet(); | ||
log.debug("total:{} min:{} max:{} serversSeen:{}", total, min, max, serversSeen); | ||
return total == 40 && min == 12 && max == 14 && serversSeen.size() == 3; | ||
} | ||
}); | ||
} | ||
} | ||
|
||
/** | ||
* Counts the number of tablets and files in a table. | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,8 +18,6 @@ | |
*/ | ||
package org.apache.accumulo.test.functional; | ||
|
||
import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; | ||
|
||
import java.io.IOException; | ||
import java.util.Collection; | ||
import java.util.Map; | ||
|
@@ -33,14 +31,17 @@ | |
import org.apache.accumulo.core.iterators.IteratorEnvironment; | ||
import org.apache.accumulo.core.iterators.SortedKeyValueIterator; | ||
import org.apache.accumulo.core.iterators.WrappingIterator; | ||
import org.apache.accumulo.core.util.UtilWaitThread; | ||
|
||
public class SlowIterator extends WrappingIterator { | ||
|
||
private static final String SLEEP_TIME = "sleepTime"; | ||
private static final String SEEK_SLEEP_TIME = "seekSleepTime"; | ||
private static final String SLEEP_UNINTERRUPTIBLY = "sleepUninterruptibly"; | ||
|
||
private long sleepTime = 0; | ||
private long seekSleepTime = 0; | ||
private boolean sleepUninterruptibly = true; | ||
|
||
public static void setSleepTime(IteratorSetting is, long millis) { | ||
is.addOption(SLEEP_TIME, Long.toString(millis)); | ||
|
@@ -50,21 +51,37 @@ public static void setSeekSleepTime(IteratorSetting is, long t) { | |
is.addOption(SEEK_SLEEP_TIME, Long.toString(t)); | ||
} | ||
|
||
public static void sleepUninterruptibly(IteratorSetting is, boolean b) { | ||
is.addOption(SLEEP_UNINTERRUPTIBLY, Boolean.toString(b)); | ||
} | ||
|
||
private void sleep(long time) throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I checked these changes by jstacking a tablet server running the new test and verifying that compaction threads were sleeping both ways. |
||
if (sleepUninterruptibly) { | ||
UtilWaitThread.sleepUninterruptibly(time, TimeUnit.MILLISECONDS); | ||
} else { | ||
try { | ||
Thread.sleep(sleepTime); | ||
} catch (InterruptedException e) { | ||
throw new IOException(e); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public void next() throws IOException { | ||
sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS); | ||
sleep(sleepTime); | ||
super.next(); | ||
} | ||
|
||
@Override | ||
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) | ||
throws IOException { | ||
sleepUninterruptibly(seekSleepTime, TimeUnit.MILLISECONDS); | ||
sleep(seekSleepTime); | ||
super.seek(range, columnFamilies, inclusive); | ||
} | ||
|
||
|
@@ -79,6 +96,10 @@ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> op | |
if (options.containsKey(SEEK_SLEEP_TIME)) { | ||
seekSleepTime = Long.parseLong(options.get(SEEK_SLEEP_TIME)); | ||
} | ||
|
||
if (options.containsKey(SLEEP_UNINTERRUPTIBLY)) { | ||
sleepUninterruptibly = Boolean.parseBoolean(options.get(SLEEP_UNINTERRUPTIBLY)); | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to understand how this PR is different than what is happening inside CompactableUtils.compact at:
accumulo/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
Lines 573 to 579 in 2950699
When
CompactableImpl.close
is called and setsclosed=true
, thencenv.isCompactionEnabled()
should returnfalse
and interrupt the FileCompactor. I'm thinking the difference here is checking theThread.interrupted
state and throwing anInterruptedException
inCompactionInterrupter.close
. I'm wondering if the code below is functionally equivalent:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main difference is that compactor.interrupt() was not calling Thread.interrupt(). However it could call that, it was already tracking the thread running the compaction. Modified it do that in 7d39c8d moving where Thread.interrupt() is called down a level (this will probably benefit external compactions also). The test still pass w/ this change, however they were a bit slower because the interrupting the thread could take up 10 seconds after calling close(). So changed the scheduled timer to run every 3 seconds. The previous code would immediately interrupt the thread when close() was called.