diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java index 8545cd38982..739fc3ee706 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.core.client.admin; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Map; @@ -357,4 +358,14 @@ boolean testClassLoad(final String className, final String asTypeName) * @since 2.1.0 */ InstanceId getInstanceId(); + + /** + * Return the current manager time. This duration represents the amount of time an accumulo + * manager process has been running. The duration is persisted and should only increase over the + * lifetime of an Accumulo instance. + * + * @return current time + * @since 4.0.0 + */ + Duration getManagerTime() throws AccumuloException, AccumuloSecurityException; } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java index 7c22b7d8a87..fb2c5878f85 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java @@ -470,6 +470,12 @@ public InstanceId getInstanceId() { return context.getInstanceID(); } + @Override + public Duration getManagerTime() throws AccumuloException, AccumuloSecurityException { + return Duration.ofNanos(ThriftClientTypes.MANAGER.execute(context, + client -> client.getManagerTimeNanos(TraceUtil.traceInfo(), context.rpcCreds()))); + } + @Override public ServerId getServer(ServerId.Type type, String resourceGroup, String host, int port) { Objects.requireNonNull(type, "type parameter cannot be null"); diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 15c3e728710..dac27692130 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -401,6 +401,11 @@ public enum Property { + " are performed (e.g. Bulk Import). This property specifies the maximum number of threads in a" + " ThreadPool in the Manager that will be used to request these refresh operations.", "4.0.0"), + MANAGER_TABLET_MERGEABILITY_INTERVAL("manager.tablet.mergeability.interval", "24h", + PropertyType.TIMEDURATION, + "Time to wait between scanning tables to identify ranges of tablets that can be " + + " auto-merged. Valid ranges will be have merge fate ops submitted.", + "4.0.0"), MANAGER_BULK_TIMEOUT("manager.bulk.timeout", "5m", PropertyType.TIMEDURATION, "The time to wait for a tablet server to process a bulk import request.", "1.4.3"), MANAGER_RENAME_THREADS("manager.rename.threadpool.size", "20", PropertyType.COUNT, @@ -904,6 +909,8 @@ public enum Property { TABLE_ONDEMAND_UNLOADER("tserver.ondemand.tablet.unloader", "org.apache.accumulo.core.spi.ondemand.DefaultOnDemandTabletUnloader", PropertyType.CLASSNAME, "The class that will be used to determine which on-demand Tablets to unload.", "4.0.0"), + TABLE_MAX_MERGEABILITY_THRESHOLD("table.mergeability.threshold", ".25", PropertyType.FRACTION, + "A tablet is mergeable until it reaches this percentage of the split threshold.", "4.0.0"), // Crypto-related properties @Experimental diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index dd521e3a0c7..28a92d4204b 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -106,6 +106,7 @@ public enum FateOperation { NAMESPACE_RENAME(TFateOperation.NAMESPACE_RENAME), SHUTDOWN_TSERVER(null), SYSTEM_SPLIT(null), + SYSTEM_MERGE(null), TABLE_BULK_IMPORT2(TFateOperation.TABLE_BULK_IMPORT2), TABLE_CANCEL_COMPACT(TFateOperation.TABLE_CANCEL_COMPACT), TABLE_CLONE(TFateOperation.TABLE_CLONE), @@ -124,7 +125,7 @@ public enum FateOperation { private final TFateOperation top; private static final EnumSet nonThriftOps = - EnumSet.of(COMMIT_COMPACTION, SHUTDOWN_TSERVER, SYSTEM_SPLIT); + EnumSet.of(COMMIT_COMPACTION, SHUTDOWN_TSERVER, SYSTEM_SPLIT, SYSTEM_MERGE); FateOperation(TFateOperation top) { this.top = top; diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java index 8942149a6ff..017ac7955c9 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java @@ -119,8 +119,12 @@ public static FateKey forCompactionCommit(ExternalCompactionId compactionId) { return new FateKey(FateKeyType.COMPACTION_COMMIT, compactionId); } + public static FateKey forMerge(KeyExtent extent) { + return new FateKey(FateKeyType.MERGE, extent); + } + public enum FateKeyType { - SPLIT, COMPACTION_COMMIT + SPLIT, COMPACTION_COMMIT, MERGE } private static byte[] serialize(FateKeyType type, KeyExtent ke) { @@ -151,6 +155,7 @@ private static Optional deserializeKeyExtent(FateKeyType type, DataIn throws IOException { switch (type) { case SPLIT: + case MERGE: return Optional.of(KeyExtent.readFrom(buffer)); case COMPACTION_COMMIT: return Optional.empty(); @@ -163,6 +168,7 @@ private static Optional deserializeCompactionId(FateKeyTyp DataInputBuffer buffer) throws IOException { switch (type) { case SPLIT: + case MERGE: return Optional.empty(); case COMPACTION_COMMIT: return Optional.of(ExternalCompactionId.of(buffer.readUTF())); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java index 4efb6025204..0dac7412cf2 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java @@ -258,7 +258,8 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, TFat manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>(new CreateTable(c.getPrincipal(), tableName, timeType, options, splitsPath, splitCount, splitsDirsPath, initialTableState, - initialTabletAvailability, namespaceId, TabletMergeability.never())), + // Set the default tablet to be auto-mergeable with other tablets if it is split + initialTabletAvailability, namespaceId, TabletMergeability.always())), autoCleanup, goalMessage); break; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index b38fa024e76..1edb63e23b6 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -122,6 +122,7 @@ import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.core.zookeeper.ZcStat; import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator; +import org.apache.accumulo.manager.merge.FindMergeableRangeTask; import org.apache.accumulo.manager.metrics.BalancerMetrics; import org.apache.accumulo.manager.metrics.ManagerMetrics; import org.apache.accumulo.manager.recovery.RecoveryManager; @@ -1309,6 +1310,13 @@ boolean canSuspendTablets() { ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor() .scheduleWithFixedDelay(() -> ScanServerMetadataEntries.clean(context), 10, 10, MINUTES)); + // TODO - create new threadpool? + var tabletMergeabilityInterval = + getConfiguration().getDuration(Property.MANAGER_TABLET_MERGEABILITY_INTERVAL); + ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay( + new FindMergeableRangeTask(this), tabletMergeabilityInterval.toMillis(), + tabletMergeabilityInterval.toMillis(), MILLISECONDS)); + // Make sure that we have a secret key (either a new one or an old one from ZK) before we start // the manager client service. Thread authenticationTokenKeyManagerThread = null; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java b/server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java new file mode 100644 index 00000000000..852f300a44a --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.merge; + +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGEABILITY; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; +import static org.apache.accumulo.manager.merge.FindMergeableRangeTask.UnmergeableReason.MAX_FILE_COUNT; +import static org.apache.accumulo.manager.merge.FindMergeableRangeTask.UnmergeableReason.MAX_TOTAL_SIZE; +import static org.apache.accumulo.manager.merge.FindMergeableRangeTask.UnmergeableReason.NOT_CONTIGUOUS; +import static org.apache.accumulo.manager.merge.FindMergeableRangeTask.UnmergeableReason.TABLET_MERGEABILITY; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; + +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.Fate.FateOperation; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metadata.schema.filters.TabletMetadataFilter; +import org.apache.accumulo.core.util.time.SteadyTime; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.TraceRepo; +import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation; +import org.apache.accumulo.manager.tableOps.merge.TableRangeOp; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; + +public class FindMergeableRangeTask implements Runnable { + + private static final Logger log = LoggerFactory.getLogger(FindMergeableRangeTask.class); + + private static final TabletMergeabilityFilter FILTER = new TabletMergeabilityFilter(); + + private final Manager manager; + + public FindMergeableRangeTask(Manager manager) { + this.manager = Objects.requireNonNull(manager); + log.debug("Creating FindMergeableRangeTask"); + } + + @Override + public void run() { + var context = manager.getContext(); + Map tables = context.getTableIdToNameMap(); + + log.debug("Starting FindMergeableRangeTask"); + + for (Entry table : tables.entrySet()) { + TableId tableId = table.getKey(); + String tableName = table.getValue(); + + long threshold = + context.getTableConfiguration(tableId).getAsBytes(Property.TABLE_SPLIT_THRESHOLD); + double mergeabilityThreshold = context.getTableConfiguration(tableId) + .getFraction(Property.TABLE_MAX_MERGEABILITY_THRESHOLD); + if (mergeabilityThreshold <= 0) { + log.trace( + "Skipping FindMergeableRangeTask for table {}, TABLE_MAX_MERGEABILITY_THRESHOLD is set to {}", + tableName, mergeabilityThreshold); + continue; + } + + long maxFileCount = + context.getTableConfiguration(tableId).getCount(Property.TABLE_MERGE_FILE_MAX); + long maxTotalSize = (long) (threshold * mergeabilityThreshold); + + log.debug("Checking {} for tablets that can be merged", tableName); + log.debug("maxFileCount: {}, maxTotalSize:{}, splitThreshold:{}, mergeabilityThreshold:{}", + maxFileCount, maxTotalSize, threshold, mergeabilityThreshold); + try { + NamespaceId namespaceId = context.getNamespaceId(tableId); + var type = FateInstanceType.fromTableId(tableId); + + try (var tablets = context.getAmple().readTablets().forTable(tableId) + .fetch(PREV_ROW, FILES, MERGEABILITY).filter(FILTER).build()) { + + final MergeableRange current = + new MergeableRange(tableId, manager.getSteadyTime(), maxFileCount, maxTotalSize); + + for (var tm : tablets) { + log.trace("Checking tablet {}, {}", tm.getExtent(), tm.getTabletMergeability()); + // If there was an error adding the next tablet to the range then + // the existing range is complete as we can't add more tablets so + // submit a merge fate op and reset to find more merge ranges + current.add(tm).ifPresent(e -> { + submit(current, type, table, namespaceId); + current.resetAndAdd(tm); + }); + } + + // Try and submit an outstanding mergeable tablets + submit(current, type, table, namespaceId); + } + + } catch (Exception e) { + log.error("Failed to generate system merges for {}", tableName, e); + } + } + + } + + void submit(MergeableRange range, FateInstanceType type, Entry table, + NamespaceId namespaceId) { + if (range.tabletCount < 2) { + return; + } + + TableId tableId = table.getKey(); + String tableName = table.getValue(); + + log.debug("Table {} found {} tablets that can be merged for table", tableName, + range.tabletCount); + + final Text startRow = range.startRow != null ? range.startRow : new Text(""); + final Text endRow = range.endRow != null ? range.endRow : new Text(""); + + final String startRowStr = StringUtils.defaultIfBlank(startRow.toString(), "-inf"); + final String endRowStr = StringUtils.defaultIfBlank(endRow.toString(), "+inf"); + log.debug("FindMergeableRangeTask: Creating merge op: {} from startRow: {} to endRow: {}", + tableId, startRowStr, endRowStr); + var fateKey = FateKey.forMerge(new KeyExtent(tableId, range.endRow, range.startRow)); + + manager.fate(type).seedTransaction(FateOperation.SYSTEM_MERGE, fateKey, + new TraceRepo<>( + new TableRangeOp(Operation.SYSTEM_MERGE, namespaceId, tableId, startRow, endRow)), + true); + } + + public enum UnmergeableReason { + NOT_CONTIGUOUS, MAX_FILE_COUNT, MAX_TOTAL_SIZE, TABLET_MERGEABILITY; + + // Cache the Optional() reason objects as we will re-use these over and over + private static final Optional NOT_CONTIGUOUS_OPT = + Optional.of(NOT_CONTIGUOUS); + private static final Optional MAX_FILE_COUNT_OPT = + Optional.of(MAX_FILE_COUNT); + private static final Optional MAX_TOTAL_SIZE_OPT = + Optional.of(MAX_TOTAL_SIZE); + private static final Optional TABLET_MERGEABILITY_OPT = + Optional.of(TABLET_MERGEABILITY); + + public Optional optional() { + switch (this) { + case NOT_CONTIGUOUS: + return NOT_CONTIGUOUS_OPT; + case MAX_FILE_COUNT: + return MAX_FILE_COUNT_OPT; + case MAX_TOTAL_SIZE: + return MAX_TOTAL_SIZE_OPT; + case TABLET_MERGEABILITY: + return TABLET_MERGEABILITY_OPT; + default: + throw new IllegalArgumentException("Unexpected enum type"); + } + } + } + + public static class MergeableRange { + final SteadyTime currentTime; + final TableId tableId; + final long maxFileCount; + final long maxTotalSize; + + Text startRow; + Text endRow; + int tabletCount; + long totalFileCount; + long totalFileSize; + + public MergeableRange(TableId tableId, SteadyTime currentTime, long maxFileCount, + long maxTotalSize) { + this.tableId = tableId; + this.currentTime = currentTime; + this.maxFileCount = maxFileCount; + this.maxTotalSize = maxTotalSize; + } + + public Optional add(TabletMetadata tm) { + var failure = validate(tm); + if (failure.isEmpty()) { + tabletCount++; + log.trace("Adding tablet {} to MergeableRange", tm.getExtent()); + if (tabletCount == 1) { + startRow = tm.getPrevEndRow(); + } + endRow = tm.getEndRow(); + totalFileCount += tm.getFiles().size(); + totalFileSize += tm.getFileSize(); + } + return failure; + } + + private Optional validate(TabletMetadata tm) { + Preconditions.checkArgument(tableId.equals(tm.getTableId()), "Unexpected tableId seen %s", + tm.getTableId()); + + if (tabletCount > 0) { + // This is at least the second tablet seen so there should not be a null prevEndRow + Preconditions.checkState(tm.getPrevEndRow() != null, + "Unexpected null prevEndRow found for %s", tm.getExtent()); + // If this is not the first tablet, then verify its prevEndRow matches + // the last endRow tracked, the server filter will skip tablets marked as never + if (!tm.getPrevEndRow().equals(endRow)) { + return NOT_CONTIGUOUS.optional(); + } + } + + if (!tm.getTabletMergeability().isMergeable(currentTime)) { + return TABLET_MERGEABILITY.optional(); + } + + if (totalFileCount + tm.getFiles().size() > maxFileCount) { + return MAX_FILE_COUNT.optional(); + } + + if (totalFileSize + tm.getFileSize() > maxTotalSize) { + return MAX_TOTAL_SIZE.optional(); + } + + return Optional.empty(); + } + + void resetAndAdd(TabletMetadata tm) { + reset(); + add(tm); + } + + void reset() { + startRow = null; + endRow = null; + tabletCount = 0; + totalFileCount = 0; + totalFileSize = 0; + } + } + + // Filter out never merge tablets to cut down on what we need to check + // We need steady time to check other tablets which is not available in the filter + public static class TabletMergeabilityFilter extends TabletMetadataFilter { + + public static final Set COLUMNS = Sets.immutableEnumSet(MERGEABILITY); + + private final static Predicate IS_NOT_NEVER = + tabletMetadata -> !tabletMetadata.getTabletMergeability().isNever(); + + @Override + public Set getColumns() { + return COLUMNS; + } + + @Override + protected Predicate acceptTablet() { + return IS_NOT_NEVER; + } + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java index 4083eecc864..f108419a513 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java @@ -19,6 +19,7 @@ package org.apache.accumulo.manager.tableOps.merge; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; +import static org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation.SYSTEM_MERGE; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.fate.FateId; @@ -28,6 +29,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + public class CountFiles extends ManagerRepo { private static final Logger log = LoggerFactory.getLogger(CountFiles.class); private static final long serialVersionUID = 1L; @@ -39,6 +42,9 @@ public CountFiles(MergeInfo mergeInfo) { @Override public Repo call(FateId fateId, Manager env) throws Exception { + // SYSTEM_MERGE should be executing VerifyMergeability repo, which already + // will count files + Preconditions.checkState(data.op != SYSTEM_MERGE, "Unexpected op %s", SYSTEM_MERGE); var range = data.getReserveExtent(); @@ -75,10 +81,13 @@ public Repo call(FateId fateId, Manager env) throws Exception { if (totalFiles >= maxFiles) { return new UnreserveAndError(data, totalFiles, maxFiles); } else { - if (data.op == MergeInfo.Operation.MERGE) { - return new MergeTablets(data); - } else { - return new DeleteRows(data); + switch (data.op) { + case MERGE: + return new MergeTablets(data); + case DELETE: + return new DeleteRows(data); + default: + throw new IllegalStateException("Unknown op " + data.op); } } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeInfo.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeInfo.java index 0da5159b657..6b84e16e7f0 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeInfo.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeInfo.java @@ -19,6 +19,7 @@ package org.apache.accumulo.manager.tableOps.merge; import java.io.Serializable; +import java.util.Objects; import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; @@ -37,7 +38,11 @@ public class MergeInfo implements Serializable { private static final long serialVersionUID = 1L; public enum Operation { - MERGE, DELETE, + MERGE, SYSTEM_MERGE, DELETE; + + public boolean isMergeOp() { + return this == MERGE || this == SYSTEM_MERGE; + } } final TableId tableId; @@ -60,7 +65,7 @@ private MergeInfo(TableId tableId, NamespaceId namespaceId, byte[] startRow, byt this.namespaceId = namespaceId; this.startRow = startRow; this.endRow = endRow; - this.op = op; + this.op = Objects.requireNonNull(op); this.mergeRangeSet = mergeRange != null; if (mergeRange != null) { mergeStartRow = @@ -102,6 +107,7 @@ public KeyExtent getMergeExtent() { public KeyExtent getReserveExtent() { switch (op) { case MERGE: + case SYSTEM_MERGE: return getOriginalExtent(); case DELETE: { if (endRow == null) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java index ffaa6adcd25..9a8a4c3dc09 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java @@ -128,6 +128,14 @@ public long isReady(FateId fateId, Manager env) throws Exception { @Override public Repo call(FateId fateId, Manager environment) throws Exception { - return new CountFiles(data); + switch (data.op) { + case SYSTEM_MERGE: + return new VerifyMergeability(data); + case MERGE: + case DELETE: + return new CountFiles(data); + default: + throw new IllegalStateException("Unknown op " + data.op); + } } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java index ddbbce5b7e4..ea63940db6a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java @@ -57,8 +57,7 @@ public TableRangeOp(MergeInfo.Operation op, NamespaceId namespaceId, TableId tab @Override public Repo call(FateId fateId, Manager env) throws Exception { - if (AccumuloTable.ROOT.tableId().equals(data.tableId) - && MergeInfo.Operation.MERGE.equals(data.op)) { + if (AccumuloTable.ROOT.tableId().equals(data.tableId) && data.op.isMergeOp()) { log.warn("Attempt to merge tablets for {} does nothing. It is not splittable.", AccumuloTable.ROOT.tableName()); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/UnreserveSystemMerge.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/UnreserveSystemMerge.java new file mode 100644 index 00000000000..a7201a06ab9 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/UnreserveSystemMerge.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps.merge; + +import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; +import org.apache.accumulo.core.clientImpl.thrift.TableOperation; +import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.merge.FindMergeableRangeTask.UnmergeableReason; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UnreserveSystemMerge extends ManagerRepo { + + private static final long serialVersionUID = 1L; + private static final Logger log = LoggerFactory.getLogger(UnreserveSystemMerge.class); + private final MergeInfo mergeInfo; + private final long maxFileCount; + private final long maxTotalSize; + private final UnmergeableReason reason; + + public UnreserveSystemMerge(MergeInfo mergeInfo, UnmergeableReason reason, long maxFileCount, + long maxTotalSize) { + this.mergeInfo = mergeInfo; + this.reason = reason; + this.maxFileCount = maxFileCount; + this.maxTotalSize = maxTotalSize; + } + + @Override + public Repo call(FateId fateId, Manager environment) throws Exception { + FinishTableRangeOp.removeOperationIds(log, mergeInfo, fateId, environment); + throw new AcceptableThriftTableOperationException(mergeInfo.tableId.toString(), null, + mergeInfo.op.isMergeOp() ? TableOperation.MERGE : TableOperation.DELETE_RANGE, + TableOperationExceptionType.OTHER, formatReason()); + } + + public UnmergeableReason getReason() { + return reason; + } + + private String formatReason() { + switch (reason) { + case MAX_FILE_COUNT: + return "Aborted merge because it would produce a tablet with more files than the configured limit of " + + maxFileCount; + case MAX_TOTAL_SIZE: + return "Aborted merge because it would produce a tablet with a file size larger than the configured limit of " + + maxTotalSize; + // This state should not happen as VerifyMergeability repo checks consistency but adding it + // just in case + case TABLET_MERGEABILITY: + return "Aborted merge because one ore more tablets in the merge range are unmergeable."; + case NOT_CONTIGUOUS: + return "Aborted merge because the tablets in a range do not form a linked list."; + default: + throw new IllegalArgumentException("Unknown Reason"); + } + + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/VerifyMergeability.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/VerifyMergeability.java new file mode 100644 index 00000000000..43b6c1459eb --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/VerifyMergeability.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps.merge; + +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGEABILITY; + +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.merge.FindMergeableRangeTask.MergeableRange; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class VerifyMergeability extends ManagerRepo { + private static final Logger log = LoggerFactory.getLogger(VerifyMergeability.class); + private static final long serialVersionUID = 1L; + private final MergeInfo data; + + public VerifyMergeability(MergeInfo mergeInfo) { + this.data = mergeInfo; + Preconditions.checkArgument(data.op == Operation.SYSTEM_MERGE, "Must be a System Merge"); + } + + @Override + public Repo call(FateId fateId, Manager env) throws Exception { + + var range = data.getReserveExtent(); + + var currentTime = env.getSteadyTime(); + var context = env.getContext(); + var tableConf = context.getTableConfiguration(data.tableId); + var splitThreshold = tableConf.getAsBytes(Property.TABLE_SPLIT_THRESHOLD); + var maxMergeabilityThreshold = tableConf.getFraction(Property.TABLE_MAX_MERGEABILITY_THRESHOLD); + + // max percentage of split threshold + long maxTotalSize = (long) (splitThreshold * maxMergeabilityThreshold); + long maxFileCount = env.getContext().getTableConfiguration(data.getOriginalExtent().tableId()) + .getCount(Property.TABLE_MERGE_FILE_MAX); + + log.debug("Validating system merge for {} with range {}", fateId, range); + + final var mr = new MergeableRange(data.tableId, currentTime, maxFileCount, maxTotalSize); + try (var tablets = env.getContext().getAmple().readTablets().forTable(data.tableId) + .overlapping(range.prevEndRow(), range.endRow()).fetch(FILES, MERGEABILITY) + .checkConsistency().build()) { + + for (var tabletMetadata : tablets) { + var error = mr.add(tabletMetadata); + if (error.isPresent()) { + return new UnreserveSystemMerge(data, error.orElseThrow(), maxFileCount, maxTotalSize); + } + } + } + + return new MergeTablets(data); + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java index 2f5ec912fb7..8b4283f31db 100644 --- a/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java +++ b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java @@ -20,6 +20,9 @@ import static org.apache.accumulo.test.ample.metadata.TestAmple.testAmpleServerContext; +import java.time.Duration; + +import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.ample.metadata.TestAmple.TestServerAmpleImpl; @@ -35,4 +38,14 @@ public static Manager mockWithAmple(ServerContext context, TestServerAmpleImpl a return manager; } + public static Manager mockWithAmple(ServerContext context, TestServerAmpleImpl ample, + Duration currentTime) { + Manager manager = EasyMock.mock(Manager.class); + EasyMock.expect(manager.getContext()).andReturn(testAmpleServerContext(context, ample)) + .atLeastOnce(); + EasyMock.expect(manager.getSteadyTime()).andReturn(SteadyTime.from(currentTime)).anyTimes(); + EasyMock.replay(manager); + return manager; + } + } diff --git a/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java b/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java index 4d8f8c44fd4..4fb1a27d52a 100644 --- a/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java +++ b/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java @@ -187,12 +187,13 @@ public void createMetadataFromExisting(AccumuloClient client, TableId tableId, WholeRowIterator.decodeRow(entry.getKey(), entry.getValue()); Text row = decodedRow.firstKey().getRow(); Mutation m = new Mutation(row); - decodedRow.entrySet().stream().filter(e -> includeColumn.test(e.getKey(), e.getValue())) .forEach(e -> m.put(e.getKey().getColumnFamily(), e.getKey().getColumnQualifier(), e.getKey().getColumnVisibilityParsed(), e.getKey().getTimestamp(), e.getValue())); - bw.addMutation(m); + if (!m.getUpdates().isEmpty()) { + bw.addMutation(m); + } } } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java index 27de329921e..644a2909a31 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java @@ -31,6 +31,7 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -52,6 +53,7 @@ import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.core.fate.FateKey.FateKeyType; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.FateStore.FateTxStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus; @@ -619,7 +621,9 @@ protected void testConcurrent(FateStore store, ServerContext sctx) thro assertEquals(1, idsSeen); assertEquals(1, store.list(FateKey.FateKeyType.SPLIT).count()); - assertEquals(0, store.list(FateKey.FateKeyType.COMPACTION_COMMIT).count()); + // All other types should be a count of 0 + Arrays.stream(FateKeyType.values()).filter(t -> !t.equals(FateKey.FateKeyType.SPLIT)) + .forEach(t -> assertEquals(0, store.list(t).count())); for (var future : futures) { if (future.get().isPresent()) { @@ -632,8 +636,9 @@ protected void testConcurrent(FateStore store, ServerContext sctx) thro } } - assertEquals(0, store.list(FateKey.FateKeyType.SPLIT).count()); - assertEquals(0, store.list(FateKey.FateKeyType.COMPACTION_COMMIT).count()); + // All types should be a count of 0 + assertTrue( + Arrays.stream(FateKeyType.values()).allMatch(t -> store.list(t).findAny().isEmpty())); } finally { executor.shutdown(); @@ -676,6 +681,7 @@ protected void testListFateKeys(FateStore store, ServerContext sctx) th TableId tid1 = TableId.of("test"); var extent1 = new KeyExtent(tid1, new Text("m"), null); var extent2 = new KeyExtent(tid1, null, new Text("m")); + var extent3 = new KeyExtent(tid1, new Text("z"), new Text("m")); var fateKey1 = FateKey.forSplit(extent1); var fateKey2 = FateKey.forSplit(extent2); @@ -687,8 +693,12 @@ protected void testListFateKeys(FateStore store, ServerContext sctx) th var fateKey3 = FateKey.forCompactionCommit(cid1); var fateKey4 = FateKey.forCompactionCommit(cid2); + // use one overlapping extent and one different + var fateKey5 = FateKey.forMerge(extent1); + var fateKey6 = FateKey.forMerge(extent3); + Map fateKeyIds = new HashMap<>(); - for (FateKey fateKey : List.of(fateKey1, fateKey2, fateKey3, fateKey4)) { + for (FateKey fateKey : List.of(fateKey1, fateKey2, fateKey3, fateKey4, fateKey5, fateKey6)) { var fateId = seedTransaction(store, TEST_FATE_OP, fateKey, new TestRepo(), true).orElseThrow(); fateKeyIds.put(fateKey, fateId); @@ -698,10 +708,10 @@ protected void testListFateKeys(FateStore store, ServerContext sctx) th allIds.addAll(fateKeyIds.values()); allIds.add(id1); assertEquals(allIds, store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet())); - assertEquals(5, allIds.size()); + assertEquals(7, allIds.size()); - assertEquals(4, fateKeyIds.size()); - assertEquals(4, fateKeyIds.values().stream().distinct().count()); + assertEquals(6, fateKeyIds.size()); + assertEquals(6, fateKeyIds.values().stream().distinct().count()); HashSet seenExtents = new HashSet<>(); store.list(FateKey.FateKeyType.SPLIT).forEach(fateKey -> { @@ -709,9 +719,18 @@ protected void testListFateKeys(FateStore store, ServerContext sctx) th assertNotNull(fateKeyIds.remove(fateKey)); assertTrue(seenExtents.add(fateKey.getKeyExtent().orElseThrow())); }); + assertEquals(4, fateKeyIds.size()); + assertEquals(Set.of(extent1, extent2), seenExtents); + // clear set as one overlaps + seenExtents.clear(); + store.list(FateKeyType.MERGE).forEach(fateKey -> { + assertEquals(FateKey.FateKeyType.MERGE, fateKey.getType()); + assertNotNull(fateKeyIds.remove(fateKey)); + assertTrue(seenExtents.add(fateKey.getKeyExtent().orElseThrow())); + }); assertEquals(2, fateKeyIds.size()); - assertEquals(Set.of(extent1, extent2), seenExtents); + assertEquals(Set.of(extent1, extent3), seenExtents); HashSet seenCids = new HashSet<>(); store.list(FateKey.FateKeyType.COMPACTION_COMMIT).forEach(fateKey -> { @@ -722,6 +741,7 @@ protected void testListFateKeys(FateStore store, ServerContext sctx) th assertEquals(0, fateKeyIds.size()); assertEquals(Set.of(cid1, cid2), seenCids); + // Cleanup so we don't interfere with other tests store.list() .forEach(fateIdStatus -> store.tryReserve(fateIdStatus.getFateId()).orElseThrow().delete()); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java index 617eba0ba16..59548cc7384 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java @@ -25,18 +25,22 @@ import static org.apache.accumulo.test.ample.metadata.TestAmple.not; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.assertNoCompactionMetadata; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.nio.file.Path; +import java.time.Duration; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -45,6 +49,7 @@ import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletMergeability; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.TabletMergeabilityUtil; import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException; @@ -59,6 +64,7 @@ import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.Ample.TabletsMutator; +import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SplitColumnFamily; import org.apache.accumulo.core.metadata.schema.SelectedFiles; import org.apache.accumulo.core.metadata.schema.TabletMetadata; @@ -70,6 +76,7 @@ import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.merge.FindMergeableRangeTask.UnmergeableReason; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.manager.tableOps.compact.CompactionDriver; import org.apache.accumulo.manager.tableOps.merge.DeleteRows; @@ -77,10 +84,13 @@ import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation; import org.apache.accumulo.manager.tableOps.merge.MergeTablets; import org.apache.accumulo.manager.tableOps.merge.ReserveTablets; +import org.apache.accumulo.manager.tableOps.merge.UnreserveSystemMerge; +import org.apache.accumulo.manager.tableOps.merge.VerifyMergeability; import org.apache.accumulo.manager.tableOps.split.AllocateDirsAndEnsureOnline; import org.apache.accumulo.manager.tableOps.split.FindSplits; import org.apache.accumulo.manager.tableOps.split.PreSplit; import org.apache.accumulo.manager.tableOps.split.SplitInfo; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.LargeSplitRowIT; import org.apache.accumulo.test.ample.metadata.TestAmple; import org.apache.accumulo.test.ample.metadata.TestAmple.TestServerAmpleImpl; @@ -107,7 +117,7 @@ public static void teardown() { } @ParameterizedTest - @EnumSource(MergeInfo.Operation.class) + @EnumSource(value = MergeInfo.Operation.class, names = {"MERGE", "DELETE"}) public void testNoWalsMergeRepos(MergeInfo.Operation operation) throws Exception { String[] tableNames = getUniqueNames(2); String metadataTable = tableNames[0] + operation; @@ -163,6 +173,108 @@ public void testNoWalsMergeRepos(MergeInfo.Operation operation) throws Exception } } + @Test + public void testVerifyMergeability() throws Exception { + String[] tableNames = getUniqueNames(2); + String metadataTable = tableNames[0]; + String userTable = tableNames[1]; + + try (ClientContext client = + (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { + + SortedMap splits = new TreeMap<>(); + splits.put(new Text("a"), TabletMergeability.always()); + splits.put(new Text("b"), TabletMergeability.always()); + splits.put(new Text("c"), TabletMergeability.never()); + splits.put(new Text("d"), TabletMergeability.after(Duration.ofDays(2))); + splits.put(new Text("e"), TabletMergeability.always()); + + client.tableOperations().create(userTable, + new NewTableConfiguration().setProperties(Map.of(Property.TABLE_SPLIT_THRESHOLD.getKey(), + "10K", Property.TABLE_MAJC_RATIO.getKey(), "9999", + Property.TABLE_MERGE_FILE_MAX.getKey(), "10")).withSplits(splits)); + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(userTable)); + + // Set up Test ample and manager + TestAmple.createMetadataTable(client, metadataTable); + TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple + .create(getCluster().getServerContext(), Map.of(DataLevel.USER, metadataTable)); + testAmple.createMetadataFromExisting(client, tableId); + Manager manager = + mockWithAmple(getCluster().getServerContext(), testAmple, Duration.ofDays(1)); + + // Create a test fate id + var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + + // Tablet c is set to never merge + MergeInfo mergeInfo = new MergeInfo(tableId, manager.getContext().getNamespaceId(tableId), + null, new Text("c").getBytes(), Operation.SYSTEM_MERGE); + var repo = new VerifyMergeability(mergeInfo).call(fateId, manager); + assertInstanceOf(UnreserveSystemMerge.class, repo); + assertEquals(UnmergeableReason.TABLET_MERGEABILITY, + ((UnreserveSystemMerge) repo).getReason()); + + // Tablets a and b are always merge + mergeInfo = new MergeInfo(tableId, manager.getContext().getNamespaceId(tableId), null, + new Text("b").getBytes(), Operation.SYSTEM_MERGE); + assertInstanceOf(MergeTablets.class, new VerifyMergeability(mergeInfo).call(fateId, manager)); + + var context = manager.getContext(); + + // split threshold is 10k so default max merge size is 2500 bytes. + // this adds 6 files of 450 each which puts the tablets over teh 2500 threshold + addFileMetadata(context, tableId, null, new Text("c"), 3, 450); + + // Data written to the first two tablets totals 2700 bytes and is too large + repo = new VerifyMergeability(mergeInfo).call(fateId, manager); + assertInstanceOf(UnreserveSystemMerge.class, repo); + assertEquals(UnmergeableReason.MAX_TOTAL_SIZE, ((UnreserveSystemMerge) repo).getReason()); + + // Not enough time has passed for Tablet, should be able to merge d and e + mergeInfo = new MergeInfo(tableId, manager.getContext().getNamespaceId(tableId), + new Text("c").getBytes(), new Text("e").getBytes(), Operation.SYSTEM_MERGE); + repo = new VerifyMergeability(mergeInfo).call(fateId, manager); + assertInstanceOf(UnreserveSystemMerge.class, repo); + assertEquals(UnmergeableReason.TABLET_MERGEABILITY, + ((UnreserveSystemMerge) repo).getReason()); + + // update time to 3 days so enough time has passed + manager = mockWithAmple(getCluster().getServerContext(), testAmple, Duration.ofDays(3)); + assertInstanceOf(MergeTablets.class, new VerifyMergeability(mergeInfo).call(fateId, manager)); + + // last 3 tablets should total 9 files which is < max of 10 + mergeInfo = new MergeInfo(tableId, manager.getContext().getNamespaceId(tableId), + new Text("c").getBytes(), null, Operation.SYSTEM_MERGE); + addFileMetadata(context, tableId, new Text("c"), null, 3, 10); + assertInstanceOf(MergeTablets.class, new VerifyMergeability(mergeInfo).call(fateId, manager)); + + // last 3 tablets should total 12 files which is > max of 10 + addFileMetadata(context, tableId, new Text("c"), null, 4, 10); + repo = new VerifyMergeability(mergeInfo).call(fateId, manager); + assertInstanceOf(UnreserveSystemMerge.class, repo); + assertEquals(UnmergeableReason.MAX_FILE_COUNT, ((UnreserveSystemMerge) repo).getReason()); + } + } + + private void addFileMetadata(ServerContext context, TableId tableId, Text start, Text end, + int numFiles, int fileSize) { + try ( + var tablets = + context.getAmple().readTablets().forTable(tableId).overlapping(start, end).build(); + var tabletsMutator = context.getAmple().mutateTablets()) { + for (var tabletMeta : tablets) { + var tabletMutator = tabletsMutator.mutateTablet(tabletMeta.getExtent()); + for (int i = 0; i < numFiles; i++) { + StoredTabletFile f = StoredTabletFile.of(new org.apache.hadoop.fs.Path( + "file:///accumulo/tables/1/" + tabletMeta.getDirName() + "/F" + i + ".rf")); + DataFileValue dfv = new DataFileValue(fileSize, 100); + tabletMutator.putFile(f, dfv); + } + tabletMutator.mutate(); + } + } + } + @Test public void testSplitOffline() throws Exception { String[] tableNames = getUniqueNames(2); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java index e97c239e952..5a72053d93c 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java @@ -53,7 +53,6 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.harness.SharedMiniClusterBase; @@ -127,12 +126,7 @@ public void addSplitTest() throws Exception { verifyData(c, tableName, 2L); TableId id = TableId.of(c.tableOperations().tableIdMap().get(tableName)); - try (TabletsMetadata tm = - getCluster().getServerContext().getAmple().readTablets().forTable(id).build()) { - // Default for user created tablets should be mergeability set to NEVER - tm.stream().forEach(tablet -> assertEquals(TabletMergeabilityMetadata.never(), - tablet.getTabletMergeability())); - } + verifySplits(id, TabletMergeabilityUtil.userDefaultSplits(splits)); } } @@ -360,9 +354,9 @@ private void verifySplits(TableId id, SortedMap splits) getCluster().getServerContext().getAmple().readTablets().forTable(id).build()) { tm.stream().forEach(t -> { var split = t.getEndRow(); - // default tablet should be set to never + // default tablet should be set to always if (split == null) { - assertEquals(TabletMergeability.never(), + assertEquals(TabletMergeability.always(), t.getTabletMergeability().getTabletMergeability()); } else { assertTrue(addedSplits.remove(split)); @@ -381,9 +375,9 @@ private void verifySplitsWithApi(AccumuloClient c, String tableName, c.tableOperations().getTabletInformation(tableName, new Range()).forEach(ti -> { var tmInfo = ti.getTabletMergeabilityInfo(); var split = ti.getTabletId().getEndRow(); - // default tablet should always be set to never + // default tablet should always be set to always if (split == null) { - assertEquals(TabletMergeability.never(), + assertEquals(TabletMergeability.always(), ti.getTabletMergeabilityInfo().getTabletMergeability()); } else { assertTrue(addedSplits.remove(split)); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CreateInitialSplitsIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CreateInitialSplitsIT.java index bc3176258ab..40cf9b0f512 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CreateInitialSplitsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CreateInitialSplitsIT.java @@ -139,10 +139,10 @@ public void testCreateInitialSplitsWithMergeability() throws TableExistsExceptio var tableId = getCluster().getServerContext().getTableId(tableName); try (var tablets = getCluster().getServerContext().getAmple().readTablets().forTable(tableId).build()) { - // default tablet (null end row) should have a default TabletMergeability of never for user + // default tablet (null end row) should have a default TabletMergeability of always for user // created tablets assertTrue(tablets.stream() - .anyMatch(tm -> tm.getEndRow() == null && tm.getTabletMergeability().isNever())); + .anyMatch(tm -> tm.getEndRow() == null && tm.getTabletMergeability().isAlways())); // other splits should be created with a duration of 10 seconds assertEquals(10, tablets.stream().filter(tm -> tm.getTabletMergeability().getDelay() .map(delay -> delay.equals(splitDuration)).orElse(false)).count()); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java index a985d3969a6..661afa74b22 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java @@ -262,11 +262,9 @@ public void tabletShouldSplit() throws Exception { } if (TabletColumnFamily.MERGEABILITY_COLUMN.getColumnQualifier() .equals(entry.getKey().getColumnQualifier())) { - // Default tablet should be set to NEVER, all newly generated system splits should be + // Default tablet should be set to ALWAYS, all newly generated system splits should be // set to ALWAYS - var mergeability = - extent.endRow() == null ? TabletMergeability.never() : TabletMergeability.always(); - assertEquals(mergeability, + assertEquals(TabletMergeability.always(), TabletMergeabilityMetadata.fromValue(entry.getValue()).getTabletMergeability()); } count++; diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletMergeabilityIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletMergeabilityIT.java new file mode 100644 index 00000000000..fa73da21072 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletMergeabilityIT.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.apache.accumulo.test.TestIngest.generateRow; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.countTablets; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.client.admin.TabletMergeability; +import org.apache.accumulo.core.clientImpl.TabletMergeabilityUtil; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.VerifyIngest; +import org.apache.accumulo.test.VerifyIngest.VerifyParams; +import org.apache.accumulo.test.util.Wait; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class TabletMergeabilityIT extends SharedMiniClusterBase { + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(5); + } + + @BeforeAll + public static void setup() throws Exception { + SharedMiniClusterBase.startMiniClusterWithConfig(new Callback()); + } + + @AfterAll + public static void teardown() { + SharedMiniClusterBase.stopMiniCluster(); + } + + private static class Callback implements MiniClusterConfigurationCallback { + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + // Configure a short period of time to run the auto merge thread for testing + cfg.setProperty(Property.MANAGER_TABLET_MERGEABILITY_INTERVAL, "3s"); + } + } + + @Test + public void testMergeabilityAll() throws Exception { + String tableName = getUniqueNames(1)[0]; + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + c.tableOperations().create(tableName); + var tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + + TreeSet splits = new TreeSet<>(); + splits.add(new Text(String.format("%09d", 333))); + splits.add(new Text(String.format("%09d", 666))); + splits.add(new Text(String.format("%09d", 999))); + + // create splits with mergeabilty disabled so the task does not merge them away + // The default tablet is always mergeable, but it is currently the only one that is mergeable, + // so nothing will merge + c.tableOperations().putSplits(tableName, TabletMergeabilityUtil.userDefaultSplits(splits)); + Wait.waitFor(() -> countTablets(getCluster().getServerContext(), tableName, tm -> true) == 4, + 5000, 200); + + // update to always mergeable so the task can now merge tablets + c.tableOperations().putSplits(tableName, TabletMergeabilityUtil.systemDefaultSplits(splits)); + + // Wait for merge, we should have one tablet + Wait.waitFor(() -> hasExactTablets(getCluster().getServerContext(), tableId, + Set.of(new KeyExtent(tableId, null, null))), 10000, 200); + + } + } + + @Test + public void testMergeabilityMultipleRanges() throws Exception { + String tableName = getUniqueNames(1)[0]; + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + c.tableOperations().create(tableName); + var tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + + SortedMap splits = new TreeMap<>(); + splits.put(new Text(String.format("%09d", 333)), TabletMergeability.never()); + splits.put(new Text(String.format("%09d", 555)), TabletMergeability.never()); + splits.put(new Text(String.format("%09d", 666)), TabletMergeability.never()); + splits.put(new Text(String.format("%09d", 999)), TabletMergeability.never()); + + c.tableOperations().putSplits(tableName, splits); + Wait.waitFor(() -> countTablets(getCluster().getServerContext(), tableName, tm -> true) == 5, + 5000, 500); + + splits.put(new Text(String.format("%09d", 333)), TabletMergeability.always()); + splits.put(new Text(String.format("%09d", 555)), TabletMergeability.always()); + // Keep tablet 666 as never, this should cause two fate jobs for merging + splits.put(new Text(String.format("%09d", 999)), TabletMergeability.always()); + c.tableOperations().putSplits(tableName, splits); + + // Wait for merge, we should have 3 tablets + // 333 and 555 should be merged into 555 + // 666 + // 999 and default merged into default + Wait.waitFor(() -> hasExactTablets(getCluster().getServerContext(), tableId, + Set.of(new KeyExtent(tableId, new Text(String.format("%09d", 555)), null), + new KeyExtent(tableId, new Text(String.format("%09d", 666)), + new Text(String.format("%09d", 555))), + new KeyExtent(tableId, null, new Text(String.format("%09d", 666))))), + 10000, 200); + + } + } + + @Test + public void testMergeabilityThresholdMultipleRanges() throws Exception { + String tableName = getUniqueNames(1)[0]; + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + Map props = new HashMap<>(); + props.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "32K"); + props.put(Property.TABLE_MAX_MERGEABILITY_THRESHOLD.getKey(), ".5"); + c.tableOperations().create(tableName, new NewTableConfiguration() + .withInitialTabletAvailability(TabletAvailability.HOSTED).setProperties(props)); + var tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + + SortedMap splits = new TreeMap<>(); + // Create new tablets that won't merge automatically + for (int i = 10000; i <= 90000; i += 10000) { + splits.put(row(i), TabletMergeability.never()); + } + + c.tableOperations().putSplits(tableName, splits); + // Verify we now have 10 tablets + // [row_0000010000, row_0000020000, row_0000030000, row_0000040000, row_0000050000, + // row_0000060000, row_0000070000, row_0000080000, row_0000090000, default] + Wait.waitFor(() -> countTablets(getCluster().getServerContext(), tableName, tm -> true) == 10, + 5000, 500); + + // Insert rows into each tablet with different numbers of rows + // Tablets with end rows row_0000020000 - row_0000040000, row_0000060000 - row_0000080000, + // default will have 1000 rows + // Tablets with end rows row_0000010000, row_0000050000, row_0000090000 will have 5000 rows + try (BatchWriter bw = c.createBatchWriter(tableName)) { + final var value = StringUtils.repeat("a", 1024); + for (int i = 0; i < 100000; i += 10000) { + var rows = 1000; + if (i % 40000 == 0) { + rows = 5000; + } + for (int j = 0; j < rows; j++) { + Mutation m = new Mutation(row(i + j)); + m.put(new Text("cf1"), new Text("cq1"), new Value(value)); + bw.addMutation(m); + } + } + } + c.tableOperations().flush(tableName, null, null, true); + + // Set all 10 tablets to be auto-mergeable + for (int i = 10000; i <= 90000; i += 10000) { + splits.put(row(i), TabletMergeability.always()); + } + c.tableOperations().putSplits(tableName, splits); + + // With the mergeability threshold set to 50% of 32KB we should be able to merge together + // the tablets with 1000 rows, but not 5000 rows. This should produce the following + // 6 tablets after merger. + Wait.waitFor(() -> hasExactTablets(getCluster().getServerContext(), tableId, + Set.of(new KeyExtent(tableId, row(10000), null), + new KeyExtent(tableId, row(40000), row(10000)), + new KeyExtent(tableId, row(50000), row(40000)), + new KeyExtent(tableId, row(80000), row(50000)), + new KeyExtent(tableId, row(90000), row(80000)), + new KeyExtent(tableId, null, row(90000)))), + 10000, 200); + } + } + + @Test + public void testSplitAndMergeAll() throws Exception { + String tableName = getUniqueNames(1)[0]; + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + Map props = new HashMap<>(); + props.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "16K"); + props.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "1K"); + c.tableOperations().create(tableName, new NewTableConfiguration().setProperties(props) + .withInitialTabletAvailability(TabletAvailability.HOSTED)); + var tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + + // Ingest data so tablet will split + VerifyParams params = new VerifyParams(getClientProps(), tableName, 5_000); + TestIngest.ingest(c, params); + c.tableOperations().flush(tableName); + VerifyIngest.verifyIngest(c, params); + + // Wait for table to split, should be more than 10 tablets + Wait.waitFor(() -> c.tableOperations().listSplits(tableName).size() > 10, 30000, 200); + + // Delete all the data - We can't use deleteRows() as that would merge empty tablets + // Instead, we want the mergeability thread to merge so use a batch deleter and + // compact away the deleted data + var bd = c.createBatchDeleter(tableName, Authorizations.EMPTY, 1); + bd.setRanges(List.of(new Range())); + bd.delete(); + c.tableOperations().compact(tableName, new CompactionConfig().setFlush(true)); + + // Wait for merge back to default tablet + Wait.waitFor(() -> hasExactTablets(getCluster().getServerContext(), tableId, + Set.of(new KeyExtent(tableId, null, null))), 30000, 200); + } + } + + @Test + public void testMergeabilityThreshold() throws Exception { + String tableName = getUniqueNames(1)[0]; + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + Map props = new HashMap<>(); + props.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "16K"); + props.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "1K"); + // Set a low threshold to 1% of the split threshold + props.put(Property.TABLE_MAX_MERGEABILITY_THRESHOLD.getKey(), ".01"); + c.tableOperations().create(tableName, new NewTableConfiguration().setProperties(props) + .withInitialTabletAvailability(TabletAvailability.HOSTED)); + var tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + + // Ingest data so tablet will split + VerifyParams params = new VerifyParams(getClientProps(), tableName, 5_000); + TestIngest.ingest(c, params); + c.tableOperations().flush(tableName); + VerifyIngest.verifyIngest(c, params); + + // Wait for table to split, should be more than 10 tablets + Wait.waitFor(() -> c.tableOperations().listSplits(tableName).size() > 10, 10000, 200); + + // Set the split threshold back to the default of 5 MB. There's not a lot of data so normally + // we could merge back to 1 tablet, but the threshold is too low at 1% so it should not merge + // yet. + c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "5m"); + + // Should not merge so make sure it throws IllegalStateException + assertThrows(IllegalStateException.class, + () -> Wait.waitFor(() -> hasExactTablets(getCluster().getServerContext(), tableId, + Set.of(new KeyExtent(tableId, null, null))), 5000, 500)); + // Make sure we failed because of exact tablets and not a different IllegalStateException + assertFalse(hasExactTablets(getCluster().getServerContext(), tableId, + Set.of(new KeyExtent(tableId, null, null)))); + + // With a 10% threshold we should be able to merge + c.tableOperations().setProperty(tableName, Property.TABLE_MAX_MERGEABILITY_THRESHOLD.getKey(), + ".1"); + + // Wait for merge back to default tablet + Wait.waitFor(() -> hasExactTablets(getCluster().getServerContext(), tableId, + Set.of(new KeyExtent(tableId, null, null))), 10000, 200); + } + } + + @Test + public void testMergeAfter() throws Exception { + String tableName = getUniqueNames(1)[0]; + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + c.tableOperations().create(tableName); + var tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + + TreeSet splits = new TreeSet<>(); + splits.add(new Text(String.format("%09d", 333))); + splits.add(new Text(String.format("%09d", 666))); + splits.add(new Text(String.format("%09d", 999))); + + var delay = Duration.ofSeconds(5); + var startTime = c.instanceOperations().getManagerTime(); + c.tableOperations().putSplits(tableName, TabletMergeabilityUtil.splitsWithDefault(splits, + TabletMergeability.after(Duration.ofSeconds(5)))); + + Wait.waitFor(() -> countTablets(getCluster().getServerContext(), tableName, tm -> true) == 4, + 5000, 200); + + // Wait for merge back to default tablet + Wait.waitFor(() -> hasExactTablets(getCluster().getServerContext(), tableId, + Set.of(new KeyExtent(tableId, null, null))), 10000, 200); + + var elapsed = c.instanceOperations().getManagerTime().minus(startTime); + assertTrue(elapsed.compareTo(delay) > 0); + } + } + + @Test + public void testMergeabilityMaxFiles() throws Exception { + String tableName = getUniqueNames(1)[0]; + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + Map props = new HashMap<>(); + // disable compactions and set a low merge file max + props.put(Property.TABLE_MAJC_RATIO.getKey(), "9999"); + props.put(Property.TABLE_MERGE_FILE_MAX.getKey(), "3"); + c.tableOperations().create(tableName, new NewTableConfiguration().setProperties(props) + .withInitialTabletAvailability(TabletAvailability.HOSTED)); + var tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + + // Create new tablets that won't merge automatically + SortedMap splits = new TreeMap<>(); + for (int i = 500; i < 5000; i += 500) { + splits.put(row(i), TabletMergeability.never()); + } + c.tableOperations().putSplits(tableName, splits); + + // Verify we now have 10 tablets + Wait.waitFor(() -> countTablets(getCluster().getServerContext(), tableName, tm -> true) == 10, + 5000, 500); + + // Ingest data so tablet will split, each tablet will have several files because + // of the flush setting + VerifyParams params = new VerifyParams(getClientProps(), tableName, 5_000); + params.startRow = 0; + params.flushAfterRows = 100; + TestIngest.ingest(c, params); + VerifyIngest.verifyIngest(c, params); + + // Mark all tablets as mergeable + for (int i = 500; i < 5000; i += 500) { + splits.put(row(i), TabletMergeability.always()); + } + c.tableOperations().putSplits(tableName, splits); + + // Should not merge as we set max file count to only 3 and there are more files than that + // per tablet, so make sure it throws IllegalStateException + assertThrows(IllegalStateException.class, + () -> Wait.waitFor(() -> hasExactTablets(getCluster().getServerContext(), tableId, + Set.of(new KeyExtent(tableId, null, null))), 5000, 500)); + // Make sure tablets is still 10, not merged + assertEquals(10, countTablets(getCluster().getServerContext(), tableName, tm -> true)); + + // Set max merge file count back to default of 10k + c.tableOperations().setProperty(tableName, Property.TABLE_MERGE_FILE_MAX.getKey(), "10000"); + + // Should merge back to 1 tablet + Wait.waitFor(() -> hasExactTablets(getCluster().getServerContext(), tableId, + Set.of(new KeyExtent(tableId, null, null))), 10000, 200); + } + } + + private static boolean hasExactTablets(ServerContext ctx, TableId tableId, + Set expected) { + try (var tabletsMetadata = ctx.getAmple().readTablets().forTable(tableId).build()) { + // check for exact tablets by counting tablets that match the expected rows and also + // making sure the number seen equals exactly expected + final var expectedTablets = new HashSet<>(expected); + for (TabletMetadata tm : tabletsMetadata) { + // make sure every tablet seen is contained in the expected set + if (!expectedTablets.remove(tm.getExtent())) { + return false; + } + } + // Verify all tablets seen + return expectedTablets.isEmpty(); + } + } + + private static Text row(int row) { + return generateRow(row, 0); + } +}