Skip to content

Commit 88b108e

Browse files
authored
Narrow the files checked by compaction commit (apache#5153)
Previously the conditional mutation to commit a compaction would require all files in the tablet be the same as read earlier and on a busy tablet this could fail and retry often. The check has now been narrowed to only verify that the files involved with the compaction still exist. A new method was added to Ample called requireFiles(Set<StoredTabletFile> files) which creates a condition for each file column to verify each one exists. This closes apache#5117
1 parent a907748 commit 88b108e

File tree

5 files changed

+76
-5
lines changed

5 files changed

+76
-5
lines changed

core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java

+6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Collection;
2222
import java.util.Iterator;
2323
import java.util.Map;
24+
import java.util.Set;
2425
import java.util.function.Consumer;
2526
import java.util.function.Predicate;
2627

@@ -514,6 +515,11 @@ ConditionalTabletMutator requireSame(TabletMetadata tabletMetadata, ColumnType t
514515

515516
ConditionalTabletMutator requireAbsentLogs();
516517

518+
/**
519+
* Require that a tablet contain all the files in the set
520+
*/
521+
ConditionalTabletMutator requireFiles(Set<StoredTabletFile> files);
522+
517523
/**
518524
* <p>
519525
* Ample provides the following features on top of the conditional writer to help automate

server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java

+14
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@
4545
import org.apache.accumulo.core.dataImpl.KeyExtent;
4646
import org.apache.accumulo.core.iterators.SortedFilesIterator;
4747
import org.apache.accumulo.core.lock.ServiceLock;
48+
import org.apache.accumulo.core.metadata.StoredTabletFile;
4849
import org.apache.accumulo.core.metadata.schema.Ample;
50+
import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletMutator;
4951
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
5052
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
5153
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily;
@@ -330,6 +332,18 @@ public Ample.ConditionalTabletMutator requireAbsentLogs() {
330332
return this;
331333
}
332334

335+
@Override
336+
public ConditionalTabletMutator requireFiles(Set<StoredTabletFile> files) {
337+
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
338+
IteratorSetting is = new IteratorSetting(INITIAL_ITERATOR_PRIO, PresentIterator.class);
339+
for (StoredTabletFile file : files) {
340+
Condition c = new Condition(DataFileColumnFamily.STR_NAME, file.getMetadata())
341+
.setValue(PresentIterator.VALUE).setIterators(is);
342+
mutation.addCondition(c);
343+
}
344+
return this;
345+
}
346+
333347
@Override
334348
public void submit(Ample.RejectionHandler rejectionCheck) {
335349
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");

server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -110,15 +110,16 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId
110110
while (canCommitCompaction(ecid, tablet)) {
111111
CompactionMetadata ecm = tablet.getExternalCompactions().get(ecid);
112112

113-
// the compacted files should not exists in the tablet already
113+
// the compacted files should not exist in the tablet already
114114
var tablet2 = tablet;
115115
newDatafile.ifPresent(
116116
newFile -> Preconditions.checkState(!tablet2.getFiles().contains(newFile.insert()),
117117
"File already exists in tablet %s %s", newFile, tablet2.getFiles()));
118118

119119
try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
120120
var tabletMutator = tabletsMutator.mutateTablet(getExtent()).requireAbsentOperation()
121-
.requireCompaction(ecid).requireSame(tablet, FILES, LOCATION);
121+
.requireCompaction(ecid).requireSame(tablet, LOCATION)
122+
.requireFiles(commitData.getJobFiles());
122123

123124
if (ecm.getKind() == CompactionKind.USER) {
124125
tabletMutator.requireSame(tablet, SELECTED, COMPACTED);

server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.accumulo.manager.compaction.coordinator.commit;
2020

2121
import java.io.Serializable;
22-
import java.util.Collection;
2322
import java.util.Set;
2423
import java.util.stream.Collectors;
2524

@@ -56,7 +55,7 @@ public TableId getTableId() {
5655
return KeyExtent.fromThrift(textent).tableId();
5756
}
5857

59-
public Collection<StoredTabletFile> getJobFiles() {
60-
return inputPaths.stream().map(StoredTabletFile::of).collect(Collectors.toList());
58+
public Set<StoredTabletFile> getJobFiles() {
59+
return inputPaths.stream().map(StoredTabletFile::of).collect(Collectors.toSet());
6160
}
6261
}

test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java

+51
Original file line numberDiff line numberDiff line change
@@ -1746,4 +1746,55 @@ public void testErrors() {
17461746
}
17471747
}
17481748
}
1749+
1750+
@Test
1751+
public void testRequiresFiles() {
1752+
var context = cluster.getServerContext();
1753+
1754+
var stf1 = StoredTabletFile
1755+
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf"));
1756+
var stf2 = StoredTabletFile
1757+
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf"));
1758+
var stf3 = StoredTabletFile
1759+
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf"));
1760+
var stf4 = StoredTabletFile
1761+
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/C0000073.rf"));
1762+
var dfv = new DataFileValue(100, 100);
1763+
1764+
// Add 3 of the files, skip the 4th file
1765+
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
1766+
ctmi.mutateTablet(e1).requireAbsentOperation().putFile(stf1, dfv).putFile(stf2, dfv)
1767+
.putFile(stf3, dfv).submit(tm -> false);
1768+
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
1769+
}
1770+
assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles());
1771+
1772+
// Test mutation is accepted when given all files
1773+
var time1 = MetadataTime.parse("L50");
1774+
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
1775+
ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1, stf2, stf3))
1776+
.putTime(time1).submit(tm -> false);
1777+
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
1778+
}
1779+
assertEquals(time1, context.getAmple().readTablet(e1).getTime());
1780+
1781+
// Test mutation is accepted when a subset of files is given
1782+
var time2 = MetadataTime.parse("L60");
1783+
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
1784+
ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1, stf3)).putTime(time2)
1785+
.submit(tm -> false);
1786+
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
1787+
}
1788+
assertEquals(time2, context.getAmple().readTablet(e1).getTime());
1789+
1790+
// Test mutation is rejected when a file is given that the tablet does not have
1791+
var time3 = MetadataTime.parse("L60");
1792+
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
1793+
ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1, stf4)).putTime(time3)
1794+
.submit(tm -> false);
1795+
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
1796+
}
1797+
// Should be previous time still as the mutation was rejected
1798+
assertEquals(time2, context.getAmple().readTablet(e1).getTime());
1799+
}
17491800
}

0 commit comments

Comments
 (0)