Skip to content

Commit 2abf5b1

Browse files
authored
Single node META FATE data (apache#5127)
- Moved all the fate data for a single `META` transaction into a single ZK node - Pushed all the data into `NodeValue` (renamed to `FateData`) - Previously, `FateData` stored `TStatus`, `FateReservation`, and `FateKey`. Now additionally stores the `REPO` stack and `TxInfo`. - Status enforcement added to `MetaFateStore` (previously only possible for `UserFateStore`). - Moved `testFateInitialConfigCorrectness()` from `UserFateStoreIT` to `UserFateIT` - Renamed `UserFateStoreIT` to `UserFateStatusEnforcementIT` (now extends a new class `FateStatusEnforcementIT`) - Now only tests status enforcement (previously status enforcement + `testFateInitialConfigCorrectness()`) - Created `MetaFateStatusEnforcementIT` (extends `FateStatusEnforcementIT`) - Tests that the new status enforcement in `MetaFateStore` works - Created `FateStoreUtil`, moving the `createFateTable()` util here, created `MetaFateZKSetup` inner class here (the counterpart to `createFateTable()` for `UserFateStore` but sets up ZooKeeper for use in `MetaFateStore`) - Deleted `UserFateStoreIT`s (now `UserFateStatusEnforcementIT`) method `injectStatus` replacing with the existing `setStatus` which does the same thing - Changed `StackOverflowException` to instead be a `RuntimeException` (previously `Exception`) - Deleted unnecessary preexisting catch and immediate re-throw of a `StackOverflowException` in `MetaFateStore.FateTxStoreImpl.push(repo)` - Cleaned up and refactored `MetaFateStore` methods which mutate existing FateData; now reuse same pattern across these methods: all call new method `MetaFateStore.mutate()`
1 parent e73a27d commit 2abf5b1

16 files changed

+811
-573
lines changed

core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.Map;
3535
import java.util.Objects;
3636
import java.util.Optional;
37+
import java.util.Set;
3738
import java.util.UUID;
3839
import java.util.concurrent.TimeUnit;
3940
import java.util.concurrent.atomic.AtomicBoolean;
@@ -75,12 +76,22 @@ public FateId newRandomId(FateInstanceType instanceType) {
7576
return FateId.from(instanceType, UUID.randomUUID());
7677
}
7778
};
79+
protected static final int MAX_REPOS = 100;
7880

7981
// The ZooKeeper lock for the process that's running this store instance
8082
protected final ZooUtil.LockID lockID;
8183
protected final Predicate<ZooUtil.LockID> isLockHeld;
8284
protected final Map<FateId,CountDownTimer> deferred;
8385
protected final FateIdGenerator fateIdGenerator;
86+
// the statuses required to perform operations
87+
public static final Set<TStatus> REQ_PUSH_STATUS = Set.of(TStatus.IN_PROGRESS, TStatus.NEW);
88+
public static final Set<TStatus> REQ_POP_STATUS =
89+
Set.of(TStatus.FAILED_IN_PROGRESS, TStatus.SUCCESSFUL);
90+
public static final Set<TStatus> REQ_DELETE_STATUS =
91+
Set.of(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL, TStatus.FAILED);
92+
// all but UNKNOWN
93+
public static final Set<TStatus> REQ_FORCE_DELETE_STATUS = Set.of(TStatus.NEW, TStatus.SUBMITTED,
94+
TStatus.SUCCESSFUL, TStatus.FAILED, TStatus.FAILED_IN_PROGRESS, TStatus.IN_PROGRESS);
8495
private final int maxDeferred;
8596
private final AtomicBoolean deferredOverflow = new AtomicBoolean();
8697

@@ -415,7 +426,7 @@ protected void seededTx() {
415426
unreservedRunnableCount.increment();
416427
}
417428

