35
35
import java .util .SortedMap ;
36
36
import java .util .TreeMap ;
37
37
import java .util .concurrent .TimeUnit ;
38
+ import java .util .concurrent .atomic .AtomicInteger ;
39
+ import java .util .concurrent .atomic .AtomicReference ;
38
40
import java .util .stream .Collectors ;
39
41
40
42
import org .apache .accumulo .core .client .Accumulo ;
@@ -243,9 +245,6 @@ protected void testInterleaving(FateStore<FilTestEnv> store, ServerContext sctx)
243
245
waitFor (store , fateId );
244
246
}
245
247
246
- Scanner scanner = client .createScanner (FATE_TRACKING_TABLE );
247
- var iter = scanner .stream ().map (FateInterleavingIT ::toIdStep ).iterator ();
248
-
249
248
// we should see the following execution order for all fate ids:
250
249
// FirstOp::isReady1, FirstOp::isReady2, FirstOp::call,
251
250
// SecondOp::isReady1, SecondOp::isReady2, SecondOp::call,
@@ -254,35 +253,37 @@ protected void testInterleaving(FateStore<FilTestEnv> store, ServerContext sctx)
254
253
// thread to interleave and work on another fate id, but may not always interleave.
255
254
// It is unlikely that the FATE will not interleave at least once in a run, so we will check
256
255
// for at least one occurrence.
257
- int interleaves = 0 ;
258
- int i = 0 ;
259
- Map .Entry <FateId ,String > prevOp = null ;
256
+ final AtomicInteger interleaves = new AtomicInteger () ;
257
+ final AtomicInteger i = new AtomicInteger () ;
258
+ final AtomicReference < Map .Entry <FateId ,String >> prevOp = new AtomicReference <>( null ) ;
260
259
var expRunOrder = List .of ("FirstOp::isReady1" , "FirstOp::isReady2" , "FirstOp::call" ,
261
260
"SecondOp::isReady1" , "SecondOp::isReady2" , "SecondOp::call" , "LastOp::isReady1" ,
262
261
"LastOp::isReady2" , "LastOp::call" );
263
262
var fateIdsToExpRunOrder = Map .of (fateIds [0 ], new ArrayList <>(expRunOrder ), fateIds [1 ],
264
263
new ArrayList <>(expRunOrder ), fateIds [2 ], new ArrayList <>(expRunOrder ));
265
264
266
- while (iter .hasNext ()) {
267
- var currOp = iter .next ();
268
- FateId fateId = currOp .getKey ();
269
- String currStep = currOp .getValue ();
270
- var expRunOrderFateId = fateIdsToExpRunOrder .get (fateId );
271
-
272
- // An interleave occurred if we do not see <FateIdX, OpN::isReady2> immediately after
273
- // <FateIdX, OpN::isReady1>
274
- if (prevOp != null && prevOp .getValue ().contains ("isReady1" )
275
- && !currOp .equals (new AbstractMap .SimpleImmutableEntry <>(prevOp .getKey (),
276
- prevOp .getValue ().replace ('1' , '2' )))) {
277
- interleaves ++;
278
- }
279
- assertEquals (currStep , expRunOrderFateId .remove (0 ));
280
- prevOp = currOp ;
281
- i ++;
265
+ try (Scanner scanner = client .createScanner (FATE_TRACKING_TABLE )) {
266
+ scanner .stream ().map (FateInterleavingIT ::toIdStep ).forEach (currOp -> {
267
+ FateId fateId = currOp .getKey ();
268
+ String currStep = currOp .getValue ();
269
+ var expRunOrderFateId = fateIdsToExpRunOrder .get (fateId );
270
+
271
+ // An interleave occurred if we do not see <FateIdX, OpN::isReady2> immediately after
272
+ // <FateIdX, OpN::isReady1>
273
+ Entry <FateId ,String > fateIdStringEntry = prevOp .get ();
274
+ if (fateIdStringEntry != null && fateIdStringEntry .getValue ().contains ("isReady1" )
275
+ && !currOp .equals (new AbstractMap .SimpleImmutableEntry <>(fateIdStringEntry .getKey (),
276
+ fateIdStringEntry .getValue ().replace ('1' , '2' )))) {
277
+ interleaves .incrementAndGet ();
278
+ }
279
+ assertEquals (currStep , expRunOrderFateId .remove (0 ));
280
+ prevOp .set (currOp );
281
+ i .incrementAndGet ();
282
+ });
282
283
}
283
284
284
- assertTrue (interleaves > 0 );
285
- assertEquals (i , expRunOrder .size () * numFateIds );
285
+ assertTrue (interleaves . get () > 0 );
286
+ assertEquals (expRunOrder .size () * numFateIds , i . get () );
286
287
assertEquals (numFateIds , fateIdsToExpRunOrder .size ());
287
288
for (var expRunOrderFateId : fateIdsToExpRunOrder .values ()) {
288
289
assertTrue (expRunOrderFateId .isEmpty ());
@@ -377,21 +378,22 @@ protected void testNonInterleaving(FateStore<FilTestEnv> store, ServerContext sc
377
378
waitFor (store , fateId );
378
379
}
379
380
380
- Scanner scanner = client .createScanner (FATE_TRACKING_TABLE );
381
- Iterator <Entry <Key ,Value >> iter = scanner .iterator ();
381
+ try ( Scanner scanner = client .createScanner (FATE_TRACKING_TABLE )) {
382
+ Iterator <Entry <Key ,Value >> iter = scanner .iterator ();
382
383
383
- SortedMap <Key ,Value > subset = new TreeMap <>();
384
+ SortedMap <Key ,Value > subset = new TreeMap <>();
384
385
385
- // should see one fate op execute all of it steps
386
- var seenId1 = verifySameIds (iter , subset );
387
- // should see another fate op execute all of it steps
388
- var seenId2 = verifySameIds (iter , subset );
389
- // should see another fate op execute all of it steps
390
- var seenId3 = verifySameIds (iter , subset );
386
+ // should see one fate op execute all of it steps
387
+ var seenId1 = verifySameIds (iter , subset );
388
+ // should see another fate op execute all of it steps
389
+ var seenId2 = verifySameIds (iter , subset );
390
+ // should see another fate op execute all of it steps
391
+ var seenId3 = verifySameIds (iter , subset );
391
392
392
- assertEquals (Set .of (fateIds [0 ], fateIds [1 ], fateIds [2 ]), Set .of (seenId1 , seenId2 , seenId3 ));
393
+ assertEquals (Set .of (fateIds [0 ], fateIds [1 ], fateIds [2 ]), Set .of (seenId1 , seenId2 , seenId3 ));
393
394
394
- assertFalse (iter .hasNext ());
395
+ assertFalse (iter .hasNext ());
396
+ }
395
397
396
398
} finally {
397
399
if (fate != null ) {
@@ -407,10 +409,10 @@ private FateId verifySameIds(Iterator<Entry<Key,Value>> iter, SortedMap<Key,Valu
407
409
Text fateId = subset .keySet ().iterator ().next ().getColumnFamily ();
408
410
assertTrue (subset .keySet ().stream ().allMatch (k -> k .getColumnFamily ().equals (fateId )));
409
411
410
- var expectedVals = List .of ("FirstNonInterleavingOp::isReady" , "FirstNonInterleavingOp::call" ,
412
+ var expectedVals = Set .of ("FirstNonInterleavingOp::isReady" , "FirstNonInterleavingOp::call" ,
411
413
"SecondNonInterleavingOp::isReady" , "SecondNonInterleavingOp::call" ,
412
414
"LastNonInterleavingOp::isReady" , "LastNonInterleavingOp::call" );
413
- var actualVals = subset .values ().stream ().map (Value ::toString ).collect (Collectors .toList ());
415
+ var actualVals = subset .values ().stream ().map (Value ::toString ).collect (Collectors .toSet ());
414
416
assertEquals (expectedVals , actualVals );
415
417
416
418
return FateId .from (fateId .toString ());
0 commit comments