25
25
import static org .apache .accumulo .core .metadata .schema .TabletMetadata .ColumnType .TIME ;
26
26
import static org .apache .accumulo .core .metadata .schema .TabletMetadata .LocationType .CURRENT ;
27
27
28
+ import java .time .Duration ;
28
29
import java .util .ArrayList ;
29
30
import java .util .Collections ;
30
31
import java .util .Comparator ;
63
64
import org .apache .accumulo .core .tabletserver .thrift .TabletServerClientService ;
64
65
import org .apache .accumulo .core .trace .TraceUtil ;
65
66
import org .apache .accumulo .core .util .PeekingIterator ;
67
+ import org .apache .accumulo .core .util .Timer ;
66
68
import org .apache .accumulo .manager .Manager ;
67
69
import org .apache .accumulo .manager .tableOps .ManagerRepo ;
68
70
import org .apache .accumulo .server .fs .VolumeManager ;
82
84
*/
83
85
class LoadFiles extends ManagerRepo {
84
86
87
+ // visible for testing
88
+ interface TabletsMetadataFactory {
89
+
90
+ TabletsMetadata newTabletsMetadata (Text startRow );
91
+
92
+ }
93
+
85
94
private static final long serialVersionUID = 1L ;
86
95
87
96
private static final Logger log = LoggerFactory .getLogger (LoadFiles .class );
@@ -94,6 +103,7 @@ public LoadFiles(BulkInfo bulkInfo) {
94
103
95
104
@ Override
96
105
public long isReady (FateId fateId , Manager manager ) throws Exception {
106
+ log .info ("Starting bulk import for {} (tid = {})" , bulkInfo .sourceDir , fateId );
97
107
if (manager .onlineTabletServers ().isEmpty ()) {
98
108
log .warn ("There are no tablet server to process bulkDir import, waiting (fateId = " + fateId
99
109
+ ")" );
@@ -104,7 +114,19 @@ public long isReady(FateId fateId, Manager manager) throws Exception {
104
114
manager .updateBulkImportStatus (bulkInfo .sourceDir , BulkImportState .LOADING );
105
115
try (LoadMappingIterator lmi =
106
116
BulkSerialize .getUpdatedLoadMapping (bulkDir .toString (), bulkInfo .tableId , fs ::open )) {
107
- return loadFiles (bulkInfo .tableId , bulkDir , lmi , manager , fateId );
117
+
118
+ Loader loader = new Loader ();
119
+
120
+ List <ColumnType > fetchCols = new ArrayList <>(List .of (PREV_ROW , LOCATION , LOADED , TIME ));
121
+ if (loader .pauseLimit > 0 ) {
122
+ fetchCols .add (FILES );
123
+ }
124
+
125
+ TabletsMetadataFactory tmf = (startRow ) -> TabletsMetadata .builder (manager .getContext ())
126
+ .forTable (bulkInfo .tableId ).overlapping (startRow , null ).checkConsistency ()
127
+ .fetch (fetchCols .toArray (new ColumnType [0 ])).build ();
128
+
129
+ return loadFiles (loader , bulkInfo , bulkDir , lmi , tmf , manager , fateId );
108
130
}
109
131
}
110
132
@@ -113,7 +135,8 @@ public Repo<Manager> call(final FateId fateId, final Manager manager) {
113
135
return new RefreshTablets (bulkInfo );
114
136
}
115
137
116
- private static class Loader {
138
+ // visible for testing
139
+ public static class Loader {
117
140
protected Path bulkDir ;
118
141
protected Manager manager ;
119
142
protected FateId fateId ;
@@ -344,14 +367,26 @@ long finish() {
344
367
}
345
368
}
346
369
370
+ /**
371
+ * Stats for the loadFiles method. Helps track wasted time and iterations.
372
+ */
373
+ static class ImportTimingStats {
374
+ Duration totalWastedTime = Duration .ZERO ;
375
+ long wastedIterations = 0 ;
376
+ long tabletCount = 0 ;
377
+ long callCount = 0 ;
378
+ }
379
+
347
380
/**
348
381
* Make asynchronous load calls to each overlapping Tablet in the bulk mapping. Return a sleep
349
382
* time to isReady based on a factor of the TabletServer with the most Tablets. This method will
350
383
* scan the metadata table getting Tablet range and location information. It will return 0 when
351
384
* all files have been loaded.
352
385
*/
353
- private long loadFiles (TableId tableId , Path bulkDir , LoadMappingIterator loadMapIter ,
354
- Manager manager , FateId fateId ) throws Exception {
386
+ // visible for testing
387
+ static long loadFiles (Loader loader , BulkInfo bulkInfo , Path bulkDir ,
388
+ LoadMappingIterator loadMapIter , TabletsMetadataFactory factory , Manager manager ,
389
+ FateId fateId ) throws Exception {
355
390
PeekingIterator <Map .Entry <KeyExtent ,Bulk .Files >> lmi = new PeekingIterator <>(loadMapIter );
356
391
Map .Entry <KeyExtent ,Bulk .Files > loadMapEntry = lmi .peek ();
357
392
@@ -360,39 +395,41 @@ private long loadFiles(TableId tableId, Path bulkDir, LoadMappingIterator loadMa
360
395
String fmtTid = fateId .getTxUUIDStr ();
361
396
log .trace ("{}: Starting bulk load at row: {}" , fmtTid , startRow );
362
397
363
- Loader loader = new Loader ();
364
- long t1 ;
365
- loader .start (bulkDir , manager , tableId , fateId , bulkInfo .setTime );
398
+ loader .start (bulkDir , manager , bulkInfo .tableId , fateId , bulkInfo .setTime );
366
399
367
- List <ColumnType > fetchCols = new ArrayList <>(List .of (PREV_ROW , LOCATION , LOADED , TIME ));
368
- if (loader .pauseLimit > 0 ) {
369
- fetchCols .add (FILES );
370
- }
371
-
372
- try (TabletsMetadata tabletsMetadata =
373
- TabletsMetadata .builder (manager .getContext ()).forTable (tableId ).overlapping (startRow , null )
374
- .checkConsistency ().fetch (fetchCols .toArray (new ColumnType [0 ])).build ()) {
400
+ ImportTimingStats importTimingStats = new ImportTimingStats ();
401
+ Timer timer = Timer .startNew ();
402
+ try (TabletsMetadata tabletsMetadata = factory .newTabletsMetadata (startRow )) {
375
403
376
404
// The tablet iterator and load mapping iterator are both iterating over data that is sorted
377
405
// in the same way. The two iterators are each independently advanced to find common points in
378
406
// the sorted data.
379
- var tabletIter = tabletsMetadata .iterator ();
380
-
381
- t1 = System .currentTimeMillis ();
407
+ Iterator <TabletMetadata > tabletIter = tabletsMetadata .iterator ();
382
408
while (lmi .hasNext ()) {
383
409
loadMapEntry = lmi .next ();
384
410
List <TabletMetadata > tablets =
385
- findOverlappingTablets (fmtTid , loadMapEntry .getKey (), tabletIter );
411
+ findOverlappingTablets (fmtTid , loadMapEntry .getKey (), tabletIter , importTimingStats );
386
412
loader .load (tablets , loadMapEntry .getValue ());
387
413
}
388
414
}
415
+ Duration totalProcessingTime = timer .elapsed ();
389
416
390
417
log .trace ("{}: Completed Finding Overlapping Tablets" , fmtTid );
391
418
419
+ if (importTimingStats .callCount > 0 ) {
420
+ log .debug (
421
+ "Bulk import stats for {} (tid = {}): processed {} tablets in {} calls which took {}ms ({} nanos). Skipped {} iterations which took {}ms ({} nanos) or {}% of the processing time." ,
422
+ bulkInfo .sourceDir , fateId , importTimingStats .tabletCount , importTimingStats .callCount ,
423
+ totalProcessingTime .toMillis (), totalProcessingTime .toNanos (),
424
+ importTimingStats .wastedIterations , importTimingStats .totalWastedTime .toMillis (),
425
+ importTimingStats .totalWastedTime .toNanos (),
426
+ (importTimingStats .totalWastedTime .toNanos () * 100 ) / totalProcessingTime .toNanos ());
427
+ }
428
+
392
429
long sleepTime = loader .finish ();
393
430
if (sleepTime > 0 ) {
394
431
log .trace ("{}: Tablet Max Sleep is {}" , fmtTid , sleepTime );
395
- long scanTime = Math .min (System . currentTimeMillis () - t1 , 30_000 );
432
+ long scanTime = Math .min (totalProcessingTime . toMillis () , 30_000 );
396
433
log .trace ("{}: Scan time is {}" , fmtTid , scanTime );
397
434
sleepTime = Math .max (sleepTime , scanTime * 2 );
398
435
}
@@ -406,8 +443,9 @@ private long loadFiles(TableId tableId, Path bulkDir, LoadMappingIterator loadMa
406
443
/**
407
444
* Find all the tablets within the provided bulk load mapping range.
408
445
*/
409
- private List <TabletMetadata > findOverlappingTablets (String fmtTid , KeyExtent loadRange ,
410
- Iterator <TabletMetadata > tabletIter ) {
446
+ // visible for testing
447
+ static List <TabletMetadata > findOverlappingTablets (String fmtTid , KeyExtent loadRange ,
448
+ Iterator <TabletMetadata > tabletIter , ImportTimingStats importTimingStats ) {
411
449
412
450
TabletMetadata currTablet = null ;
413
451
@@ -419,12 +457,18 @@ private List<TabletMetadata> findOverlappingTablets(String fmtTid, KeyExtent loa
419
457
420
458
int cmp ;
421
459
460
+ long wastedIterations = 0 ;
461
+ Timer timer = Timer .startNew ();
462
+
422
463
// skip tablets until we find the prevEndRow of loadRange
423
464
while ((cmp = PREV_COMP .compare (currTablet .getPrevEndRow (), loadRange .prevEndRow ())) < 0 ) {
465
+ wastedIterations ++;
424
466
log .trace ("{}: Skipping tablet: {}" , fmtTid , currTablet .getExtent ());
425
467
currTablet = tabletIter .next ();
426
468
}
427
469
470
+ Duration wastedTime = timer .elapsed ();
471
+
428
472
if (cmp != 0 ) {
429
473
throw new IllegalStateException (
430
474
"Unexpected prev end row " + currTablet .getExtent () + " " + loadRange );
@@ -445,6 +489,11 @@ private List<TabletMetadata> findOverlappingTablets(String fmtTid, KeyExtent loa
445
489
throw new IllegalStateException ("Unexpected end row " + currTablet + " " + loadRange );
446
490
}
447
491
492
+ importTimingStats .wastedIterations += wastedIterations ;
493
+ importTimingStats .totalWastedTime = importTimingStats .totalWastedTime .plus (wastedTime );
494
+ importTimingStats .tabletCount += tablets .size ();
495
+ importTimingStats .callCount ++;
496
+
448
497
return tablets ;
449
498
} catch (NoSuchElementException e ) {
450
499
NoSuchElementException ne2 = new NoSuchElementException (
0 commit comments