Skip to content

Commit 08f77aa

Browse files
authored
avoid checking all tablet file in bulk import conditional mutation (apache#5159)
After the changes in apache#5104 bulk import would read a tablets files, examine the files, and then require the set of files to be the same to update the tablet. The code only cared that the count of files was below a specified limit. Requiring the set of files not to change was causing the conditional mutation to fail on a busy tablet. This change modifies the condition to check the count of file which avoids contention on a busy tablet where the set of files is constantly changing.
1 parent f4ef6ff commit 08f77aa

File tree

6 files changed

+243
-6
lines changed

6 files changed

+243
-6
lines changed

core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java

+5
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,11 @@ ConditionalTabletMutator requireSame(TabletMetadata tabletMetadata, ColumnType t
520520
*/
521521
ConditionalTabletMutator requireFiles(Set<StoredTabletFile> files);
522522

523+
/**
524+
* Require that a tablet have less than or equals the specified number of files.
525+
*/
526+
ConditionalTabletMutator requireLessOrEqualsFiles(long limit);
527+
523528
/**
524529
* <p>
525530
* Ample provides the following features on top of the conditional writer to help automate

server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java

+9
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import org.apache.accumulo.core.tabletserver.log.LogEntry;
6565
import org.apache.accumulo.core.util.Pair;
6666
import org.apache.accumulo.server.ServerContext;
67+
import org.apache.accumulo.server.metadata.iterators.ColumnFamilySizeLimitIterator;
6768
import org.apache.accumulo.server.metadata.iterators.PresentIterator;
6869
import org.apache.accumulo.server.metadata.iterators.SetEncodingIterator;
6970
import org.apache.accumulo.server.metadata.iterators.TabletExistsIterator;
@@ -344,6 +345,14 @@ public ConditionalTabletMutator requireFiles(Set<StoredTabletFile> files) {
344345
return this;
345346
}
346347

348+
@Override
349+
public ConditionalTabletMutator requireLessOrEqualsFiles(long limit) {
350+
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
351+
Condition c = ColumnFamilySizeLimitIterator.createCondition(DataFileColumnFamily.NAME, limit);
352+
mutation.addCondition(c);
353+
return this;
354+
}
355+
347356
@Override
348357
public void submit(Ample.RejectionHandler rejectionCheck) {
349358
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.accumulo.server.metadata.iterators;
20+
21+
import static org.apache.accumulo.server.metadata.iterators.SetEncodingIterator.getTabletRow;
22+
23+
import java.io.IOException;
24+
import java.util.Collection;
25+
import java.util.Map;
26+
import java.util.NoSuchElementException;
27+
import java.util.Set;
28+
29+
import org.apache.accumulo.core.client.IteratorSetting;
30+
import org.apache.accumulo.core.data.ByteSequence;
31+
import org.apache.accumulo.core.data.Condition;
32+
import org.apache.accumulo.core.data.Key;
33+
import org.apache.accumulo.core.data.PartialKey;
34+
import org.apache.accumulo.core.data.Range;
35+
import org.apache.accumulo.core.data.Value;
36+
import org.apache.accumulo.core.iterators.IteratorEnvironment;
37+
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
38+
import org.apache.accumulo.core.iterators.WrappingIterator;
39+
import org.apache.accumulo.server.metadata.ConditionalTabletMutatorImpl;
40+
import org.apache.hadoop.io.Text;
41+
42+
import com.google.common.base.Preconditions;
43+
44+
/**
45+
* Iterator that checks if a column family size is less than or equal a limit as part of a
46+
* conditional mutation.
47+
*/
48+
public class ColumnFamilySizeLimitIterator extends WrappingIterator {
49+
50+
private static final String LIMIT_OPT = "limit";
51+
private static final Text EMPTY = new Text();
52+
53+
private Long limit;
54+
55+
private Key startKey = null;
56+
private Value topValue = null;
57+
58+
@Override
59+
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
60+
IteratorEnvironment env) throws IOException {
61+
super.init(source, options, env);
62+
limit = Long.parseLong(options.get(LIMIT_OPT));
63+
Preconditions.checkState(limit >= 0);
64+
}
65+
66+
@Override
67+
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
68+
throws IOException {
69+
Text tabletRow = getTabletRow(range);
70+
Text family = range.getStartKey().getColumnFamily();
71+
72+
Preconditions.checkArgument(
73+
family.getLength() > 0 && range.getStartKey().getColumnQualifier().getLength() == 0);
74+
75+
startKey = new Key(tabletRow, family);
76+
Key endKey = startKey.followingKey(PartialKey.ROW_COLFAM);
77+
78+
Range r = new Range(startKey, true, endKey, false);
79+
80+
var source = getSource();
81+
source.seek(r, Set.of(startKey.getColumnFamilyData()), true);
82+
83+
long count = 0;
84+
while (source.hasTop()) {
85+
source.next();
86+
count++;
87+
}
88+
89+
if (count <= limit) {
90+
topValue = new Value("1");
91+
} else {
92+
topValue = null;
93+
}
94+
}
95+
96+
@Override
97+
public boolean hasTop() {
98+
if (startKey == null) {
99+
throw new IllegalStateException("never been seeked");
100+
}
101+
return topValue != null;
102+
}
103+
104+
@Override
105+
public void next() throws IOException {
106+
if (startKey == null) {
107+
throw new IllegalStateException("never been seeked");
108+
}
109+
topValue = null;
110+
}
111+
112+
@Override
113+
public Key getTopKey() {
114+
if (startKey == null) {
115+
throw new IllegalStateException("never been seeked");
116+
}
117+
if (topValue == null) {
118+
throw new NoSuchElementException();
119+
}
120+
121+
return startKey;
122+
}
123+
124+
@Override
125+
public Value getTopValue() {
126+
if (startKey == null) {
127+
throw new IllegalStateException("never been seeked");
128+
}
129+
if (topValue == null) {
130+
throw new NoSuchElementException();
131+
}
132+
return topValue;
133+
}
134+
135+
/**
136+
* Create a condition that checks if the specified column family's size is less than or equal to
137+
* the given limit.
138+
*/
139+
public static Condition createCondition(Text family, long limit) {
140+
IteratorSetting is = new IteratorSetting(ConditionalTabletMutatorImpl.INITIAL_ITERATOR_PRIO,
141+
ColumnFamilySizeLimitIterator.class);
142+
is.addOption(LIMIT_OPT, limit + "");
143+
return new Condition(family, EMPTY).setValue("1").setIterators(is);
144+
}
145+
}

server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,11 @@ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean i
9393
family.getLength() > 0 && range.getStartKey().getColumnQualifier().getLength() == 0);
9494

