Skip to content

Commit 0ff1b21

Browse files
authored
Adds FateOperation type (apache#5218)
Adds `FateOperation` enum. This consolidates all fate operations under one type (those used in thrift and those passed directly to a Fate object (outside of thrift)). This avoids the use of String here. - Adds a `FateOperation` enum which includes all current fate operations - Renamed existing thrift type `FateOperation` to `TFateOperation` - `FateOperation` includes all `TFateOperation`s and fate operations performed outside of thrift (one example is `COMMIT_COMPACTION`) - `FateOperation` is now the type passed around instead of a String - Removed OBSOLETE_TABLE_BULK_IMPORT from `TFateOperation` since it is no longer used (was bulk import v1). Keep enum integers stable to avoid confusion across different versions of Accumulo - New `FateOperationTest` to test `TFateOperation` and `FateOperation`
1 parent cc37af9 commit 0ff1b21

File tree

29 files changed

+297
-177
lines changed

29 files changed

+297
-177
lines changed

core/src/main/java/org/apache/accumulo/core/clientImpl/NamespaceOperationsImpl.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
import org.apache.accumulo.core.data.constraints.Constraint;
5757
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
5858
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
59-
import org.apache.accumulo.core.manager.thrift.FateOperation;
59+
import org.apache.accumulo.core.manager.thrift.TFateOperation;
6060
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
6161
import org.apache.accumulo.core.trace.TraceUtil;
6262
import org.apache.accumulo.core.util.LocalityGroupUtil;
@@ -126,7 +126,7 @@ public void create(String namespace)
126126
NEW_NAMESPACE_NAME.validate(namespace);
127127

128128
try {
129-
doNamespaceFateOperation(FateOperation.NAMESPACE_CREATE,
129+
doNamespaceFateOperation(TFateOperation.NAMESPACE_CREATE,
130130
Arrays.asList(ByteBuffer.wrap(namespace.getBytes(UTF_8))), Collections.emptyMap(),
131131
namespace);
132132
} catch (NamespaceNotFoundException e) {
@@ -156,7 +156,7 @@ public void delete(String namespace) throws AccumuloException, AccumuloSecurityE
156156
Map<String,String> opts = new HashMap<>();
157157

158158
try {
159-
doNamespaceFateOperation(FateOperation.NAMESPACE_DELETE, args, opts, namespace);
159+
doNamespaceFateOperation(TFateOperation.NAMESPACE_DELETE, args, opts, namespace);
160160
} catch (NamespaceExistsException e) {
161161
// should not happen
162162
throw new AssertionError(e);
@@ -174,7 +174,7 @@ public void rename(String oldNamespaceName, String newNamespaceName)
174174
List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(oldNamespaceName.getBytes(UTF_8)),
175175
ByteBuffer.wrap(newNamespaceName.getBytes(UTF_8)));
176176
Map<String,String> opts = new HashMap<>();
177-
doNamespaceFateOperation(FateOperation.NAMESPACE_RENAME, args, opts, oldNamespaceName);
177+
doNamespaceFateOperation(TFateOperation.NAMESPACE_RENAME, args, opts, oldNamespaceName);
178178
}
179179

180180
@Override
@@ -385,7 +385,7 @@ public int addConstraint(String namespace, String constraintClassName)
385385
return super.addConstraint(namespace, constraintClassName);
386386
}
387387

388-
private String doNamespaceFateOperation(FateOperation op, List<ByteBuffer> args,
388+
private String doNamespaceFateOperation(TFateOperation op, List<ByteBuffer> args,
389389
Map<String,String> opts, String namespace) throws AccumuloSecurityException,
390390
AccumuloException, NamespaceExistsException, NamespaceNotFoundException {
391391
// caller should validate the namespace name

core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java

+22-22
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,11 @@
140140
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
141141
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
142142
import org.apache.accumulo.core.manager.state.tables.TableState;
143-
import org.apache.accumulo.core.manager.thrift.FateOperation;
144143
import org.apache.accumulo.core.manager.thrift.FateService;
145144
import org.apache.accumulo.core.manager.thrift.ManagerClientService;
146145
import org.apache.accumulo.core.manager.thrift.TFateId;
147146
import org.apache.accumulo.core.manager.thrift.TFateInstanceType;
147+
import org.apache.accumulo.core.manager.thrift.TFateOperation;
148148
import org.apache.accumulo.core.metadata.AccumuloTable;
149149
import org.apache.accumulo.core.metadata.TServerInstance;
150150
import org.apache.accumulo.core.metadata.TabletState;
@@ -274,7 +274,7 @@ public void create(String tableName, NewTableConfiguration ntc)
274274
Map<String,String> opts = ntc.getProperties();
275275

276276
try {
277-
doTableFateOperation(tableName, AccumuloException.class, FateOperation.TABLE_CREATE, args,
277+
doTableFateOperation(tableName, AccumuloException.class, TFateOperation.TABLE_CREATE, args,
278278
opts);
279279
} catch (TableNotFoundException e) {
280280
// should not happen
@@ -304,7 +304,7 @@ private TFateId beginFateOperation(TFateInstanceType type)
304304

305305
// This method is for retrying in the case of network failures;
306306
// anything else it passes to the caller to deal with
307-
private void executeFateOperation(TFateId opid, FateOperation op, List<ByteBuffer> args,
307+
private void executeFateOperation(TFateId opid, TFateOperation op, List<ByteBuffer> args,
308308
Map<String,String> opts, boolean autoCleanUp)
309309
throws ThriftSecurityException, TException, ThriftTableOperationException {
310310
while (true) {
@@ -372,7 +372,7 @@ public String doBulkFateOperation(List<ByteBuffer> args, String tableName)
372372
EXISTING_TABLE_NAME.validate(tableName);
373373

374374
try {
375-
return doFateOperation(FateOperation.TABLE_BULK_IMPORT2, args, Collections.emptyMap(),
375+
return doFateOperation(TFateOperation.TABLE_BULK_IMPORT2, args, Collections.emptyMap(),
376376
tableName);
377377
} catch (TableExistsException | NamespaceExistsException e) {
378378
// should not happen
@@ -427,14 +427,14 @@ private <T> T handleFateOperation(FateOperationExecutor<T> executor, String tabl
427427
}
428428
}
429429

430-
String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,String> opts,
430+
String doFateOperation(TFateOperation op, List<ByteBuffer> args, Map<String,String> opts,
431431
String tableOrNamespaceName)
432432
throws AccumuloSecurityException, TableExistsException, TableNotFoundException,
433433
AccumuloException, NamespaceExistsException, NamespaceNotFoundException {
434434
return doFateOperation(op, args, opts, tableOrNamespaceName, true);
435435
}
436436

437-
String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,String> opts,
437+
String doFateOperation(TFateOperation op, List<ByteBuffer> args, Map<String,String> opts,
438438
String tableOrNamespaceName, boolean wait)
439439
throws AccumuloSecurityException, TableExistsException, TableNotFoundException,
440440
AccumuloException, NamespaceExistsException, NamespaceNotFoundException {
@@ -521,7 +521,7 @@ public void addSplits(String tableName, SortedSet<Text> splits)
521521
return handleFateOperation(() -> {
522522
TFateInstanceType t = FateInstanceType.fromNamespaceOrTableName(tableName).toThrift();
523523
TFateId opid = beginFateOperation(t);
524-
executeFateOperation(opid, FateOperation.TABLE_SPLIT, args, Map.of(), false);
524+
executeFateOperation(opid, TFateOperation.TABLE_SPLIT, args, Map.of(), false);
525525
return new Pair<>(opid, splitsForTablet.getValue());
526526
}, tableName);
527527
} catch (TableExistsException | NamespaceExistsException | NamespaceNotFoundException
@@ -645,8 +645,8 @@ public void merge(String tableName, Text start, Text end)
645645
end == null ? EMPTY : TextUtil.getByteBuffer(end));
646646
Map<String,String> opts = new HashMap<>();
647647
try {
648-
doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_MERGE, args,
649-
opts);
648+
doTableFateOperation(tableName, TableNotFoundException.class, TFateOperation.TABLE_MERGE,
649+
args, opts);
650650
} catch (TableExistsException e) {
651651
// should not happen
652652
throw new AssertionError(e);
@@ -665,7 +665,7 @@ public void deleteRows(String tableName, Text start, Text end)
665665
Map<String,String> opts = new HashMap<>();
666666
try {
667667
doTableFateOperation(tableName, TableNotFoundException.class,
668-
FateOperation.TABLE_DELETE_RANGE, args, opts);
668+
TFateOperation.TABLE_DELETE_RANGE, args, opts);
669669
} catch (TableExistsException e) {
670670
// should not happen
671671
throw new AssertionError(e);
@@ -760,7 +760,7 @@ public void delete(String tableName)
760760
List<ByteBuffer> args = List.of(ByteBuffer.wrap(tableName.getBytes(UTF_8)));
761761
Map<String,String> opts = new HashMap<>();
762762
try {
763-
doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_DELETE,
763+
doTableFateOperation(tableName, TableNotFoundException.class, TFateOperation.TABLE_DELETE,
764764
args, opts);
765765
} catch (TableExistsException e) {
766766
// should not happen
@@ -800,7 +800,7 @@ public void clone(String srcTableName, String newTableName, CloneConfiguration c
800800

801801
prependPropertiesToExclude(opts, config.getPropertiesToExclude());
802802

803-
doTableFateOperation(newTableName, AccumuloException.class, FateOperation.TABLE_CLONE, args,
803+
doTableFateOperation(newTableName, AccumuloException.class, TFateOperation.TABLE_CLONE, args,
804804
opts);
805805
}
806806

@@ -813,7 +813,7 @@ public void rename(String oldTableName, String newTableName) throws AccumuloSecu
813813
List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(oldTableName.getBytes(UTF_8)),
814814
ByteBuffer.wrap(newTableName.getBytes(UTF_8)));
815815
Map<String,String> opts = new HashMap<>();
816-
doTableFateOperation(oldTableName, TableNotFoundException.class, FateOperation.TABLE_RENAME,
816+
doTableFateOperation(oldTableName, TableNotFoundException.class, TFateOperation.TABLE_RENAME,
817817
args, opts);
818818
}
819819

@@ -892,7 +892,7 @@ public void compact(String tableName, CompactionConfig config)
892892
Map<String,String> opts = new HashMap<>();
893893

894894
try {
895-
doFateOperation(FateOperation.TABLE_COMPACT, args, opts, tableName, config.getWait());
895+
doFateOperation(TFateOperation.TABLE_COMPACT, args, opts, tableName, config.getWait());
896896
} catch (TableExistsException | NamespaceExistsException e) {
897897
// should not happen
898898
throw new AssertionError(e);
@@ -912,7 +912,7 @@ public void cancelCompaction(String tableName)
912912

913913
try {
914914
doTableFateOperation(tableName, TableNotFoundException.class,
915-
FateOperation.TABLE_CANCEL_COMPACT, args, opts);
915+
TFateOperation.TABLE_CANCEL_COMPACT, args, opts);
916916
} catch (TableExistsException e) {
917917
// should not happen
918918
throw new AssertionError(e);
@@ -1455,17 +1455,17 @@ private void changeTableState(String tableName, boolean wait, TableState newStat
14551455

14561456
TableId tableId = context.getTableId(tableName);
14571457

1458-
FateOperation op = null;
1458+
TFateOperation op = null;
14591459
switch (newState) {
14601460
case OFFLINE:
1461-
op = FateOperation.TABLE_OFFLINE;
1461+
op = TFateOperation.TABLE_OFFLINE;
14621462
if (tableName.equals(AccumuloTable.METADATA.tableName())
14631463
|| tableName.equals(AccumuloTable.ROOT.tableName())) {
14641464
throw new AccumuloException("Cannot set table to offline state");
14651465
}
14661466
break;
14671467
case ONLINE:
1468-
op = FateOperation.TABLE_ONLINE;
1468+
op = TFateOperation.TABLE_ONLINE;
14691469
if (tableName.equals(AccumuloTable.METADATA.tableName())
14701470
|| tableName.equals(AccumuloTable.ROOT.tableName())) {
14711471
// Don't submit a Fate operation for this, these tables can only be online.
@@ -1694,7 +1694,7 @@ public void importTable(String tableName, Set<String> importDirs, ImportConfigur
16941694
checkedImportDirs.stream().map(s -> s.getBytes(UTF_8)).map(ByteBuffer::wrap).forEach(args::add);
16951695

16961696
try {
1697-
doTableFateOperation(tableName, AccumuloException.class, FateOperation.TABLE_IMPORT, args,
1697+
doTableFateOperation(tableName, AccumuloException.class, TFateOperation.TABLE_IMPORT, args,
16981698
Collections.emptyMap());
16991699
} catch (TableNotFoundException e) {
17001700
// should not happen
@@ -1727,7 +1727,7 @@ public void exportTable(String tableName, String exportDir)
17271727
Map<String,String> opts = Collections.emptyMap();
17281728

17291729
try {
1730-
doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_EXPORT,
1730+
doTableFateOperation(tableName, TableNotFoundException.class, TFateOperation.TABLE_EXPORT,
17311731
args, opts);
17321732
} catch (TableExistsException e) {
17331733
// should not happen
@@ -1782,7 +1782,7 @@ public int addConstraint(String tableName, String constraintClassName)
17821782
}
17831783

17841784
private void doTableFateOperation(String tableOrNamespaceName,
1785-
Class<? extends Exception> namespaceNotFoundExceptionClass, FateOperation op,
1785+
Class<? extends Exception> namespaceNotFoundExceptionClass, TFateOperation op,
17861786
List<ByteBuffer> args, Map<String,String> opts) throws AccumuloSecurityException,
17871787
AccumuloException, TableExistsException, TableNotFoundException {
17881788
try {
@@ -2212,7 +2212,7 @@ public void setTabletAvailability(String tableName, Range range, TabletAvailabil
22122212

22132213
try {
22142214
doTableFateOperation(tableName, AccumuloException.class,
2215-
FateOperation.TABLE_TABLET_AVAILABILITY, args, opts);
2215+
TFateOperation.TABLE_TABLET_AVAILABILITY, args, opts);
22162216
} catch (TableNotFoundException | TableExistsException e) {
22172217
// should not happen
22182218
throw new AssertionError(e);

core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,15 @@ public static class TransactionStatus {
7676
private final FateId fateId;
7777
private final FateInstanceType instanceType;
7878
private final TStatus status;
79-
private final String txName;
79+
private final Fate.FateOperation txName;
8080
private final List<String> hlocks;
8181
private final List<String> wlocks;
8282
private final String top;
8383
private final long timeCreated;
8484

8585
private TransactionStatus(FateId fateId, FateInstanceType instanceType, TStatus status,
86-
String txName, List<String> hlocks, List<String> wlocks, String top, Long timeCreated) {
86+
Fate.FateOperation txName, List<String> hlocks, List<String> wlocks, String top,
87+
Long timeCreated) {
8788

8889
this.fateId = fateId;
8990
this.instanceType = instanceType;
@@ -115,7 +116,7 @@ public TStatus getStatus() {
115116
/**
116117
* @return The name of the transaction running.
117118
*/
118-
public String getTxName() {
119+
public Fate.FateOperation getTxName() {
119120
return txName;
120121
}
121122

@@ -361,7 +362,9 @@ public static <T> FateStatus getTransactionStatus(
361362
fateIds.forEach(fateId -> {
362363

363364
ReadOnlyFateTxStore<T> txStore = store.read(fateId);
364-
String txName = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME);
365+
// tx name will not be set if the tx is not seeded with work (it is NEW)
366+
Fate.FateOperation txName = txStore.getTransactionInfo(Fate.TxInfo.TX_NAME) == null ? null
367+
: ((Fate.FateOperation) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME));
365368

366369
List<String> hlocks = heldLocks.remove(fateId);
367370

core/src/main/java/org/apache/accumulo/core/fate/Fate.java

+52-3
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.accumulo.core.fate.FateStore.FateTxStore;
5555
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
5656
import org.apache.accumulo.core.logging.FateLogger;
57+
import org.apache.accumulo.core.manager.thrift.TFateOperation;
5758
import org.apache.accumulo.core.util.ShutdownUtil;
5859
import org.apache.accumulo.core.util.Timer;
5960
import org.apache.accumulo.core.util.UtilWaitThread;
@@ -88,6 +89,53 @@ public enum TxInfo {
8889
TX_NAME, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE
8990
}
9091

92+
public enum FateOperation {
93+
COMMIT_COMPACTION(null),
94+
NAMESPACE_CREATE(TFateOperation.NAMESPACE_CREATE),
95+
NAMESPACE_DELETE(TFateOperation.NAMESPACE_DELETE),
96+
NAMESPACE_RENAME(TFateOperation.NAMESPACE_RENAME),
97+
SHUTDOWN_TSERVER(null),
98+
SYSTEM_SPLIT(null),
99+
TABLE_BULK_IMPORT2(TFateOperation.TABLE_BULK_IMPORT2),
100+
TABLE_CANCEL_COMPACT(TFateOperation.TABLE_CANCEL_COMPACT),
101+
TABLE_CLONE(TFateOperation.TABLE_CLONE),
102+
TABLE_COMPACT(TFateOperation.TABLE_COMPACT),
103+
TABLE_CREATE(TFateOperation.TABLE_CREATE),
104+
TABLE_DELETE(TFateOperation.TABLE_DELETE),
105+
TABLE_DELETE_RANGE(TFateOperation.TABLE_DELETE_RANGE),
106+
TABLE_EXPORT(TFateOperation.TABLE_EXPORT),
107+
TABLE_IMPORT(TFateOperation.TABLE_IMPORT),
108+
TABLE_MERGE(TFateOperation.TABLE_MERGE),
109+
TABLE_OFFLINE(TFateOperation.TABLE_OFFLINE),
110+
TABLE_ONLINE(TFateOperation.TABLE_ONLINE),
111+
TABLE_RENAME(TFateOperation.TABLE_RENAME),
112+
TABLE_SPLIT(TFateOperation.TABLE_SPLIT),
113+
TABLE_TABLET_AVAILABILITY(TFateOperation.TABLE_TABLET_AVAILABILITY);
114+
115+
private final TFateOperation top;
116+
private static final EnumSet<FateOperation> nonThriftOps =
117+
EnumSet.of(COMMIT_COMPACTION, SHUTDOWN_TSERVER, SYSTEM_SPLIT);
118+
119+
FateOperation(TFateOperation top) {
120+
this.top = top;
121+
}
122+
123+
public static FateOperation fromThrift(TFateOperation top) {
124+
return FateOperation.valueOf(top.name());
125+
}
126+
127+
public static EnumSet<FateOperation> getNonThriftOps() {
128+
return nonThriftOps;
129+
}
130+
131+
public TFateOperation toThrift() {
132+
if (top == null) {
133+
throw new IllegalStateException(this + " does not have an equivalent thrift form");
134+
}
135+
return top;
136+
}
137+
}
138+
91139
/**
92140
* A single thread that finds transactions to work on and queues them up. Do not want each worker
93141
* thread going to the store and looking for work as it would place more load on the store.
@@ -437,14 +485,15 @@ public FateId startTransaction() {
437485
return store.create();
438486
}
439487

440-
public void seedTransaction(String txName, FateKey fateKey, Repo<T> repo, boolean autoCleanUp) {
488+
public void seedTransaction(FateOperation txName, FateKey fateKey, Repo<T> repo,
489+
boolean autoCleanUp) {
441490
store.seedTransaction(txName, fateKey, repo, autoCleanUp);
442491
}
443492

444493
// start work in the transaction.. it is safe to call this
445494
// multiple times for a transaction... but it will only seed once
446-
public void seedTransaction(String txName, FateId fateId, Repo<T> repo, boolean autoCleanUp,
447-
String goalMessage) {
495+
public void seedTransaction(FateOperation txName, FateId fateId, Repo<T> repo,
496+
boolean autoCleanUp, String goalMessage) {
448497
log.info("Seeding {} {}", fateId, goalMessage);
449498
store.seedTransaction(txName, fateId, repo, autoCleanUp);
450499
}

core/src/main/java/org/apache/accumulo/core/fate/FateStore.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
5050
FateId create();
5151

5252
/**
53-
* Seeds a transaction with the given repo if it does not exists. A fateId will be derived from
54-
* the fateKey. If seeded, sets the following data for the fateId in the store.
53+
* Seeds a transaction with the given repo if it does not exist. A fateId will be derived from the
54+
* fateKey. If seeded, sets the following data for the fateId in the store.
5555
*
5656
* <ul>
5757
* <li>Set the tx name</li>
@@ -66,7 +66,7 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
6666
* empty optional otherwise. If there was a failure this could return an empty optional
6767
* when it actually succeeded.
6868
*/
69-
Optional<FateId> seedTransaction(String txName, FateKey fateKey, Repo<T> repo,
69+
Optional<FateId> seedTransaction(Fate.FateOperation txName, FateKey fateKey, Repo<T> repo,
7070
boolean autoCleanUp);
7171

7272
/**
@@ -84,7 +84,8 @@ Optional<FateId> seedTransaction(String txName, FateKey fateKey, Repo<T> repo,
8484
* failures. When there are no failures returns true if seeded and false otherwise. If
8585
* there was a failure this could return false when it actually succeeded.
8686
*/
87-
boolean seedTransaction(String txName, FateId fateId, Repo<T> repo, boolean autoCleanUp);
87+
boolean seedTransaction(Fate.FateOperation txName, FateId fateId, Repo<T> repo,
88+
boolean autoCleanUp);
8889

8990
/**
9091
* An interface that allows read/write access to the data related to a single fate operation.

0 commit comments

Comments
 (0)