Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit fef8e75

Browse files
committedMar 11, 2024·
Replace long + TimeUnit with Duration in ReadOnlyTStore.unreserve()
1 parent f3d5fb0 commit fef8e75

File tree

8 files changed

+44
-47
lines changed

8 files changed

+44
-47
lines changed
 

‎core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import static java.nio.charset.StandardCharsets.UTF_8;
2222

23+
import java.time.Duration;
2324
import java.time.ZoneOffset;
2425
import java.time.format.DateTimeFormatter;
2526
import java.util.ArrayList;
@@ -32,7 +33,6 @@
3233
import java.util.Map;
3334
import java.util.Map.Entry;
3435
import java.util.Set;
35-
import java.util.concurrent.TimeUnit;
3636

3737
import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
3838
import org.apache.accumulo.core.fate.zookeeper.FateLock;
@@ -368,7 +368,7 @@ private FateStatus getTransactionStatus(ReadOnlyTStore<T> zs, Set<Long> filterTx
368368

369369
long timeCreated = zs.timeCreated(tid);
370370

371-
zs.unreserve(tid, 0, TimeUnit.MILLISECONDS);
371+
zs.unreserve(tid, Duration.ZERO);
372372

373373
if (includeByStatus(status, filterStatus) && includeByTxid(tid, filterTxid)) {
374374
statuses.add(new TransactionStatus(tid, status, txName, hlocks, wlocks, top, timeCreated));
@@ -451,7 +451,7 @@ public boolean prepDelete(TStore<T> zs, ZooReaderWriter zk, ServiceLockPath path
451451
break;
452452
}
453453

454-
zs.unreserve(txid, 0, TimeUnit.MILLISECONDS);
454+
zs.unreserve(txid, Duration.ZERO);
455455
return state;
456456
}
457457

@@ -495,7 +495,7 @@ public boolean prepFail(TStore<T> zs, ZooReaderWriter zk, ServiceLockPath zLockM
495495
break;
496496
}
497497

498-
zs.unreserve(txid, 0, TimeUnit.MILLISECONDS);
498+
zs.unreserve(txid, Duration.ZERO);
499499
return state;
500500
}
501501

‎core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919
package org.apache.accumulo.core.fate;
2020

2121
import java.io.Serializable;
22+
import java.time.Duration;
2223
import java.util.EnumSet;
2324
import java.util.HashMap;
2425
import java.util.HashSet;
2526
import java.util.List;
2627
import java.util.Map;
2728
import java.util.Map.Entry;
28-
import java.util.concurrent.TimeUnit;
2929

3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
@@ -108,7 +108,7 @@ public void ageOff() {
108108
}
109109

110110
} finally {
111-
store.unreserve(txid, 0, TimeUnit.MILLISECONDS);
111+
store.unreserve(txid, Duration.ZERO);
112112
}
113113
} catch (Exception e) {
114114
log.warn("Failed to age off FATE tx " + FateTxId.formatTid(txid), e);
@@ -138,7 +138,7 @@ public AgeOffStore(ZooStore<T> store, long ageOffTime, TimeSource timeSource) {
138138
break;
139139
}
140140
} finally {
141-
store.unreserve(txid, 0, TimeUnit.MILLISECONDS);
141+
store.unreserve(txid, Duration.ZERO);
142142
}
143143
}
144144
}
@@ -166,8 +166,8 @@ public boolean tryReserve(long tid) {
166166
}
167167

168168
@Override
169-
public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) {
170-
store.unreserve(tid, deferTime, deferTimeUnit);
169+
public void unreserve(long tid, Duration deferTime) {
170+
store.unreserve(tid, deferTime);
171171
}
172172

173173
@Override

‎core/src/main/java/org/apache/accumulo/core/fate/Fate.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@
3030
import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.UNKNOWN;
3131
import static org.apache.accumulo.core.util.ShutdownUtil.isIOException;
3232

33+
import java.time.Duration;
3334
import java.util.EnumSet;
3435
import java.util.concurrent.ExecutorService;
3536
import java.util.concurrent.RejectedExecutionException;
3637
import java.util.concurrent.ScheduledThreadPoolExecutor;
3738
import java.util.concurrent.ThreadPoolExecutor;
38-
import java.util.concurrent.TimeUnit;
3939
import java.util.concurrent.atomic.AtomicBoolean;
4040
import java.util.function.Function;
4141

@@ -133,7 +133,7 @@ public void run() {
133133
runnerLog.error("Uncaught exception in FATE runner thread.", e);
134134
} finally {
135135
if (tid != null) {
136-
store.unreserve(tid, deferTime, TimeUnit.MILLISECONDS);
136+
store.unreserve(tid, Duration.ofMillis(deferTime));
137137
}
138138
}
139139
}
@@ -289,7 +289,7 @@ public void seedTransaction(String txName, long tid, Repo<T> repo, boolean autoC
289289
store.setStatus(tid, SUBMITTED);
290290
}
291291
} finally {
292-
store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
292+
store.unreserve(tid, Duration.ZERO);
293293
}
294294