9595
startKey = new Key(tabletRow, family);
96-
Key endKey = new Key(tabletRow, family).followingKey(PartialKey.ROW_COLFAM);
96+
Key endKey = startKey.followingKey(PartialKey.ROW_COLFAM);
9797

9898
Range r = new Range(startKey, true, endKey, false);
9999

100-
source.seek(r, Set.of(), false);
100+
source.seek(r, Set.of(startKey.getColumnFamilyData()), true);
101101

102102
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
103103
DataOutputStream dos = new DataOutputStream(baos)) {

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,6 @@ void load(List<TabletMetadata> tablets, Files files) {
178178
if (setTime) {
179179
rsc.add(TIME);
180180
}
181-
if (pauseLimit > 0) {
182-
rsc.add(FILES);
183-
}
184181

185182
ColumnType[] requireSameCols = rsc.toArray(new ColumnType[0]);
186183

@@ -237,6 +234,10 @@ void load(List<TabletMetadata> tablets, Files files) {
237234
var tabletMutator = conditionalMutator.mutateTablet(tablet.getExtent())
238235
.requireAbsentOperation().requireSame(tablet, LOADED, requireSameCols);
239236

237+
if (pauseLimit > 0) {
238+
tabletMutator.requireLessOrEqualsFiles(pauseLimit);
239+
}
240+
240241
filesToLoad.forEach((f, v) -> {
241242
tabletMutator.putBulkFile(f, fateId);
242243
tabletMutator.putFile(f, v);

test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java

+78-1
Original file line numberDiff line numberDiff line change
@@ -1788,7 +1788,7 @@ public void testRequiresFiles() {
17881788
assertEquals(time2, context.getAmple().readTablet(e1).getTime());
17891789

17901790
// Test mutation is rejected when a file is given that the tablet does not have
1791-
var time3 = MetadataTime.parse("L60");
1791+
var time3 = MetadataTime.parse("L70");
17921792
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
17931793
ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1, stf4)).putTime(time3)
17941794
.submit(tm -> false);
@@ -1797,4 +1797,81 @@ public void testRequiresFiles() {
17971797
// Should be previous time still as the mutation was rejected
17981798
assertEquals(time2, context.getAmple().readTablet(e1).getTime());
17991799
}
1800+
1801+
@Test
1802+
public void testFilesLimit() {
1803+
var context = cluster.getServerContext();
1804+
1805+
var stf1 = StoredTabletFile
1806+
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf"));
1807+
var stf2 = StoredTabletFile
1808+
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf"));
1809+
var stf3 = StoredTabletFile
1810+
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf"));
1811+
var stf4 = StoredTabletFile
1812+
.of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/C0000073.rf"));
1813+
var dfv = new DataFileValue(100, 100);
1814+
1815+
// Add 3 of the files, skip the 4th file
1816+
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
1817+
ctmi.mutateTablet(e1).requireAbsentOperation().putFile(stf1, dfv).putFile(stf2, dfv)
1818+
.putFile(stf3, dfv).submit(tm -> false);
1819+
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
1820+
}
1821+
assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles());
1822+
1823+
// Test mutation is accepted when # files in tablet equals limit
1824+
var time1 = MetadataTime.parse("L50");
1825+
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
1826+
ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(3).putTime(time1)
1827+
.submit(tm -> false);
1828+
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
1829+
}
1830+
assertEquals(time1, context.getAmple().readTablet(e1).getTime());
1831+
1832+
// Test mutation is accepted when # files in tablet is less than limit
1833+
var time2 = MetadataTime.parse("L60");
1834+
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
1835+
ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(4).putTime(time2)
1836+
.submit(tm -> false);
1837+
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
1838+
}
1839+
assertEquals(time2, context.getAmple().readTablet(e1).getTime());
1840+
1841+
// Test mutation is rejected when # files in tablet is greater than limit
1842+
var time3 = MetadataTime.parse("L70");
1843+
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
1844+
ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(2).putTime(time3)
1845+
.submit(tm -> false);
1846+
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
1847+
}
1848+
// Should be previous time still as the mutation was rejected
1849+
assertEquals(time2, context.getAmple().readTablet(e1).getTime());
1850+
1851+
// add fourth file
1852+
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
1853+
ctmi.mutateTablet(e1).requireAbsentOperation().putFile(stf4, dfv).submit(tm -> false);
1854+
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
1855+
}
1856+
assertEquals(Set.of(stf1, stf2, stf3, stf4), context.getAmple().readTablet(e1).getFiles());
1857+
1858+
// Test mutation is rejected when # files in tablet is greater than limit
1859+
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
1860+
ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(3).putTime(time3)
1861+
.submit(tm -> false);
1862+
assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus());
1863+
}
1864+
// Should be previous time still as the mutation was rejected
1865+
assertEquals(time2, context.getAmple().readTablet(e1).getTime());
1866+
1867+
// Test mutation is accepted when # files in tablet equals limit
1868+
var time4 = MetadataTime.parse("L80");
1869+
try (var ctmi = new ConditionalTabletsMutatorImpl(context)) {
1870+
ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(4).putTime(time4)
1871+
.submit(tm -> false);
1872+
assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus());
1873+
}
1874+
assertEquals(time4, context.getAmple().readTablet(e1).getTime());
1875+
1876+
}
18001877
}

0 commit comments

Comments
 (0)