418-
protected byte[] serializeTxInfo(Serializable so) {
429+
protected static byte[] serializeTxInfo(Serializable so) {
419430
if (so instanceof String) {
420431
return ("S " + so).getBytes(UTF_8);
421432
} else {
@@ -428,7 +439,7 @@ protected byte[] serializeTxInfo(Serializable so) {
428439
}
429440
}
430441

431-
protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
442+
protected static Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
432443
if (data[0] == 'O') {
433444
byte[] sera = new byte[data.length - 2];
434445
System.arraycopy(data, 2, sera, 0, sera.length);

core/src/main/java/org/apache/accumulo/core/fate/StackOverflowException.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*/
1919
package org.apache.accumulo.core.fate;
2020

21-
public class StackOverflowException extends Exception {
21+
public class StackOverflowException extends RuntimeException {
2222

2323
public StackOverflowException(String msg) {
2424
super(msg);

core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java

+11-14
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,8 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
7171
private final String tableName;
7272

7373
private static final FateInstanceType fateInstanceType = FateInstanceType.USER;
74-
private static final int maxRepos = 100;
7574
private static final com.google.common.collect.Range<Integer> REPO_RANGE =
76-
com.google.common.collect.Range.closed(1, maxRepos);
75+
com.google.common.collect.Range.closed(1, MAX_REPOS);
7776

7877
public UserFateStore(ClientContext context, String tableName, ZooUtil.LockID lockID,
7978
Predicate<ZooUtil.LockID> isLockHeld) {
@@ -457,12 +456,12 @@ public void push(Repo<T> repo) throws StackOverflowException {
457456

458457
Optional<Integer> top = findTop();
459458

460-
if (top.filter(t -> t >= maxRepos).isPresent()) {
459+
if (top.filter(t -> t >= MAX_REPOS).isPresent()) {
461460
throw new StackOverflowException("Repo stack size too large");
462461
}
463462

464463
FateMutator<T> fateMutator =
465-
newMutator(fateId).requireStatus(TStatus.IN_PROGRESS, TStatus.NEW);
464+
newMutator(fateId).requireStatus(REQ_PUSH_STATUS.toArray(TStatus[]::new));
466465
fateMutator.putRepo(top.map(t -> t + 1).orElse(1), repo).mutate();
467466
}
468467

@@ -471,8 +470,8 @@ public void pop() {
471470
verifyReservedAndNotDeleted(true);
472471

473472
Optional<Integer> top = findTop();
474-
top.ifPresent(t -> newMutator(fateId)
475-
.requireStatus(TStatus.FAILED_IN_PROGRESS, TStatus.SUCCESSFUL).deleteRepo(t).mutate());
473+
top.ifPresent(t -> newMutator(fateId).requireStatus(REQ_POP_STATUS.toArray(TStatus[]::new))
474+
.deleteRepo(t).mutate());
476475
}
477476

478477
@Override
@@ -497,7 +496,7 @@ public void delete() {
497496
verifyReservedAndNotDeleted(true);
498497

499498
var mutator = newMutator(fateId);
500-
mutator.requireStatus(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL, TStatus.FAILED);
499+
mutator.requireStatus(REQ_DELETE_STATUS.toArray(TStatus[]::new));
501500
mutator.delete().mutate();
502501
this.deleted = true;
503502
}
@@ -507,9 +506,7 @@ public void forceDelete() {
507506
verifyReservedAndNotDeleted(true);
508507

509508
var mutator = newMutator(fateId);
510-
// allow deletion of all txns other than UNKNOWN
511-
mutator.requireStatus(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL, TStatus.FAILED,
512-
TStatus.FAILED_IN_PROGRESS, TStatus.IN_PROGRESS);
509+
mutator.requireStatus(REQ_FORCE_DELETE_STATUS.toArray(TStatus[]::new));
513510
mutator.delete().mutate();
514511
this.deleted = true;
515512
}
@@ -537,14 +534,14 @@ protected void unreserve() {
537534

538535
static Text invertRepo(int position) {
539536
Preconditions.checkArgument(REPO_RANGE.contains(position),
540-
"Position %s is not in the valid range of [0,%s]", position, maxRepos);
541-
return new Text(String.format("%02d", maxRepos - position));
537+
"Position %s is not in the valid range of [0,%s]", position, MAX_REPOS);
538+
return new Text(String.format("%02d", MAX_REPOS - position));
542539
}
543540

544541
static Integer restoreRepo(Text invertedPosition) {
545-
int position = maxRepos - Integer.parseInt(invertedPosition.toString());
542+
int position = MAX_REPOS - Integer.parseInt(invertedPosition.toString());
546543
Preconditions.checkArgument(REPO_RANGE.contains(position),
547-
"Position %s is not in the valid range of [0,%s]", position, maxRepos);
544+
"Position %s is not in the valid range of [0,%s]", position, MAX_REPOS);
548545
return position;
549546
}
550547
}

0 commit comments

Comments
 (0)