Skip to content

Commit 3c84e85

Browse files
committed
Refactor to reuse Mergeablerange inside VerifyMergeability
1 parent 62254bc commit 3c84e85

File tree

5 files changed

+166
-98
lines changed

5 files changed

+166
-98
lines changed

server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java

+59-20
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,15 @@
2121
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
2222
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGEABILITY;
2323
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
24+
import static org.apache.accumulo.manager.merge.FindMergeableRangeTask.UnmergeableReason.MAX_FILE_COUNT;
25+
import static org.apache.accumulo.manager.merge.FindMergeableRangeTask.UnmergeableReason.MAX_TOTAL_SIZE;
26+
import static org.apache.accumulo.manager.merge.FindMergeableRangeTask.UnmergeableReason.NOT_CONTIGUOUS;
27+
import static org.apache.accumulo.manager.merge.FindMergeableRangeTask.UnmergeableReason.TABLET_MERGEABILITY;
2428

2529
import java.util.Map;
2630
import java.util.Map.Entry;
2731
import java.util.Objects;
32+
import java.util.Optional;
2833
import java.util.Set;
2934
import java.util.function.Predicate;
3035

@@ -94,14 +99,14 @@ public void run() {
9499
.fetch(PREV_ROW, FILES, MERGEABILITY).filter(FILTER).build()) {
95100

96101
final MergeableRange current =
97-
new MergeableRange(manager.getSteadyTime(), tableId, maxFileCount, maxTotalSize);
102+
new MergeableRange(tableId, manager.getSteadyTime(), maxFileCount, maxTotalSize);
98103

99104
for (var tm : tablets) {
100105
log.trace("Checking tablet {}, {}", tm.getExtent(), tm.getTabletMergeability());
101-
if (!current.add(tm)) {
106+
current.add(tm).ifPresent(e -> {
102107
submit(current, type, table, namespaceId);
103108
current.resetAndAdd(tm);
104-
}
109+
});
105110
}
106111

107112
submit(current, type, table, namespaceId);
@@ -120,17 +125,17 @@ void submit(MergeableRange range, FateInstanceType type, Entry<TableId,String> t
120125
return;
121126
}
122127

123-
log.debug("Table {} found {} tablets that can be merged for table", table.getValue(),
124-
range.tabletCount);
125-
126128
TableId tableId = table.getKey();
127129
String tableName = table.getValue();
128130

131+
log.debug("Table {} found {} tablets that can be merged for table", tableName,
132+
range.tabletCount);
133+
129134
final Text startRow = range.startRow != null ? range.startRow : new Text("");
130135
final Text endRow = range.endRow != null ? range.endRow : new Text("");
131136

132-
String startRowStr = StringUtils.defaultIfBlank(startRow.toString(), "-inf");
133-
String endRowStr = StringUtils.defaultIfBlank(endRow.toString(), "+inf");
137+
final String startRowStr = StringUtils.defaultIfBlank(startRow.toString(), "-inf");
138+
final String endRowStr = StringUtils.defaultIfBlank(endRow.toString(), "+inf");
134139
log.debug("FindMergeableRangeTask: Creating merge op: {} from startRow: {} to endRow: {}",
135140
tableId, startRowStr, endRowStr);
136141
var fateKey = FateKey.forMerge(new KeyExtent(tableId, range.endRow, range.startRow));
@@ -141,7 +146,36 @@ void submit(MergeableRange range, FateInstanceType type, Entry<TableId,String> t
141146
true);
142147
}
143148

