Skip to content

Commit 62254bc

Browse files
committed
Add new MERGE FateKey type
1 parent a8d193a commit 62254bc

File tree

4 files changed

+43
-16
lines changed

4 files changed

+43
-16
lines changed

core/src/main/java/org/apache/accumulo/core/fate/FateKey.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,12 @@ public static FateKey forCompactionCommit(ExternalCompactionId compactionId) {
119119
return new FateKey(FateKeyType.COMPACTION_COMMIT, compactionId);
120120
}
121121

122+
public static FateKey forMerge(KeyExtent extent) {
123+
return new FateKey(FateKeyType.MERGE, extent);
124+
}
125+
122126
public enum FateKeyType {
123-
SPLIT, COMPACTION_COMMIT
127+
SPLIT, COMPACTION_COMMIT, MERGE
124128
}
125129

126130
private static byte[] serialize(FateKeyType type, KeyExtent ke) {
@@ -151,6 +155,7 @@ private static Optional<KeyExtent> deserializeKeyExtent(FateKeyType type, DataIn
151155
throws IOException {
152156
switch (type) {
153157
case SPLIT:
158+
case MERGE:
154159
return Optional.of(KeyExtent.readFrom(buffer));
155160
case COMPACTION_COMMIT:
156161
return Optional.empty();
@@ -163,6 +168,7 @@ private static Optional<ExternalCompactionId> deserializeCompactionId(FateKeyTyp
163168
DataInputBuffer buffer) throws IOException {
164169
switch (type) {
165170
case SPLIT:
171+
case MERGE:
166172
return Optional.empty();
167173
case COMPACTION_COMMIT:
168174
return Optional.of(ExternalCompactionId.of(buffer.readUTF()));

server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@
2828
import java.util.Set;
2929
import java.util.function.Predicate;
3030

31-
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
3231
import org.apache.accumulo.core.conf.Property;
3332
import org.apache.accumulo.core.data.NamespaceId;
3433
import org.apache.accumulo.core.data.TableId;
34+
import org.apache.accumulo.core.dataImpl.KeyExtent;
3535
import org.apache.accumulo.core.fate.Fate.FateOperation;
3636
import org.apache.accumulo.core.fate.FateInstanceType;
37+
import org.apache.accumulo.core.fate.FateKey;
3738
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
3839
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
3940
import org.apache.accumulo.core.metadata.schema.filters.TabletMetadataFilter;
@@ -132,14 +133,12 @@ void submit(MergeableRange range, FateInstanceType type, Entry<TableId,String> t
132133
String endRowStr = StringUtils.defaultIfBlank(endRow.toString(), "+inf");
133134
log.debug("FindMergeableRangeTask: Creating merge op: {} from startRow: {} to endRow: {}",
134135
tableId, startRowStr, endRowStr);
135-
var fateId = manager.fate(type).startTransaction();
136-
String goalMessage = TableOperation.MERGE + " Merge table " + tableName + "(" + tableId
137-
+ ") splits from " + startRowStr + " to " + endRowStr;
136+
var fateKey = FateKey.forMerge(new KeyExtent(tableId, range.endRow, range.startRow));
138137

139-
manager.fate(type).seedTransaction(FateOperation.SYSTEM_MERGE, fateId,
138+
manager.fate(type).seedTransaction(FateOperation.SYSTEM_MERGE, fateKey,
140139
new TraceRepo<>(
141140
new TableRangeOp(Operation.SYSTEM_MERGE, namespaceId, tableId, startRow, endRow)),
142-
true, goalMessage);
141+
true);
143142
}
144143

145144
static class MergeableRange {

test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java

+28-8
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import java.time.Duration;
3232
import java.util.ArrayList;
33+
import java.util.Arrays;
3334
import java.util.EnumSet;
3435
import java.util.HashMap;
3536
import java.util.HashSet;
@@ -51,6 +52,7 @@
5152
import org.apache.accumulo.core.fate.FateId;
5253
import org.apache.accumulo.core.fate.FateInstanceType;
5354
import org.apache.accumulo.core.fate.FateKey;
55+
import org.apache.accumulo.core.fate.FateKey.FateKeyType;
5456
import org.apache.accumulo.core.fate.FateStore;
5557
import org.apache.accumulo.core.fate.FateStore.FateTxStore;
5658
import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus;
@@ -615,7 +617,9 @@ protected void testConcurrent(FateStore<TestEnv> store, ServerContext sctx) thro
615617

616618
assertEquals(1, idsSeen);
617619
assertEquals(1, store.list(FateKey.FateKeyType.SPLIT).count());
618-
assertEquals(0, store.list(FateKey.FateKeyType.COMPACTION_COMMIT).count());
620+
// All other types should be a count of 0
621+
Arrays.stream(FateKeyType.values()).filter(t -> !t.equals(FateKey.FateKeyType.SPLIT))
622+
.forEach(t -> assertEquals(0, store.list(t).count()));
619623

620624
for (var future : futures) {
621625
if (future.get().isPresent()) {
@@ -628,8 +632,9 @@ protected void testConcurrent(FateStore<TestEnv> store, ServerContext sctx) thro
628632
}
629633
}
630634

631-
assertEquals(0, store.list(FateKey.FateKeyType.SPLIT).count());
632-
assertEquals(0, store.list(FateKey.FateKeyType.COMPACTION_COMMIT).count());
635+
// All types should be a count of 0
636+
assertTrue(
637+
Arrays.stream(FateKeyType.values()).allMatch(t -> store.list(t).findAny().isEmpty()));
633638

634639
} finally {
635640
executor.shutdown();
@@ -672,6 +677,7 @@ protected void testListFateKeys(FateStore<TestEnv> store, ServerContext sctx) th
672677
TableId tid1 = TableId.of("test");
673678
var extent1 = new KeyExtent(tid1, new Text("m"), null);
674679
var extent2 = new KeyExtent(tid1, null, new Text("m"));
680+
var extent3 = new KeyExtent(tid1, new Text("z"), new Text("m"));
675681
var fateKey1 = FateKey.forSplit(extent1);
676682
var fateKey2 = FateKey.forSplit(extent2);
677683

@@ -683,8 +689,12 @@ protected void testListFateKeys(FateStore<TestEnv> store, ServerContext sctx) th
683689
var fateKey3 = FateKey.forCompactionCommit(cid1);
684690
var fateKey4 = FateKey.forCompactionCommit(cid2);
685691

692+
// use one overlapping extent and one different
693+
var fateKey5 = FateKey.forMerge(extent1);
694+
var fateKey6 = FateKey.forMerge(extent3);
695+
686696
Map<FateKey,FateId> fateKeyIds = new HashMap<>();
687-
for (FateKey fateKey : List.of(fateKey1, fateKey2, fateKey3, fateKey4)) {
697+
for (FateKey fateKey : List.of(fateKey1, fateKey2, fateKey3, fateKey4, fateKey5, fateKey6)) {
688698
var fateId = store.seedTransaction(TEST_FATE_OP, fateKey, new TestRepo(), true).orElseThrow();
689699
fateKeyIds.put(fateKey, fateId);
690700
}
@@ -693,20 +703,29 @@ protected void testListFateKeys(FateStore<TestEnv> store, ServerContext sctx) th
693703
allIds.addAll(fateKeyIds.values());
694704
allIds.add(id1);
695705
assertEquals(allIds, store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet()));
696-
assertEquals(5, allIds.size());
706+
assertEquals(7, allIds.size());
697707

698-
assertEquals(4, fateKeyIds.size());
699-
assertEquals(4, fateKeyIds.values().stream().distinct().count());
708+
assertEquals(6, fateKeyIds.size());
709+
assertEquals(6, fateKeyIds.values().stream().distinct().count());
700710

701711
HashSet<KeyExtent> seenExtents = new HashSet<>();
702712
store.list(FateKey.FateKeyType.SPLIT).forEach(fateKey -> {
703713
assertEquals(FateKey.FateKeyType.SPLIT, fateKey.getType());
704714
assertNotNull(fateKeyIds.remove(fateKey));
705715
assertTrue(seenExtents.add(fateKey.getKeyExtent().orElseThrow()));
706716
});
717+
assertEquals(4, fateKeyIds.size());
718+
assertEquals(Set.of(extent1, extent2), seenExtents);
707719

720+
// clear set as one overlaps
721+
seenExtents.clear();
722+
store.list(FateKeyType.MERGE).forEach(fateKey -> {
723+
assertEquals(FateKey.FateKeyType.MERGE, fateKey.getType());
724+
assertNotNull(fateKeyIds.remove(fateKey));
725+
assertTrue(seenExtents.add(fateKey.getKeyExtent().orElseThrow()));
726+
});
708727
assertEquals(2, fateKeyIds.size());
709-
assertEquals(Set.of(extent1, extent2), seenExtents);
728+
assertEquals(Set.of(extent1, extent3), seenExtents);
710729

711730
HashSet<ExternalCompactionId> seenCids = new HashSet<>();
712731
store.list(FateKey.FateKeyType.COMPACTION_COMMIT).forEach(fateKey -> {
@@ -717,6 +736,7 @@ protected void testListFateKeys(FateStore<TestEnv> store, ServerContext sctx) th
717736

718737
assertEquals(0, fateKeyIds.size());
719738
assertEquals(Set.of(cid1, cid2), seenCids);
739+
720740
// Cleanup so we don't interfere with other tests
721741
store.list()
722742
.forEach(fateIdStatus -> store.tryReserve(fateIdStatus.getFateId()).orElseThrow().delete());

test/src/main/java/org/apache/accumulo/test/functional/TabletMergeabilityIT.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.accumulo.core.client.AccumuloClient;
3636
import org.apache.accumulo.core.client.admin.CompactionConfig;
3737
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
38+
import org.apache.accumulo.core.client.admin.TabletAvailability;
3839
import org.apache.accumulo.core.client.admin.TabletMergeability;
3940
import org.apache.accumulo.core.clientImpl.TabletMergeabilityUtil;
4041
import org.apache.accumulo.core.conf.Property;
@@ -155,7 +156,8 @@ public void testSplitAndMergeAll() throws Exception {
155156
Map<String,String> props = new HashMap<>();
156157
props.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "16K");
157158
props.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "1K");
158-
c.tableOperations().create(tableName, new NewTableConfiguration().setProperties(props));
159+
c.tableOperations().create(tableName, new NewTableConfiguration().setProperties(props)
160+
.withInitialTabletAvailability(TabletAvailability.HOSTED));
159161
var tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName));
160162

161163
// Ingest data so tablet will split

0 commit comments

Comments
 (0)