21
21
import java .io .IOException ;
22
22
import java .io .Serializable ;
23
23
import java .util .EnumSet ;
24
+ import java .util .HashMap ;
25
+ import java .util .Iterator ;
24
26
import java .util .List ;
27
+ import java .util .Map ;
25
28
import java .util .Map .Entry ;
26
29
import java .util .Objects ;
27
30
import java .util .Optional ;
28
31
import java .util .SortedMap ;
29
32
import java .util .UUID ;
30
33
import java .util .concurrent .CompletableFuture ;
34
+ import java .util .concurrent .atomic .AtomicBoolean ;
31
35
import java .util .function .Function ;
32
36
import java .util .function .Predicate ;
33
37
import java .util .function .Supplier ;
34
38
import java .util .stream .Collectors ;
35
39
import java .util .stream .Stream ;
36
40
41
+ import org .apache .accumulo .core .client .AccumuloException ;
42
+ import org .apache .accumulo .core .client .AccumuloSecurityException ;
43
+ import org .apache .accumulo .core .client .ConditionalWriter ;
37
44
import org .apache .accumulo .core .client .Scanner ;
38
45
import org .apache .accumulo .core .client .TableNotFoundException ;
39
46
import org .apache .accumulo .core .clientImpl .ClientContext ;
47
54
import org .apache .accumulo .core .fate .FateId ;
48
55
import org .apache .accumulo .core .fate .FateInstanceType ;
49
56
import org .apache .accumulo .core .fate .FateKey ;
57
+ import org .apache .accumulo .core .fate .FateKey .FateKeyType ;
50
58
import org .apache .accumulo .core .fate .ReadOnlyRepo ;
51
59
import org .apache .accumulo .core .fate .Repo ;
52
60
import org .apache .accumulo .core .fate .StackOverflowException ;
61
+ import org .apache .accumulo .core .fate .user .FateMutator .Status ;
53
62
import org .apache .accumulo .core .fate .user .schema .FateSchema .RepoColumnFamily ;
54
63
import org .apache .accumulo .core .fate .user .schema .FateSchema .TxColumnFamily ;
55
64
import org .apache .accumulo .core .fate .user .schema .FateSchema .TxInfoColumnFamily ;
56
65
import org .apache .accumulo .core .fate .zookeeper .ZooUtil ;
57
66
import org .apache .accumulo .core .iterators .user .WholeRowIterator ;
58
67
import org .apache .accumulo .core .security .Authorizations ;
59
68
import org .apache .accumulo .core .util .ColumnFQ ;
69
+ import org .apache .accumulo .core .util .Pair ;
60
70
import org .apache .accumulo .core .util .UtilWaitThread ;
61
71
import org .apache .hadoop .io .Text ;
62
72
import org .slf4j .Logger ;
@@ -136,36 +146,14 @@ public FateId getFateId() {
136
146
137
147
@ Override
138
148
public Seeder <T > beginSeeding () {
139
- // TODO: For now can handle seeding 1 transaction at a time so just process
140
- // everything in attemptToSeedTransaction
141
- // Part 2 of the changes in #5160 will allow multiple seeding attempts to be combined
142
- // into one conditional mutation and we will need to track the pending operations
143
- // and futures in a map
144
- return new Seeder <T >() {
145
- @ Override
146
- public CompletableFuture <Optional <FateId >> attemptToSeedTransaction (FateOperation fateOp ,
147
- FateKey fateKey , Repo <T > repo , boolean autoCleanUp ) {
148
- return CompletableFuture
149
- .completedFuture (seedTransaction (fateOp , fateKey , repo , autoCleanUp ));
150
- }
151
-
152
- @ Override
153
- public void close () {
154
- // TODO: This will be used in Part 2 of #5160
155
- }
156
- };
149
+ return new BatchSeeder ();
157
150
}
158
151
159
- private Optional <FateId > seedTransaction (Fate .FateOperation fateOp , FateKey fateKey , Repo <T > repo ,
160
- boolean autoCleanUp ) {
161
- final var fateId = fateIdGenerator .fromTypeAndKey (type (), fateKey );
152
+ private FateMutator <T > seedTransaction (Fate .FateOperation fateOp , FateKey fateKey , FateId fateId ,
153
+ Repo <T > repo , boolean autoCleanUp ) {
162
154
Supplier <FateMutator <T >> mutatorFactory = () -> newMutator (fateId ).requireAbsent ()
163
155
.putKey (fateKey ).putCreateTime (System .currentTimeMillis ());
164
- if (seedTransaction (mutatorFactory , fateKey + " " + fateId , fateOp , repo , autoCleanUp )) {
165
- return Optional .of (fateId );
166
- } else {
167
- return Optional .empty ();
168
- }
156
+ return buildMutator (mutatorFactory , fateOp , repo , autoCleanUp );
169
157
}
170
158
171
159
@ Override
@@ -176,16 +164,22 @@ public boolean seedTransaction(Fate.FateOperation fateOp, FateId fateId, Repo<T>
176
164
return seedTransaction (mutatorFactory , fateId .canonical (), fateOp , repo , autoCleanUp );
177
165
}
178
166
167
+ private FateMutator <T > buildMutator (Supplier <FateMutator <T >> mutatorFactory ,
168
+ Fate .FateOperation fateOp , Repo <T > repo , boolean autoCleanUp ) {
169
+ var mutator = mutatorFactory .get ();
170
+ mutator =
171
+ mutator .putFateOp (serializeTxInfo (fateOp )).putRepo (1 , repo ).putStatus (TStatus .SUBMITTED );
172
+ if (autoCleanUp ) {
173
+ mutator = mutator .putAutoClean (serializeTxInfo (autoCleanUp ));
174
+ }
175
+ return mutator ;
176
+ }
177
+
179
178
private boolean seedTransaction (Supplier <FateMutator <T >> mutatorFactory , String logId ,
180
179
Fate .FateOperation fateOp , Repo <T > repo , boolean autoCleanUp ) {
180
+ var mutator = buildMutator (mutatorFactory , fateOp , repo , autoCleanUp );
181
181
int maxAttempts = 5 ;
182
182
for (int attempt = 0 ; attempt < maxAttempts ; attempt ++) {
183
- var mutator = mutatorFactory .get ();
184
- mutator =
185
- mutator .putFateOp (serializeTxInfo (fateOp )).putRepo (1 , repo ).putStatus (TStatus .SUBMITTED );
186
- if (autoCleanUp ) {
187
- mutator = mutator .putAutoClean (serializeTxInfo (autoCleanUp ));
188
- }
189
183
var status = mutator .tryMutate ();
190
184
if (status == FateMutator .Status .ACCEPTED ) {
191
185
// signal to the super class that a new fate transaction was seeded and is ready to run
@@ -393,6 +387,113 @@ public FateInstanceType type() {
393
387
return fateInstanceType ;
394
388
}
395
389
390
+ private class BatchSeeder implements Seeder <T > {
391
+ private final AtomicBoolean closed = new AtomicBoolean (false );
392
+
393
+ private final Map <FateId ,Pair <FateMutator <T >,CompletableFuture <Optional <FateId >>>> pending =
394
+ new HashMap <>();
395
+
396
+ @ Override
397
+ public CompletableFuture <Optional <FateId >> attemptToSeedTransaction (FateOperation fateOp ,
398
+ FateKey fateKey , Repo <T > repo , boolean autoCleanUp ) {
399
+ Preconditions .checkState (!closed .get (), "Can't attempt to seed with a closed seeder." );
400
+
401
+ final var fateId = fateIdGenerator .fromTypeAndKey (type (), fateKey );
402
+ // If not already submitted, add to the pending list and return the future
403
+ // or the existing future if duplicate. The pending map will store the mutator
404
+ // to be processed on close in a one batch.
405
+ return pending .computeIfAbsent (fateId , id -> {
406
+ FateMutator <T > mutator = seedTransaction (fateOp , fateKey , fateId , repo , autoCleanUp );
407
+ CompletableFuture <Optional <FateId >> future = new CompletableFuture <>();
408
+ return new Pair <>(mutator , future );
409
+ }).getSecond ();
410
+ }
411
+
412
+ @ Override
413
+ public void close () {
414
+ closed .set (true );
415
+
416
+ int maxAttempts = 5 ;
417
+
418
+ // This loop will submit all the pending mutations as one batch
419
+ // to a conditional writer and any known results will be removed
420
+ // from the pending map. Unknown results will be re-attempted up
421
+ // to the maxAttempts count
422
+ for (int attempt = 0 ; attempt < maxAttempts ; attempt ++) {
423
+ var currentResults = tryMutateBatch ();
424
+ for (Entry <FateId ,ConditionalWriter .Status > result : currentResults .entrySet ()) {
425
+ var fateId = result .getKey ();
426
+ var status = result .getValue ();
427
+ var future = pending .get (fateId ).getSecond ();
428
+ switch (result .getValue ()) {
429
+ case ACCEPTED :
430
+ seededTx ();
431
+ log .trace ("Attempt to seed {} returned {}" , fateId .canonical (), status );
432
+ // Complete the future with the fatId and remove from pending
433
+ future .complete (Optional .of (fateId ));
434
+ pending .remove (fateId );
435
+ break ;
436
+ case REJECTED :
437
+ log .debug ("Attempt to seed {} returned {}" , fateId .canonical (), status );
438
+ // Rejected so complete with an empty optional and remove from pending
439
+ future .complete (Optional .empty ());
440
+ pending .remove (fateId );
441
+ break ;
442
+ case UNKNOWN :
443
+ log .debug ("Attempt to seed {} returned {} status, retrying" , fateId .canonical (),
444
+ status );
445
+ // unknown, so don't remove from map so that we try again if still under
446
+ // max attempts
447
+ break ;
448
+ default :
449
+ // do not expect other statuses
450
+ throw new IllegalStateException ("Unhandled status for mutation " + status );
451
+ }
452
+ }
453
+
454
+ if (!pending .isEmpty ()) {
455
+ // At this point can not reliably determine if the unknown pending mutations were
456
+ // successful or not because no reservation was acquired. For example since no
457
+ // reservation was acquired it is possible that seeding was a success and something
458
+ // immediately picked it up and started operating on it and changing it.
459
+ // If scanning after that point can not conclude success or failure. Another situation
460
+ // is that maybe the fateId already existed in a seeded form prior to getting this
461
+ // unknown.
462
+ UtilWaitThread .sleep (250 );
463
+ }
464
+ }
465
+
466
+ // Any remaining will be UNKNOWN status, so complete the futures with an optional empty
467
+ pending .forEach ((fateId , pair ) -> {
468
+ pair .getSecond ().complete (Optional .empty ());
469
+ log .warn ("Repeatedly received unknown status when attempting to seed {}" ,
470
+ fateId .canonical ());
471
+ });
472
+ }
473
+
474
+ // Submit all the pending mutations to a single conditional writer
475
+ // as one batch and return the results for each mutation
476
+ private Map <FateId ,ConditionalWriter .Status > tryMutateBatch () {
477
+ if (pending .isEmpty ()) {
478
+ return Map .of ();
479
+ }
480
+
481
+ final Map <FateId ,ConditionalWriter .Status > resultsMap = new HashMap <>();
482
+ try (ConditionalWriter writer = context .createConditionalWriter (tableName )) {
483
+ Iterator <ConditionalWriter .Result > results = writer
484
+ .write (pending .values ().stream ().map (pair -> pair .getFirst ().getMutation ()).iterator ());
485
+ while (results .hasNext ()) {
486
+ var result = results .next ();
487
+ var row = new Text (result .getMutation ().getRow ());
488
+ resultsMap .put (FateId .from (FateInstanceType .USER , row .toString ()), result .getStatus ());
489
+ }
490
+ } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e ) {
491
+ throw new IllegalStateException (e );
492
+ }
493
+ return resultsMap ;
494
+ }
495
+ }
496
+
396
497
private class FateTxStoreImpl extends AbstractFateTxStoreImpl {
397
498
398
499
private FateTxStoreImpl (FateId fateId ) {
0 commit comments