Skip to content

Commit 1fcf88d

Browse files
authored
Modify FateStore to introduce a Seeder API (apache#5374)
This adds a new Seeder API that can be used to seed transactions with a FateKey. The purpose of this new API is to allow for eventual batching of multiple seeding attempts together into one conditional mutation to improve performance. This change just updates the api and all calls to the seeder to attempt seeding will be performed individually. A future update will support the new functionality of batching and commiting all the changes together when the Seeder is closed. This change is for Part 1 of apache#5160
1 parent fd6a3d7 commit 1fcf88d

File tree

9 files changed

+184
-60
lines changed

9 files changed

+184
-60
lines changed

core/src/main/java/org/apache/accumulo/core/fate/Fate.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,9 @@ public FateId startTransaction() {
540540

541541
public void seedTransaction(FateOperation fateOp, FateKey fateKey, Repo<T> repo,
542542
boolean autoCleanUp) {
543-
store.seedTransaction(fateOp, fateKey, repo, autoCleanUp);
543+
try (var seeder = store.beginSeeding()) {
544+
var unused = seeder.attemptToSeedTransaction(fateOp, fateKey, repo, autoCleanUp);
545+
}
544546
}
545547

546548
// start work in the transaction.. it is safe to call this

core/src/main/java/org/apache/accumulo/core/fate/FateStore.java

+38-19
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
import java.util.Arrays;
2828
import java.util.Objects;
2929
import java.util.Optional;
30+
import java.util.Set;
3031
import java.util.UUID;
32+
import java.util.concurrent.CompletableFuture;
3133

3234
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
3335
import org.apache.hadoop.io.DataInputBuffer;
@@ -49,25 +51,42 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
4951
*/
5052
FateId create();
5153

52-
/**
53-
* Seeds a transaction with the given repo if it does not exist. A fateId will be derived from the
54-
* fateKey. If seeded, sets the following data for the fateId in the store.
55-
*
56-
* <ul>
57-
* <li>Set the fate op</li>
58-
* <li>Set the status to SUBMITTED</li>
59-
* <li>Set the fate key</li>
60-
* <li>Sets autocleanup only if true</li>
61-
* <li>Sets the creation time</li>
62-
* </ul>
63-
*
64-
* @return The return type is only intended for testing it may not be correct in the face of
65-
* failures. When there are no failures returns optional w/ the fate id set if seeded and
66-
* empty optional otherwise. If there was a failure this could return an empty optional
67-
* when it actually succeeded.
68-
*/
69-
Optional<FateId> seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, Repo<T> repo,
70-
boolean autoCleanUp);
54+
interface Seeder<T> extends AutoCloseable {
55+
56+
/**
57+
* Attempts to seed a transaction with the given repo if it does not exist. A fateId will be
58+
* derived from the fateKey. If seeded, sets the following data for the fateId in the store.
59+
*
60+
* TODO: Support completing futures later in close method The current version will always return
61+
* with a CompleteableFuture that is already completed. Future version will process will
62+
* complete in the close() method for the User store.
63+
*
64+
* <ul>
65+
* <li>Set the fate op</li>
66+
* <li>Set the status to SUBMITTED</li>
67+
* <li>Set the fate key</li>
68+
* <li>Sets autocleanup only if true</li>
69+
* <li>Sets the creation time</li>
70+
* </ul>
71+
*
72+
* @return The return type is only intended for testing it may not be correct in the face of
73+
* failures. When there are no failures returns optional w/ the fate id set if seeded
74+
* and empty optional otherwise. If there was a failure this could return an empty
75+
* optional when it actually succeeded.
76+
*/
77+
CompletableFuture<Optional<FateId>> attemptToSeedTransaction(Fate.FateOperation fateOp,
78+
FateKey fateKey, Repo<T> repo, boolean autoCleanUp);
79+
80+
// TODO: Right now all implementations do nothing
81+
// Eventually this would check the status of all added conditional mutations,
82+
// retry unknown, and then close the conditional writer.
83+
@Override
84+
void close();
85+
}
86+
87+
// Creates a conditional writer for the user fate store. For Zookeeper all this code will probably
88+
// do the same thing its currently doing as zookeeper does not support multi-node operations.
89+
Seeder<T> beginSeeding();
7190

7291
/**
7392
* Seeds a transaction with the given repo if its current status is NEW and it is currently

core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java

+24-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Optional;
2828
import java.util.SortedMap;
2929
import java.util.UUID;
30+
import java.util.concurrent.CompletableFuture;
3031
import java.util.function.Function;
3132
import java.util.function.Predicate;
3233
import java.util.function.Supplier;
@@ -41,6 +42,7 @@
4142
import org.apache.accumulo.core.data.Value;
4243
import org.apache.accumulo.core.fate.AbstractFateStore;
4344
import org.apache.accumulo.core.fate.Fate;
45+
import org.apache.accumulo.core.fate.Fate.FateOperation;
4446
import org.apache.accumulo.core.fate.Fate.TxInfo;
4547
import org.apache.accumulo.core.fate.FateId;
4648
import org.apache.accumulo.core.fate.FateInstanceType;
@@ -133,7 +135,28 @@ public FateId getFateId() {
133135
}
134136

135137
@Override
136-
public Optional<FateId> seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, Repo<T> repo,
138+
public Seeder<T> beginSeeding() {
139+
// TODO: For now can handle seeding 1 transaction at a time so just process
140+
// everything in attemptToSeedTransaction
141+
// Part 2 of the changes in #5160 will allow multiple seeding attempts to be combined
142+
// into one conditional mutation and we will need to track the pending operations
143+
// and futures in a map
144+
return new Seeder<T>() {
145+
@Override
146+
public CompletableFuture<Optional<FateId>> attemptToSeedTransaction(FateOperation fateOp,
147+
FateKey fateKey, Repo<T> repo, boolean autoCleanUp) {
148+
return CompletableFuture
149+
.completedFuture(seedTransaction(fateOp, fateKey, repo, autoCleanUp));
150+
}
151+
152+
@Override
153+
public void close() {
154+
// TODO: This will be used in Part 2 of #5160
155+
}
156+
};
157+
}
158+
159+
private Optional<FateId> seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, Repo<T> repo,
137160
boolean autoCleanUp) {
138161
final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
139162
Supplier<FateMutator<T>> mutatorFactory = () -> newMutator(fateId).requireAbsent()

core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java

+21-9
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.Optional;
4141
import java.util.Set;
4242
import java.util.UUID;
43+
import java.util.concurrent.CompletableFuture;
4344
import java.util.function.Predicate;
4445
import java.util.function.Supplier;
4546
import java.util.function.UnaryOperator;
@@ -48,6 +49,7 @@
4849
import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
4950
import org.apache.accumulo.core.fate.AbstractFateStore;
5051
import org.apache.accumulo.core.fate.Fate;
52+
import org.apache.accumulo.core.fate.Fate.FateOperation;
5153
import org.apache.accumulo.core.fate.Fate.TxInfo;
5254
import org.apache.accumulo.core.fate.FateId;
5355
import org.apache.accumulo.core.fate.FateInstanceType;
@@ -181,16 +183,26 @@ private Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
181183
}
182184

183185
@Override
184-
public Optional<FateId> seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, Repo<T> repo,
185-
boolean autoCleanUp) {
186-
return createAndReserve(fateKey).map(txStore -> {
187-
try {
188-
seedTransaction(fateOp, repo, autoCleanUp, txStore);
189-
return txStore.getID();
190-
} finally {
191-
txStore.unreserve(Duration.ZERO);
186+
public Seeder<T> beginSeeding() {
187+
return new Seeder<T>() {
188+
@Override
189+
public CompletableFuture<Optional<FateId>> attemptToSeedTransaction(FateOperation fateOp,
190+
FateKey fateKey, Repo<T> repo, boolean autoCleanUp) {
191+
return CompletableFuture.completedFuture(createAndReserve(fateKey).map(txStore -> {
192+
try {
193+
seedTransaction(fateOp, repo, autoCleanUp, txStore);
194+
return txStore.getID();
195+
} finally {
196+
txStore.unreserve(Duration.ZERO);
197+
}
198+
}));
199+
}
200+
201+
@Override
202+
public void close() {
203+
// Nothing to do for Meta fate store
192204
}
193-
});
205+
};
194206
}
195207

196208
@Override

core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java

+43-13
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,22 @@
2121
import java.io.Serializable;
2222
import java.util.EnumSet;
2323
import java.util.Map;
24+
import java.util.Objects;
2425
import java.util.Optional;
26+
import java.util.concurrent.CompletableFuture;
2527
import java.util.concurrent.atomic.AtomicBoolean;
2628
import java.util.function.Consumer;
2729
import java.util.function.Function;
2830
import java.util.stream.Stream;
2931

3032
import org.apache.accumulo.core.fate.Fate;
33+
import org.apache.accumulo.core.fate.Fate.FateOperation;
3134
import org.apache.accumulo.core.fate.FateId;
3235
import org.apache.accumulo.core.fate.FateInstanceType;
3336
import org.apache.accumulo.core.fate.FateKey;
3437
import org.apache.accumulo.core.fate.FateStore;
3538
import org.apache.accumulo.core.fate.FateStore.FateTxStore;
39+
import org.apache.accumulo.core.fate.FateStore.Seeder;
3640
import org.apache.accumulo.core.fate.ReadOnlyFateStore;
3741
import org.apache.accumulo.core.fate.Repo;
3842
import org.apache.accumulo.core.fate.StackOverflowException;
@@ -150,19 +154,8 @@ public FateId create() {
150154
}
151155

152156
@Override
153-
public Optional<FateId> seedTransaction(Fate.FateOperation fateOp, FateKey fateKey,
154-
Repo<T> repo, boolean autoCleanUp) {
155-
var optional = store.seedTransaction(fateOp, fateKey, repo, autoCleanUp);
156-
if (storeLog.isTraceEnabled()) {
157-
optional.ifPresentOrElse(fateId -> {
158-
storeLog.trace("{} seeded {} {} {}", fateId, fateKey, toLogString.apply(repo),
159-
autoCleanUp);
160-
}, () -> {
161-
storeLog.trace("Possibly unable to seed {} {} {}", fateKey, toLogString.apply(repo),
162-
autoCleanUp);
163-
});
164-
}
165-
return optional;
157+
public Seeder<T> beginSeeding() {
158+
return new SeederLogger<>(store, toLogString);
166159
}
167160

168161
@Override
@@ -202,4 +195,41 @@ public void deleteDeadReservations() {
202195
}
203196
};
204197
}
198+
199+
public static class SeederLogger<T> implements Seeder<T> {
200+
private final FateStore<T> store;
201+
private final Seeder<T> seeder;
202+
private final Function<Repo<T>,String> toLogString;
203+
204+
public SeederLogger(FateStore<T> store, Function<Repo<T>,String> toLogString) {
205+
this.store = Objects.requireNonNull(store);
206+
this.seeder = store.beginSeeding();
207+
this.toLogString = Objects.requireNonNull(toLogString);
208+
}
209+
210+
@Override
211+
public CompletableFuture<Optional<FateId>> attemptToSeedTransaction(FateOperation fateOp,
212+
FateKey fateKey, Repo<T> repo, boolean autoCleanUp) {
213+
var future = this.seeder.attemptToSeedTransaction(fateOp, fateKey, repo, autoCleanUp);
214+
return future.whenComplete((optional, throwable) -> {
215+
if (storeLog.isTraceEnabled()) {
216+
optional.ifPresentOrElse(fateId -> {
217+
storeLog.trace("{} seeded {} {} {}", fateId, fateKey, toLogString.apply(repo),
218+
autoCleanUp);
219+
}, () -> {
220+
storeLog.trace("Possibly unable to seed {} {} {}", fateKey, toLogString.apply(repo),
221+
autoCleanUp);
222+
});
223+
}
224+
});
225+
}
226+
227+
@Override
228+
public void close() {
229+
seeder.close();
230+
if (storeLog.isTraceEnabled()) {
231+
storeLog.trace("attempted to close seeder for {}", store.type());
232+
}
233+
}
234+
}
205235
}

core/src/test/java/org/apache/accumulo/core/fate/TestStore.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@
2929
import java.util.Optional;
3030
import java.util.Set;
3131
import java.util.UUID;
32+
import java.util.concurrent.CompletableFuture;
3233
import java.util.concurrent.atomic.AtomicBoolean;
3334
import java.util.function.Consumer;
3435
import java.util.stream.Stream;
3536

37+
import org.apache.accumulo.core.fate.Fate.FateOperation;
3638
import org.apache.accumulo.core.util.Pair;
3739

3840
/**
@@ -53,9 +55,17 @@ public FateId create() {
5355
}
5456

5557
@Override
56-
public Optional<FateId> seedTransaction(Fate.FateOperation fateOp, FateKey fateKey,
57-
Repo<String> repo, boolean autoCleanUp) {
58-
return Optional.empty();
58+
public Seeder<String> beginSeeding() {
59+
return new Seeder<>() {
60+
@Override
61+
public CompletableFuture<Optional<FateId>> attemptToSeedTransaction(FateOperation fateOp,
62+
FateKey fateKey, Repo<String> repo, boolean autoCleanUp) {
63+
return CompletableFuture.completedFuture(Optional.empty());
64+
}
65+
66+
@Override
67+
public void close() {}
68+
};
5969
}
6070

6171
@Override

test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row;
3333
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.verify;
3434
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData;
35+
import static org.apache.accumulo.test.fate.FateStoreUtil.seedTransaction;
3536
import static org.apache.accumulo.test.util.FileMetadataUtil.countFencedFiles;
3637
import static org.apache.accumulo.test.util.FileMetadataUtil.splitFilesIntoRanges;
3738
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -381,7 +382,7 @@ private FateId createCompactionCommitAndDeadMetadata(AccumuloClient c,
381382
// should never run. Its purpose is to prevent the dead compaction detector
382383
// from deleting the id.
383384
Repo<Manager> repo = new FakeRepo();
384-
var fateId = fateStore.seedTransaction(Fate.FateOperation.COMMIT_COMPACTION,
385+
var fateId = seedTransaction(fateStore, Fate.FateOperation.COMMIT_COMPACTION,
385386
FateKey.forCompactionCommit(allCids.get(tableId).get(0)), repo, true).orElseThrow();
386387

387388
// Read the tablet metadata

0 commit comments

Comments
 (0)