Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable auto merging of tablets #5353

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.core.client.admin;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -357,4 +358,12 @@ boolean testClassLoad(final String className, final String asTypeName)
* @since 2.1.0
*/
InstanceId getInstanceId();

/**
* Return the current manager time
*
* @return current time
* @since 4.0.0
*/
Duration getManagerTime() throws AccumuloException, AccumuloSecurityException;
}
Original file line number Diff line number Diff line change
@@ -470,6 +470,12 @@ public InstanceId getInstanceId() {
return context.getInstanceID();
}

@Override
public Duration getManagerTime() throws AccumuloException, AccumuloSecurityException {
return Duration.ofNanos(ThriftClientTypes.MANAGER.execute(context,
client -> client.getManagerTimeNanos(TraceUtil.traceInfo(), context.rpcCreds())));
}

@Override
public ServerId getServer(ServerId.Type type, String resourceGroup, String host, int port) {
Objects.requireNonNull(type, "type parameter cannot be null");
Original file line number Diff line number Diff line change
@@ -401,6 +401,11 @@ public enum Property {
+ " are performed (e.g. Bulk Import). This property specifies the maximum number of threads in a"
+ " ThreadPool in the Manager that will be used to request these refresh operations.",
"4.0.0"),
MANAGER_TABLET_MERGEABILITY_INTERVAL("manager.tablet.mergeability.interval", "24h",
PropertyType.TIMEDURATION,
"Time to wait between scanning tables to identify ranges of tablets that can be "
+ " auto-merged. Valid ranges will be have merge fate ops submitted.",
"4.0.0"),
MANAGER_BULK_TIMEOUT("manager.bulk.timeout", "5m", PropertyType.TIMEDURATION,
"The time to wait for a tablet server to process a bulk import request.", "1.4.3"),
MANAGER_RENAME_THREADS("manager.rename.threadpool.size", "20", PropertyType.COUNT,
@@ -904,6 +909,8 @@ public enum Property {
TABLE_ONDEMAND_UNLOADER("tserver.ondemand.tablet.unloader",
"org.apache.accumulo.core.spi.ondemand.DefaultOnDemandTabletUnloader", PropertyType.CLASSNAME,
"The class that will be used to determine which on-demand Tablets to unload.", "4.0.0"),
TABLE_MAX_MERGEABILITY_THRESHOLD("table.mergeability.threshold", ".25", PropertyType.FRACTION,
"A tablet is mergeable until it reaches this percentage of the split threshold.", "4.0.0"),

// Crypto-related properties
@Experimental
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
@@ -106,6 +106,7 @@ public enum FateOperation {
NAMESPACE_RENAME(TFateOperation.NAMESPACE_RENAME),
SHUTDOWN_TSERVER(null),
SYSTEM_SPLIT(null),
SYSTEM_MERGE(null),
TABLE_BULK_IMPORT2(TFateOperation.TABLE_BULK_IMPORT2),
TABLE_CANCEL_COMPACT(TFateOperation.TABLE_CANCEL_COMPACT),
TABLE_CLONE(TFateOperation.TABLE_CLONE),
@@ -124,7 +125,7 @@ public enum FateOperation {

private final TFateOperation top;
private static final EnumSet<FateOperation> nonThriftOps =
EnumSet.of(COMMIT_COMPACTION, SHUTDOWN_TSERVER, SYSTEM_SPLIT);
EnumSet.of(COMMIT_COMPACTION, SHUTDOWN_TSERVER, SYSTEM_SPLIT, SYSTEM_MERGE);

FateOperation(TFateOperation top) {
this.top = top;
Original file line number Diff line number Diff line change
@@ -258,7 +258,8 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, TFat
manager.fate(type).seedTransaction(op, fateId,
new TraceRepo<>(new CreateTable(c.getPrincipal(), tableName, timeType, options,
splitsPath, splitCount, splitsDirsPath, initialTableState,
initialTabletAvailability, namespaceId, TabletMergeability.never())),
// Set the default tablet to be auto-mergeable with other tablets if it is split
initialTabletAvailability, namespaceId, TabletMergeability.always())),
autoCleanup, goalMessage);

break;
Original file line number Diff line number Diff line change
@@ -122,6 +122,7 @@
import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.accumulo.core.zookeeper.ZcStat;
import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator;
import org.apache.accumulo.manager.merge.FindMergeableRangeTask;
import org.apache.accumulo.manager.metrics.BalancerMetrics;
import org.apache.accumulo.manager.metrics.ManagerMetrics;
import org.apache.accumulo.manager.recovery.RecoveryManager;
@@ -1292,6 +1293,13 @@ boolean canSuspendTablets() {
ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
.scheduleWithFixedDelay(() -> ScanServerMetadataEntries.clean(context), 10, 10, MINUTES));

// TODO - create new threadpool?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that this cannot go into the TabletGroupWatcher because it needs to look at more than one Tablet? I think using the general scheduled executor threadpool should be fine. However, we may want to consider bumping the default value of GENERAL_THREADPOOL_SIZE from 1 to something larger. Probably good for a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, TGW doesn't work in this case because because the task needs to look at multiple tablets together and not just one tablet at a time. Also, the task likely doesn't need to run very often to look for mergeable ranges where as TGW runs frequently

var tabletMergeabilityInterval =
getConfiguration().getDuration(Property.MANAGER_TABLET_MERGEABILITY_INTERVAL);
ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
new FindMergeableRangeTask(this), tabletMergeabilityInterval.toMillis(),
tabletMergeabilityInterval.toMillis(), MILLISECONDS));

// Make sure that we have a secret key (either a new one or an old one from ZK) before we start
// the manager client service.
Thread authenticationTokenKeyManagerThread = null;
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.manager.merge;

import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGEABILITY;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;

import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;

import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.Fate.FateOperation;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.metadata.schema.filters.TabletMetadataFilter;
import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.TraceRepo;
import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation;
import org.apache.accumulo.manager.tableOps.merge.TableRangeOp;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;

public class FindMergeableRangeTask implements Runnable {

private static final Logger log = LoggerFactory.getLogger(FindMergeableRangeTask.class);

private static final TabletMergeabilityFilter FILTER = new TabletMergeabilityFilter();

private final Manager manager;

public FindMergeableRangeTask(Manager manager) {
this.manager = Objects.requireNonNull(manager);
log.debug("Creating FindMergeableRangeTask");
}

@Override
public void run() {
var context = manager.getContext();
Map<TableId,String> tables = context.getTableIdToNameMap();

log.debug("Starting FindMergeableRangeTask");

for (Entry<TableId,String> table : tables.entrySet()) {
TableId tableId = table.getKey();
String tableName = table.getValue();

long maxFileCount =
context.getTableConfiguration(tableId).getCount(Property.TABLE_MERGE_FILE_MAX);
long threshold =
context.getTableConfiguration(tableId).getAsBytes(Property.TABLE_SPLIT_THRESHOLD);
double mergeabilityThreshold = context.getTableConfiguration(tableId)
.getFraction(Property.TABLE_MAX_MERGEABILITY_THRESHOLD);
long maxTotalSize = (long) (threshold * mergeabilityThreshold);

log.debug("Checking {} for tablets that can be merged", tableName);
log.debug("maxFileCount: {}, maxTotalSize:{}, splitThreshold:{}, mergeabilityThreshold:{}",
maxFileCount, maxTotalSize, threshold, mergeabilityThreshold);
try {
NamespaceId namespaceId = context.getNamespaceId(tableId);
var type = FateInstanceType.fromTableId(tableId);

try (var tablets = context.getAmple().readTablets().forTable(tableId)
.fetch(PREV_ROW, FILES, MERGEABILITY).filter(FILTER).build()) {

final MergeableRange current =
new MergeableRange(manager.getSteadyTime(), maxFileCount, maxTotalSize);

for (var tm : tablets) {
log.trace("Checking tablet {}, {}", tm.getExtent(), tm.getTabletMergeability());
if (!current.add(tm)) {
submit(current, type, table, namespaceId);
current.resetAndAdd(tm);
}
}

submit(current, type, table, namespaceId);
}

} catch (Exception e) {
log.error("Failed to generate system merges for {}", tableName, e);
}
}

}

void submit(MergeableRange range, FateInstanceType type, Entry<TableId,String> table,
NamespaceId namespaceId) {
if (range.tabletCount < 2) {
return;
}

log.debug("Table {} found {} tablets that can be merged for table", table.getValue(),
range.tabletCount);

TableId tableId = table.getKey();
String tableName = table.getValue();

final Text startRow = range.startRow != null ? range.startRow : new Text("");
final Text endRow = range.endRow != null ? range.endRow : new Text("");

String startRowStr = StringUtils.defaultIfBlank(startRow.toString(), "-inf");
String endRowStr = StringUtils.defaultIfBlank(endRow.toString(), "+inf");
log.debug("FindMergeableRangeTask: Creating merge op: {} from startRow: {} to endRow: {}",
tableId, startRowStr, endRowStr);
var fateId = manager.fate(type).startTransaction();
String goalMessage = TableOperation.MERGE + " Merge table " + tableName + "(" + tableId
+ ") splits from " + startRowStr + " to " + endRowStr;

manager.fate(type).seedTransaction(FateOperation.SYSTEM_MERGE, fateId,
new TraceRepo<>(
new TableRangeOp(Operation.SYSTEM_MERGE, namespaceId, tableId, startRow, endRow)),
true, goalMessage);
}

static class MergeableRange {
final SteadyTime currentTime;
final long maxFileCount;
final long maxTotalSize;

Text startRow;
Text endRow;
int tabletCount;
long totalFileCount;
long totalFileSize;

MergeableRange(SteadyTime currentTime, long maxFileCount, long maxTotalSize) {
this.currentTime = currentTime;
this.maxFileCount = maxFileCount;
this.maxTotalSize = maxTotalSize;
}

boolean add(TabletMetadata tm) {
if (validate(tm)) {
tabletCount++;
log.trace("Adding tablet {} to MergeableRange", tm.getExtent());
if (tabletCount == 1) {
startRow = tm.getPrevEndRow();
}
endRow = tm.getEndRow();
totalFileCount += tm.getFiles().size();
totalFileSize += tm.getFileSize();
return true;
}
return false;
}

private boolean validate(TabletMetadata tm) {
if (tabletCount > 0) {
// This is at least the second tablet seen so there should not be a null prevEndRow
Preconditions.checkState(tm.getPrevEndRow() != null,
"Unexpected null prevEndRow found for %s", tm.getExtent());
// If this is not the first tablet, then verify its prevEndRow matches
// the last endRow tracked, the server filter will skip tablets marked as never
if (!tm.getPrevEndRow().equals(endRow)) {
return false;
}
}

if (!tm.getTabletMergeability().isMergeable(currentTime)) {
return false;
}

if (totalFileCount + tm.getFiles().size() > maxFileCount) {
return false;
}

return totalFileSize + tm.getFileSize() <= maxTotalSize;
}

void resetAndAdd(TabletMetadata tm) {
reset();
add(tm);
}

void reset() {
startRow = null;
endRow = null;
tabletCount = 0;
totalFileCount = 0;
totalFileSize = 0;
}
}

// Filter out never merge tablets to cut down on what we need to check
// We need steady time to check other tablets which is not available in the filter
public static class TabletMergeabilityFilter extends TabletMetadataFilter {

public static final Set<ColumnType> COLUMNS = Sets.immutableEnumSet(MERGEABILITY);

private final static Predicate<TabletMetadata> IS_NOT_NEVER =
tabletMetadata -> !tabletMetadata.getTabletMergeability().isNever();

@Override
public Set<TabletMetadata.ColumnType> getColumns() {
return COLUMNS;
}

@Override
protected Predicate<TabletMetadata> acceptTablet() {
return IS_NOT_NEVER;
}
}
}
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
package org.apache.accumulo.manager.tableOps.merge;

import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
import static org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation.SYSTEM_MERGE;

import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.fate.FateId;
@@ -28,6 +29,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

public class CountFiles extends ManagerRepo {
private static final Logger log = LoggerFactory.getLogger(CountFiles.class);
private static final long serialVersionUID = 1L;
@@ -39,6 +42,9 @@ public CountFiles(MergeInfo mergeInfo) {

@Override
public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
// SYSTEM_MERGE should be executing VerifyMergeability repo, which already
// will count files
Preconditions.checkState(data.op != SYSTEM_MERGE, "Unexpected op %s", SYSTEM_MERGE);

var range = data.getReserveExtent();

@@ -75,10 +81,13 @@ public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
if (totalFiles >= maxFiles) {
return new UnreserveAndError(data, totalFiles, maxFiles);
} else {
if (data.op == MergeInfo.Operation.MERGE) {
return new MergeTablets(data);
} else {
return new DeleteRows(data);
switch (data.op) {
case MERGE:
return new MergeTablets(data);
case DELETE:
return new DeleteRows(data);
default:
throw new IllegalStateException("Unknown op " + data.op);
}
}
}
Loading