From 77919bcb3c0938b3f31475e7f24d5229806b3e51 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon" <cshannon@apache.org> Date: Sun, 9 Mar 2025 14:22:46 -0400 Subject: [PATCH 1/2] Process seeding of split fate operations in batches Updates the Seeder in the Manager that handles seeding split fate ops to use a single thread and to submit multiple outstanding operations to be seeded together instead of individually in order to improve performance. The user fate store will now track outstanding fate operations and return a future for each pending operation that will be completed when the batch is submitted. This closes #5160 --- .../apache/accumulo/core/conf/Property.java | 1 + .../org/apache/accumulo/core/fate/Fate.java | 5 + .../apache/accumulo/core/fate/FateStore.java | 11 +- .../accumulo/core/fate/user/FateMutator.java | 3 + .../core/fate/user/FateMutatorImpl.java | 5 + .../core/fate/user/UserFateStore.java | 165 ++++++++++++++---- .../org/apache/accumulo/manager/Manager.java | 2 +- .../accumulo/manager/TabletGroupWatcher.java | 3 +- .../accumulo/manager/split/SeedSplitTask.java | 55 ------ .../accumulo/manager/split/Splitter.java | 102 ++++++++--- .../accumulo/test/fate/FateStoreUtil.java | 3 - 11 files changed, 226 insertions(+), 129 deletions(-) delete mode 100644 server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 4f3b8fb984e..28a2932e012 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -461,6 +461,7 @@ public enum Property { + "indefinitely. Default is 0 to block indefinitely. Only valid when tserver available " + "threshold is set greater than 0.", "1.10.0"), + @Deprecated(since = "4.0.0") MANAGER_SPLIT_WORKER_THREADS("manager.split.seed.threadpool.size", "8", PropertyType.COUNT, "The number of threads used to seed fate split task, the actual split work is done by fate" + " threads.", diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index dd521e3a0c7..7b0b0314deb 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -55,6 +55,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.FateStore.Seeder; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.logging.FateLogger; import org.apache.accumulo.core.manager.thrift.TFateOperation; @@ -538,6 +539,10 @@ public FateId startTransaction() { return store.create(); } + public Seeder<T> beginSeeding() { + return store.beginSeeding(); + } + public void seedTransaction(FateOperation fateOp, FateKey fateKey, Repo<T> repo, boolean autoCleanUp) { try (var seeder = store.beginSeeding()) { diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index 6854aec7136..f193c4b5139 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -57,10 +57,6 @@ interface Seeder<T> extends AutoCloseable { * Attempts to seed a transaction with the given repo if it does not exist. A fateId will be * derived from the fateKey. If seeded, sets the following data for the fateId in the store. * - * TODO: Support completing futures later in close method The current version will always return - * with a CompleteableFuture that is already completed. Future version will process will - * complete in the close() method for the User store. - * * <ul> * <li>Set the fate op</li> * <li>Set the status to SUBMITTED</li> @@ -77,15 +73,12 @@ interface Seeder<T> extends AutoCloseable { CompletableFuture<Optional<FateId>> attemptToSeedTransaction(Fate.FateOperation fateOp, FateKey fateKey, Repo<T> repo, boolean autoCleanUp); - // TODO: Right now all implementations do nothing - // Eventually this would check the status of all added conditional mutations, - // retry unknown, and then close the conditional writer. @Override void close(); } - // Creates a conditional writer for the user fate store. For Zookeeper all this code will probably - // do the same thing its currently doing as zookeeper does not support multi-node operations. + // Creates a conditional writer for the user fate store. For Zookeeper this will be a no-op + // because currently zookeeper does not support multi-node operations. Seeder<T> beginSeeding(); /** diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java index 25fc8fdd476..0280dbf7498 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.core.fate.user; +import org.apache.accumulo.core.data.ConditionalMutation; import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.fate.FateStore; @@ -101,4 +102,6 @@ enum Status { Status tryMutate(); + ConditionalMutation getMutation(); + } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java index b742361ccfb..bb33f6ea818 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java @@ -260,4 +260,9 @@ public Status tryMutate() { throw new RuntimeException(e); } } + + @Override + public ConditionalMutation getMutation() { + return mutation; + } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index 466c771d1e7..9268a1cdb60 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -21,19 +21,26 @@ import java.io.IOException; import java.io.Serializable; import java.util.EnumSet; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.SortedMap; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.clientImpl.ClientContext; @@ -47,9 +54,11 @@ import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.core.fate.FateKey.FateKeyType; import org.apache.accumulo.core.fate.ReadOnlyRepo; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.StackOverflowException; +import org.apache.accumulo.core.fate.user.FateMutator.Status; import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily; import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily; import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily; @@ -57,6 +66,7 @@ import org.apache.accumulo.core.iterators.user.WholeRowIterator; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.hadoop.io.Text; import org.slf4j.Logger; @@ -136,36 +146,14 @@ public FateId getFateId() { @Override public Seeder<T> beginSeeding() { - // TODO: For now can handle seeding 1 transaction at a time so just process - // everything in attemptToSeedTransaction - // Part 2 of the changes in #5160 will allow multiple seeding attempts to be combined - // into one conditional mutation and we will need to track the pending operations - // and futures in a map - return new Seeder<T>() { - @Override - public CompletableFuture<Optional<FateId>> attemptToSeedTransaction(FateOperation fateOp, - FateKey fateKey, Repo<T> repo, boolean autoCleanUp) { - return CompletableFuture - .completedFuture(seedTransaction(fateOp, fateKey, repo, autoCleanUp)); - } - - @Override - public void close() { - // TODO: This will be used in Part 2 of #5160 - } - }; + return new BatchSeeder(); } - private Optional<FateId> seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, Repo<T> repo, - boolean autoCleanUp) { - final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey); + private FateMutator<T> seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, FateId fateId, + Repo<T> repo, boolean autoCleanUp) { Supplier<FateMutator<T>> mutatorFactory = () -> newMutator(fateId).requireAbsent() .putKey(fateKey).putCreateTime(System.currentTimeMillis()); - if (seedTransaction(mutatorFactory, fateKey + " " + fateId, fateOp, repo, autoCleanUp)) { - return Optional.of(fateId); - } else { - return Optional.empty(); - } + return buildMutator(mutatorFactory, fateOp, repo, autoCleanUp); } @Override @@ -176,16 +164,22 @@ public boolean seedTransaction(Fate.FateOperation fateOp, FateId fateId, Repo<T> return seedTransaction(mutatorFactory, fateId.canonical(), fateOp, repo, autoCleanUp); } + private FateMutator<T> buildMutator(Supplier<FateMutator<T>> mutatorFactory, + Fate.FateOperation fateOp, Repo<T> repo, boolean autoCleanUp) { + var mutator = mutatorFactory.get(); + mutator = + mutator.putFateOp(serializeTxInfo(fateOp)).putRepo(1, repo).putStatus(TStatus.SUBMITTED); + if (autoCleanUp) { + mutator = mutator.putAutoClean(serializeTxInfo(autoCleanUp)); + } + return mutator; + } + private boolean seedTransaction(Supplier<FateMutator<T>> mutatorFactory, String logId, Fate.FateOperation fateOp, Repo<T> repo, boolean autoCleanUp) { + var mutator = buildMutator(mutatorFactory, fateOp, repo, autoCleanUp); int maxAttempts = 5; for (int attempt = 0; attempt < maxAttempts; attempt++) { - var mutator = mutatorFactory.get(); - mutator = - mutator.putFateOp(serializeTxInfo(fateOp)).putRepo(1, repo).putStatus(TStatus.SUBMITTED); - if (autoCleanUp) { - mutator = mutator.putAutoClean(serializeTxInfo(autoCleanUp)); - } var status = mutator.tryMutate(); if (status == FateMutator.Status.ACCEPTED) { // signal to the super class that a new fate transaction was seeded and is ready to run @@ -393,6 +387,113 @@ public FateInstanceType type() { return fateInstanceType; } + private class BatchSeeder implements Seeder<T> { + private final AtomicBoolean closed = new AtomicBoolean(false); + + private final Map<FateId,Pair<FateMutator<T>,CompletableFuture<Optional<FateId>>>> pending = + new HashMap<>(); + + @Override + public CompletableFuture<Optional<FateId>> attemptToSeedTransaction(FateOperation fateOp, + FateKey fateKey, Repo<T> repo, boolean autoCleanUp) { + Preconditions.checkState(!closed.get(), "Can't attempt to seed with a closed seeder."); + + final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey); + // If not already submitted, add to the pending list and return the future + // or the existing future if duplicate. The pending map will store the mutator + // to be processed on close in a one batch. + return pending.computeIfAbsent(fateId, id -> { + FateMutator<T> mutator = seedTransaction(fateOp, fateKey, fateId, repo, autoCleanUp); + CompletableFuture<Optional<FateId>> future = new CompletableFuture<>(); + return new Pair<>(mutator, future); + }).getSecond(); + } + + @Override + public void close() { + closed.set(true); + + int maxAttempts = 5; + + // This loop will submit all the pending mutations as one batch + // to a conditional writer and any known results will be removed + // from the pending map. Unknown results will be re-attempted up + // to the maxAttempts count + for (int attempt = 0; attempt < maxAttempts; attempt++) { + var currentResults = tryMutateBatch(); + for (Entry<FateId,ConditionalWriter.Status> result : currentResults.entrySet()) { + var fateId = result.getKey(); + var status = result.getValue(); + var future = pending.get(fateId).getSecond(); + switch (result.getValue()) { + case ACCEPTED: + seededTx(); + log.trace("Attempt to seed {} returned {}", fateId.canonical(), status); + // Complete the future with the fatId and remove from pending + future.complete(Optional.of(fateId)); + pending.remove(fateId); + break; + case REJECTED: + log.debug("Attempt to seed {} returned {}", fateId.canonical(), status); + // Rejected so complete with an empty optional and remove from pending + future.complete(Optional.empty()); + pending.remove(fateId); + break; + case UNKNOWN: + log.debug("Attempt to seed {} returned {} status, retrying", fateId.canonical(), + status); + // unknown, so don't remove from map so that we try again if still under + // max attempts + break; + default: + // do not expect other statuses + throw new IllegalStateException("Unhandled status for mutation " + status); + } + } + + if (!pending.isEmpty()) { + // At this point can not reliably determine if the unknown pending mutations were + // successful or not because no reservation was acquired. For example since no + // reservation was acquired it is possible that seeding was a success and something + // immediately picked it up and started operating on it and changing it. + // If scanning after that point can not conclude success or failure. Another situation + // is that maybe the fateId already existed in a seeded form prior to getting this + // unknown. + UtilWaitThread.sleep(250); + } + } + + // Any remaining will be UNKNOWN status, so complete the futures with an optional empty + pending.forEach((fateId, pair) -> { + pair.getSecond().complete(Optional.empty()); + log.warn("Repeatedly received unknown status when attempting to seed {}", + fateId.canonical()); + }); + } + + // Submit all the pending mutations to a single conditional writer + // as one batch and return the results for each mutation + private Map<FateId,ConditionalWriter.Status> tryMutateBatch() { + if (pending.isEmpty()) { + return Map.of(); + } + + final Map<FateId,ConditionalWriter.Status> resultsMap = new HashMap<>(); + try (ConditionalWriter writer = context.createConditionalWriter(tableName)) { + Iterator<ConditionalWriter.Result> results = writer + .write(pending.values().stream().map(pair -> pair.getFirst().getMutation()).iterator()); + while (results.hasNext()) { + var result = results.next(); + var row = new Text(result.getMutation().getRow()); + resultsMap.put(FateId.from(FateInstanceType.USER, row.toString()), result.getStatus()); + } + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { + throw new IllegalStateException(e); + } + return resultsMap; + } + } + private class FateTxStoreImpl extends AbstractFateTxStoreImpl { private FateTxStoreImpl(FateId fateId) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index a3eb271b37b..6f358bfaae2 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1347,7 +1347,7 @@ boolean canSuspendTablets() { // Don't call start the CompactionCoordinator until we have tservers and upgrade is complete. compactionCoordinator.start(); - this.splitter = new Splitter(context); + this.splitter = new Splitter(this); this.splitter.start(); try { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index e29413e82aa..fe39e603818 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -75,7 +75,6 @@ import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread; import org.apache.accumulo.manager.metrics.ManagerMetrics; -import org.apache.accumulo.manager.split.SeedSplitTask; import org.apache.accumulo.manager.state.TableCounts; import org.apache.accumulo.manager.state.TableStats; import org.apache.accumulo.manager.upgrade.UpgradeCoordinator; @@ -607,7 +606,7 @@ private TableMgmtStats manageTablets(Iterator<TabletManagement> iter, final boolean needsSplit = actions.contains(ManagementAction.NEEDS_SPLITTING); if (needsSplit) { LOG.debug("{} may need splitting.", tm.getExtent()); - manager.getSplitter().initiateSplit(new SeedSplitTask(manager, tm.getExtent())); + manager.getSplitter().initiateSplit(tm.getExtent()); } if (actions.contains(ManagementAction.NEEDS_COMPACTING) && compactionGenerator != null) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java deleted file mode 100644 index 8270bc423f3..00000000000 --- a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.manager.split; - -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.Fate; -import org.apache.accumulo.core.fate.FateInstanceType; -import org.apache.accumulo.core.fate.FateKey; -import org.apache.accumulo.manager.Manager; -import org.apache.accumulo.manager.tableOps.split.FindSplits; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SeedSplitTask implements Runnable { - - private static final Logger log = LoggerFactory.getLogger(SeedSplitTask.class); - private final Manager manager; - private final KeyExtent extent; - - public SeedSplitTask(Manager manager, KeyExtent extent) { - this.manager = manager; - this.extent = extent; - } - - @Override - public void run() { - try { - var fateInstanceType = FateInstanceType.fromTableId((extent.tableId())); - manager.fate(fateInstanceType).seedTransaction(Fate.FateOperation.SYSTEM_SPLIT, - FateKey.forSplit(extent), new FindSplits(extent), true); - } catch (Exception e) { - log.error("Failed to split {}", extent, e); - } - } - - public KeyExtent getExtent() { - return extent; - } -} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java index 85b841d1cf4..d88e52ed66b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java @@ -18,22 +18,28 @@ */ package org.apache.accumulo.manager.split; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.util.cache.Caches.CacheName; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.split.FindSplits; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.hadoop.fs.FileSystem; @@ -49,9 +55,64 @@ public class Splitter { private static final Logger LOG = LoggerFactory.getLogger(Splitter.class); + private final Manager manager; private final ThreadPoolExecutor splitExecutor; // tracks which tablets are queued in splitExecutor - private final Set<Text> queuedTablets = ConcurrentHashMap.newKeySet(); + private final Map<Text,KeyExtent> queuedTablets = new ConcurrentHashMap<>(); + + class SplitWorker implements Runnable { + + @Override + public void run() { + try { + while (manager.stillManager()) { + if (queuedTablets.isEmpty()) { + sleepUninterruptibly(10, TimeUnit.MILLISECONDS); + continue; + } + + final Map<Text,KeyExtent> userSplits = new HashMap<>(); + final Map<Text,KeyExtent> metaSplits = new HashMap<>(); + + // Go through all the queued up splits and partition + // into the different store types to be submitted. + queuedTablets.forEach((metaRow, extent) -> { + switch (FateInstanceType.fromTableId((extent.tableId()))) { + case USER: + userSplits.put(metaRow, extent); + break; + case META: + metaSplits.put(metaRow, extent); + break; + default: + throw new IllegalStateException("Unexpected FateInstanceType"); + } + }); + + // see the user and then meta splits + // The meta plits (zk) will be processed one at a time but there will not be + // many of those splits. The user splits are processed as a batch. + seedSplits(FateInstanceType.USER, userSplits); + seedSplits(FateInstanceType.META, metaSplits); + } + } catch (Exception e) { + LOG.error("Failed to split", e); + } + } + } + + private void seedSplits(FateInstanceType instanceType, Map<Text,KeyExtent> splits) { + if (!splits.isEmpty()) { + try (var seeder = manager.fate(instanceType).beginSeeding()) { + for (KeyExtent extent : splits.values()) { + var unused = seeder.attemptToSeedTransaction(Fate.FateOperation.SYSTEM_SPLIT, + FateKey.forSplit(extent), new FindSplits(extent), true); + } + } finally { + queuedTablets.keySet().removeAll(splits.keySet()); + } + } + } public static class FileInfo { final Text firstRow; @@ -151,12 +212,12 @@ public int hashCode() { final LoadingCache<CacheKey,FileInfo> splitFileCache; - public Splitter(ServerContext context) { - int numThreads = context.getConfiguration().getCount(Property.MANAGER_SPLIT_WORKER_THREADS); + public Splitter(Manager manager) { + this.manager = manager; + ServerContext context = manager.getContext(); - this.splitExecutor = context.threadPools().getPoolBuilder("split_seeder") - .numCoreThreads(numThreads).numMaxThreads(numThreads).withTimeOut(0L, TimeUnit.MILLISECONDS) - .enableThreadPoolMetrics().build(); + this.splitExecutor = context.threadPools().getPoolBuilder("split_seeder").numCoreThreads(1) + .numMaxThreads(1).withTimeOut(0L, TimeUnit.MILLISECONDS).enableThreadPoolMetrics().build(); Weigher<CacheKey, FileInfo> weigher = (key, info) -> key.tableId.canonical().length() @@ -175,7 +236,9 @@ public Splitter(ServerContext context) { } - public synchronized void start() {} + public synchronized void start() { + splitExecutor.execute(new SplitWorker()); + } public synchronized void stop() { splitExecutor.shutdownNow(); @@ -185,29 +248,14 @@ public FileInfo getCachedFileInfo(TableId tableId, TabletFile tabletFile) { return splitFileCache.get(new CacheKey(tableId, tabletFile)); } - public void initiateSplit(SeedSplitTask seedSplitTask) { + public void initiateSplit(KeyExtent extent) { // Want to avoid queuing the same tablet multiple times, it would not cause bugs but would waste // work. Use the metadata row to identify a tablet because the KeyExtent also includes the prev // end row which may change when splits happen. The metaRow is conceptually tableId+endRow and // that does not change for a split. - Text metaRow = seedSplitTask.getExtent().toMetaRow(); + Text metaRow = extent.toMetaRow(); int qsize = queuedTablets.size(); - if (qsize < 10_000 && queuedTablets.add(metaRow)) { - Runnable taskWrapper = () -> { - try { - seedSplitTask.run(); - } finally { - queuedTablets.remove(metaRow); - } - }; - - try { - splitExecutor.execute(taskWrapper); - } catch (RejectedExecutionException rje) { - queuedTablets.remove(metaRow); - throw rje; - } - } else { + if (qsize >= 10_000 || queuedTablets.putIfAbsent(metaRow, extent) != null) { LOG.trace("Did not add {} to split queue {}", metaRow, qsize); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java index 0e4c1800bfc..0689f0c7723 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java @@ -78,9 +78,6 @@ public static void createFateTable(ClientContext client, String table) throws Ex assertEquals(fateTableProps, testFateTableProps); } - // For now just process one at a time as the current impl completes - // each seed transaction individually. In future versions we can test - // batching multiple seeding atempts together. public static <T> Optional<FateId> seedTransaction(FateStore<T> store, Fate.FateOperation fateOp, FateKey fateKey, Repo<T> repo, boolean autoCleanUp) { CompletableFuture<Optional<FateId>> fateIdFuture; From e7f48aa83e0afdc05d37e1b83911d7232395d59d Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon" <cshannon@apache.org> Date: Fri, 21 Mar 2025 08:46:27 -0400 Subject: [PATCH 2/2] Address PR comments --- .../main/java/org/apache/accumulo/core/conf/Property.java | 5 ----- .../org/apache/accumulo/core/fate/user/UserFateStore.java | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 71d9e7727c7..62ae03cf69f 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -466,11 +466,6 @@ public enum Property { + "indefinitely. Default is 0 to block indefinitely. Only valid when tserver available " + "threshold is set greater than 0.", "1.10.0"), - @Deprecated(since = "4.0.0") - MANAGER_SPLIT_WORKER_THREADS("manager.split.seed.threadpool.size", "8", PropertyType.COUNT, - "The number of threads used to seed fate split task, the actual split work is done by fate" - + " threads.", - "4.0.0"), MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size", "1M", PropertyType.MEMORY, "The data size of each resource groups compaction job priority queue. The memory size of " diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index 9268a1cdb60..2a6efbed0dc 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -419,7 +419,7 @@ public void close() { // to a conditional writer and any known results will be removed // from the pending map. Unknown results will be re-attempted up // to the maxAttempts count - for (int attempt = 0; attempt < maxAttempts; attempt++) { + for (int attempt = 0; attempt < maxAttempts && !pending.isEmpty(); attempt++) { var currentResults = tryMutateBatch(); for (Entry<FateId,ConditionalWriter.Status> result : currentResults.entrySet()) { var fateId = result.getKey();