Skip to content

Commit 802b4d6

Browse files
authored
narrows check of loaded files in conditional mutation (apache#5166)
Bulk load fate code was reading a tablets loaded flags, checking it was not in them, and then making a conditional mutation that required the set of bulk flags to to be the same. Requiring the set to be the same caused unnecessary collisions between bulk imports. Modified the conditional check to require the loaded flags for the fate operation to be absent.
1 parent 08f77aa commit 802b4d6

File tree

4 files changed

+79
-2
lines changed

4 files changed

+79
-2
lines changed

core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java

+5
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,11 @@ ConditionalTabletMutator requireSame(TabletMetadata tabletMetadata, ColumnType t
525525
*/
526526
ConditionalTabletMutator requireLessOrEqualsFiles(long limit);
527527

528+
/**
529+
* Requires that a tablet not have these loaded flags set.
530+
*/
531+
ConditionalTabletMutator requireAbsentLoaded(Set<ReferencedTabletFile> files);
532+
528533
/**
529534
* <p>
530535
* Ample provides the following features on top of the conditional writer to help automate

server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java

+11
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.accumulo.core.dataImpl.KeyExtent;
4646
import org.apache.accumulo.core.iterators.SortedFilesIterator;
4747
import org.apache.accumulo.core.lock.ServiceLock;
48+
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
4849
import org.apache.accumulo.core.metadata.StoredTabletFile;
4950
import org.apache.accumulo.core.metadata.schema.Ample;
5051
import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletMutator;
@@ -353,6 +354,16 @@ public ConditionalTabletMutator requireLessOrEqualsFiles(long limit) {
353354
return this;
354355
}
355356

357+
@Override
358+
public ConditionalTabletMutator requireAbsentLoaded(Set<ReferencedTabletFile> files) {
359+
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
360+
for (ReferencedTabletFile file : files) {
361+
Condition c = new Condition(BulkFileColumnFamily.STR_NAME, file.insert().getMetadata());
362+
mutation.addCondition(c);
363+
}
364+
return this;
365+
}
366+
356367
@Override
357368
public void submit(Ample.RejectionHandler rejectionCheck) {
358369
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,6 @@ void load(List<TabletMetadata> tablets, Files files) {
174174
}
175175

176176
List<ColumnType> rsc = new ArrayList<>();
177-
rsc.add(LOCATION);
178177
if (setTime) {
179178
rsc.add(TIME);
180179
}
@@ -232,7 +231,8 @@ void load(List<TabletMetadata> tablets, Files files) {
232231
}
233232

234233
var tabletMutator = conditionalMutator.mutateTablet(tablet.getExtent())
235-
.requireAbsentOperation().requireSame(tablet, LOADED, requireSameCols);
234+
.requireAbsentOperation().requireAbsentLoaded(filesToLoad.keySet())
235+
.requireSame(tablet, LOCATION, requireSameCols);
236236

237237
if (pauseLimit > 0) {
238238
tabletMutator.requireLessOrEqualsFiles(pauseLimit);

test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java

+61
Original file line numberDiff line numberDiff line change
@@ -1874,4 +1874,65 @@ public void testFilesLimit() {
18741874
assertEquals(time4, context.getAmple().readTablet(e1).getTime());
18751875

18761876
}
1877+
1878+
@Test
1879+
public void testRequireAbsentLoaded() {
1880+
var context = cluster.getServerContext();
1881+
1882+
var stf1 = StoredTabletFile
1883+
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf"));
1884+
var stf2 = StoredTabletFile
1885+
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf"));
1886+
var stf3 = StoredTabletFile
1887+
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf"));
1888+
var dfv = new DataFileValue(100, 100);
1889+
1890+
FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID());
1891+
1892+
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
1893+
ctmi.mutateTablet(e1).requireAbsentOperation()
1894+
.requireAbsentLoaded(Set.of(stf1.getTabletFile(), stf2.getTabletFile()))
1895+
.putBulkFile(stf1.getTabletFile(), fateId1).putBulkFile(stf2.getTabletFile(), fateId1)
1896+
.putFile(stf1, dfv).putFile(stf2, dfv).submit(tm -> false);
1897+
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
1898+
}
1899+
assertEquals(Set.of(stf1, stf2), context.getAmple().readTablet(e1).getFiles());
1900+
assertEquals(Map.of(stf1, fateId1, stf2, fateId1),
1901+
context.getAmple().readTablet(e1).getLoaded());
1902+
1903+
FateId fateId2 = FateId.from(FateInstanceType.USER, UUID.randomUUID());
1904+
1905+
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
1906+
ctmi.mutateTablet(e1).requireAbsentOperation()
1907+
.requireAbsentLoaded(Set.of(stf3.getTabletFile()))
1908+
.putBulkFile(stf3.getTabletFile(), fateId2).putFile(stf3, dfv).submit(tm -> false);
1909+
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
1910+
}
1911+
assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles());
1912+
assertEquals(Map.of(stf1, fateId1, stf2, fateId1, stf3, fateId2),
1913+
context.getAmple().readTablet(e1).getLoaded());
1914+
1915+
// should fail because the loaded markers are present
1916+
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
1917+
ctmi.mutateTablet(e1).requireAbsentOperation()
1918+
.requireAbsentLoaded(Set.of(stf1.getTabletFile(), stf2.getTabletFile()))
1919+
.putBulkFile(stf1.getTabletFile(), fateId1).putBulkFile(stf2.getTabletFile(), fateId1)
1920+
.putFile(stf1, dfv).putFile(stf2, dfv).putFlushId(99).submit(tm -> false);
1921+
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
1922+
}
1923+
1924+
// should fail because the loaded markers are present
1925+
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
1926+
ctmi.mutateTablet(e1).requireAbsentOperation()
1927+
.requireAbsentLoaded(Set.of(stf3.getTabletFile()))
1928+
.putBulkFile(stf3.getTabletFile(), fateId2).putFile(stf3, dfv).putFlushId(99)
1929+
.submit(tm -> false);
1930+
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
1931+
}
1932+
1933+
assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles());
1934+
assertEquals(Map.of(stf1, fateId1, stf2, fateId1, stf3, fateId2),
1935+
context.getAmple().readTablet(e1).getLoaded());
1936+
assertTrue(context.getAmple().readTablet(e1).getFlushId().isEmpty());
1937+
}
18771938
}

0 commit comments

Comments
 (0)