144-
static class MergeableRange {
149+
public enum UnmergeableReason {
150+
NOT_CONTIGUOUS, MAX_FILE_COUNT, MAX_TOTAL_SIZE, TABLET_MERGEABILITY;
151+
152+
// Cache the Optional() reason objects as we will re-use these over and over
153+
private static final Optional<UnmergeableReason> NOT_CONTIGUOUS_OPT =
154+
Optional.of(NOT_CONTIGUOUS);
155+
private static final Optional<UnmergeableReason> MAX_FILE_COUNT_OPT =
156+
Optional.of(MAX_FILE_COUNT);
157+
private static final Optional<UnmergeableReason> MAX_TOTAL_SIZE_OPT =
158+
Optional.of(MAX_TOTAL_SIZE);
159+
private static final Optional<UnmergeableReason> TABLET_MERGEABILITY_OPT =
160+
Optional.of(TABLET_MERGEABILITY);
161+
162+
public Optional<UnmergeableReason> optional() {
163+
switch (this) {
164+
case NOT_CONTIGUOUS:
165+
return NOT_CONTIGUOUS_OPT;
166+
case MAX_FILE_COUNT:
167+
return MAX_FILE_COUNT_OPT;
168+
case MAX_TOTAL_SIZE:
169+
return MAX_TOTAL_SIZE_OPT;
170+
case TABLET_MERGEABILITY:
171+
return TABLET_MERGEABILITY_OPT;
172+
default:
173+
throw new IllegalArgumentException("Unexpected enum type");
174+
}
175+
}
176+
}
177+
178+
public static class MergeableRange {
145179
final SteadyTime currentTime;
146180
final TableId tableId;
147181
final long maxFileCount;
@@ -153,15 +187,17 @@ static class MergeableRange {
153187
long totalFileCount;
154188
long totalFileSize;
155189

156-
MergeableRange(SteadyTime currentTime, TableId tableId, long maxFileCount, long maxTotalSize) {
157-
this.currentTime = currentTime;
190+
public MergeableRange(TableId tableId, SteadyTime currentTime, long maxFileCount,
191+
long maxTotalSize) {
158192
this.tableId = tableId;
193+
this.currentTime = currentTime;
159194
this.maxFileCount = maxFileCount;
160195
this.maxTotalSize = maxTotalSize;
161196
}
162197

163-
boolean add(TabletMetadata tm) {
164-
if (validate(tm)) {
198+
public Optional<UnmergeableReason> add(TabletMetadata tm) {
199+
var failure = validate(tm);
200+
if (failure.isEmpty()) {
165201
tabletCount++;
166202
log.trace("Adding tablet {} to MergeableRange", tm.getExtent());
167203
if (tabletCount == 1) {
@@ -170,12 +206,11 @@ boolean add(TabletMetadata tm) {
170206
endRow = tm.getEndRow();
171207
totalFileCount += tm.getFiles().size();
172208
totalFileSize += tm.getFileSize();
173-
return true;
174209
}
175-
return false;
210+
return failure;
176211
}
177212

178-
private boolean validate(TabletMetadata tm) {
213+
private Optional<UnmergeableReason> validate(TabletMetadata tm) {
179214
Preconditions.checkArgument(tableId.equals(tm.getTableId()), "Unexpected tableId seen %s",
180215
tm.getTableId());
181216

@@ -186,19 +221,23 @@ private boolean validate(TabletMetadata tm) {
186221
// If this is not the first tablet, then verify its prevEndRow matches
187222
// the last endRow tracked, the server filter will skip tablets marked as never
188223
if (!tm.getPrevEndRow().equals(endRow)) {
189-
return false;
224+
return NOT_CONTIGUOUS.optional();
190225
}
191226
}
192227

193228
if (!tm.getTabletMergeability().isMergeable(currentTime)) {
194-
return false;
229+
return TABLET_MERGEABILITY.optional();
195230
}
196231

197232
if (totalFileCount + tm.getFiles().size() > maxFileCount) {
198-
return false;
233+
return MAX_FILE_COUNT.optional();
234+
}
235+
236+
if (totalFileSize + tm.getFileSize() > maxTotalSize) {
237+
return MAX_TOTAL_SIZE.optional();
199238
}
200239

201-
return totalFileSize + tm.getFileSize() <= maxTotalSize;
240+
return Optional.empty();
202241
}
203242

204243
void resetAndAdd(TabletMetadata tm) {

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/UnreserveAndError.java

+9-44
Original file line numberDiff line numberDiff line change
@@ -29,61 +29,26 @@
2929
import org.slf4j.LoggerFactory;
3030

3131
public class UnreserveAndError extends ManagerRepo {
32-
public enum Reason {
33-
MAX_FILES, MAX_SIZE, UNMERGEABLE
34-
}
35-
3632
private static final long serialVersionUID = 1L;
3733
private static final Logger log = LoggerFactory.getLogger(UnreserveAndError.class);
3834
private final MergeInfo mergeInfo;
39-
private final long total;
40-
private final long max;
41-
private final Reason reason;
35+
private final long totalFiles;
36+
private final long maxFiles;
4237

4338
public UnreserveAndError(MergeInfo mergeInfo, long totalFiles, long maxFiles) {
44-
this(mergeInfo, Reason.MAX_FILES, totalFiles, maxFiles);
45-
}
46-
47-
public UnreserveAndError(MergeInfo mergeInfo, long unmergeable) {
48-
this.mergeInfo = mergeInfo;
49-
this.reason = Reason.UNMERGEABLE;
50-
this.total = unmergeable;
51-
this.max = 0;
52-
}
53-
54-
public UnreserveAndError(MergeInfo mergeInfo, Reason reason, long total, long max) {
5539
this.mergeInfo = mergeInfo;
56-
this.reason = reason;
57-
this.total = total;
58-
this.max = max;
40+
this.totalFiles = totalFiles;
41+
this.maxFiles = maxFiles;
5942
}
6043

6144
@Override
6245
public Repo<Manager> call(FateId fateId, Manager environment) throws Exception {
6346
FinishTableRangeOp.removeOperationIds(log, mergeInfo, fateId, environment);
6447
throw new AcceptableThriftTableOperationException(mergeInfo.tableId.toString(), null,
65-
mergeInfo.op.isMergeOp() ? TableOperation.MERGE : TableOperation.DELETE_RANGE,
66-
TableOperationExceptionType.OTHER, formatReason());
67-
}
68-
69-
public Reason getReason() {
70-
return reason;
71-
}
72-
73-
private String formatReason() {
74-
switch (reason) {
75-
case MAX_FILES:
76-
return "Aborted merge because it would produce a tablets with more files than the configured limit of "
77-
+ max + ". Observed " + total + " files in the merge range.";
78-
case MAX_SIZE:
79-
return "Aborted merge because it would produce a tablets with a file size larger than the configured limit of "
80-
+ max + ". Observed " + total + " file size in the merge range.";
81-
case UNMERGEABLE:
82-
return "Aborted merge because one ore more tablets in the merge range are unmergeable. "
83-
+ "Observed " + total + " unmergeable tablets in the merge range.";
84-
default:
85-
throw new IllegalArgumentException("Unknown Reason");
86-
}
87-
48+
mergeInfo.op == MergeInfo.Operation.MERGE ? TableOperation.MERGE
49+
: TableOperation.DELETE_RANGE,
50+
TableOperationExceptionType.OTHER,
51+
"Aborted merge because it would produce a tablets with more files than the configured limit of "
52+
+ maxFiles + ". Observed " + totalFiles + " files in the merge range.");
8853
}
8954
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.accumulo.manager.tableOps.merge;
20+
21+
import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
22+
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
23+
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
24+
import org.apache.accumulo.core.fate.FateId;
25+
import org.apache.accumulo.core.fate.Repo;
26+
import org.apache.accumulo.manager.Manager;
27+
import org.apache.accumulo.manager.merge.FindMergeableRangeTask.UnmergeableReason;
28+
import org.apache.accumulo.manager.tableOps.ManagerRepo;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
public class UnreserveSystemMerge extends ManagerRepo {
33+
34+
private static final long serialVersionUID = 1L;
35+
private static final Logger log = LoggerFactory.getLogger(UnreserveSystemMerge.class);
36+
private final MergeInfo mergeInfo;
37+
private final long maxFileCount;
38+
private final long maxTotalSize;
39+
private final UnmergeableReason reason;
40+
41+
public UnreserveSystemMerge(MergeInfo mergeInfo, UnmergeableReason reason, long maxFileCount,
42+
long maxTotalSize) {
43+
this.mergeInfo = mergeInfo;
44+
this.reason = reason;
45+
this.maxFileCount = maxFileCount;
46+
this.maxTotalSize = maxTotalSize;
47+
}
48+
49+
@Override
50+
public Repo<Manager> call(FateId fateId, Manager environment) throws Exception {
51+
FinishTableRangeOp.removeOperationIds(log, mergeInfo, fateId, environment);
52+
throw new AcceptableThriftTableOperationException(mergeInfo.tableId.toString(), null,
53+
mergeInfo.op.isMergeOp() ? TableOperation.MERGE : TableOperation.DELETE_RANGE,
54+
TableOperationExceptionType.OTHER, formatReason());
55+
}
56+
57+
public UnmergeableReason getReason() {
58+
return reason;
59+
}
60+
61+
private String formatReason() {
62+
switch (reason) {
63+
case MAX_FILE_COUNT:
64+
return "Aborted merge because it would produce a tablet with more files than the configured limit of "
65+
+ maxFileCount;
66+
case MAX_TOTAL_SIZE:
67+
return "Aborted merge because it would produce a tablet with a file size larger than the configured limit of "
68+
+ maxTotalSize;
69+
// This state should not happen as VerifyMergeability repo checks consistency but adding it
70+
// just in case
71+
case TABLET_MERGEABILITY:
72+
return "Aborted merge because one ore more tablets in the merge range are unmergeable.";
73+
case NOT_CONTIGUOUS:
74+
return "Aborted merge because the tablets in a range do not form a linked list.";
75+
default:
76+
throw new IllegalArgumentException("Unknown Reason");
77+
}
78+
79+
}
80+
}

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/VerifyMergeability.java

+6-24
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
import org.apache.accumulo.core.fate.FateId;
2626
import org.apache.accumulo.core.fate.Repo;
2727
import org.apache.accumulo.manager.Manager;
28+
import org.apache.accumulo.manager.merge.FindMergeableRangeTask.MergeableRange;
2829
import org.apache.accumulo.manager.tableOps.ManagerRepo;
2930
import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation;
30-
import org.apache.accumulo.manager.tableOps.merge.UnreserveAndError.Reason;
3131
import org.slf4j.Logger;
3232
import org.slf4j.LoggerFactory;
3333

@@ -56,39 +56,21 @@ public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
5656

5757
// max percentage of split threshold
5858
long maxTotalSize = (long) (splitThreshold * maxMergeabilityThreshold);
59-
60-
long maxFiles = env.getContext().getTableConfiguration(data.getOriginalExtent().tableId())
59+
long maxFileCount = env.getContext().getTableConfiguration(data.getOriginalExtent().tableId())
6160
.getCount(Property.TABLE_MERGE_FILE_MAX);
6261

6362
log.debug("Validating system merge for {} with range {}", fateId, range);
6463

64+
final var mr = new MergeableRange(data.tableId, currentTime, maxFileCount, maxTotalSize);
6565
try (var tablets = env.getContext().getAmple().readTablets().forTable(data.tableId)
6666
.overlapping(range.prevEndRow(), range.endRow()).fetch(FILES, MERGEABILITY)
6767
.checkConsistency().build()) {
6868

69-
long totalSize = 0;
70-
long totalFiles = 0;
71-
int totalUnMergeable = 0;
72-
7369
for (var tabletMetadata : tablets) {
74-
if (!tabletMetadata.getTabletMergeability().isMergeable(currentTime)) {
75-
totalUnMergeable++;
70+
var error = mr.add(tabletMetadata);
71+
if (error.isPresent()) {
72+
return new UnreserveSystemMerge(data, error.orElseThrow(), maxFileCount, maxTotalSize);
7673
}
77-
78-
totalFiles += tabletMetadata.getFiles().size();
79-
totalSize += tabletMetadata.getFileSize();
80-
}
81-
82-
if (totalFiles > maxFiles) {
83-
return new UnreserveAndError(data, Reason.MAX_FILES, totalFiles, maxFiles);
84-
}
85-
86-
if (totalSize > maxTotalSize) {
87-
return new UnreserveAndError(data, Reason.MAX_SIZE, totalSize, maxTotalSize);
88-
}
89-
90-
if (totalUnMergeable > 0) {
91-
return new UnreserveAndError(data, totalUnMergeable);
9274
}
9375
}
9476

0 commit comments

Comments
 (0)