23
23
import static org .apache .accumulo .core .metadata .schema .TabletMetadata .ColumnType .LOCATION ;
24
24
import static org .apache .accumulo .core .metadata .schema .TabletMetadata .ColumnType .PREV_ROW ;
25
25
26
+ import java .time .Duration ;
26
27
import java .util .ArrayList ;
27
28
import java .util .Comparator ;
28
29
import java .util .HashMap ;
41
42
import org .apache .accumulo .core .clientImpl .bulk .LoadMappingIterator ;
42
43
import org .apache .accumulo .core .conf .Property ;
43
44
import org .apache .accumulo .core .data .Mutation ;
44
- import org .apache .accumulo .core .data .TableId ;
45
45
import org .apache .accumulo .core .dataImpl .KeyExtent ;
46
46
import org .apache .accumulo .core .dataImpl .thrift .TKeyExtent ;
47
47
import org .apache .accumulo .core .fate .FateTxId ;
64
64
import org .apache .accumulo .core .util .MapCounter ;
65
65
import org .apache .accumulo .core .util .PeekingIterator ;
66
66
import org .apache .accumulo .core .util .TextUtil ;
67
+ import org .apache .accumulo .core .util .Timer ;
67
68
import org .apache .accumulo .manager .Manager ;
68
69
import org .apache .accumulo .manager .tableOps .ManagerRepo ;
69
70
import org .apache .accumulo .server .fs .VolumeManager ;
82
83
*/
83
84
class LoadFiles extends ManagerRepo {
84
85
86
+ // visible for testing
87
+ interface TabletsMetadataFactory {
88
+
89
+ TabletsMetadata newTabletsMetadata (Text startRow );
90
+
91
+ }
92
+
85
93
private static final long serialVersionUID = 1L ;
86
94
87
95
private static final Logger log = LoggerFactory .getLogger (LoadFiles .class );
@@ -94,6 +102,7 @@ public LoadFiles(BulkInfo bulkInfo) {
94
102
95
103
@ Override
96
104
public long isReady (long tid , Manager manager ) throws Exception {
105
+ log .info ("Starting bulk import for {} (tid = {})" , bulkInfo .sourceDir , FateTxId .formatTid (tid ));
97
106
if (manager .onlineTabletServers ().isEmpty ()) {
98
107
log .warn ("There are no tablet server to process bulkDir import, waiting (tid = "
99
108
+ FateTxId .formatTid (tid ) + ")" );
@@ -104,7 +113,19 @@ public long isReady(long tid, Manager manager) throws Exception {
104
113
manager .updateBulkImportStatus (bulkInfo .sourceDir , BulkImportState .LOADING );
105
114
try (LoadMappingIterator lmi =
106
115
BulkSerialize .getUpdatedLoadMapping (bulkDir .toString (), bulkInfo .tableId , fs ::open )) {
107
- return loadFiles (bulkInfo .tableId , bulkDir , lmi , manager , tid );
116
+
117
+ Loader loader ;
118
+ if (bulkInfo .tableState == TableState .ONLINE ) {
119
+ loader = new OnlineLoader ();
120
+ } else {
121
+ loader = new OfflineLoader ();
122
+ }
123
+
124
+ TabletsMetadataFactory tmf = (startRow ) -> TabletsMetadata .builder (manager .getContext ())
125
+ .forTable (bulkInfo .tableId ).overlapping (startRow , null ).checkConsistency ()
126
+ .fetch (PREV_ROW , LOCATION , LOADED ).build ();
127
+
128
+ return loadFiles (loader , bulkInfo , bulkDir , lmi , tmf , manager , tid );
108
129
}
109
130
}
110
131
@@ -117,7 +138,8 @@ public Repo<Manager> call(final long tid, final Manager manager) {
117
138
}
118
139
}
119
140
120
- private abstract static class Loader {
141
+ // visible for testing
142
+ public abstract static class Loader {
121
143
protected Path bulkDir ;
122
144
protected Manager manager ;
123
145
protected long tid ;
@@ -315,14 +337,26 @@ long finish() throws Exception {
315
337
}
316
338
}
317
339
340
+ /**
341
+ * Stats for the loadFiles method. Helps track wasted time and iterations.
342
+ */
343
+ static class ImportTimingStats {
344
+ Duration totalWastedTime = Duration .ZERO ;
345
+ long wastedIterations = 0 ;
346
+ long tabletCount = 0 ;
347
+ long callCount = 0 ;
348
+ }
349
+
318
350
/**
319
351
* Make asynchronous load calls to each overlapping Tablet in the bulk mapping. Return a sleep
320
352
* time to isReady based on a factor of the TabletServer with the most Tablets. This method will
321
353
* scan the metadata table getting Tablet range and location information. It will return 0 when
322
354
* all files have been loaded.
323
355
*/
324
- private long loadFiles (TableId tableId , Path bulkDir , LoadMappingIterator loadMapIter ,
325
- Manager manager , long tid ) throws Exception {
356
+ // visible for testing
357
+ static long loadFiles (Loader loader , BulkInfo bulkInfo , Path bulkDir ,
358
+ LoadMappingIterator loadMapIter , TabletsMetadataFactory factory , Manager manager , long tid )
359
+ throws Exception {
326
360
PeekingIterator <Map .Entry <KeyExtent ,Bulk .Files >> lmi = new PeekingIterator <>(loadMapIter );
327
361
Map .Entry <KeyExtent ,Bulk .Files > loadMapEntry = lmi .peek ();
328
362
@@ -331,33 +365,38 @@ private long loadFiles(TableId tableId, Path bulkDir, LoadMappingIterator loadMa
331
365
String fmtTid = FateTxId .formatTid (tid );
332
366
log .trace ("{}: Starting bulk load at row: {}" , fmtTid , startRow );
333
367
334
- Loader loader ;
335
- if (bulkInfo .tableState == TableState .ONLINE ) {
336
- loader = new OnlineLoader ();
337
- } else {
338
- loader = new OfflineLoader ();
339
- }
340
- long t1 ;
341
368
loader .start (bulkDir , manager , tid , bulkInfo .setTime );
342
- try (TabletsMetadata tabletsMetadata =
343
- TabletsMetadata .builder (manager .getContext ()).forTable (tableId ).overlapping (startRow , null )
344
- .checkConsistency ().fetch (PREV_ROW , LOCATION , LOADED ).build ()) {
345
369
346
- t1 = System .currentTimeMillis ();
370
+ ImportTimingStats importTimingStats = new ImportTimingStats ();
371
+ Timer timer = Timer .startNew ();
372
+ try (TabletsMetadata tabletsMetadata = factory .newTabletsMetadata (startRow )) {
373
+
374
+ Iterator <TabletMetadata > tabletIter = tabletsMetadata .iterator ();
347
375
while (lmi .hasNext ()) {
348
376
loadMapEntry = lmi .next ();
349
377
List <TabletMetadata > tablets =
350
- findOverlappingTablets (fmtTid , loadMapEntry .getKey (), tabletsMetadata . iterator () );
378
+ findOverlappingTablets (fmtTid , loadMapEntry .getKey (), tabletIter , importTimingStats );
351
379
loader .load (tablets , loadMapEntry .getValue ());
352
380
}
353
381
}
382
+ Duration totalProcessingTime = timer .elapsed ();
354
383
355
384
log .trace ("{}: Completed Finding Overlapping Tablets" , fmtTid );
356
385
386
+ if (importTimingStats .callCount > 0 ) {
387
+ log .debug (
388
+ "Bulk import stats for {} (tid = {}): processed {} tablets in {} calls which took {}ms ({} nanos). Skipped {} iterations which took {}ms ({} nanos) or {}% of the processing time." ,
389
+ bulkInfo .sourceDir , FateTxId .formatTid (tid ), importTimingStats .tabletCount ,
390
+ importTimingStats .callCount , totalProcessingTime .toMillis (),
391
+ totalProcessingTime .toNanos (), importTimingStats .wastedIterations ,
392
+ importTimingStats .totalWastedTime .toMillis (), importTimingStats .totalWastedTime .toNanos (),
393
+ (importTimingStats .totalWastedTime .toNanos () * 100 ) / totalProcessingTime .toNanos ());
394
+ }
395
+
357
396
long sleepTime = loader .finish ();
358
397
if (sleepTime > 0 ) {
359
398
log .trace ("{}: Tablet Max Sleep is {}" , fmtTid , sleepTime );
360
- long scanTime = Math .min (System . currentTimeMillis () - t1 , 30_000 );
399
+ long scanTime = Math .min (totalProcessingTime . toMillis () , 30_000 );
361
400
log .trace ("{}: Scan time is {}" , fmtTid , scanTime );
362
401
sleepTime = Math .max (sleepTime , scanTime * 2 );
363
402
}
@@ -371,8 +410,9 @@ private long loadFiles(TableId tableId, Path bulkDir, LoadMappingIterator loadMa
371
410
/**
372
411
* Find all the tablets within the provided bulk load mapping range.
373
412
*/
374
- private List <TabletMetadata > findOverlappingTablets (String fmtTid , KeyExtent loadRange ,
375
- Iterator <TabletMetadata > tabletIter ) {
413
+ // visible for testing
414
+ static List <TabletMetadata > findOverlappingTablets (String fmtTid , KeyExtent loadRange ,
415
+ Iterator <TabletMetadata > tabletIter , ImportTimingStats importTimingStats ) {
376
416
377
417
TabletMetadata currTablet = null ;
378
418
@@ -384,12 +424,18 @@ private List<TabletMetadata> findOverlappingTablets(String fmtTid, KeyExtent loa
384
424
385
425
int cmp ;
386
426
427
+ long wastedIterations = 0 ;
428
+ Timer timer = Timer .startNew ();
429
+
387
430
// skip tablets until we find the prevEndRow of loadRange
388
431
while ((cmp = PREV_COMP .compare (currTablet .getPrevEndRow (), loadRange .prevEndRow ())) < 0 ) {
432
+ wastedIterations ++;
389
433
log .trace ("{}: Skipping tablet: {}" , fmtTid , currTablet .getExtent ());
390
434
currTablet = tabletIter .next ();
391
435
}
392
436
437
+ Duration wastedTime = timer .elapsed ();
438
+
393
439
if (cmp != 0 ) {
394
440
throw new IllegalStateException (
395
441
"Unexpected prev end row " + currTablet .getExtent () + " " + loadRange );
@@ -410,6 +456,11 @@ private List<TabletMetadata> findOverlappingTablets(String fmtTid, KeyExtent loa
410
456
throw new IllegalStateException ("Unexpected end row " + currTablet + " " + loadRange );
411
457
}
412
458
459
+ importTimingStats .wastedIterations += wastedIterations ;
460
+ importTimingStats .totalWastedTime = importTimingStats .totalWastedTime .plus (wastedTime );
461
+ importTimingStats .tabletCount += tablets .size ();
462
+ importTimingStats .callCount ++;
463
+
413
464
return tablets ;
414
465
} catch (NoSuchElementException e ) {
415
466
NoSuchElementException ne2 = new NoSuchElementException (
0 commit comments