Skip to content

Commit 6407c31

Browse files
Makes seeding a fate transaction more efficient (apache#5122)
Modified fate to seed fate transaction in single conditional mutation instead of multiple. fixes apache#5097 Co-authored-by: Kevin Rathbun <kevinrr888@gmail.com>
1 parent 6dc52bc commit 6407c31

File tree

16 files changed

+529
-235
lines changed

16 files changed

+529
-235
lines changed

core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java

+11
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey) {
6969
UUID txUUID = UUID.nameUUIDFromBytes(fateKey.getSerialized());
7070
return FateId.from(instanceType, txUUID);
7171
}
72+
73+
@Override
74+
public FateId newRandomId(FateInstanceType instanceType) {
75+
return FateId.from(instanceType, UUID.randomUUID());
76+
}
7277
};
7378

7479
// The ZooKeeper lock for the process that's running this store instance
@@ -402,6 +407,12 @@ public FateId getID() {
402407

403408
public interface FateIdGenerator {
404409
FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey);
410+
411+
FateId newRandomId(FateInstanceType instanceType);
412+
}
413+
414+
protected void seededTx() {
415+
unreservedRunnableCount.increment();
405416
}
406417

407418
protected byte[] serializeTxInfo(Serializable so) {

core/src/main/java/org/apache/accumulo/core/fate/Fate.java

+4-47
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,6 @@
6363
import org.slf4j.Logger;
6464
import org.slf4j.LoggerFactory;
6565

66-
import com.google.common.base.Preconditions;
67-
6866
/**
6967
* Fault tolerant executor
7068
*/
@@ -439,57 +437,16 @@ public FateId startTransaction() {
439437
return store.create();
440438
}
441439

442-
public Optional<FateId> seedTransaction(String txName, FateKey fateKey, Repo<T> repo,
443-
boolean autoCleanUp, String goalMessage) {
444-
445-
Optional<FateTxStore<T>> optTxStore = store.createAndReserve(fateKey);
446-
447-
return optTxStore.map(txStore -> {
448-
var fateId = txStore.getID();
449-
try {
450-
Preconditions.checkState(txStore.getStatus() == NEW);
451-
seedTransaction(txName, fateId, repo, autoCleanUp, goalMessage, txStore);
452-
} finally {
453-
txStore.unreserve(Duration.ZERO);
454-
}
455-
return fateId;
456-
});
457-
}
458-
459-
private void seedTransaction(String txName, FateId fateId, Repo<T> repo, boolean autoCleanUp,
460-
String goalMessage, FateTxStore<T> txStore) {
461-
if (txStore.top() == null) {
462-
try {
463-
log.info("Seeding {} {}", fateId, goalMessage);
464-
txStore.push(repo);
465-
} catch (StackOverflowException e) {
466-
// this should not happen
467-
throw new IllegalStateException(e);
468-
}
469-
}
470-
471-
if (autoCleanUp) {
472-
txStore.setTransactionInfo(TxInfo.AUTO_CLEAN, autoCleanUp);
473-
}
474-
475-
txStore.setTransactionInfo(TxInfo.TX_NAME, txName);
476-
477-
txStore.setStatus(SUBMITTED);
440+
public void seedTransaction(String txName, FateKey fateKey, Repo<T> repo, boolean autoCleanUp) {
441+
store.seedTransaction(txName, fateKey, repo, autoCleanUp);
478442
}
479443

480444
// start work in the transaction.. it is safe to call this
481445
// multiple times for a transaction... but it will only seed once
482446
public void seedTransaction(String txName, FateId fateId, Repo<T> repo, boolean autoCleanUp,
483447
String goalMessage) {
484-
FateTxStore<T> txStore = store.reserve(fateId);
485-
try {
486-
if (txStore.getStatus() == NEW) {
487-
seedTransaction(txName, fateId, repo, autoCleanUp, goalMessage, txStore);
488-
}
489-
} finally {
490-
txStore.unreserve(Duration.ZERO);
491-
}
492-
448+
log.info("Seeding {} {}", fateId, goalMessage);
449+
store.seedTransaction(txName, fateId, repo, autoCleanUp);
493450
}
494451

495452
// check on the transaction

core/src/main/java/org/apache/accumulo/core/fate/FateStore.java

+32-10
Original file line numberDiff line numberDiff line change
@@ -50,19 +50,41 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
5050
FateId create();
5151

5252
/**
53-
* Creates and reserves a transaction using the given key. If something is already running for the
54-
* given key, then Optional.empty() will be returned. When this returns a non-empty id, it will be
55-
* in the new state.
53+
* Seeds a transaction with the given repo if it does not exists. A fateId will be derived from
54+
* the fateKey. If seeded, sets the following data for the fateId in the store.
5655
*
57-
* <p>
58-
* In the case where a process dies in the middle of a call to this. If later, another call is
59-
* made with the same key and its in the new state then the FateId for that key will be returned.
60-
* </p>
56+
* <ul>
57+
* <li>Set the tx name</li>
58+
* <li>Set the status to SUBMITTED</li>
59+
* <li>Set the fate key</li>
60+
* <li>Sets autocleanup only if true</li>
61+
* <li>Sets the creation time</li>
62+
* </ul>
6163
*
62-
* @throws IllegalStateException when there is an unexpected collision. This can occur if two key
63-
* hash to the same FateId or if a random FateId already exists.
64+
* @return The return type is only intended for testing it may not be correct in the face of
65+
* failures. When there are no failures returns optional w/ the fate id set if seeded and
66+
* empty optional otherwise. If there was a failure this could return an empty optional
67+
* when it actually succeeded.
6468
*/
65-
Optional<FateTxStore<T>> createAndReserve(FateKey fateKey);
69+
Optional<FateId> seedTransaction(String txName, FateKey fateKey, Repo<T> repo,
70+
boolean autoCleanUp);
71+
72+
/**
73+
* Seeds a transaction with the given repo if its current status is NEW and it is currently
74+
* unreserved. If seeded, sets the following data for the fateId in the store.
75+
*
76+
* <ul>
77+
* <li>Set the tx name</li>
78+
* <li>Set the status to SUBMITTED</li>
79+
* <li>Sets autocleanup only if true</li>
80+
* <li>Sets the creation time</li>
81+
* </ul>
82+
*
83+
* @return The return type is only intended for testing it may not be correct in the face of
84+
* failures. When there are no failures returns true if seeded and false otherwise. If
85+
* there was a failure this could return false when it actually succeeded.
86+
*/
87+
boolean seedTransaction(String txName, FateId fateId, Repo<T> repo, boolean autoCleanUp);
6688

6789
/**
6890
* An interface that allows read/write access to the data related to a single fate operation.

core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java

+23
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,29 @@ public interface FateMutator<T> {
3333

3434
FateMutator<T> putCreateTime(long ctime);
3535

36+
/**
37+
* Requires that nothing exists for this fate mutation.
38+
*/
39+
FateMutator<T> requireAbsent();
40+
41+
/**
42+
* Require that the transaction status is one of the given statuses. If no statuses are provided,
43+
* require that the status column is absent.
44+
*
45+
* @param statuses The statuses to check against.
46+
*/
47+
FateMutator<T> requireStatus(TStatus... statuses);
48+
49+
/**
50+
* Require the transaction has no reservation.
51+
*/
52+
FateMutator<T> requireUnreserved();
53+
54+
/**
55+
* Require the transaction has no fate key set.
56+
*/
57+
FateMutator<T> requireAbsentKey();
58+
3659
/**
3760
* Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will
3861
* put the reservation if there is not already a reservation present

core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java

+31-7
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.accumulo.core.client.AccumuloSecurityException;
3030
import org.apache.accumulo.core.client.BatchWriter;
3131
import org.apache.accumulo.core.client.ConditionalWriter;
32+
import org.apache.accumulo.core.client.IteratorSetting;
3233
import org.apache.accumulo.core.client.MutationsRejectedException;
3334
import org.apache.accumulo.core.client.Scanner;
3435
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -48,12 +49,16 @@
4849
import org.apache.accumulo.core.security.Authorizations;
4950
import org.apache.hadoop.io.Text;
5051

52+
import com.google.common.base.Preconditions;
53+
5154
public class FateMutatorImpl<T> implements FateMutator<T> {
5255

5356
private final ClientContext context;
5457
private final String tableName;
5558
private final FateId fateId;
5659
private final ConditionalMutation mutation;
60+
private boolean requiredUnreserved = false;
61+
public static final int INITIAL_ITERATOR_PRIO = 1000000;
5762

5863
public FateMutatorImpl(ClientContext context, String tableName, FateId fateId) {
5964
this.context = Objects.requireNonNull(context);
@@ -81,10 +86,34 @@ public FateMutator<T> putCreateTime(long ctime) {
8186
}
8287

8388
@Override
84-
public FateMutator<T> putReservedTx(FateStore.FateReservation reservation) {
89+
public FateMutator<T> requireAbsent() {
90+
IteratorSetting is = new IteratorSetting(INITIAL_ITERATOR_PRIO, RowExistsIterator.class);
91+
Condition c = new Condition("", "").setIterators(is);
92+
mutation.addCondition(c);
93+
return this;
94+
}
95+
96+
@Override
97+
public FateMutator<T> requireUnreserved() {
98+
Preconditions.checkState(!requiredUnreserved);
8599
Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
86100
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
87101
mutation.addCondition(condition);
102+
requiredUnreserved = true;
103+
return this;
104+
}
105+
106+
@Override
107+
public FateMutator<T> requireAbsentKey() {
108+
Condition condition = new Condition(TxColumnFamily.TX_KEY_COLUMN.getColumnFamily(),
109+
TxColumnFamily.TX_KEY_COLUMN.getColumnQualifier());
110+
mutation.addCondition(condition);
111+
return this;
112+
}
113+
114+
@Override
115+
public FateMutator<T> putReservedTx(FateStore.FateReservation reservation) {
116+
requireUnreserved();
88117
TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(reservation.getSerialized()));
89118
return this;
90119
}
@@ -179,12 +208,7 @@ public FateMutator<T> delete() {
179208
return this;
180209
}
181210

182-
/**
183-
* Require that the transaction status is one of the given statuses. If no statuses are provided,
184-
* require that the status column is absent.
185-
*
186-
* @param statuses The statuses to check against.
187-
*/
211+
@Override
188212
public FateMutator<T> requireStatus(TStatus... statuses) {
189213
Condition condition = StatusMappingIterator.createCondition(statuses);
190214
mutation.addCondition(condition);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.accumulo.core.fate.user;
20+
21+
import java.io.IOException;
22+
import java.util.Collection;
23+
import java.util.Set;
24+
25+
import org.apache.accumulo.core.data.ByteSequence;
26+
import org.apache.accumulo.core.data.Range;
27+
import org.apache.accumulo.core.iterators.WrappingIterator;
28+
29+
import com.google.common.base.Preconditions;
30+
31+
/**
32+
* Iterator is used by conditional mutations to check if row exists.
33+
*/
34+
public class RowExistsIterator extends WrappingIterator {
35+
36+
@Override
37+
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
38+
throws IOException {
39+
Preconditions.checkState(range.getStartKey() != null && range.getEndKey() != null);
40+
var startRow = range.getStartKey().getRow();
41+
var endRow = range.getEndKey().getRow();
42+
Preconditions.checkState(startRow.equals(endRow));
43+
Range r = new Range(startRow);
44+
super.seek(r, Set.of(), false);
45+
}
46+
}

0 commit comments

Comments
 (0)