diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 9687d2b7f60..62ae03cf69f 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -466,10 +466,6 @@ public enum Property { + "indefinitely. Default is 0 to block indefinitely. Only valid when tserver available " + "threshold is set greater than 0.", "1.10.0"), - MANAGER_SPLIT_WORKER_THREADS("manager.split.seed.threadpool.size", "8", PropertyType.COUNT, - "The number of threads used to seed fate split task, the actual split work is done by fate" - + " threads.", - "4.0.0"), MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size", "1M", PropertyType.MEMORY, "The data size of each resource groups compaction job priority queue. The memory size of " diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 93f42181198..60d3f427c06 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -55,6 +55,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.FateStore.Seeder; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.logging.FateLogger; import org.apache.accumulo.core.manager.thrift.TFateOperation; @@ -539,6 +540,10 @@ public FateId startTransaction() { return store.create(); } + public Seeder beginSeeding() { + return store.beginSeeding(); + } + public void seedTransaction(FateOperation fateOp, FateKey fateKey, Repo repo, boolean autoCleanUp) { try (var seeder = store.beginSeeding()) { diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index c7ec3b4e4cd..3f5a8ec0402 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -56,10 +56,6 @@ interface Seeder extends AutoCloseable { * Attempts to seed a transaction with the given repo if it does not exist. A fateId will be * derived from the fateKey. If seeded, sets the following data for the fateId in the store. * - * TODO: Support completing futures later in close method The current version will always return - * with a CompleteableFuture that is already completed. Future version will process will - * complete in the close() method for the User store. - * *
    *
  • Set the fate op
  • *
  • Set the status to SUBMITTED
  • @@ -76,15 +72,12 @@ interface Seeder extends AutoCloseable { CompletableFuture> attemptToSeedTransaction(Fate.FateOperation fateOp, FateKey fateKey, Repo repo, boolean autoCleanUp); - // TODO: Right now all implementations do nothing - // Eventually this would check the status of all added conditional mutations, - // retry unknown, and then close the conditional writer. @Override void close(); } - // Creates a conditional writer for the user fate store. For Zookeeper all this code will probably - // do the same thing its currently doing as zookeeper does not support multi-node operations. + // Creates a conditional writer for the user fate store. For Zookeeper this will be a no-op + // because currently zookeeper does not support multi-node operations. Seeder beginSeeding(); /** diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java index 25fc8fdd476..0280dbf7498 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.core.fate.user; +import org.apache.accumulo.core.data.ConditionalMutation; import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.fate.FateStore; @@ -101,4 +102,6 @@ enum Status { Status tryMutate(); + ConditionalMutation getMutation(); + } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java index b742361ccfb..bb33f6ea818 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java @@ -260,4 +260,9 @@ public Status tryMutate() { throw new RuntimeException(e); } } + + @Override + public ConditionalMutation getMutation() { + return mutation; + } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index 466c771d1e7..2a6efbed0dc 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -21,19 +21,26 @@ import java.io.IOException; import java.io.Serializable; import java.util.EnumSet; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.SortedMap; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.clientImpl.ClientContext; @@ -47,9 +54,11 @@ import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.core.fate.FateKey.FateKeyType; import org.apache.accumulo.core.fate.ReadOnlyRepo; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.StackOverflowException; +import org.apache.accumulo.core.fate.user.FateMutator.Status; import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily; import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily; import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily; @@ -57,6 +66,7 @@ import org.apache.accumulo.core.iterators.user.WholeRowIterator; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.hadoop.io.Text; import org.slf4j.Logger; @@ -136,36 +146,14 @@ public FateId getFateId() { @Override public Seeder beginSeeding() { - // TODO: For now can handle seeding 1 transaction at a time so just process - // everything in attemptToSeedTransaction - // Part 2 of the changes in #5160 will allow multiple seeding attempts to be combined - // into one conditional mutation and we will need to track the pending operations - // and futures in a map - return new Seeder() { - @Override - public CompletableFuture> attemptToSeedTransaction(FateOperation fateOp, - FateKey fateKey, Repo repo, boolean autoCleanUp) { - return CompletableFuture - .completedFuture(seedTransaction(fateOp, fateKey, repo, autoCleanUp)); - } - - @Override - public void close() { - // TODO: This will be used in Part 2 of #5160 - } - }; + return new BatchSeeder(); } - private Optional seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, Repo repo, - boolean autoCleanUp) { - final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey); + private FateMutator seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, FateId fateId, + Repo repo, boolean autoCleanUp) { Supplier> mutatorFactory = () -> newMutator(fateId).requireAbsent() .putKey(fateKey).putCreateTime(System.currentTimeMillis()); - if (seedTransaction(mutatorFactory, fateKey + " " + fateId, fateOp, repo, autoCleanUp)) { - return Optional.of(fateId); - } else { - return Optional.empty(); - } + return buildMutator(mutatorFactory, fateOp, repo, autoCleanUp); } @Override @@ -176,16 +164,22 @@ public boolean seedTransaction(Fate.FateOperation fateOp, FateId fateId, Repo return seedTransaction(mutatorFactory, fateId.canonical(), fateOp, repo, autoCleanUp); } + private FateMutator buildMutator(Supplier> mutatorFactory, + Fate.FateOperation fateOp, Repo repo, boolean autoCleanUp) { + var mutator = mutatorFactory.get(); + mutator = + mutator.putFateOp(serializeTxInfo(fateOp)).putRepo(1, repo).putStatus(TStatus.SUBMITTED); + if (autoCleanUp) { + mutator = mutator.putAutoClean(serializeTxInfo(autoCleanUp)); + } + return mutator; + } + private boolean seedTransaction(Supplier> mutatorFactory, String logId, Fate.FateOperation fateOp, Repo repo, boolean autoCleanUp) { + var mutator = buildMutator(mutatorFactory, fateOp, repo, autoCleanUp); int maxAttempts = 5; for (int attempt = 0; attempt < maxAttempts; attempt++) { - var mutator = mutatorFactory.get(); - mutator = - mutator.putFateOp(serializeTxInfo(fateOp)).putRepo(1, repo).putStatus(TStatus.SUBMITTED); - if (autoCleanUp) { - mutator = mutator.putAutoClean(serializeTxInfo(autoCleanUp)); - } var status = mutator.tryMutate(); if (status == FateMutator.Status.ACCEPTED) { // signal to the super class that a new fate transaction was seeded and is ready to run @@ -393,6 +387,113 @@ public FateInstanceType type() { return fateInstanceType; } + private class BatchSeeder implements Seeder { + private final AtomicBoolean closed = new AtomicBoolean(false); + + private final Map,CompletableFuture>>> pending = + new HashMap<>(); + + @Override + public CompletableFuture> attemptToSeedTransaction(FateOperation fateOp, + FateKey fateKey, Repo repo, boolean autoCleanUp) { + Preconditions.checkState(!closed.get(), "Can't attempt to seed with a closed seeder."); + + final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey); + // If not already submitted, add to the pending list and return the future + // or the existing future if duplicate. The pending map will store the mutator + // to be processed on close in a one batch. + return pending.computeIfAbsent(fateId, id -> { + FateMutator mutator = seedTransaction(fateOp, fateKey, fateId, repo, autoCleanUp); + CompletableFuture> future = new CompletableFuture<>(); + return new Pair<>(mutator, future); + }).getSecond(); + } + + @Override + public void close() { + closed.set(true); + + int maxAttempts = 5; + + // This loop will submit all the pending mutations as one batch + // to a conditional writer and any known results will be removed + // from the pending map. Unknown results will be re-attempted up + // to the maxAttempts count + for (int attempt = 0; attempt < maxAttempts && !pending.isEmpty(); attempt++) { + var currentResults = tryMutateBatch(); + for (Entry result : currentResults.entrySet()) { + var fateId = result.getKey(); + var status = result.getValue(); + var future = pending.get(fateId).getSecond(); + switch (result.getValue()) { + case ACCEPTED: + seededTx(); + log.trace("Attempt to seed {} returned {}", fateId.canonical(), status); + // Complete the future with the fatId and remove from pending + future.complete(Optional.of(fateId)); + pending.remove(fateId); + break; + case REJECTED: + log.debug("Attempt to seed {} returned {}", fateId.canonical(), status); + // Rejected so complete with an empty optional and remove from pending + future.complete(Optional.empty()); + pending.remove(fateId); + break; + case UNKNOWN: + log.debug("Attempt to seed {} returned {} status, retrying", fateId.canonical(), + status); + // unknown, so don't remove from map so that we try again if still under + // max attempts + break; + default: + // do not expect other statuses + throw new IllegalStateException("Unhandled status for mutation " + status); + } + } + + if (!pending.isEmpty()) { + // At this point can not reliably determine if the unknown pending mutations were + // successful or not because no reservation was acquired. For example since no + // reservation was acquired it is possible that seeding was a success and something + // immediately picked it up and started operating on it and changing it. + // If scanning after that point can not conclude success or failure. Another situation + // is that maybe the fateId already existed in a seeded form prior to getting this + // unknown. + UtilWaitThread.sleep(250); + } + } + + // Any remaining will be UNKNOWN status, so complete the futures with an optional empty + pending.forEach((fateId, pair) -> { + pair.getSecond().complete(Optional.empty()); + log.warn("Repeatedly received unknown status when attempting to seed {}", + fateId.canonical()); + }); + } + + // Submit all the pending mutations to a single conditional writer + // as one batch and return the results for each mutation + private Map tryMutateBatch() { + if (pending.isEmpty()) { + return Map.of(); + } + + final Map resultsMap = new HashMap<>(); + try (ConditionalWriter writer = context.createConditionalWriter(tableName)) { + Iterator results = writer + .write(pending.values().stream().map(pair -> pair.getFirst().getMutation()).iterator()); + while (results.hasNext()) { + var result = results.next(); + var row = new Text(result.getMutation().getRow()); + resultsMap.put(FateId.from(FateInstanceType.USER, row.toString()), result.getStatus()); + } + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { + throw new IllegalStateException(e); + } + return resultsMap; + } + } + private class FateTxStoreImpl extends AbstractFateTxStoreImpl { private FateTxStoreImpl(FateId fateId) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index edeef321a6b..6b8c6931e3a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1346,7 +1346,7 @@ boolean canSuspendTablets() { // Don't call start the CompactionCoordinator until we have tservers and upgrade is complete. compactionCoordinator.start(); - this.splitter = new Splitter(context); + this.splitter = new Splitter(this); this.splitter.start(); try { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index e29413e82aa..fe39e603818 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -75,7 +75,6 @@ import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread; import org.apache.accumulo.manager.metrics.ManagerMetrics; -import org.apache.accumulo.manager.split.SeedSplitTask; import org.apache.accumulo.manager.state.TableCounts; import org.apache.accumulo.manager.state.TableStats; import org.apache.accumulo.manager.upgrade.UpgradeCoordinator; @@ -607,7 +606,7 @@ private TableMgmtStats manageTablets(Iterator iter, final boolean needsSplit = actions.contains(ManagementAction.NEEDS_SPLITTING); if (needsSplit) { LOG.debug("{} may need splitting.", tm.getExtent()); - manager.getSplitter().initiateSplit(new SeedSplitTask(manager, tm.getExtent())); + manager.getSplitter().initiateSplit(tm.getExtent()); } if (actions.contains(ManagementAction.NEEDS_COMPACTING) && compactionGenerator != null) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java deleted file mode 100644 index 8270bc423f3..00000000000 --- a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.manager.split; - -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.Fate; -import org.apache.accumulo.core.fate.FateInstanceType; -import org.apache.accumulo.core.fate.FateKey; -import org.apache.accumulo.manager.Manager; -import org.apache.accumulo.manager.tableOps.split.FindSplits; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SeedSplitTask implements Runnable { - - private static final Logger log = LoggerFactory.getLogger(SeedSplitTask.class); - private final Manager manager; - private final KeyExtent extent; - - public SeedSplitTask(Manager manager, KeyExtent extent) { - this.manager = manager; - this.extent = extent; - } - - @Override - public void run() { - try { - var fateInstanceType = FateInstanceType.fromTableId((extent.tableId())); - manager.fate(fateInstanceType).seedTransaction(Fate.FateOperation.SYSTEM_SPLIT, - FateKey.forSplit(extent), new FindSplits(extent), true); - } catch (Exception e) { - log.error("Failed to split {}", extent, e); - } - } - - public KeyExtent getExtent() { - return extent; - } -} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java index 85b841d1cf4..d88e52ed66b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java @@ -18,22 +18,28 @@ */ package org.apache.accumulo.manager.split; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.util.cache.Caches.CacheName; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.split.FindSplits; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.hadoop.fs.FileSystem; @@ -49,9 +55,64 @@ public class Splitter { private static final Logger LOG = LoggerFactory.getLogger(Splitter.class); + private final Manager manager; private final ThreadPoolExecutor splitExecutor; // tracks which tablets are queued in splitExecutor - private final Set queuedTablets = ConcurrentHashMap.newKeySet(); + private final Map queuedTablets = new ConcurrentHashMap<>(); + + class SplitWorker implements Runnable { + + @Override + public void run() { + try { + while (manager.stillManager()) { + if (queuedTablets.isEmpty()) { + sleepUninterruptibly(10, TimeUnit.MILLISECONDS); + continue; + } + + final Map userSplits = new HashMap<>(); + final Map metaSplits = new HashMap<>(); + + // Go through all the queued up splits and partition + // into the different store types to be submitted. + queuedTablets.forEach((metaRow, extent) -> { + switch (FateInstanceType.fromTableId((extent.tableId()))) { + case USER: + userSplits.put(metaRow, extent); + break; + case META: + metaSplits.put(metaRow, extent); + break; + default: + throw new IllegalStateException("Unexpected FateInstanceType"); + } + }); + + // see the user and then meta splits + // The meta plits (zk) will be processed one at a time but there will not be + // many of those splits. The user splits are processed as a batch. + seedSplits(FateInstanceType.USER, userSplits); + seedSplits(FateInstanceType.META, metaSplits); + } + } catch (Exception e) { + LOG.error("Failed to split", e); + } + } + } + + private void seedSplits(FateInstanceType instanceType, Map splits) { + if (!splits.isEmpty()) { + try (var seeder = manager.fate(instanceType).beginSeeding()) { + for (KeyExtent extent : splits.values()) { + var unused = seeder.attemptToSeedTransaction(Fate.FateOperation.SYSTEM_SPLIT, + FateKey.forSplit(extent), new FindSplits(extent), true); + } + } finally { + queuedTablets.keySet().removeAll(splits.keySet()); + } + } + } public static class FileInfo { final Text firstRow; @@ -151,12 +212,12 @@ public int hashCode() { final LoadingCache splitFileCache; - public Splitter(ServerContext context) { - int numThreads = context.getConfiguration().getCount(Property.MANAGER_SPLIT_WORKER_THREADS); + public Splitter(Manager manager) { + this.manager = manager; + ServerContext context = manager.getContext(); - this.splitExecutor = context.threadPools().getPoolBuilder("split_seeder") - .numCoreThreads(numThreads).numMaxThreads(numThreads).withTimeOut(0L, TimeUnit.MILLISECONDS) - .enableThreadPoolMetrics().build(); + this.splitExecutor = context.threadPools().getPoolBuilder("split_seeder").numCoreThreads(1) + .numMaxThreads(1).withTimeOut(0L, TimeUnit.MILLISECONDS).enableThreadPoolMetrics().build(); Weigher weigher = (key, info) -> key.tableId.canonical().length() @@ -175,7 +236,9 @@ public Splitter(ServerContext context) { } - public synchronized void start() {} + public synchronized void start() { + splitExecutor.execute(new SplitWorker()); + } public synchronized void stop() { splitExecutor.shutdownNow(); @@ -185,29 +248,14 @@ public FileInfo getCachedFileInfo(TableId tableId, TabletFile tabletFile) { return splitFileCache.get(new CacheKey(tableId, tabletFile)); } - public void initiateSplit(SeedSplitTask seedSplitTask) { + public void initiateSplit(KeyExtent extent) { // Want to avoid queuing the same tablet multiple times, it would not cause bugs but would waste // work. Use the metadata row to identify a tablet because the KeyExtent also includes the prev // end row which may change when splits happen. The metaRow is conceptually tableId+endRow and // that does not change for a split. - Text metaRow = seedSplitTask.getExtent().toMetaRow(); + Text metaRow = extent.toMetaRow(); int qsize = queuedTablets.size(); - if (qsize < 10_000 && queuedTablets.add(metaRow)) { - Runnable taskWrapper = () -> { - try { - seedSplitTask.run(); - } finally { - queuedTablets.remove(metaRow); - } - }; - - try { - splitExecutor.execute(taskWrapper); - } catch (RejectedExecutionException rje) { - queuedTablets.remove(metaRow); - throw rje; - } - } else { + if (qsize >= 10_000 || queuedTablets.putIfAbsent(metaRow, extent) != null) { LOG.trace("Did not add {} to split queue {}", metaRow, qsize); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java index 61ea073a6f2..5b12b0f3cdd 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java @@ -77,9 +77,6 @@ public static void createFateTable(ClientContext client, String table) throws Ex assertEquals(fateTableProps, testFateTableProps); } - // For now just process one at a time as the current impl completes - // each seed transaction individually. In future versions we can test - // batching multiple seeding atempts together. public static Optional seedTransaction(FateStore store, Fate.FateOperation fateOp, FateKey fateKey, Repo repo, boolean autoCleanUp) { CompletableFuture> fateIdFuture;