-
Notifications
You must be signed in to change notification settings - Fork 456
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Process seeding of split fate operations in batches #5404
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -21,19 +21,26 @@ | |||||
import java.io.IOException; | ||||||
import java.io.Serializable; | ||||||
import java.util.EnumSet; | ||||||
import java.util.HashMap; | ||||||
import java.util.Iterator; | ||||||
import java.util.List; | ||||||
import java.util.Map; | ||||||
import java.util.Map.Entry; | ||||||
import java.util.Objects; | ||||||
import java.util.Optional; | ||||||
import java.util.SortedMap; | ||||||
import java.util.UUID; | ||||||
import java.util.concurrent.CompletableFuture; | ||||||
import java.util.concurrent.atomic.AtomicBoolean; | ||||||
import java.util.function.Function; | ||||||
import java.util.function.Predicate; | ||||||
import java.util.function.Supplier; | ||||||
import java.util.stream.Collectors; | ||||||
import java.util.stream.Stream; | ||||||
|
||||||
import org.apache.accumulo.core.client.AccumuloException; | ||||||
import org.apache.accumulo.core.client.AccumuloSecurityException; | ||||||
import org.apache.accumulo.core.client.ConditionalWriter; | ||||||
import org.apache.accumulo.core.client.Scanner; | ||||||
import org.apache.accumulo.core.client.TableNotFoundException; | ||||||
import org.apache.accumulo.core.clientImpl.ClientContext; | ||||||
|
@@ -47,16 +54,19 @@ | |||||
import org.apache.accumulo.core.fate.FateId; | ||||||
import org.apache.accumulo.core.fate.FateInstanceType; | ||||||
import org.apache.accumulo.core.fate.FateKey; | ||||||
import org.apache.accumulo.core.fate.FateKey.FateKeyType; | ||||||
import org.apache.accumulo.core.fate.ReadOnlyRepo; | ||||||
import org.apache.accumulo.core.fate.Repo; | ||||||
import org.apache.accumulo.core.fate.StackOverflowException; | ||||||
import org.apache.accumulo.core.fate.user.FateMutator.Status; | ||||||
import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily; | ||||||
import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily; | ||||||
import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily; | ||||||
import org.apache.accumulo.core.fate.zookeeper.ZooUtil; | ||||||
import org.apache.accumulo.core.iterators.user.WholeRowIterator; | ||||||
import org.apache.accumulo.core.security.Authorizations; | ||||||
import org.apache.accumulo.core.util.ColumnFQ; | ||||||
import org.apache.accumulo.core.util.Pair; | ||||||
import org.apache.accumulo.core.util.UtilWaitThread; | ||||||
import org.apache.hadoop.io.Text; | ||||||
import org.slf4j.Logger; | ||||||
|
@@ -136,36 +146,14 @@ public FateId getFateId() { | |||||
|
||||||
@Override | ||||||
public Seeder<T> beginSeeding() { | ||||||
// TODO: For now can handle seeding 1 transaction at a time so just process | ||||||
// everything in attemptToSeedTransaction | ||||||
// Part 2 of the changes in #5160 will allow multiple seeding attempts to be combined | ||||||
// into one conditional mutation and we will need to track the pending operations | ||||||
// and futures in a map | ||||||
return new Seeder<T>() { | ||||||
@Override | ||||||
public CompletableFuture<Optional<FateId>> attemptToSeedTransaction(FateOperation fateOp, | ||||||
FateKey fateKey, Repo<T> repo, boolean autoCleanUp) { | ||||||
return CompletableFuture | ||||||
.completedFuture(seedTransaction(fateOp, fateKey, repo, autoCleanUp)); | ||||||
} | ||||||
|
||||||
@Override | ||||||
public void close() { | ||||||
// TODO: This will be used in Part 2 of #5160 | ||||||
} | ||||||
}; | ||||||
return new BatchSeeder(); | ||||||
} | ||||||
|
||||||
private Optional<FateId> seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, Repo<T> repo, | ||||||
boolean autoCleanUp) { | ||||||
final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey); | ||||||
private FateMutator<T> seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, FateId fateId, | ||||||
Repo<T> repo, boolean autoCleanUp) { | ||||||
Supplier<FateMutator<T>> mutatorFactory = () -> newMutator(fateId).requireAbsent() | ||||||
.putKey(fateKey).putCreateTime(System.currentTimeMillis()); | ||||||
if (seedTransaction(mutatorFactory, fateKey + " " + fateId, fateOp, repo, autoCleanUp)) { | ||||||
return Optional.of(fateId); | ||||||
} else { | ||||||
return Optional.empty(); | ||||||
} | ||||||
return buildMutator(mutatorFactory, fateOp, repo, autoCleanUp); | ||||||
} | ||||||
|
||||||
@Override | ||||||
|
@@ -176,16 +164,22 @@ public boolean seedTransaction(Fate.FateOperation fateOp, FateId fateId, Repo<T> | |||||
return seedTransaction(mutatorFactory, fateId.canonical(), fateOp, repo, autoCleanUp); | ||||||
} | ||||||
|
||||||
private FateMutator<T> buildMutator(Supplier<FateMutator<T>> mutatorFactory, | ||||||
Fate.FateOperation fateOp, Repo<T> repo, boolean autoCleanUp) { | ||||||
var mutator = mutatorFactory.get(); | ||||||
mutator = | ||||||
mutator.putFateOp(serializeTxInfo(fateOp)).putRepo(1, repo).putStatus(TStatus.SUBMITTED); | ||||||
if (autoCleanUp) { | ||||||
mutator = mutator.putAutoClean(serializeTxInfo(autoCleanUp)); | ||||||
} | ||||||
return mutator; | ||||||
} | ||||||
|
||||||
private boolean seedTransaction(Supplier<FateMutator<T>> mutatorFactory, String logId, | ||||||
Fate.FateOperation fateOp, Repo<T> repo, boolean autoCleanUp) { | ||||||
var mutator = buildMutator(mutatorFactory, fateOp, repo, autoCleanUp); | ||||||
int maxAttempts = 5; | ||||||
for (int attempt = 0; attempt < maxAttempts; attempt++) { | ||||||
var mutator = mutatorFactory.get(); | ||||||
mutator = | ||||||
mutator.putFateOp(serializeTxInfo(fateOp)).putRepo(1, repo).putStatus(TStatus.SUBMITTED); | ||||||
if (autoCleanUp) { | ||||||
mutator = mutator.putAutoClean(serializeTxInfo(autoCleanUp)); | ||||||
} | ||||||
var status = mutator.tryMutate(); | ||||||
if (status == FateMutator.Status.ACCEPTED) { | ||||||
// signal to the super class that a new fate transaction was seeded and is ready to run | ||||||
|
@@ -393,6 +387,113 @@ public FateInstanceType type() { | |||||
return fateInstanceType; | ||||||
} | ||||||
|
||||||
private class BatchSeeder implements Seeder<T> { | ||||||
private final AtomicBoolean closed = new AtomicBoolean(false); | ||||||
|
||||||
private final Map<FateId,Pair<FateMutator<T>,CompletableFuture<Optional<FateId>>>> pending = | ||||||
new HashMap<>(); | ||||||
|
||||||
@Override | ||||||
public CompletableFuture<Optional<FateId>> attemptToSeedTransaction(FateOperation fateOp, | ||||||
FateKey fateKey, Repo<T> repo, boolean autoCleanUp) { | ||||||
Preconditions.checkState(!closed.get(), "Can't attempt to seed with a closed seeder."); | ||||||
|
||||||
final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey); | ||||||
// If not already submitted, add to the pending list and return the future | ||||||
// or the existing future if duplicate. The pending map will store the mutator | ||||||
// to be processed on close in a one batch. | ||||||
return pending.computeIfAbsent(fateId, id -> { | ||||||
FateMutator<T> mutator = seedTransaction(fateOp, fateKey, fateId, repo, autoCleanUp); | ||||||
CompletableFuture<Optional<FateId>> future = new CompletableFuture<>(); | ||||||
return new Pair<>(mutator, future); | ||||||
}).getSecond(); | ||||||
} | ||||||
|
||||||
@Override | ||||||
public void close() { | ||||||
closed.set(true); | ||||||
|
||||||
int maxAttempts = 5; | ||||||
|
||||||
// This loop will submit all the pending mutations as one batch | ||||||
// to a conditional writer and any known results will be removed | ||||||
// from the pending map. Unknown results will be re-attempted up | ||||||
// to the maxAttempts count | ||||||
for (int attempt = 0; attempt < maxAttempts; attempt++) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like this will continue to loop even after pending is empty, could add a check for that.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch, i'll fix that |
||||||
var currentResults = tryMutateBatch(); | ||||||
for (Entry<FateId,ConditionalWriter.Status> result : currentResults.entrySet()) { | ||||||
var fateId = result.getKey(); | ||||||
var status = result.getValue(); | ||||||
var future = pending.get(fateId).getSecond(); | ||||||
switch (result.getValue()) { | ||||||
case ACCEPTED: | ||||||
seededTx(); | ||||||
log.trace("Attempt to seed {} returned {}", fateId.canonical(), status); | ||||||
// Complete the future with the fatId and remove from pending | ||||||
future.complete(Optional.of(fateId)); | ||||||
pending.remove(fateId); | ||||||
break; | ||||||
case REJECTED: | ||||||
log.debug("Attempt to seed {} returned {}", fateId.canonical(), status); | ||||||
// Rejected so complete with an empty optional and remove from pending | ||||||
future.complete(Optional.empty()); | ||||||
pending.remove(fateId); | ||||||
break; | ||||||
case UNKNOWN: | ||||||
log.debug("Attempt to seed {} returned {} status, retrying", fateId.canonical(), | ||||||
status); | ||||||
// unknown, so don't remove from map so that we try again if still under | ||||||
// max attempts | ||||||
break; | ||||||
default: | ||||||
// do not expect other statuses | ||||||
throw new IllegalStateException("Unhandled status for mutation " + status); | ||||||
} | ||||||
} | ||||||
|
||||||
if (!pending.isEmpty()) { | ||||||
// At this point can not reliably determine if the unknown pending mutations were | ||||||
// successful or not because no reservation was acquired. For example since no | ||||||
// reservation was acquired it is possible that seeding was a success and something | ||||||
// immediately picked it up and started operating on it and changing it. | ||||||
// If scanning after that point can not conclude success or failure. Another situation | ||||||
// is that maybe the fateId already existed in a seeded form prior to getting this | ||||||
// unknown. | ||||||
UtilWaitThread.sleep(250); | ||||||
} | ||||||
} | ||||||
|
||||||
// Any remaining will be UNKNOWN status, so complete the futures with an optional empty | ||||||
pending.forEach((fateId, pair) -> { | ||||||
pair.getSecond().complete(Optional.empty()); | ||||||
log.warn("Repeatedly received unknown status when attempting to seed {}", | ||||||
fateId.canonical()); | ||||||
}); | ||||||
} | ||||||
|
||||||
// Submit all the pending mutations to a single conditional writer | ||||||
// as one batch and return the results for each mutation | ||||||
private Map<FateId,ConditionalWriter.Status> tryMutateBatch() { | ||||||
if (pending.isEmpty()) { | ||||||
return Map.of(); | ||||||
} | ||||||
|
||||||
final Map<FateId,ConditionalWriter.Status> resultsMap = new HashMap<>(); | ||||||
try (ConditionalWriter writer = context.createConditionalWriter(tableName)) { | ||||||
Iterator<ConditionalWriter.Result> results = writer | ||||||
.write(pending.values().stream().map(pair -> pair.getFirst().getMutation()).iterator()); | ||||||
while (results.hasNext()) { | ||||||
var result = results.next(); | ||||||
var row = new Text(result.getMutation().getRow()); | ||||||
resultsMap.put(FateId.from(FateInstanceType.USER, row.toString()), result.getStatus()); | ||||||
} | ||||||
} catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { | ||||||
throw new IllegalStateException(e); | ||||||
} | ||||||
return resultsMap; | ||||||
} | ||||||
} | ||||||
|
||||||
private class FateTxStoreImpl extends AbstractFateTxStoreImpl { | ||||||
|
||||||
private FateTxStoreImpl(FateId fateId) { | ||||||
|
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This property can be dropped instead of deprecated as it was introduced in main.