Skip to content

Commit cd894c5

Browse files
committed
adds an IT to verify compaction cancels on tablet migration
1 parent 9037d61 commit cd894c5

File tree

1 file changed

+88
-0
lines changed

1 file changed

+88
-0
lines changed

test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java

+88
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,15 @@
3737
import java.util.Map.Entry;
3838
import java.util.NoSuchElementException;
3939
import java.util.Objects;
40+
import java.util.SortedSet;
41+
import java.util.TreeSet;
4042
import java.util.concurrent.CountDownLatch;
4143
import java.util.concurrent.ExecutorService;
4244
import java.util.concurrent.Executors;
4345
import java.util.concurrent.atomic.AtomicBoolean;
4446
import java.util.concurrent.atomic.AtomicReference;
47+
import java.util.stream.Collectors;
48+
import java.util.stream.IntStream;
4549

4650
import org.apache.accumulo.core.client.Accumulo;
4751
import org.apache.accumulo.core.client.AccumuloClient;
@@ -86,11 +90,14 @@
8690
import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
8791
import org.apache.accumulo.core.util.HostAndPort;
8892
import org.apache.accumulo.harness.AccumuloClusterHarness;
93+
import org.apache.accumulo.minicluster.ServerType;
94+
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
8995
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
9096
import org.apache.accumulo.test.VerifyIngest;
9197
import org.apache.accumulo.test.VerifyIngest.VerifyParams;
9298
import org.apache.accumulo.test.compaction.CompactionExecutorIT;
9399
import org.apache.accumulo.test.compaction.ExternalCompaction_1_IT.FSelector;
100+
import org.apache.accumulo.test.util.Wait;
94101
import org.apache.hadoop.conf.Configuration;
95102
import org.apache.hadoop.fs.FileSystem;
96103
import org.apache.hadoop.fs.Path;
@@ -797,6 +804,87 @@ public void testGetActiveCompactions() throws Exception {
797804
}
798805
}
799806

807+
@Test
808+
public void testMigrationCancelCompaction() throws Exception {
809+
810+
// This test creates 20 tablets w/ slow iterator, causes 20 compactions to start, and then
811+
// starts a new tablet server. Some of the tablets should migrate to the new tserver and cancel
812+
// their compaction. Because the test uses a slow iterator, if close blocks on compaction then
813+
// the test should timeout.
814+
815+
final String table1 = this.getUniqueNames(1)[0];
816+
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
817+
818+
// TODO use newer property
819+
client.instanceOperations().setProperty(Property.TSERV_MAJC_MAXCONCURRENT.getKey(), "20");
820+
821+
IteratorSetting setting = new IteratorSetting(50, "sleepy", SlowIterator.class);
822+
setting.addOption("sleepTime", "3000");
823+
setting.addOption("seekSleepTime", "3000");
824+
825+
SortedSet<Text> splits = IntStream.range(1, 20).mapToObj(i -> String.format("%06d", i * 1000))
826+
.map(Text::new).collect(Collectors.toCollection(TreeSet::new));
827+
client.tableOperations().create(table1, new NewTableConfiguration().withSplits(splits)
828+
.attachIterator(setting, EnumSet.of(IteratorScope.majc)));
829+
830+
// write files to each tablet, should cause compactions to start
831+
for (int round = 0; round < 5; round++) {
832+
try (var writer = client.createBatchWriter(table1)) {
833+
for (int i = 0; i < 20_000; i++) {
834+
Mutation m = new Mutation(String.format("%06d", i));
835+
m.put("f", "q", "v");
836+
writer.addMutation(m);
837+
}
838+
}
839+
client.tableOperations().flush(table1, null, null, true);
840+
}
841+
842+
assertEquals(2, client.instanceOperations().getTabletServers().size());
843+
844+
var ctx = (ClientContext) client;
845+
var tableId = ctx.getTableId(table1);
846+
847+
Wait.waitFor(() -> {
848+
var runningCompactions = client.instanceOperations().getActiveCompactions().stream()
849+
.filter(ac -> ac.getTablet().getTable().equals(tableId)).count();
850+
log.debug("Running compactions {}", runningCompactions);
851+
return runningCompactions == 20;
852+
});
853+
854+
// TODO is there a better w/ to do this w/o the cast?
855+
((MiniAccumuloClusterImpl) getCluster()).getConfig().setNumTservers(3);
856+
getCluster().getClusterControl().start(ServerType.TABLET_SERVER, "localhost");
857+
858+
Wait.waitFor(() -> {
859+
var servers = client.instanceOperations().getTabletServers().size();
860+
log.debug("Server count {}", servers);
861+
return 3 == servers;
862+
});
863+
864+
Wait.waitFor(() -> {
865+
try (var tablets = ctx.getAmple().readTablets().forTable(tableId)
866+
.fetch(ColumnType.LOCATION, ColumnType.PREV_ROW).build()) {
867+
Map<String,Long> counts = new HashMap<>();
868+
for (var tablet : tablets) {
869+
if (tablet.getLocation() != null) {
870+
counts.merge(tablet.getLocation().getHostPort().toString(), 1L, Long::sum);
871+
}
872+
}
873+
874+
var total = counts.values().stream().mapToLong(l -> l).sum();
875+
var min = counts.values().stream().mapToLong(l -> l).min().orElse(0);
876+
var max = counts.values().stream().mapToLong(l -> l).max().orElse(100);
877+
var serversSeen = counts.keySet();
878+
log.debug("total:{} min:{} max:{} serversSeen:{}", total, min, max, serversSeen);
879+
return total == 20 && min == 6 && max == 7 && serversSeen.size() == 3;
880+
}
881+
});
882+
883+
// TODO is there any way to check that a compaction was actually canceled? Looking in the
884+
// tserver logs can see it happened.
885+
}
886+
}
887+
800888
/**
801889
* Counts the number of tablets and files in a table.
802890
*/

0 commit comments

Comments
 (0)