41
41
import org .apache .accumulo .core .clientImpl .bulk .LoadMappingIterator ;
42
42
import org .apache .accumulo .core .conf .Property ;
43
43
import org .apache .accumulo .core .data .Mutation ;
44
- import org .apache .accumulo .core .data .TableId ;
45
44
import org .apache .accumulo .core .dataImpl .KeyExtent ;
46
45
import org .apache .accumulo .core .dataImpl .thrift .MapFileInfo ;
47
46
import org .apache .accumulo .core .dataImpl .thrift .TKeyExtent ;
82
81
*/
83
82
class LoadFiles extends ManagerRepo {
84
83
84
+ // visible for testing
85
+ interface TabletsMetadataFactory {
86
+
87
+ TabletsMetadata newTabletsMetadata (Text startRow );
88
+
89
+ }
90
+
85
91
private static final long serialVersionUID = 1L ;
86
92
87
93
private static final Logger log = LoggerFactory .getLogger (LoadFiles .class );
@@ -105,7 +111,19 @@ public long isReady(long tid, Manager manager) throws Exception {
105
111
manager .updateBulkImportStatus (bulkInfo .sourceDir , BulkImportState .LOADING );
106
112
try (LoadMappingIterator lmi =
107
113
BulkSerialize .getUpdatedLoadMapping (bulkDir .toString (), bulkInfo .tableId , fs ::open )) {
108
- return loadFiles (bulkInfo .tableId , bulkDir , lmi , manager , tid );
114
+
115
+ Loader loader ;
116
+ if (bulkInfo .tableState == TableState .ONLINE ) {
117
+ loader = new OnlineLoader ();
118
+ } else {
119
+ loader = new OfflineLoader ();
120
+ }
121
+
122
+ TabletsMetadataFactory tmf = (startRow ) -> TabletsMetadata .builder (manager .getContext ())
123
+ .forTable (bulkInfo .tableId ).overlapping (startRow , null ).checkConsistency ()
124
+ .fetch (PREV_ROW , LOCATION , LOADED ).build ();
125
+
126
+ return loadFiles (loader , bulkInfo , bulkDir , lmi , tmf , manager , tid );
109
127
}
110
128
}
111
129
@@ -118,7 +136,8 @@ public Repo<Manager> call(final long tid, final Manager manager) {
118
136
}
119
137
}
120
138
121
- private abstract static class Loader {
139
+ // visible for testing
140
+ public abstract static class Loader {
122
141
protected Path bulkDir ;
123
142
protected Manager manager ;
124
143
protected long tid ;
@@ -318,7 +337,7 @@ long finish() throws Exception {
318
337
/**
319
338
* Stats for the loadFiles method. Helps track wasted time and iterations.
320
339
*/
321
- private static class ImportTimingStats {
340
+ static class ImportTimingStats {
322
341
Duration totalWastedTime = Duration .ZERO ;
323
342
long wastedIterations = 0 ;
324
343
long tabletCount = 0 ;
@@ -331,8 +350,10 @@ private static class ImportTimingStats {
331
350
* scan the metadata table getting Tablet range and location information. It will return 0 when
332
351
* all files have been loaded.
333
352
*/
334
- private long loadFiles (TableId tableId , Path bulkDir , LoadMappingIterator loadMapIter ,
335
- Manager manager , long tid ) throws Exception {
353
+ // visible for testing
354
+ static long loadFiles (Loader loader , BulkInfo bulkInfo , Path bulkDir ,
355
+ LoadMappingIterator loadMapIter , TabletsMetadataFactory factory , Manager manager , long tid )
356
+ throws Exception {
336
357
PeekingIterator <Map .Entry <KeyExtent ,Bulk .Files >> lmi = new PeekingIterator <>(loadMapIter );
337
358
Map .Entry <KeyExtent ,Bulk .Files > loadMapEntry = lmi .peek ();
338
359
@@ -341,27 +362,19 @@ private long loadFiles(TableId tableId, Path bulkDir, LoadMappingIterator loadMa
341
362
String fmtTid = FateTxId .formatTid (tid );
342
363
log .trace ("{}: Starting bulk load at row: {}" , fmtTid , startRow );
343
364
344
- Iterator <TabletMetadata > tabletIter =
345
- TabletsMetadata .builder (manager .getContext ()).forTable (tableId ).overlapping (startRow , null )
346
- .checkConsistency ().fetch (PREV_ROW , LOCATION , LOADED ).build ().iterator ();
347
-
348
- Loader loader ;
349
- if (bulkInfo .tableState == TableState .ONLINE ) {
350
- loader = new OnlineLoader ();
351
- } else {
352
- loader = new OfflineLoader ();
353
- }
354
-
355
365
loader .start (bulkDir , manager , tid , bulkInfo .setTime );
356
366
357
367
ImportTimingStats importTimingStats = new ImportTimingStats ();
358
-
359
368
Timer timer = Timer .startNew ();
360
- while (lmi .hasNext ()) {
361
- loadMapEntry = lmi .next ();
362
- List <TabletMetadata > tablets =
363
- findOverlappingTablets (fmtTid , loadMapEntry .getKey (), tabletIter , importTimingStats );
364
- loader .load (tablets , loadMapEntry .getValue ());
369
+ try (TabletsMetadata tabletsMetadata = factory .newTabletsMetadata (startRow )) {
370
+
371
+ Iterator <TabletMetadata > tabletIter = tabletsMetadata .iterator ();
372
+ while (lmi .hasNext ()) {
373
+ loadMapEntry = lmi .next ();
374
+ List <TabletMetadata > tablets =
375
+ findOverlappingTablets (fmtTid , loadMapEntry .getKey (), tabletIter , importTimingStats );
376
+ loader .load (tablets , loadMapEntry .getValue ());
377
+ }
365
378
}
366
379
Duration totalProcessingTime = timer .elapsed ();
367
380
@@ -394,7 +407,8 @@ private long loadFiles(TableId tableId, Path bulkDir, LoadMappingIterator loadMa
394
407
/**
395
408
* Find all the tablets within the provided bulk load mapping range.
396
409
*/
397
- private List <TabletMetadata > findOverlappingTablets (String fmtTid , KeyExtent loadRange ,
410
+ // visible for testing
411
+ static List <TabletMetadata > findOverlappingTablets (String fmtTid , KeyExtent loadRange ,
398
412
Iterator <TabletMetadata > tabletIter , ImportTimingStats importTimingStats ) {
399
413
400
414
TabletMetadata currTablet = null ;
0 commit comments