|
28 | 28 | import static org.junit.jupiter.api.Assertions.assertFalse;
|
29 | 29 | import static org.junit.jupiter.api.Assertions.assertTrue;
|
30 | 30 |
|
| 31 | +import java.util.ArrayList; |
31 | 32 | import java.util.HashSet;
|
| 33 | +import java.util.List; |
32 | 34 | import java.util.Set;
|
33 | 35 | import java.util.concurrent.CountDownLatch;
|
34 | 36 | import java.util.concurrent.TimeUnit;
|
@@ -58,6 +60,11 @@ public abstract class FateIT extends SharedMiniClusterBase implements FateTestRu
|
58 | 60 |
|
59 | 61 | private static CountDownLatch callStarted;
|
60 | 62 | private static CountDownLatch finishCall;
|
| 63 | + private static CountDownLatch undoLatch; |
| 64 | + |
| 65 | + private enum ExceptionLocation { |
| 66 | + CALL, IS_READY |
| 67 | + } |
61 | 68 |
|
62 | 69 | public static class TestRepo implements Repo<TestEnv> {
|
63 | 70 | private static final long serialVersionUID = 1L;
|
@@ -100,6 +107,67 @@ public String getReturn() {
|
100 | 107 | }
|
101 | 108 | }
|
102 | 109 |
|
| 110 | + public static class TestOperationFails implements Repo<TestEnv> { |
| 111 | + private static final long serialVersionUID = 1L; |
| 112 | + private static final Logger LOG = LoggerFactory.getLogger(TestOperationFails.class); |
| 113 | + private static List<String> undoOrder = new ArrayList<>(); |
| 114 | + private static final int TOTAL_NUM_OPS = 3; |
| 115 | + private int opNum; |
| 116 | + private final String opName; |
| 117 | + private final ExceptionLocation location; |
| 118 | + |
| 119 | + public TestOperationFails(int opNum, ExceptionLocation location) { |
| 120 | + this.opNum = opNum; |
| 121 | + this.opName = "OP" + opNum; |
| 122 | + this.location = location; |
| 123 | + } |
| 124 | + |
| 125 | + @Override |
| 126 | + public long isReady(FateId fateId, TestEnv environment) throws Exception { |
| 127 | + LOG.debug("{} {} Entered isReady()", opName, fateId); |
| 128 | + if (location == ExceptionLocation.IS_READY) { |
| 129 | + if (opNum < TOTAL_NUM_OPS) { |
| 130 | + return 0; |
| 131 | + } else { |
| 132 | + throw new Exception(opName + " " + fateId + " isReady() failed - this is expected"); |
| 133 | + } |
| 134 | + } else { |
| 135 | + return 0; |
| 136 | + } |
| 137 | + } |
| 138 | + |
| 139 | + @Override |
| 140 | + public String getName() { |
| 141 | + return getClass().getName(); |
| 142 | + } |
| 143 | + |
| 144 | + @Override |
| 145 | + public void undo(FateId fateId, TestEnv environment) throws Exception { |
| 146 | + LOG.debug("{} {} Entered undo()", opName, fateId); |
| 147 | + undoOrder.add(opName); |
| 148 | + undoLatch.countDown(); |
| 149 | + } |
| 150 | + |
| 151 | + @Override |
| 152 | + public Repo<TestEnv> call(FateId fateId, TestEnv environment) throws Exception { |
| 153 | + LOG.debug("{} {} Entered call()", opName, fateId); |
| 154 | + if (location == ExceptionLocation.CALL) { |
| 155 | + if (opNum < TOTAL_NUM_OPS) { |
| 156 | + return new TestOperationFails(++opNum, location); |
| 157 | + } else { |
| 158 | + throw new Exception(opName + " " + fateId + " call() failed - this is expected"); |
| 159 | + } |
| 160 | + } else { |
| 161 | + return new TestOperationFails(++opNum, location); |
| 162 | + } |
| 163 | + } |
| 164 | + |
| 165 | + @Override |
| 166 | + public String getReturn() { |
| 167 | + return "none"; |
| 168 | + } |
| 169 | + } |
| 170 | + |
103 | 171 | /**
|
104 | 172 | * Test Repo that allows configuring a delay time to be returned in isReady().
|
105 | 173 | */
|
@@ -354,6 +422,63 @@ protected void testDeferredOverflow(FateStore<TestEnv> store, ServerContext sctx
|
354 | 422 | }
|
355 | 423 | }
|
356 | 424 |
|
| 425 | + @Test |
| 426 | + @Timeout(30) |
| 427 | + public void testRepoFails() throws Exception { |
| 428 | + // Set a maximum deferred map size of 10 transactions so that when the 11th |
| 429 | + // is seen the Fate store should clear the deferred map and mark |
| 430 | + // the flag as overflow so that all the deferred transactions will be run |
| 431 | + executeTest(this::testRepoFails, 10, AbstractFateStore.DEFAULT_FATE_ID_GENERATOR); |
| 432 | + } |
| 433 | + |
| 434 | + protected void testRepoFails(FateStore<TestEnv> store, ServerContext sctx) throws Exception { |
| 435 | + /* |
| 436 | + * This test ensures that when an exception occurs in a Repo's call() or isReady() methods, that |
| 437 | + * undo() will be called back up the chain of Repo's and in the correct order. The test works as |
| 438 | + * follows: 1) Repo1 is called and returns Repo2, 2) Repo2 is called and returns Repo3, 3) Repo3 |
| 439 | + * is called and throws an exception (in call() or isReady()). It is then expected that: 1) |
| 440 | + * undo() is called on Repo3, 2) undo() is called on Repo2, 3) undo() is called on Repo1 |
| 441 | + */ |
| 442 | + Fate<TestEnv> fate = initializeFate(store); |
| 443 | + try { |
| 444 | + |
| 445 | + // Wait for the transaction runner to be scheduled. |
| 446 | + Thread.sleep(3000); |
| 447 | + |
| 448 | + List<String> expectedUndoOrder = List.of("OP3", "OP2", "OP1"); |
| 449 | + /* |
| 450 | + * Test exception in call() |
| 451 | + */ |
| 452 | + TestOperationFails.undoOrder = new ArrayList<>(); |
| 453 | + undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS); |
| 454 | + FateId fateId = fate.startTransaction(); |
| 455 | + assertEquals(NEW, getTxStatus(sctx, fateId)); |
| 456 | + fate.seedTransaction("TestOperationFails", fateId, |
| 457 | + new TestOperationFails(1, ExceptionLocation.CALL), false, "Test Op Fails"); |
| 458 | + // Wait for all the undo() calls to complete |
| 459 | + undoLatch.await(); |
| 460 | + assertEquals(expectedUndoOrder, TestOperationFails.undoOrder); |
| 461 | + assertEquals(FAILED, fate.waitForCompletion(fateId)); |
| 462 | + assertTrue(fate.getException(fateId).getMessage().contains("call() failed")); |
| 463 | + /* |
| 464 | + * Test exception in isReady() |
| 465 | + */ |
| 466 | + TestOperationFails.undoOrder = new ArrayList<>(); |
| 467 | + undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS); |
| 468 | + fateId = fate.startTransaction(); |
| 469 | + assertEquals(NEW, getTxStatus(sctx, fateId)); |
| 470 | + fate.seedTransaction("TestOperationFails", fateId, |
| 471 | + new TestOperationFails(1, ExceptionLocation.IS_READY), false, "Test Op Fails"); |
| 472 | + // Wait for all the undo() calls to complete |
| 473 | + undoLatch.await(); |
| 474 | + assertEquals(expectedUndoOrder, TestOperationFails.undoOrder); |
| 475 | + assertEquals(FAILED, fate.waitForCompletion(fateId)); |
| 476 | + assertTrue(fate.getException(fateId).getMessage().contains("isReady() failed")); |
| 477 | + } finally { |
| 478 | + fate.shutdown(10, TimeUnit.MINUTES); |
| 479 | + } |
| 480 | + } |
| 481 | + |
357 | 482 | private void submitDeferred(Fate<TestEnv> fate, ServerContext sctx, Set<FateId> transactions) {
|
358 | 483 | FateId fateId = fate.startTransaction();
|
359 | 484 | transactions.add(fateId);
|
|
0 commit comments