295295
}
@@ -325,7 +325,7 @@ public boolean cancel(long tid) {
325325
return false;
326326
}
327327
} finally {
328-
store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
328+
store.unreserve(tid, Duration.ZERO);
329329
}
330330
} else {
331331
// reserved, lets retry.
@@ -356,7 +356,7 @@ public void delete(long tid) {
356356
break;
357357
}
358358
} finally {
359-
store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
359+
store.unreserve(tid, Duration.ZERO);
360360
}
361361
}
362362

@@ -369,7 +369,7 @@ public String getReturn(long tid) {
369369
}
370370
return (String) store.getTransactionInfo(tid, TxInfo.RETURN_VALUE);
371371
} finally {
372-
store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
372+
store.unreserve(tid, Duration.ZERO);
373373
}
374374
}
375375

@@ -383,7 +383,7 @@ public Exception getException(long tid) {
383383
}
384384
return (Exception) store.getTransactionInfo(tid, TxInfo.EXCEPTION);
385385
} finally {
386-
store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
386+
store.unreserve(tid, Duration.ZERO);
387387
}
388388
}
389389

‎core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyTStore.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
package org.apache.accumulo.core.fate;
2020

2121
import java.io.Serializable;
22+
import java.time.Duration;
2223
import java.util.EnumSet;
2324
import java.util.List;
24-
import java.util.concurrent.TimeUnit;
2525

2626
/**
2727
* Read only access to a Transaction Store.
@@ -79,9 +79,8 @@ enum TStatus {
7979
* @param tid transaction id, previously reserved.
8080
* @param deferTime time to keep this transaction out of the pool used in the {@link #reserve()
8181
* reserve} method. must be non-negative.
82-
* @param deferTimeUnit the time unit of deferTime
8382
*/
84-
void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit);
83+
void unreserve(long tid, Duration deferTime);
8584

