Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process seeding of split fate operations in batches #5404

Merged
merged 4 commits into from
Mar 21, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -466,10 +466,6 @@ public enum Property {
+ "indefinitely. Default is 0 to block indefinitely. Only valid when tserver available "
+ "threshold is set greater than 0.",
"1.10.0"),
MANAGER_SPLIT_WORKER_THREADS("manager.split.seed.threadpool.size", "8", PropertyType.COUNT,
"The number of threads used to seed fate split task, the actual split work is done by fate"
+ " threads.",
"4.0.0"),
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size",
"1M", PropertyType.MEMORY,
"The data size of each resource groups compaction job priority queue. The memory size of "
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
@@ -55,6 +55,7 @@
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.fate.FateStore.FateTxStore;
import org.apache.accumulo.core.fate.FateStore.Seeder;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.logging.FateLogger;
import org.apache.accumulo.core.manager.thrift.TFateOperation;
@@ -539,6 +540,10 @@ public FateId startTransaction() {
return store.create();
}

public Seeder<T> beginSeeding() {
return store.beginSeeding();
}

public void seedTransaction(FateOperation fateOp, FateKey fateKey, Repo<T> repo,
boolean autoCleanUp) {
try (var seeder = store.beginSeeding()) {
11 changes: 2 additions & 9 deletions core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
Original file line number Diff line number Diff line change
@@ -56,10 +56,6 @@ interface Seeder<T> extends AutoCloseable {
* Attempts to seed a transaction with the given repo if it does not exist. A fateId will be
* derived from the fateKey. If seeded, sets the following data for the fateId in the store.
*
* TODO: Support completing futures later in close method The current version will always return
* with a CompleteableFuture that is already completed. Future version will process will
* complete in the close() method for the User store.
*
* <ul>
* <li>Set the fate op</li>
* <li>Set the status to SUBMITTED</li>
@@ -76,15 +72,12 @@ interface Seeder<T> extends AutoCloseable {
CompletableFuture<Optional<FateId>> attemptToSeedTransaction(Fate.FateOperation fateOp,
FateKey fateKey, Repo<T> repo, boolean autoCleanUp);

// TODO: Right now all implementations do nothing
// Eventually this would check the status of all added conditional mutations,
// retry unknown, and then close the conditional writer.
@Override
void close();
}

// Creates a conditional writer for the user fate store. For Zookeeper all this code will probably
// do the same thing its currently doing as zookeeper does not support multi-node operations.
// Creates a conditional writer for the user fate store. For Zookeeper this will be a no-op
// because currently zookeeper does not support multi-node operations.
Seeder<T> beginSeeding();

/**
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.core.fate.user;

import org.apache.accumulo.core.data.ConditionalMutation;
import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateKey;
import org.apache.accumulo.core.fate.FateStore;
@@ -101,4 +102,6 @@ enum Status {

Status tryMutate();

ConditionalMutation getMutation();

}
Original file line number Diff line number Diff line change
@@ -260,4 +260,9 @@ public Status tryMutate() {
throw new RuntimeException(e);
}
}

@Override
public ConditionalMutation getMutation() {
return mutation;
}
}
165 changes: 133 additions & 32 deletions core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
Original file line number Diff line number Diff line change
@@ -21,19 +21,26 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.clientImpl.ClientContext;
@@ -47,16 +54,19 @@
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateKey;
import org.apache.accumulo.core.fate.FateKey.FateKeyType;
import org.apache.accumulo.core.fate.ReadOnlyRepo;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.StackOverflowException;
import org.apache.accumulo.core.fate.user.FateMutator.Status;
import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily;
import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily;
import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
@@ -136,36 +146,14 @@ public FateId getFateId() {

@Override
public Seeder<T> beginSeeding() {
// TODO: For now can handle seeding 1 transaction at a time so just process
// everything in attemptToSeedTransaction
// Part 2 of the changes in #5160 will allow multiple seeding attempts to be combined
// into one conditional mutation and we will need to track the pending operations
// and futures in a map
return new Seeder<T>() {
@Override
public CompletableFuture<Optional<FateId>> attemptToSeedTransaction(FateOperation fateOp,
FateKey fateKey, Repo<T> repo, boolean autoCleanUp) {
return CompletableFuture
.completedFuture(seedTransaction(fateOp, fateKey, repo, autoCleanUp));
}

@Override
public void close() {
// TODO: This will be used in Part 2 of #5160
}
};
return new BatchSeeder();
}

private Optional<FateId> seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, Repo<T> repo,
boolean autoCleanUp) {
final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
private FateMutator<T> seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, FateId fateId,
Repo<T> repo, boolean autoCleanUp) {
Supplier<FateMutator<T>> mutatorFactory = () -> newMutator(fateId).requireAbsent()
.putKey(fateKey).putCreateTime(System.currentTimeMillis());
if (seedTransaction(mutatorFactory, fateKey + " " + fateId, fateOp, repo, autoCleanUp)) {
return Optional.of(fateId);
} else {
return Optional.empty();
}
return buildMutator(mutatorFactory, fateOp, repo, autoCleanUp);
}

@Override
@@ -176,16 +164,22 @@ public boolean seedTransaction(Fate.FateOperation fateOp, FateId fateId, Repo<T>
return seedTransaction(mutatorFactory, fateId.canonical(), fateOp, repo, autoCleanUp);
}

private FateMutator<T> buildMutator(Supplier<FateMutator<T>> mutatorFactory,
Fate.FateOperation fateOp, Repo<T> repo, boolean autoCleanUp) {
var mutator = mutatorFactory.get();
mutator =
mutator.putFateOp(serializeTxInfo(fateOp)).putRepo(1, repo).putStatus(TStatus.SUBMITTED);
if (autoCleanUp) {
mutator = mutator.putAutoClean(serializeTxInfo(autoCleanUp));
}
return mutator;
}

private boolean seedTransaction(Supplier<FateMutator<T>> mutatorFactory, String logId,
Fate.FateOperation fateOp, Repo<T> repo, boolean autoCleanUp) {
var mutator = buildMutator(mutatorFactory, fateOp, repo, autoCleanUp);
int maxAttempts = 5;
for (int attempt = 0; attempt < maxAttempts; attempt++) {
var mutator = mutatorFactory.get();
mutator =
mutator.putFateOp(serializeTxInfo(fateOp)).putRepo(1, repo).putStatus(TStatus.SUBMITTED);
if (autoCleanUp) {
mutator = mutator.putAutoClean(serializeTxInfo(autoCleanUp));
}
var status = mutator.tryMutate();
if (status == FateMutator.Status.ACCEPTED) {
// signal to the super class that a new fate transaction was seeded and is ready to run
@@ -393,6 +387,113 @@ public FateInstanceType type() {
return fateInstanceType;
}

private class BatchSeeder implements Seeder<T> {
private final AtomicBoolean closed = new AtomicBoolean(false);

private final Map<FateId,Pair<FateMutator<T>,CompletableFuture<Optional<FateId>>>> pending =
new HashMap<>();

@Override
public CompletableFuture<Optional<FateId>> attemptToSeedTransaction(FateOperation fateOp,
FateKey fateKey, Repo<T> repo, boolean autoCleanUp) {
Preconditions.checkState(!closed.get(), "Can't attempt to seed with a closed seeder.");

final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
// If not already submitted, add to the pending list and return the future
// or the existing future if duplicate. The pending map will store the mutator
// to be processed on close in a one batch.
return pending.computeIfAbsent(fateId, id -> {
FateMutator<T> mutator = seedTransaction(fateOp, fateKey, fateId, repo, autoCleanUp);
CompletableFuture<Optional<FateId>> future = new CompletableFuture<>();
return new Pair<>(mutator, future);
}).getSecond();
}

@Override
public void close() {
closed.set(true);

int maxAttempts = 5;

// This loop will submit all the pending mutations as one batch
// to a conditional writer and any known results will be removed
// from the pending map. Unknown results will be re-attempted up
// to the maxAttempts count
for (int attempt = 0; attempt < maxAttempts && !pending.isEmpty(); attempt++) {
var currentResults = tryMutateBatch();
for (Entry<FateId,ConditionalWriter.Status> result : currentResults.entrySet()) {
var fateId = result.getKey();
var status = result.getValue();
var future = pending.get(fateId).getSecond();
switch (result.getValue()) {
case ACCEPTED:
seededTx();
log.trace("Attempt to seed {} returned {}", fateId.canonical(), status);
// Complete the future with the fatId and remove from pending
future.complete(Optional.of(fateId));
pending.remove(fateId);
break;
case REJECTED:
log.debug("Attempt to seed {} returned {}", fateId.canonical(), status);
// Rejected so complete with an empty optional and remove from pending
future.complete(Optional.empty());
pending.remove(fateId);
break;
case UNKNOWN:
log.debug("Attempt to seed {} returned {} status, retrying", fateId.canonical(),
status);
// unknown, so don't remove from map so that we try again if still under
// max attempts
break;
default:
// do not expect other statuses
throw new IllegalStateException("Unhandled status for mutation " + status);
}
}

if (!pending.isEmpty()) {
// At this point can not reliably determine if the unknown pending mutations were
// successful or not because no reservation was acquired. For example since no
// reservation was acquired it is possible that seeding was a success and something
// immediately picked it up and started operating on it and changing it.
// If scanning after that point can not conclude success or failure. Another situation
// is that maybe the fateId already existed in a seeded form prior to getting this
// unknown.
UtilWaitThread.sleep(250);
}
}

// Any remaining will be UNKNOWN status, so complete the futures with an optional empty
pending.forEach((fateId, pair) -> {
pair.getSecond().complete(Optional.empty());
log.warn("Repeatedly received unknown status when attempting to seed {}",
fateId.canonical());
});
}

// Submit all the pending mutations to a single conditional writer
// as one batch and return the results for each mutation
private Map<FateId,ConditionalWriter.Status> tryMutateBatch() {
if (pending.isEmpty()) {
return Map.of();
}

final Map<FateId,ConditionalWriter.Status> resultsMap = new HashMap<>();
try (ConditionalWriter writer = context.createConditionalWriter(tableName)) {
Iterator<ConditionalWriter.Result> results = writer
.write(pending.values().stream().map(pair -> pair.getFirst().getMutation()).iterator());
while (results.hasNext()) {
var result = results.next();
var row = new Text(result.getMutation().getRow());
resultsMap.put(FateId.from(FateInstanceType.USER, row.toString()), result.getStatus());
}
} catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
throw new IllegalStateException(e);
}
return resultsMap;
}
}

private class FateTxStoreImpl extends AbstractFateTxStoreImpl {

private FateTxStoreImpl(FateId fateId) {
Original file line number Diff line number Diff line change
@@ -1346,7 +1346,7 @@ boolean canSuspendTablets() {
// Don't call start the CompactionCoordinator until we have tservers and upgrade is complete.
compactionCoordinator.start();

this.splitter = new Splitter(context);
this.splitter = new Splitter(this);
this.splitter.start();

try {
Original file line number Diff line number Diff line change
@@ -75,7 +75,6 @@
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread;
import org.apache.accumulo.manager.metrics.ManagerMetrics;
import org.apache.accumulo.manager.split.SeedSplitTask;
import org.apache.accumulo.manager.state.TableCounts;
import org.apache.accumulo.manager.state.TableStats;
import org.apache.accumulo.manager.upgrade.UpgradeCoordinator;
@@ -607,7 +606,7 @@ private TableMgmtStats manageTablets(Iterator<TabletManagement> iter,
final boolean needsSplit = actions.contains(ManagementAction.NEEDS_SPLITTING);
if (needsSplit) {
LOG.debug("{} may need splitting.", tm.getExtent());
manager.getSplitter().initiateSplit(new SeedSplitTask(manager, tm.getExtent()));
manager.getSplitter().initiateSplit(tm.getExtent());
}

if (actions.contains(ManagementAction.NEEDS_COMPACTING) && compactionGenerator != null) {

This file was deleted.

Loading