23
23
import static org .apache .accumulo .core .metadata .schema .TabletMetadata .ColumnType .LOCATION ;
24
24
import static org .apache .accumulo .core .metadata .schema .TabletMetadata .ColumnType .PREV_ROW ;
25
25
26
+ import java .time .Duration ;
26
27
import java .util .ArrayList ;
27
28
import java .util .Comparator ;
28
29
import java .util .HashMap ;
63
64
import org .apache .accumulo .core .util .MapCounter ;
64
65
import org .apache .accumulo .core .util .PeekingIterator ;
65
66
import org .apache .accumulo .core .util .TextUtil ;
67
+ import org .apache .accumulo .core .util .Timer ;
66
68
import org .apache .accumulo .manager .Manager ;
67
69
import org .apache .accumulo .manager .tableOps .ManagerRepo ;
68
70
import org .apache .accumulo .server .fs .VolumeManager ;
@@ -92,6 +94,7 @@ public LoadFiles(BulkInfo bulkInfo) {
92
94
93
95
@ Override
94
96
public long isReady (long tid , Manager manager ) throws Exception {
97
+ log .info ("Starting bulk import for {} (tid = {})" , bulkInfo .sourceDir , FateTxId .formatTid (tid ));
95
98
if (manager .onlineTabletServers ().isEmpty ()) {
96
99
log .warn ("There are no tablet server to process bulkDir import, waiting (tid = "
97
100
+ FateTxId .formatTid (tid ) + ")" );
@@ -312,6 +315,16 @@ long finish() throws Exception {
312
315
}
313
316
}
314
317
318
+ /**
319
+ * Stats for the loadFiles method. Helps track wasted time and iterations.
320
+ */
321
+ private static class ImportTimingStats {
322
+ Duration totalWastedTime = Duration .ZERO ;
323
+ long wastedIterations = 0 ;
324
+ long tabletCount = 0 ;
325
+ long callCount = 0 ;
326
+ }
327
+
315
328
/**
316
329
* Make asynchronous load calls to each overlapping Tablet in the bulk mapping. Return a sleep
317
330
* time to isReady based on a factor of the TabletServer with the most Tablets. This method will
@@ -341,20 +354,33 @@ private long loadFiles(TableId tableId, Path bulkDir, LoadMappingIterator loadMa
341
354
342
355
loader .start (bulkDir , manager , tid , bulkInfo .setTime );
343
356
344
- long t1 = System .currentTimeMillis ();
357
+ ImportTimingStats importTimingStats = new ImportTimingStats ();
358
+
359
+ Timer timer = Timer .startNew ();
345
360
while (lmi .hasNext ()) {
346
361
loadMapEntry = lmi .next ();
347
362
List <TabletMetadata > tablets =
348
- findOverlappingTablets (fmtTid , loadMapEntry .getKey (), tabletIter );
363
+ findOverlappingTablets (fmtTid , loadMapEntry .getKey (), tabletIter , importTimingStats );
349
364
loader .load (tablets , loadMapEntry .getValue ());
350
365
}
366
+ Duration totalProcessingTime = timer .elapsed ();
351
367
352
368
log .trace ("{}: Completed Finding Overlapping Tablets" , fmtTid );
353
369
370
+ if (importTimingStats .callCount > 0 ) {
371
+ log .debug (
372
+ "Bulk import stats for {} (tid = {}): processed {} tablets in {} calls which took {}ms ({} nanos). Skipped {} iterations which took {}ms ({} nanos) or {}% of the processing time." ,
373
+ bulkInfo .sourceDir , FateTxId .formatTid (tid ), importTimingStats .tabletCount ,
374
+ importTimingStats .callCount , totalProcessingTime .toMillis (),
375
+ totalProcessingTime .toNanos (), importTimingStats .wastedIterations ,
376
+ importTimingStats .totalWastedTime .toMillis (), importTimingStats .totalWastedTime .toNanos (),
377
+ (importTimingStats .totalWastedTime .toNanos () * 100 ) / totalProcessingTime .toNanos ());
378
+ }
379
+
354
380
long sleepTime = loader .finish ();
355
381
if (sleepTime > 0 ) {
356
382
log .trace ("{}: Tablet Max Sleep is {}" , fmtTid , sleepTime );
357
- long scanTime = Math .min (System . currentTimeMillis () - t1 , 30_000 );
383
+ long scanTime = Math .min (totalProcessingTime . toMillis () , 30_000 );
358
384
log .trace ("{}: Scan time is {}" , fmtTid , scanTime );
359
385
sleepTime = Math .max (sleepTime , scanTime * 2 );
360
386
}
@@ -369,7 +395,7 @@ private long loadFiles(TableId tableId, Path bulkDir, LoadMappingIterator loadMa
369
395
* Find all the tablets within the provided bulk load mapping range.
370
396
*/
371
397
private List <TabletMetadata > findOverlappingTablets (String fmtTid , KeyExtent loadRange ,
372
- Iterator <TabletMetadata > tabletIter ) {
398
+ Iterator <TabletMetadata > tabletIter , ImportTimingStats importTimingStats ) {
373
399
374
400
TabletMetadata currTablet = null ;
375
401
@@ -381,12 +407,18 @@ private List<TabletMetadata> findOverlappingTablets(String fmtTid, KeyExtent loa
381
407
382
408
int cmp ;
383
409
410
+ long wastedIterations = 0 ;
411
+ Timer timer = Timer .startNew ();
412
+
384
413
// skip tablets until we find the prevEndRow of loadRange
385
414
while ((cmp = PREV_COMP .compare (currTablet .getPrevEndRow (), loadRange .prevEndRow ())) < 0 ) {
415
+ wastedIterations ++;
386
416
log .trace ("{}: Skipping tablet: {}" , fmtTid , currTablet .getExtent ());
387
417
currTablet = tabletIter .next ();
388
418
}
389
419
420
+ Duration wastedTime = timer .elapsed ();
421
+
390
422
if (cmp != 0 ) {
391
423
throw new IllegalStateException (
392
424
"Unexpected prev end row " + currTablet .getExtent () + " " + loadRange );
@@ -407,6 +439,11 @@ private List<TabletMetadata> findOverlappingTablets(String fmtTid, KeyExtent loa
407
439
throw new IllegalStateException ("Unexpected end row " + currTablet + " " + loadRange );
408
440
}
409
441
442
+ importTimingStats .wastedIterations += wastedIterations ;
443
+ importTimingStats .totalWastedTime = importTimingStats .totalWastedTime .plus (wastedTime );
444
+ importTimingStats .tabletCount += tablets .size ();
445
+ importTimingStats .callCount ++;
446
+
410
447
return tablets ;
411
448
} catch (NoSuchElementException e ) {
412
449
NoSuchElementException ne2 = new NoSuchElementException (
0 commit comments