36
36
import static org .junit .jupiter .api .Assertions .fail ;
37
37
38
38
import java .io .File ;
39
+ import java .util .ArrayList ;
40
+ import java .util .List ;
39
41
import java .util .UUID ;
40
42
import java .util .concurrent .CountDownLatch ;
41
43
@@ -115,6 +117,59 @@ public Repo<Manager> call(long tid, Manager manager) throws Exception {
115
117
116
118
}
117
119
120
+ public static class TestOperationFails extends ManagerRepo {
121
+ private static final long serialVersionUID = 1L ;
122
+ private static final Logger LOG = LoggerFactory .getLogger (TestOperationFails .class );
123
+ private static List <String > undoOrder = new ArrayList <>();
124
+ private static final int TOTAL_NUM_OPS = 3 ;
125
+ private int opNum ;
126
+ private final String opName ;
127
+ private final ExceptionLocation location ;
128
+
129
+ public TestOperationFails (int opNum , ExceptionLocation location ) {
130
+ this .opNum = opNum ;
131
+ this .opName = "OP" + opNum ;
132
+ this .location = location ;
133
+ }
134
+
135
+ @ Override
136
+ public long isReady (long tid , Manager environment ) throws Exception {
137
+ LOG .debug ("{} {} Entered isReady()" , opName , FateTxId .formatTid (tid ));
138
+ if (location == ExceptionLocation .IS_READY ) {
139
+ if (opNum < TOTAL_NUM_OPS ) {
140
+ return 0 ;
141
+ } else {
142
+ throw new Exception (
143
+ opName + " " + FateTxId .formatTid (tid ) + " isReady() failed - this is expected" );
144
+ }
145
+ } else {
146
+ return 0 ;
147
+ }
148
+ }
149
+
150
+ @ Override
151
+ public void undo (long tid , Manager environment ) throws Exception {
152
+ LOG .debug ("{} {} Entered undo()" , opName , FateTxId .formatTid (tid ));
153
+ undoOrder .add (opName );
154
+ undoLatch .countDown ();
155
+ }
156
+
157
+ @ Override
158
+ public Repo <Manager > call (long tid , Manager environment ) throws Exception {
159
+ LOG .debug ("{} {} Entered call()" , opName , FateTxId .formatTid (tid ));
160
+ if (location == ExceptionLocation .CALL ) {
161
+ if (opNum < TOTAL_NUM_OPS ) {
162
+ return new TestOperationFails (++opNum , location );
163
+ } else {
164
+ throw new Exception (
165
+ opName + " " + FateTxId .formatTid (tid ) + " call() failed - this is expected" );
166
+ }
167
+ } else {
168
+ return new TestOperationFails (++opNum , location );
169
+ }
170
+ }
171
+ }
172
+
118
173
private static final Logger LOG = LoggerFactory .getLogger (FateIT .class );
119
174
120
175
@ TempDir
@@ -128,6 +183,11 @@ public Repo<Manager> call(long tid, Manager manager) throws Exception {
128
183
129
184
private static CountDownLatch callStarted ;
130
185
private static CountDownLatch finishCall ;
186
+ private static CountDownLatch undoLatch ;
187
+
188
+ private enum ExceptionLocation {
189
+ CALL , IS_READY
190
+ };
131
191
132
192
@ BeforeAll
133
193
public static void setup () throws Exception {
@@ -165,10 +225,6 @@ public void testTransactionStatus() throws Exception {
165
225
ConfigurationCopy config = new ConfigurationCopy ();
166
226
config .set (Property .GENERAL_THREADPOOL_SIZE , "2" );
167
227
config .set (Property .MANAGER_FATE_THREADPOOL_SIZE , "1" );
168
- fate .startTransactionRunners (config );
169
-
170
- // Wait for the transaction runner to be scheduled.
171
- UtilWaitThread .sleep (3000 );
172
228
173
229
callStarted = new CountDownLatch (1 );
174
230
finishCall = new CountDownLatch (1 );
@@ -177,6 +233,11 @@ public void testTransactionStatus() throws Exception {
177
233
assertEquals (TStatus .NEW , getTxStatus (zk , txid ));
178
234
fate .seedTransaction ("TestOperation" , txid , new TestOperation (NS , TID ), true , "Test Op" );
179
235
assertEquals (TStatus .SUBMITTED , getTxStatus (zk , txid ));
236
+
237
+ fate .startTransactionRunners (config );
238
+ // Wait for the transaction runner to be scheduled.
239
+ UtilWaitThread .sleep (3000 );
240
+
180
241
// wait for call() to be called
181
242
callStarted .await ();
182
243
assertEquals (IN_PROGRESS , getTxStatus (zk , txid ));
@@ -346,10 +407,6 @@ public void testCancelWhileInCall() throws Exception {
346
407
ConfigurationCopy config = new ConfigurationCopy ();
347
408
config .set (Property .GENERAL_THREADPOOL_SIZE , "2" );
348
409
config .set (Property .MANAGER_FATE_THREADPOOL_SIZE , "1" );
349
- fate .startTransactionRunners (config );
350
-
351
- // Wait for the transaction runner to be scheduled.
352
- UtilWaitThread .sleep (3000 );
353
410
354
411
callStarted = new CountDownLatch (1 );
355
412
finishCall = new CountDownLatch (1 );
@@ -359,6 +416,11 @@ public void testCancelWhileInCall() throws Exception {
359
416
assertEquals (NEW , getTxStatus (zk , txid ));
360
417
fate .seedTransaction ("TestOperation" , txid , new TestOperation (NS , TID ), true , "Test Op" );
361
418
assertEquals (SUBMITTED , getTxStatus (zk , txid ));
419
+
420
+ fate .startTransactionRunners (config );
421
+ // Wait for the transaction runner to be scheduled.
422
+ UtilWaitThread .sleep (3000 );
423
+
362
424
// wait for call() to be called
363
425
callStarted .await ();
364
426
// cancel the transaction
@@ -369,6 +431,69 @@ public void testCancelWhileInCall() throws Exception {
369
431
370
432
}
371
433
434
+ @ Test
435
+ public void testRepoFails () throws Exception {
436
+ /*
437
+ * This test ensures that when an exception occurs in a Repo's call() or isReady() methods, that
438
+ * undo() will be called back up the chain of Repo's and in the correct order. The test works as
439
+ * follows: 1) Repo1 is called and returns Repo2, 2) Repo2 is called and returns Repo3, 3) Repo3
440
+ * is called and throws an exception (in call() or isReady()). It is then expected that: 1)
441
+ * undo() is called on Repo3, 2) undo() is called on Repo2, 3) undo() is called on Repo1
442
+ */
443
+ final ZooStore <Manager > zooStore = new ZooStore <Manager >(ZK_ROOT + Constants .ZFATE , zk );
444
+ final AgeOffStore <Manager > store =
445
+ new AgeOffStore <Manager >(zooStore , 3000 , System ::currentTimeMillis );
446
+
447
+ Manager manager = createMock (Manager .class );
448
+ ServerContext sctx = createMock (ServerContext .class );
449
+ expect (manager .getContext ()).andReturn (sctx ).anyTimes ();
450
+ expect (sctx .getZooKeeperRoot ()).andReturn (ZK_ROOT ).anyTimes ();
451
+ expect (sctx .getZooReaderWriter ()).andReturn (zk ).anyTimes ();
452
+ replay (manager , sctx );
453
+
454
+ Fate <Manager > fate = new Fate <Manager >(manager , store , TraceRepo ::toLogString );
455
+ try {
456
+ ConfigurationCopy config = new ConfigurationCopy ();
457
+ config .set (Property .GENERAL_THREADPOOL_SIZE , "2" );
458
+ config .set (Property .MANAGER_FATE_THREADPOOL_SIZE , "1" );
459
+ fate .startTransactionRunners (config );
460
+
461
+ // Wait for the transaction runner to be scheduled.
462
+ UtilWaitThread .sleep (3000 );
463
+
464
+ List <String > expectedUndoOrder = List .of ("OP3" , "OP2" , "OP1" );
465
+ /*
466
+ * Test exception in call()
467
+ */
468
+ undoLatch = new CountDownLatch (TestOperationFails .TOTAL_NUM_OPS );
469
+ long txid = fate .startTransaction ();
470
+ assertEquals (NEW , getTxStatus (zk , txid ));
471
+ fate .seedTransaction ("TestOperationFails" , txid ,
472
+ new TestOperationFails (1 , ExceptionLocation .CALL ), false , "Test Op Fails" );
473
+ // Wait for all the undo() calls to complete
474
+ undoLatch .await ();
475
+ assertEquals (expectedUndoOrder , TestOperationFails .undoOrder );
476
+ assertEquals (FAILED , fate .waitForCompletion (txid ));
477
+ assertTrue (fate .getException (txid ).getMessage ().contains ("call() failed" ));
478
+ /*
479
+ * Test exception in isReady()
480
+ */
481
+ TestOperationFails .undoOrder = new ArrayList <>();
482
+ undoLatch = new CountDownLatch (TestOperationFails .TOTAL_NUM_OPS );
483
+ txid = fate .startTransaction ();
484
+ assertEquals (NEW , getTxStatus (zk , txid ));
485
+ fate .seedTransaction ("TestOperationFails" , txid ,
486
+ new TestOperationFails (1 , ExceptionLocation .IS_READY ), false , "Test Op Fails" );
487
+ // Wait for all the undo() calls to complete
488
+ undoLatch .await ();
489
+ assertEquals (expectedUndoOrder , TestOperationFails .undoOrder );
490
+ assertEquals (FAILED , fate .waitForCompletion (txid ));
491
+ assertTrue (fate .getException (txid ).getMessage ().contains ("isReady() failed" ));
492
+ } finally {
493
+ fate .shutdown ();
494
+ }
495
+ }
496
+
372
497
private static void inCall () throws InterruptedException {
373
498
// signal that call started
374
499
callStarted .countDown ();
0 commit comments