Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process seeding of split fate operations in batches #5404

Merged
merged 4 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ public enum Property {
+ "indefinitely. Default is 0 to block indefinitely. Only valid when tserver available "
+ "threshold is set greater than 0.",
"1.10.0"),
@Deprecated(since = "4.0.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This property can be dropped instead of deprecated as it was introduced in main.

MANAGER_SPLIT_WORKER_THREADS("manager.split.seed.threadpool.size", "8", PropertyType.COUNT,
"The number of threads used to seed fate split task, the actual split work is done by fate"
+ " threads.",
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.fate.FateStore.FateTxStore;
import org.apache.accumulo.core.fate.FateStore.Seeder;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.logging.FateLogger;
import org.apache.accumulo.core.manager.thrift.TFateOperation;
Expand Down Expand Up @@ -538,6 +539,10 @@ public FateId startTransaction() {
return store.create();
}

public Seeder<T> beginSeeding() {
return store.beginSeeding();
}

public void seedTransaction(FateOperation fateOp, FateKey fateKey, Repo<T> repo,
boolean autoCleanUp) {
try (var seeder = store.beginSeeding()) {
Expand Down
11 changes: 2 additions & 9 deletions core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ interface Seeder<T> extends AutoCloseable {
* Attempts to seed a transaction with the given repo if it does not exist. A fateId will be
* derived from the fateKey. If seeded, sets the following data for the fateId in the store.
*
* TODO: Support completing futures later in close method The current version will always return
* with a CompleteableFuture that is already completed. Future version will process will
* complete in the close() method for the User store.
*
* <ul>
* <li>Set the fate op</li>
* <li>Set the status to SUBMITTED</li>
Expand All @@ -77,15 +73,12 @@ interface Seeder<T> extends AutoCloseable {
CompletableFuture<Optional<FateId>> attemptToSeedTransaction(Fate.FateOperation fateOp,
FateKey fateKey, Repo<T> repo, boolean autoCleanUp);

// TODO: Right now all implementations do nothing
// Eventually this would check the status of all added conditional mutations,
// retry unknown, and then close the conditional writer.
@Override
void close();
}

// Creates a conditional writer for the user fate store. For Zookeeper all this code will probably
// do the same thing its currently doing as zookeeper does not support multi-node operations.
// Creates a conditional writer for the user fate store. For Zookeeper this will be a no-op
// because currently zookeeper does not support multi-node operations.
Seeder<T> beginSeeding();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.accumulo.core.fate.user;

import org.apache.accumulo.core.data.ConditionalMutation;
import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateKey;
import org.apache.accumulo.core.fate.FateStore;
Expand Down Expand Up @@ -101,4 +102,6 @@ enum Status {

Status tryMutate();

ConditionalMutation getMutation();

}
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,9 @@ public Status tryMutate() {
throw new RuntimeException(e);
}
}

@Override
public ConditionalMutation getMutation() {
return mutation;
}
}
165 changes: 133 additions & 32 deletions core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,26 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.clientImpl.ClientContext;
Expand All @@ -47,16 +54,19 @@
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateKey;
import org.apache.accumulo.core.fate.FateKey.FateKeyType;
import org.apache.accumulo.core.fate.ReadOnlyRepo;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.StackOverflowException;
import org.apache.accumulo.core.fate.user.FateMutator.Status;
import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily;
import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily;
import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
Expand Down Expand Up @@ -136,36 +146,14 @@ public FateId getFateId() {

@Override
public Seeder<T> beginSeeding() {
// TODO: For now can handle seeding 1 transaction at a time so just process
// everything in attemptToSeedTransaction
// Part 2 of the changes in #5160 will allow multiple seeding attempts to be combined
// into one conditional mutation and we will need to track the pending operations
// and futures in a map
return new Seeder<T>() {
@Override
public CompletableFuture<Optional<FateId>> attemptToSeedTransaction(FateOperation fateOp,
FateKey fateKey, Repo<T> repo, boolean autoCleanUp) {
return CompletableFuture
.completedFuture(seedTransaction(fateOp, fateKey, repo, autoCleanUp));
}

@Override
public void close() {
// TODO: This will be used in Part 2 of #5160
}
};
return new BatchSeeder();
}

private Optional<FateId> seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, Repo<T> repo,
boolean autoCleanUp) {
final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
private FateMutator<T> seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, FateId fateId,
Repo<T> repo, boolean autoCleanUp) {
Supplier<FateMutator<T>> mutatorFactory = () -> newMutator(fateId).requireAbsent()
.putKey(fateKey).putCreateTime(System.currentTimeMillis());
if (seedTransaction(mutatorFactory, fateKey + " " + fateId, fateOp, repo, autoCleanUp)) {
return Optional.of(fateId);
} else {
return Optional.empty();
}
return buildMutator(mutatorFactory, fateOp, repo, autoCleanUp);
}

@Override
Expand All @@ -176,16 +164,22 @@ public boolean seedTransaction(Fate.FateOperation fateOp, FateId fateId, Repo<T>
return seedTransaction(mutatorFactory, fateId.canonical(), fateOp, repo, autoCleanUp);
}

private FateMutator<T> buildMutator(Supplier<FateMutator<T>> mutatorFactory,
Fate.FateOperation fateOp, Repo<T> repo, boolean autoCleanUp) {
var mutator = mutatorFactory.get();
mutator =
mutator.putFateOp(serializeTxInfo(fateOp)).putRepo(1, repo).putStatus(TStatus.SUBMITTED);
if (autoCleanUp) {
mutator = mutator.putAutoClean(serializeTxInfo(autoCleanUp));
}
return mutator;
}

private boolean seedTransaction(Supplier<FateMutator<T>> mutatorFactory, String logId,
Fate.FateOperation fateOp, Repo<T> repo, boolean autoCleanUp) {
var mutator = buildMutator(mutatorFactory, fateOp, repo, autoCleanUp);
int maxAttempts = 5;
for (int attempt = 0; attempt < maxAttempts; attempt++) {
var mutator = mutatorFactory.get();
mutator =
mutator.putFateOp(serializeTxInfo(fateOp)).putRepo(1, repo).putStatus(TStatus.SUBMITTED);
if (autoCleanUp) {
mutator = mutator.putAutoClean(serializeTxInfo(autoCleanUp));
}
var status = mutator.tryMutate();
if (status == FateMutator.Status.ACCEPTED) {
// signal to the super class that a new fate transaction was seeded and is ready to run
Expand Down Expand Up @@ -393,6 +387,113 @@ public FateInstanceType type() {
return fateInstanceType;
}

private class BatchSeeder implements Seeder<T> {
private final AtomicBoolean closed = new AtomicBoolean(false);

private final Map<FateId,Pair<FateMutator<T>,CompletableFuture<Optional<FateId>>>> pending =
new HashMap<>();

@Override
public CompletableFuture<Optional<FateId>> attemptToSeedTransaction(FateOperation fateOp,
FateKey fateKey, Repo<T> repo, boolean autoCleanUp) {
Preconditions.checkState(!closed.get(), "Can't attempt to seed with a closed seeder.");

final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
// If not already submitted, add to the pending list and return the future
// or the existing future if duplicate. The pending map will store the mutator
// to be processed on close in a one batch.
return pending.computeIfAbsent(fateId, id -> {
FateMutator<T> mutator = seedTransaction(fateOp, fateKey, fateId, repo, autoCleanUp);
CompletableFuture<Optional<FateId>> future = new CompletableFuture<>();
return new Pair<>(mutator, future);
}).getSecond();
}

@Override
public void close() {
closed.set(true);

int maxAttempts = 5;

// This loop will submit all the pending mutations as one batch
// to a conditional writer and any known results will be removed
// from the pending map. Unknown results will be re-attempted up
// to the maxAttempts count
for (int attempt = 0; attempt < maxAttempts; attempt++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this will continue to loop even after pending is empty, could add a check for that.

Suggested change
for (int attempt = 0; attempt < maxAttempts; attempt++) {
for (int attempt = 0; attempt < maxAttempts && !pending.isEmpty(); attempt++) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, i'll fix that

var currentResults = tryMutateBatch();
for (Entry<FateId,ConditionalWriter.Status> result : currentResults.entrySet()) {
var fateId = result.getKey();
var status = result.getValue();
var future = pending.get(fateId).getSecond();
switch (result.getValue()) {
case ACCEPTED:
seededTx();
log.trace("Attempt to seed {} returned {}", fateId.canonical(), status);
// Complete the future with the fatId and remove from pending
future.complete(Optional.of(fateId));
pending.remove(fateId);
break;
case REJECTED:
log.debug("Attempt to seed {} returned {}", fateId.canonical(), status);
// Rejected so complete with an empty optional and remove from pending
future.complete(Optional.empty());
pending.remove(fateId);
break;
case UNKNOWN:
log.debug("Attempt to seed {} returned {} status, retrying", fateId.canonical(),
status);
// unknown, so don't remove from map so that we try again if still under
// max attempts
break;
default:
// do not expect other statuses
throw new IllegalStateException("Unhandled status for mutation " + status);
}
}

if (!pending.isEmpty()) {
// At this point can not reliably determine if the unknown pending mutations were
// successful or not because no reservation was acquired. For example since no
// reservation was acquired it is possible that seeding was a success and something
// immediately picked it up and started operating on it and changing it.
// If scanning after that point can not conclude success or failure. Another situation
// is that maybe the fateId already existed in a seeded form prior to getting this
// unknown.
UtilWaitThread.sleep(250);
}
}

// Any remaining will be UNKNOWN status, so complete the futures with an optional empty
pending.forEach((fateId, pair) -> {
pair.getSecond().complete(Optional.empty());
log.warn("Repeatedly received unknown status when attempting to seed {}",
fateId.canonical());
});
}

// Submit all the pending mutations to a single conditional writer
// as one batch and return the results for each mutation
private Map<FateId,ConditionalWriter.Status> tryMutateBatch() {
if (pending.isEmpty()) {
return Map.of();
}

final Map<FateId,ConditionalWriter.Status> resultsMap = new HashMap<>();
try (ConditionalWriter writer = context.createConditionalWriter(tableName)) {
Iterator<ConditionalWriter.Result> results = writer
.write(pending.values().stream().map(pair -> pair.getFirst().getMutation()).iterator());
while (results.hasNext()) {
var result = results.next();
var row = new Text(result.getMutation().getRow());
resultsMap.put(FateId.from(FateInstanceType.USER, row.toString()), result.getStatus());
}
} catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
throw new IllegalStateException(e);
}
return resultsMap;
}
}

private class FateTxStoreImpl extends AbstractFateTxStoreImpl {

private FateTxStoreImpl(FateId fateId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,7 @@ boolean canSuspendTablets() {
// Don't call start the CompactionCoordinator until we have tservers and upgrade is complete.
compactionCoordinator.start();

this.splitter = new Splitter(context);
this.splitter = new Splitter(this);
this.splitter.start();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread;
import org.apache.accumulo.manager.metrics.ManagerMetrics;
import org.apache.accumulo.manager.split.SeedSplitTask;
import org.apache.accumulo.manager.state.TableCounts;
import org.apache.accumulo.manager.state.TableStats;
import org.apache.accumulo.manager.upgrade.UpgradeCoordinator;
Expand Down Expand Up @@ -607,7 +606,7 @@ private TableMgmtStats manageTablets(Iterator<TabletManagement> iter,
final boolean needsSplit = actions.contains(ManagementAction.NEEDS_SPLITTING);
if (needsSplit) {
LOG.debug("{} may need splitting.", tm.getExtent());
manager.getSplitter().initiateSplit(new SeedSplitTask(manager, tm.getExtent()));
manager.getSplitter().initiateSplit(tm.getExtent());
}

if (actions.contains(ManagementAction.NEEDS_COMPACTING) && compactionGenerator != null) {
Expand Down

This file was deleted.

Loading