8685
/**
8786
* Get the current operation for the given transaction id.

‎core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java

+12-14
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.io.ObjectOutputStream;
3131
import java.io.Serializable;
3232
import java.io.UncheckedIOException;
33+
import java.time.Duration;
3334
import java.util.ArrayList;
3435
import java.util.Collections;
3536
import java.util.EnumSet;
@@ -38,7 +39,6 @@
3839
import java.util.List;
3940
import java.util.Map;
4041
import java.util.Set;
41-
import java.util.concurrent.TimeUnit;
4242

4343
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
4444
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
@@ -63,7 +63,7 @@ public class ZooStore<T> implements TStore<T> {
6363
private ZooReaderWriter zk;
6464
private String lastReserved = "";
6565
private Set<Long> reserved;
66-
private Map<Long,Long> deferred;
66+
private Map<Long,Duration> deferred;
6767
private long statusChangeEvents = 0;
6868
private int reservationsWaiting = 0;
6969

@@ -163,7 +163,7 @@ public long reserve() {
163163
}
164164

165165
if (deferred.containsKey(tid)) {
166-
if ((deferred.get(tid) - System.nanoTime()) < 0) {
166+
if (deferred.get(tid).minusNanos(System.nanoTime()).isNegative()) {
167167
deferred.remove(tid);
168168
} else {
169169
continue;
@@ -202,12 +202,11 @@ public long reserve() {
202202
if (deferred.isEmpty()) {
203203
this.wait(5000);
204204
} else {
205-
long currTime = System.nanoTime();
206-
long minWait =
207-
deferred.values().stream().mapToLong(l -> l - currTime).min().getAsLong();
208-
long waitTime = TimeUnit.MILLISECONDS.convert(minWait, TimeUnit.NANOSECONDS);
209-
if (waitTime > 0) {
210-
this.wait(Math.min(waitTime, 5000));
205+
final long now = System.nanoTime();
206+
Duration minWait = deferred.values().stream().map(l -> l.minusNanos(now))
207+
.min(Duration::compareTo).orElseThrow();
208+
if (minWait.compareTo(Duration.ZERO) > 0) {
209+
this.wait(Math.min(minWait.toMillis(), 5000));
211210
}
212211
}
213212
}
@@ -272,10 +271,9 @@ private void unreserve(long tid) {
272271
}
273272

274273
@Override
275-
public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) {
276-
deferTime = TimeUnit.NANOSECONDS.convert(deferTime, deferTimeUnit);
274+
public void unreserve(long tid, Duration deferTime) {
277275

278-
if (deferTime < 0) {
276+
if (deferTime.isNegative()) {
279277
throw new IllegalArgumentException("deferTime < 0 : " + deferTime);
280278
}
281279

@@ -285,8 +283,8 @@ public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) {
285283
"Tried to unreserve id that was not reserved " + FateTxId.formatTid(tid));
286284
}
287285

288-
if (deferTime > 0) {
289-
deferred.put(tid, System.nanoTime() + deferTime);
286+
if (deferTime.compareTo(Duration.ZERO) > 0) {
287+
deferred.put(tid, deferTime.plusNanos(System.nanoTime()));
290288
}
291289

292290
this.notifyAll();

‎core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import static org.apache.accumulo.core.fate.FateTxId.formatTid;
2222

2323
import java.io.Serializable;
24+
import java.time.Duration;
2425
import java.util.EnumSet;
2526
import java.util.List;
26-
import java.util.concurrent.TimeUnit;
2727
import java.util.function.Function;
2828

2929
import org.apache.accumulo.core.fate.Fate;
@@ -62,8 +62,8 @@ public boolean tryReserve(long tid) {
6262
}
6363

6464
@Override
65-
public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) {
66-
store.unreserve(tid, deferTime, deferTimeUnit);
65+
public void unreserve(long tid, Duration deferTime) {
66+
store.unreserve(tid, deferTime);
6767
}
6868

6969
@Override

‎core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020

2121
import static org.junit.jupiter.api.Assertions.assertEquals;
2222

23+
import java.time.Duration;
2324
import java.util.HashSet;
2425
import java.util.Set;
25-
import java.util.concurrent.TimeUnit;
2626

2727
import org.apache.accumulo.core.fate.AgeOffStore.TimeSource;
2828
import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
@@ -53,23 +53,23 @@ public void testBasic() throws InterruptedException, KeeperException {
5353
long txid1 = aoStore.create();
5454
aoStore.reserve(txid1);
5555
aoStore.setStatus(txid1, TStatus.IN_PROGRESS);
56-
aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);
56+
aoStore.unreserve(txid1, Duration.ZERO);
5757

5858
aoStore.ageOff();
5959

6060
long txid2 = aoStore.create();
6161
aoStore.reserve(txid2);
6262
aoStore.setStatus(txid2, TStatus.IN_PROGRESS);
6363
aoStore.setStatus(txid2, TStatus.FAILED);
64-
aoStore.unreserve(txid2, 0, TimeUnit.MILLISECONDS);
64+
aoStore.unreserve(txid2, Duration.ZERO);
6565

6666
tts.time = 6;
6767

6868
long txid3 = aoStore.create();
6969
aoStore.reserve(txid3);
7070
aoStore.setStatus(txid3, TStatus.IN_PROGRESS);
7171
aoStore.setStatus(txid3, TStatus.SUCCESSFUL);
72-
aoStore.unreserve(txid3, 0, TimeUnit.MILLISECONDS);
72+
aoStore.unreserve(txid3, Duration.ZERO);
7373

7474
Long txid4 = aoStore.create();
7575

@@ -102,19 +102,19 @@ public void testNonEmpty() throws InterruptedException, KeeperException {
102102
long txid1 = testStore.create();
103103
testStore.reserve(txid1);
104104
testStore.setStatus(txid1, TStatus.IN_PROGRESS);
105-
testStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);
105+
testStore.unreserve(txid1, Duration.ZERO);
106106

107107
long txid2 = testStore.create();
108108
testStore.reserve(txid2);
109109
testStore.setStatus(txid2, TStatus.IN_PROGRESS);
110110
testStore.setStatus(txid2, TStatus.FAILED);
111-
testStore.unreserve(txid2, 0, TimeUnit.MILLISECONDS);
111+
testStore.unreserve(txid2, Duration.ZERO);
112112

113113
long txid3 = testStore.create();
114114
testStore.reserve(txid3);
115115
testStore.setStatus(txid3, TStatus.IN_PROGRESS);
116116
testStore.setStatus(txid3, TStatus.SUCCESSFUL);
117-
testStore.unreserve(txid3, 0, TimeUnit.MILLISECONDS);
117+
testStore.unreserve(txid3, Duration.ZERO);
118118

119119
Long txid4 = testStore.create();
120120

@@ -137,7 +137,7 @@ public void testNonEmpty() throws InterruptedException, KeeperException {
137137

138138
aoStore.reserve(txid1);
139139
aoStore.setStatus(txid1, TStatus.FAILED_IN_PROGRESS);
140-
aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);
140+
aoStore.unreserve(txid1, Duration.ZERO);
141141

142142
tts.time = 30;
143143

@@ -148,7 +148,7 @@ public void testNonEmpty() throws InterruptedException, KeeperException {
148148

149149
aoStore.reserve(txid1);
150150
aoStore.setStatus(txid1, TStatus.FAILED);
151-
aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);
151+
aoStore.unreserve(txid1, Duration.ZERO);
152152

153153
aoStore.ageOff();
154154

‎core/src/test/java/org/apache/accumulo/core/fate/TestStore.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@
1818
*/
1919
package org.apache.accumulo.core.fate;
2020

21+
import java.time.Duration;
2122
import java.util.ArrayList;
2223
import java.util.HashMap;
2324
import java.util.HashSet;
2425
import java.util.List;
2526
import java.util.Map;
2627
import java.util.Set;
27-
import java.util.concurrent.TimeUnit;
2828

2929
/**
3030
* Transient in memory store for transactions.
@@ -62,7 +62,7 @@ public boolean tryReserve(long tid) {
6262
}
6363

6464
@Override
65-
public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) {
65+
public void unreserve(long tid, Duration deferTime) {
6666
if (!reserved.remove(tid)) {
6767
throw new IllegalStateException();
6868
}

0 commit comments

Comments
 (0)
Please sign in to comment.