Skip to content

Commit 5fb05ef

Browse files
committed
Merge branch '2.1'
2 parents 1db892b + 8bf0970 commit 5fb05ef

File tree

3 files changed

+93
-4
lines changed

3 files changed

+93
-4
lines changed

server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -265,13 +265,13 @@ private synchronized void checkForConfigChanges(boolean force) {
265265
}
266266
});
267267

268-
var deletedServices =
269-
Sets.difference(currentCfg.getPlanners().keySet(), tmpCfg.getPlanners().keySet());
268+
var deletedServices = Sets.difference(services.keySet(), tmpServices.keySet());
270269

271-
for (String serviceName : deletedServices) {
272-
services.get(CompactionServiceId.of(serviceName)).stop();
270+
for (var dcsid : deletedServices) {
271+
services.get(dcsid).stop();
273272
}
274273

274+
this.currentCfg = tmpCfg;
275275
this.services = Map.copyOf(tmpServices);
276276

277277
HashSet<CompactionExecutorId> activeExternalExecs = new HashSet<>();

server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java

+1
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,7 @@ public void configurationChanged(String plannerClassName, String plannerPrefix,
445445

446446
public void stop() {
447447
executors.values().forEach(CompactionExecutor::stop);
448+
log.debug("Stopped compaction service {}", myId);
448449
}
449450

450451
int getCompactionsRunning(CType ctype) {

test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java

+88
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@
9090
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
9191
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
9292
import org.apache.accumulo.core.security.Authorizations;
93+
import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
94+
import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
9395
import org.apache.accumulo.harness.AccumuloClusterHarness;
9496
import org.apache.accumulo.minicluster.ServerType;
9597
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
@@ -897,6 +899,92 @@ private Map<String,Long> getFileSizeMap(AccumuloClient client, String tableName)
897899
}
898900
}
899901

902+
@Test
903+
public void testDeleteCompactionService() throws Exception {
904+
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
905+
var uniqueNames = getUniqueNames(2);
906+
String table1 = uniqueNames[0];
907+
String table2 = uniqueNames[1];
908+
909+
// create a compaction service named deleteme
910+
c.instanceOperations().setProperty(
911+
Property.COMPACTION_SERVICE_PREFIX.getKey() + "deleteme.planner",
912+
DefaultCompactionPlanner.class.getName());
913+
c.instanceOperations().setProperty(
914+
Property.COMPACTION_SERVICE_PREFIX.getKey() + "deleteme.planner.opts.executors",
915+
"[{'name':'all','type':'internal','numThreads':1}]".replaceAll("'", "\""));
916+
917+
// create a compaction service named keepme
918+
c.instanceOperations().setProperty(
919+
Property.COMPACTION_SERVICE_PREFIX.getKey() + "keepme.planner",
920+
DefaultCompactionPlanner.class.getName());
921+
c.instanceOperations().setProperty(
922+
Property.COMPACTION_SERVICE_PREFIX.getKey() + "keepme.planner.opts.executors",
923+
"[{'name':'all','type':'internal','numThreads':1}]".replaceAll("'", "\""));
924+
925+
// create a table that uses the compaction service deleteme
926+
Map<String,String> props = new HashMap<>();
927+
props.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(),
928+
SimpleCompactionDispatcher.class.getName());
929+
props.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "deleteme");
930+
c.tableOperations().create(table1, new NewTableConfiguration().setProperties(props));
931+
932+
// create a table that uses the compaction service keepme
933+
props.clear();
934+
props.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(),
935+
SimpleCompactionDispatcher.class.getName());
936+
props.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "keepme");
937+
c.tableOperations().create(table2, new NewTableConfiguration().setProperties(props));
938+
939+
try (var writer1 = c.createBatchWriter(table1); var writer2 = c.createBatchWriter(table2)) {
940+
for (int i = 0; i < 10; i++) {
941+
Mutation m = new Mutation("" + i);
942+
m.put("f", "q", "" + i);
943+
writer1.addMutation(m);
944+
writer2.addMutation(m);
945+
}
946+
}
947+
948+
c.tableOperations().compact(table1, new CompactionConfig().setWait(true));
949+
c.tableOperations().compact(table2, new CompactionConfig().setWait(true));
950+
951+
// delete the compaction service deleteme
952+
c.instanceOperations()
953+
.removeProperty(Property.COMPACTION_SERVICE_PREFIX.getKey() + "deleteme.planner");
954+
c.instanceOperations().removeProperty(
955+
Property.COMPACTION_SERVICE_PREFIX.getKey() + "deleteme.planner.opts.executors");
956+
957+
// add a new compaction service named newcs
958+
c.instanceOperations().setProperty(
959+
Property.COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner",
960+
DefaultCompactionPlanner.class.getName());
961+
c.instanceOperations().setProperty(
962+
Property.COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner.opts.executors",
963+
"[{'name':'all','type':'internal','numThreads':1}]".replaceAll("'", "\""));
964+
965+
// set table 1 to a compaction service newcs
966+
c.tableOperations().setProperty(table1,
967+
Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "newcs");
968+
969+
// ensure tables can still compact and are not impacted by the deleted compaction service
970+
for (int i = 0; i < 10; i++) {
971+
c.tableOperations().compact(table1, new CompactionConfig().setWait(true));
972+
c.tableOperations().compact(table2, new CompactionConfig().setWait(true));
973+
974+
try (var scanner = c.createScanner(table1)) {
975+
assertEquals(9 * 10 / 2, scanner.stream().map(Entry::getValue)
976+
.mapToInt(v -> Integer.parseInt(v.toString())).sum());
977+
}
978+
try (var scanner = c.createScanner(table2)) {
979+
assertEquals(9 * 10 / 2, scanner.stream().map(Entry::getValue)
980+
.mapToInt(v -> Integer.parseInt(v.toString())).sum());
981+
}
982+
983+
Thread.sleep(100);
984+
}
985+
}
986+
}
987+
900988
private int countFiles(AccumuloClient c) throws Exception {
901989
try (Scanner s = c.createScanner(AccumuloTable.METADATA.tableName(), Authorizations.EMPTY)) {
902990
s.fetchColumnFamily(new Text(TabletColumnFamily.NAME));

0 commit comments

Comments
 (0)