26
26
27
27
import java .time .Duration ;
28
28
import java .util .AbstractMap ;
29
+ import java .util .ArrayList ;
29
30
import java .util .Iterator ;
31
+ import java .util .List ;
32
+ import java .util .Map ;
30
33
import java .util .Map .Entry ;
31
34
import java .util .Set ;
32
35
import java .util .SortedMap ;
54
57
import org .apache .accumulo .core .fate .Repo ;
55
58
import org .apache .accumulo .harness .SharedMiniClusterBase ;
56
59
import org .apache .accumulo .server .ServerContext ;
57
- import org .apache .accumulo .test .fate .FateTestRunner .TestEnv ;
58
60
import org .apache .hadoop .io .Text ;
59
61
import org .junit .jupiter .api .AfterAll ;
60
62
import org .junit .jupiter .api .BeforeAll ;
@@ -101,12 +103,17 @@ protected static void insertTrackingData(FateId tid, FilTestEnv env, String step
101
103
102
104
@ Override
103
105
public long isReady (FateId tid , FilTestEnv env ) throws Exception {
106
+ // First call to isReady will return that it's not ready (defer time of 100ms), inserting
107
+ // the data 'isReady1' so we know isReady was called once. The second attempt (after the
108
+ // deferral time) will pass as ready (return 0) and insert the data 'isReady2' so we know
109
+ // the second call to isReady was made
104
110
Thread .sleep (50 );
105
111
var step = this .getName () + "::isReady" ;
106
- if (isTrackingDataSet (tid , env , step )) {
112
+ if (isTrackingDataSet (tid , env , step + "1" )) {
113
+ insertTrackingData (tid , env , step + "2" );
107
114
return 0 ;
108
115
} else {
109
- insertTrackingData (tid , env , step );
116
+ insertTrackingData (tid , env , step + "1" );
110
117
return 100 ;
111
118
}
112
119
}
@@ -192,9 +199,9 @@ protected Fate<FilTestEnv> initializeFate(AccumuloClient client, FateStore<FilTe
192
199
return new Fate <>(new FilTestEnv (client ), store , false , r -> r + "" , config );
193
200
}
194
201
195
- private static Entry <String ,String > toIdStep (Entry <Key ,Value > e ) {
196
- return new AbstractMap .SimpleImmutableEntry <>(e . getKey (). getColumnFamily (). toString (),
197
- e .getValue ().toString ());
202
+ private static Entry <FateId ,String > toIdStep (Entry <Key ,Value > e ) {
203
+ return new AbstractMap .SimpleImmutableEntry <>(
204
+ FateId . from ( e . getKey (). getColumnFamily (). toString ()), e .getValue ().toString ());
198
205
}
199
206
200
207
@ Test
@@ -208,9 +215,10 @@ protected void testInterleaving(FateStore<FilTestEnv> store, ServerContext sctx)
208
215
// This test verifies that fates will interleave in time when their isReady() returns >0 and
209
216
// then 0.
210
217
211
- FateId [] fateIds = new FateId [3 ];
218
+ final int numFateIds = 3 ;
219
+ FateId [] fateIds = new FateId [numFateIds ];
212
220
213
- for (int i = 0 ; i < 3 ; i ++) {
221
+ for (int i = 0 ; i < numFateIds ; i ++) {
214
222
fateIds [i ] = store .create ();
215
223
var txStore = store .reserve (fateIds [i ]);
216
224
try {
@@ -235,38 +243,50 @@ protected void testInterleaving(FateStore<FilTestEnv> store, ServerContext sctx)
235
243
waitFor (store , fateId );
236
244
}
237
245
238
- var expectedIds =
239
- Set .of (fateIds [0 ].canonical (), fateIds [1 ].canonical (), fateIds [2 ].canonical ());
240
-
241
246
Scanner scanner = client .createScanner (FATE_TRACKING_TABLE );
242
- Iterator <Entry <String ,String >> iter = scanner .stream ().map (FateInterleavingIT ::toIdStep )
243
- .filter (e -> e .getValue ().contains ("::call" )).iterator ();
244
-
245
- SortedMap <String ,String > subset = new TreeMap <>();
246
-
247
- Iterators .limit (iter , 3 ).forEachRemaining (e -> subset .put (e .getKey (), e .getValue ()));
248
-
249
- // Should see the call() for the first steps of all three fates come first in time
250
- assertTrue (subset .values ().stream ().allMatch (v -> v .startsWith ("FirstOp" )));
251
- assertEquals (expectedIds , subset .keySet ());
252
-
253
- subset .clear ();
254
-
255
- Iterators .limit (iter , 3 ).forEachRemaining (e -> subset .put (e .getKey (), e .getValue ()));
256
-
257
- // Should see the call() for the second steps of all three fates come second in time
258
- assertTrue (subset .values ().stream ().allMatch (v -> v .startsWith ("SecondOp" )));
259
- assertEquals (expectedIds , subset .keySet ());
260
-
261
- subset .clear ();
262
-
263
- Iterators .limit (iter , 3 ).forEachRemaining (e -> subset .put (e .getKey (), e .getValue ()));
264
-
265
- // Should see the call() for the last steps of all three fates come last in time
266
- assertTrue (subset .values ().stream ().allMatch (v -> v .startsWith ("LastOp" )));
267
- assertEquals (expectedIds , subset .keySet ());
247
+ var iter = scanner .stream ().map (FateInterleavingIT ::toIdStep ).iterator ();
248
+
249
+ // we should see the following execution order for all fate ids:
250
+ // FirstOp::isReady1, FirstOp::isReady2, FirstOp::call,
251
+ // SecondOp::isReady1, SecondOp::isReady2, SecondOp::call,
252
+ // LastOp::isReady1, LastOp::isReady2, LastOp::call
253
+ // the first isReady of each op will defer the op to be executed later, allowing for the FATE
254
+ // thread to interleave and work on another fate id, but may not always interleave.
255
+ // It is unlikely that the FATE will not interleave at least once in a run, so we will check
256
+ // for at least one occurrence.
257
+ int interleaves = 0 ;
258
+ int i = 0 ;
259
+ Map .Entry <FateId ,String > prevOp = null ;
260
+ var expRunOrder = List .of ("FirstOp::isReady1" , "FirstOp::isReady2" , "FirstOp::call" ,
261
+ "SecondOp::isReady1" , "SecondOp::isReady2" , "SecondOp::call" , "LastOp::isReady1" ,
262
+ "LastOp::isReady2" , "LastOp::call" );
263
+ var fateIdsToExpRunOrder = Map .of (fateIds [0 ], new ArrayList <>(expRunOrder ), fateIds [1 ],
264
+ new ArrayList <>(expRunOrder ), fateIds [2 ], new ArrayList <>(expRunOrder ));
265
+
266
+ while (iter .hasNext ()) {
267
+ var currOp = iter .next ();
268
+ FateId fateId = currOp .getKey ();
269
+ String currStep = currOp .getValue ();
270
+ var expRunOrderFateId = fateIdsToExpRunOrder .get (fateId );
271
+
272
+ // An interleave occurred if we do not see <FateIdX, OpN::isReady2> immediately after
273
+ // <FateIdX, OpN::isReady1>
274
+ if (prevOp != null && prevOp .getValue ().contains ("isReady1" )
275
+ && !currOp .equals (new AbstractMap .SimpleImmutableEntry <>(prevOp .getKey (),
276
+ prevOp .getValue ().replace ('1' , '2' )))) {
277
+ interleaves ++;
278
+ }
279
+ assertEquals (currStep , expRunOrderFateId .remove (0 ));
280
+ prevOp = currOp ;
281
+ i ++;
282
+ }
268
283
269
- assertFalse (iter .hasNext ());
284
+ assertTrue (interleaves > 0 );
285
+ assertEquals (i , expRunOrder .size () * numFateIds );
286
+ assertEquals (numFateIds , fateIdsToExpRunOrder .size ());
287
+ for (var expRunOrderFateId : fateIdsToExpRunOrder .values ()) {
288
+ assertTrue (expRunOrderFateId .isEmpty ());
289
+ }
270
290
271
291
} finally {
272
292
if (fate != null ) {
@@ -329,9 +349,10 @@ protected void testNonInterleaving(FateStore<FilTestEnv> store, ServerContext sc
329
349
// This test ensures that when isReady() always returns zero that all the fate steps will
330
350
// execute immediately
331
351
332
- FateId [] fateIds = new FateId [3 ];
352
+ final int numFateIds = 3 ;
353
+ FateId [] fateIds = new FateId [numFateIds ];
333
354
334
- for (int i = 0 ; i < 3 ; i ++) {
355
+ for (int i = 0 ; i < numFateIds ; i ++) {
335
356
fateIds [i ] = store .create ();
336
357
var txStore = store .reserve (fateIds [i ]);
337
358
try {
@@ -386,10 +407,10 @@ private FateId verifySameIds(Iterator<Entry<Key,Value>> iter, SortedMap<Key,Valu
386
407
Text fateId = subset .keySet ().iterator ().next ().getColumnFamily ();
387
408
assertTrue (subset .keySet ().stream ().allMatch (k -> k .getColumnFamily ().equals (fateId )));
388
409
389
- var expectedVals = Set .of ("FirstNonInterleavingOp::isReady" , "FirstNonInterleavingOp::call" ,
410
+ var expectedVals = List .of ("FirstNonInterleavingOp::isReady" , "FirstNonInterleavingOp::call" ,
390
411
"SecondNonInterleavingOp::isReady" , "SecondNonInterleavingOp::call" ,
391
412
"LastNonInterleavingOp::isReady" , "LastNonInterleavingOp::call" );
392
- var actualVals = subset .values ().stream ().map (Value ::toString ).collect (Collectors .toSet ());
413
+ var actualVals = subset .values ().stream ().map (Value ::toString ).collect (Collectors .toList ());
393
414
assertEquals (expectedVals , actualVals );
394
415
395
416
return FateId .from (fateId .toString ());
0 commit comments