36
36
import static org .junit .jupiter .api .Assertions .fail ;
37
37
38
38
import java .io .File ;
39
+ import java .util .ArrayList ;
40
+ import java .util .List ;
39
41
import java .util .UUID ;
40
42
import java .util .concurrent .CountDownLatch ;
41
43
@@ -114,19 +116,77 @@ public Repo<Manager> call(long tid, Manager manager) throws Exception {
114
116
115
117
}
116
118
119
+ public static class TestOperationFails extends ManagerRepo {
120
+ private static final long serialVersionUID = 1L ;
121
+ private static final Logger LOG = LoggerFactory .getLogger (TestOperationFails .class );
122
+ private static List <String > undoOrder = new ArrayList <>();
123
+ private static final int TOTAL_NUM_OPS = 3 ;
124
+ private int opNum ;
125
+ private final String opName ;
126
+ private final ExceptionLocation location ;
127
+
128
+ public TestOperationFails (int opNum , ExceptionLocation location ) {
129
+ this .opNum = opNum ;
130
+ this .opName = "OP" + opNum ;
131
+ this .location = location ;
132
+ }
133
+
134
+ @ Override
135
+ public long isReady (long tid , Manager environment ) throws Exception {
136
+ LOG .debug ("{} {} Entered isReady()" , opName , FateTxId .formatTid (tid ));
137
+ if (location == ExceptionLocation .IS_READY ) {
138
+ if (opNum < TOTAL_NUM_OPS ) {
139
+ return 0 ;
140
+ } else {
141
+ throw new Exception (
142
+ opName + " " + FateTxId .formatTid (tid ) + " isReady() failed - this is expected" );
143
+ }
144
+ } else {
145
+ return 0 ;
146
+ }
147
+ }
148
+
149
+ @ Override
150
+ public void undo (long tid , Manager environment ) throws Exception {
151
+ LOG .debug ("{} {} Entered undo()" , opName , FateTxId .formatTid (tid ));
152
+ undoOrder .add (opName );
153
+ undoLatch .countDown ();
154
+ }
155
+
156
+ @ Override
157
+ public Repo <Manager > call (long tid , Manager environment ) throws Exception {
158
+ LOG .debug ("{} {} Entered call()" , opName , FateTxId .formatTid (tid ));
159
+ if (location == ExceptionLocation .CALL ) {
160
+ if (opNum < TOTAL_NUM_OPS ) {
161
+ return new TestOperationFails (++opNum , location );
162
+ } else {
163
+ throw new Exception (
164
+ opName + " " + FateTxId .formatTid (tid ) + " call() failed - this is expected" );
165
+ }
166
+ } else {
167
+ return new TestOperationFails (++opNum , location );
168
+ }
169
+ }
170
+ }
171
+
117
172
private static final Logger LOG = LoggerFactory .getLogger (FateIT .class );
118
173
119
174
@ TempDir
120
175
private static File tempDir ;
121
176
122
177
private static ZooKeeperTestingServer szk = null ;
123
178
private static ZooReaderWriter zk = null ;
124
- private static final String ZK_ROOT = "/accumulo/" + UUID .randomUUID (). toString () ;
179
+ private static final String ZK_ROOT = "/accumulo/" + UUID .randomUUID ();
125
180
private static final NamespaceId NS = NamespaceId .of ("testNameSpace" );
126
181
private static final TableId TID = TableId .of ("testTable" );
127
182
128
183
private static CountDownLatch callStarted ;
129
184
private static CountDownLatch finishCall ;
185
+ private static CountDownLatch undoLatch ;
186
+
187
+ private enum ExceptionLocation {
188
+ CALL , IS_READY
189
+ }
130
190
131
191
@ BeforeAll
132
192
public static void setup () throws Exception {
@@ -148,9 +208,8 @@ public static void teardown() throws Exception {
148
208
@ Timeout (30 )
149
209
public void testTransactionStatus () throws Exception {
150
210
151
- final ZooStore <Manager > zooStore = new ZooStore <Manager >(ZK_ROOT + Constants .ZFATE , zk );
152
- final AgeOffStore <Manager > store =
153
- new AgeOffStore <Manager >(zooStore , 3000 , System ::currentTimeMillis );
211
+ final ZooStore <Manager > zooStore = new ZooStore <>(ZK_ROOT + Constants .ZFATE , zk );
212
+ final AgeOffStore <Manager > store = new AgeOffStore <>(zooStore , 3000 , System ::currentTimeMillis );
154
213
155
214
Manager manager = createMock (Manager .class );
156
215
ServerContext sctx = createMock (ServerContext .class );
@@ -162,19 +221,20 @@ public void testTransactionStatus() throws Exception {
162
221
ConfigurationCopy config = new ConfigurationCopy ();
163
222
config .set (Property .GENERAL_THREADPOOL_SIZE , "2" );
164
223
config .set (Property .MANAGER_FATE_THREADPOOL_SIZE , "1" );
165
- Fate <Manager > fate = new Fate <Manager >(manager , store , TraceRepo ::toLogString , config );
224
+ Fate <Manager > fate = new Fate <>(manager , store , TraceRepo ::toLogString , config );
166
225
try {
167
226
168
- // Wait for the transaction runner to be scheduled.
169
- Thread .sleep (3000 );
170
-
171
227
callStarted = new CountDownLatch (1 );
172
228
finishCall = new CountDownLatch (1 );
173
229
174
230
long txid = fate .startTransaction ();
175
231
assertEquals (TStatus .NEW , getTxStatus (zk , txid ));
176
232
fate .seedTransaction ("TestOperation" , txid , new TestOperation (NS , TID ), true , "Test Op" );
177
233
assertEquals (TStatus .SUBMITTED , getTxStatus (zk , txid ));
234
+
235
+ // Wait for the transaction runner to be scheduled.
236
+ Thread .sleep (3000 );
237
+
178
238
// wait for call() to be called
179
239
callStarted .await ();
180
240
assertEquals (IN_PROGRESS , getTxStatus (zk , txid ));
@@ -208,9 +268,8 @@ public void testTransactionStatus() throws Exception {
208
268
209
269
@ Test
210
270
public void testCancelWhileNew () throws Exception {
211
- final ZooStore <Manager > zooStore = new ZooStore <Manager >(ZK_ROOT + Constants .ZFATE , zk );
212
- final AgeOffStore <Manager > store =
213
- new AgeOffStore <Manager >(zooStore , 3000 , System ::currentTimeMillis );
271
+ final ZooStore <Manager > zooStore = new ZooStore <>(ZK_ROOT + Constants .ZFATE , zk );
272
+ final AgeOffStore <Manager > store = new AgeOffStore <>(zooStore , 3000 , System ::currentTimeMillis );
214
273
215
274
Manager manager = createMock (Manager .class );
216
275
ServerContext sctx = createMock (ServerContext .class );
@@ -222,9 +281,8 @@ public void testCancelWhileNew() throws Exception {
222
281
ConfigurationCopy config = new ConfigurationCopy ();
223
282
config .set (Property .GENERAL_THREADPOOL_SIZE , "2" );
224
283
config .set (Property .MANAGER_FATE_THREADPOOL_SIZE , "1" );
225
- Fate <Manager > fate = new Fate <Manager >(manager , store , TraceRepo ::toLogString , config );
284
+ Fate <Manager > fate = new Fate <>(manager , store , TraceRepo ::toLogString , config );
226
285
try {
227
-
228
286
// Wait for the transaction runner to be scheduled.
229
287
Thread .sleep (3000 );
230
288
@@ -250,9 +308,8 @@ public void testCancelWhileNew() throws Exception {
250
308
251
309
@ Test
252
310
public void testCancelWhileSubmittedAndRunning () throws Exception {
253
- final ZooStore <Manager > zooStore = new ZooStore <Manager >(ZK_ROOT + Constants .ZFATE , zk );
254
- final AgeOffStore <Manager > store =
255
- new AgeOffStore <Manager >(zooStore , 3000 , System ::currentTimeMillis );
311
+ final ZooStore <Manager > zooStore = new ZooStore <>(ZK_ROOT + Constants .ZFATE , zk );
312
+ final AgeOffStore <Manager > store = new AgeOffStore <>(zooStore , 3000 , System ::currentTimeMillis );
256
313
257
314
Manager manager = createMock (Manager .class );
258
315
ServerContext sctx = createMock (ServerContext .class );
@@ -264,7 +321,7 @@ public void testCancelWhileSubmittedAndRunning() throws Exception {
264
321
ConfigurationCopy config = new ConfigurationCopy ();
265
322
config .set (Property .GENERAL_THREADPOOL_SIZE , "2" );
266
323
config .set (Property .MANAGER_FATE_THREADPOOL_SIZE , "1" );
267
- Fate <Manager > fate = new Fate <Manager >(manager , store , TraceRepo ::toLogString , config );
324
+ Fate <Manager > fate = new Fate <>(manager , store , TraceRepo ::toLogString , config );
268
325
try {
269
326
270
327
// Wait for the transaction runner to be scheduled.
@@ -293,9 +350,8 @@ public void testCancelWhileSubmittedAndRunning() throws Exception {
293
350
294
351
@ Test
295
352
public void testCancelWhileInCall () throws Exception {
296
- final ZooStore <Manager > zooStore = new ZooStore <Manager >(ZK_ROOT + Constants .ZFATE , zk );
297
- final AgeOffStore <Manager > store =
298
- new AgeOffStore <Manager >(zooStore , 3000 , System ::currentTimeMillis );
353
+ final ZooStore <Manager > zooStore = new ZooStore <>(ZK_ROOT + Constants .ZFATE , zk );
354
+ final AgeOffStore <Manager > store = new AgeOffStore <>(zooStore , 3000 , System ::currentTimeMillis );
299
355
300
356
Manager manager = createMock (Manager .class );
301
357
ServerContext sctx = createMock (ServerContext .class );
@@ -307,7 +363,7 @@ public void testCancelWhileInCall() throws Exception {
307
363
ConfigurationCopy config = new ConfigurationCopy ();
308
364
config .set (Property .GENERAL_THREADPOOL_SIZE , "2" );
309
365
config .set (Property .MANAGER_FATE_THREADPOOL_SIZE , "1" );
310
- Fate <Manager > fate = new Fate <Manager >(manager , store , TraceRepo ::toLogString , config );
366
+ Fate <Manager > fate = new Fate <>(manager , store , TraceRepo ::toLogString , config );
311
367
try {
312
368
313
369
// Wait for the transaction runner to be scheduled.
@@ -321,6 +377,7 @@ public void testCancelWhileInCall() throws Exception {
321
377
assertEquals (NEW , getTxStatus (zk , txid ));
322
378
fate .seedTransaction ("TestOperation" , txid , new TestOperation (NS , TID ), true , "Test Op" );
323
379
assertEquals (SUBMITTED , getTxStatus (zk , txid ));
380
+
324
381
// wait for call() to be called
325
382
callStarted .await ();
326
383
// cancel the transaction
@@ -331,6 +388,67 @@ public void testCancelWhileInCall() throws Exception {
331
388
332
389
}
333
390
391
+ @ Test
392
+ public void testRepoFails () throws Exception {
393
+ /*
394
+ * This test ensures that when an exception occurs in a Repo's call() or isReady() methods, that
395
+ * undo() will be called back up the chain of Repo's and in the correct order. The test works as
396
+ * follows: 1) Repo1 is called and returns Repo2, 2) Repo2 is called and returns Repo3, 3) Repo3
397
+ * is called and throws an exception (in call() or isReady()). It is then expected that: 1)
398
+ * undo() is called on Repo3, 2) undo() is called on Repo2, 3) undo() is called on Repo1
399
+ */
400
+ final ZooStore <Manager > zooStore = new ZooStore <>(ZK_ROOT + Constants .ZFATE , zk );
401
+ final AgeOffStore <Manager > store = new AgeOffStore <>(zooStore , 3000 , System ::currentTimeMillis );
402
+
403
+ Manager manager = createMock (Manager .class );
404
+ ServerContext sctx = createMock (ServerContext .class );
405
+ expect (manager .getContext ()).andReturn (sctx ).anyTimes ();
406
+ expect (sctx .getZooKeeperRoot ()).andReturn (ZK_ROOT ).anyTimes ();
407
+ expect (sctx .getZooReaderWriter ()).andReturn (zk ).anyTimes ();
408
+ replay (manager , sctx );
409
+
410
+ ConfigurationCopy config = new ConfigurationCopy ();
411
+ config .set (Property .GENERAL_THREADPOOL_SIZE , "2" );
412
+ config .set (Property .MANAGER_FATE_THREADPOOL_SIZE , "1" );
413
+ Fate <Manager > fate = new Fate <>(manager , store , TraceRepo ::toLogString , config );
414
+ try {
415
+
416
+ // Wait for the transaction runner to be scheduled.
417
+ Thread .sleep (3000 );
418
+
419
+ List <String > expectedUndoOrder = List .of ("OP3" , "OP2" , "OP1" );
420
+ /*
421
+ * Test exception in call()
422
+ */
423
+ undoLatch = new CountDownLatch (TestOperationFails .TOTAL_NUM_OPS );
424
+ long txid = fate .startTransaction ();
425
+ assertEquals (NEW , getTxStatus (zk , txid ));
426
+ fate .seedTransaction ("TestOperationFails" , txid ,
427
+ new TestOperationFails (1 , ExceptionLocation .CALL ), false , "Test Op Fails" );
428
+ // Wait for all the undo() calls to complete
429
+ undoLatch .await ();
430
+ assertEquals (expectedUndoOrder , TestOperationFails .undoOrder );
431
+ assertEquals (FAILED , fate .waitForCompletion (txid ));
432
+ assertTrue (fate .getException (txid ).getMessage ().contains ("call() failed" ));
433
+ /*
434
+ * Test exception in isReady()
435
+ */
436
+ TestOperationFails .undoOrder = new ArrayList <>();
437
+ undoLatch = new CountDownLatch (TestOperationFails .TOTAL_NUM_OPS );
438
+ txid = fate .startTransaction ();
439
+ assertEquals (NEW , getTxStatus (zk , txid ));
440
+ fate .seedTransaction ("TestOperationFails" , txid ,
441
+ new TestOperationFails (1 , ExceptionLocation .IS_READY ), false , "Test Op Fails" );
442
+ // Wait for all the undo() calls to complete
443
+ undoLatch .await ();
444
+ assertEquals (expectedUndoOrder , TestOperationFails .undoOrder );
445
+ assertEquals (FAILED , fate .waitForCompletion (txid ));
446
+ assertTrue (fate .getException (txid ).getMessage ().contains ("isReady() failed" ));
447
+ } finally {
448
+ fate .shutdown ();
449
+ }
450
+ }
451
+
334
452
private static void inCall () throws InterruptedException {
335
453
// signal that call started
336
454
callStarted .countDown ();
0 commit comments