diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index dfa4b982789..8cd5af62817 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@ -366,16 +366,24 @@ static long loadFiles(Loader loader, BulkInfo bulkInfo, Path bulkDir, ImportTimingStats importTimingStats = new ImportTimingStats(); Timer timer = Timer.startNew(); - try (TabletsMetadata tabletsMetadata = factory.newTabletsMetadata(startRow)) { - - Iterator tabletIter = tabletsMetadata.iterator(); - while (lmi.hasNext()) { - loadMapEntry = lmi.next(); - List tablets = - findOverlappingTablets(fmtTid, loadMapEntry.getKey(), tabletIter, importTimingStats); - loader.load(tablets, loadMapEntry.getValue()); + KeyExtent prevLastExtent = null; // KeyExtent of last tablet from prior loadMapEntry + + TabletsMetadata tabletsMetadata = factory.newTabletsMetadata(startRow); + Iterator tabletIter = tabletsMetadata.iterator(); + while (lmi.hasNext()) { + loadMapEntry = lmi.next(); + KeyExtent loadMapKey = loadMapEntry.getKey(); + if (prevLastExtent != null && !loadMapKey.isPreviousExtent(prevLastExtent)) { + tabletsMetadata.close(); + tabletsMetadata = factory.newTabletsMetadata(startRow); + tabletIter = tabletsMetadata.iterator(); } + List tablets = + findOverlappingTablets(fmtTid, loadMapEntry.getKey(), tabletIter, importTimingStats); + loader.load(tablets, loadMapEntry.getValue()); + prevLastExtent = tablets.get(tablets.size() - 1).getExtent(); } + tabletsMetadata.close(); Duration totalProcessingTime = timer.elapsed(); log.trace("{}: Completed Finding Overlapping Tablets", fmtTid); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java index 56116365b49..165785991bb 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java @@ -104,6 +104,8 @@ public long isReady(long tid, Manager manager) throws Exception { @VisibleForTesting interface TabletIterFactory { Iterator newTabletIter(Text startRow); + + void close(); } private static boolean equals(Function extractor, KeyExtent ke1, KeyExtent ke2) { @@ -123,75 +125,81 @@ static KeyExtent validateLoadMapping(String tableId, LoadMappingIterator lmi, Iterator tabletIter = tabletIterFactory.newTabletIter(startRow); - KeyExtent currTablet = tabletIter.next(); + try { + KeyExtent currTablet = tabletIter.next(); - var fileCounts = new HashMap(); - int count; + var fileCounts = new HashMap(); + int count; - KeyExtent firstTablet = currRange.getKey(); - KeyExtent lastTablet = currRange.getKey(); + KeyExtent firstTablet = currRange.getKey(); + KeyExtent lastTablet = currRange.getKey(); - if (!tabletIter.hasNext() && equals(KeyExtent::prevEndRow, currTablet, currRange.getKey()) - && equals(KeyExtent::endRow, currTablet, currRange.getKey())) { - currRange = null; - } + if (!tabletIter.hasNext() && equals(KeyExtent::prevEndRow, currTablet, currRange.getKey()) + && equals(KeyExtent::endRow, currTablet, currRange.getKey())) { + currRange = null; + } - while (tabletIter.hasNext()) { + while (tabletIter.hasNext()) { - if (currRange == null) { - if (!lmi.hasNext()) { - break; + if (currRange == null) { + if (!lmi.hasNext()) { + break; + } + currRange = lmi.next(); + lastTablet = currRange.getKey(); } - currRange = lmi.next(); - lastTablet = currRange.getKey(); - } - while (!equals(KeyExtent::prevEndRow, currTablet, currRange.getKey()) - && tabletIter.hasNext()) { - currTablet = tabletIter.next(); - } + while (!equals(KeyExtent::prevEndRow, currTablet, currRange.getKey()) + && tabletIter.hasNext()) { + currTablet = tabletIter.next(); + } - boolean matchedPrevRow = equals(KeyExtent::prevEndRow, currTablet, currRange.getKey()); + boolean matchedPrevRow = equals(KeyExtent::prevEndRow, currTablet, currRange.getKey()); - if (matchedPrevRow && firstTablet == null) { - firstTablet = currTablet; - } + if (matchedPrevRow && firstTablet == null) { + firstTablet = currTablet; + } - count = matchedPrevRow ? 1 : 0; + count = matchedPrevRow ? 1 : 0; + + while (!equals(KeyExtent::endRow, currTablet, currRange.getKey()) && tabletIter.hasNext()) { + currTablet = tabletIter.next(); + count++; + } + + if (!matchedPrevRow || !equals(KeyExtent::endRow, currTablet, currRange.getKey())) { + break; + } - while (!equals(KeyExtent::endRow, currTablet, currRange.getKey()) && tabletIter.hasNext()) { - currTablet = tabletIter.next(); - count++; + if (maxNumTablets > 0) { + int fc = count; + currRange.getValue() + .forEach(fileInfo -> fileCounts.merge(fileInfo.getFileName(), fc, Integer::sum)); + } + currRange = null; } - if (!matchedPrevRow || !equals(KeyExtent::endRow, currTablet, currRange.getKey())) { - break; + if (currRange != null || lmi.hasNext()) { + // merge happened after the mapping was generated and before the table lock was acquired + throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, + TableOperationExceptionType.BULK_CONCURRENT_MERGE, "Concurrent merge happened"); } if (maxNumTablets > 0) { - int fc = count; - currRange.getValue() - .forEach(fileInfo -> fileCounts.merge(fileInfo.getFileName(), fc, Integer::sum)); + fileCounts.values().removeIf(c -> c <= maxNumTablets); + if (!fileCounts.isEmpty()) { + throw new AcceptableThriftTableOperationException(tableId, null, + TableOperation.BULK_IMPORT, TableOperationExceptionType.OTHER, + "Files overlap the configured max (" + maxNumTablets + ") number of tablets: " + + new TreeMap<>(fileCounts)); + } } - currRange = null; - } - if (currRange != null || lmi.hasNext()) { - // merge happened after the mapping was generated and before the table lock was acquired - throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, - TableOperationExceptionType.BULK_CONCURRENT_MERGE, "Concurrent merge happened"); - } + return new KeyExtent(firstTablet.tableId(), lastTablet.endRow(), firstTablet.prevEndRow()); - if (maxNumTablets > 0) { - fileCounts.values().removeIf(c -> c <= maxNumTablets); - if (!fileCounts.isEmpty()) { - throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, - TableOperationExceptionType.OTHER, "Files overlap the configured max (" + maxNumTablets - + ") number of tablets: " + new TreeMap<>(fileCounts)); - } + } finally { + tabletIterFactory.close(); } - - return new KeyExtent(firstTablet.tableId(), lastTablet.endRow(), firstTablet.prevEndRow()); } private KeyExtent checkForMerge(final long tid, final Manager manager) throws Exception { @@ -205,10 +213,24 @@ private KeyExtent checkForMerge(final long tid, final Manager manager) throws Ex try (LoadMappingIterator lmi = BulkSerialize.readLoadMapping(bulkDir.toString(), bulkInfo.tableId, fs::open)) { - TabletIterFactory tabletIterFactory = - startRow -> TabletsMetadata.builder(manager.getContext()).forTable(bulkInfo.tableId) - .overlapping(startRow, null).checkConsistency().fetch(PREV_ROW).build().stream() - .map(TabletMetadata::getExtent).iterator(); + TabletIterFactory tabletIterFactory = new TabletIterFactory() { + + TabletsMetadata tm = null; + + @Override + public Iterator newTabletIter(Text startRow) { + tm = TabletsMetadata.builder(manager.getContext()).forTable(bulkInfo.tableId) + .overlapping(startRow, null).checkConsistency().fetch(PREV_ROW).build(); + return tm.stream().map(TabletMetadata::getExtent).iterator(); + } + + @Override + public void close() { + if (tm != null) { + tm.close(); + } + } + }; return validateLoadMapping(bulkInfo.tableId.canonical(), lmi, tabletIterFactory, maxTablets, tid); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java index 9ff5945e219..4d21d4e5841 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImportTest.java @@ -31,6 +31,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -96,21 +97,28 @@ private void runTest(List loadRanges, List tabletRanges) t public void runTest(Map loadRanges, List tabletRanges, int maxTablets) throws Exception { - TabletIterFactory tabletIterFactory = startRow -> { - int start = -1; - - if (startRow == null) { - start = 0; - } else { - for (int i = 0; i < tabletRanges.size(); i++) { - if (tabletRanges.get(i).contains(startRow)) { - start = i; - break; + TabletIterFactory tabletIterFactory = new TabletIterFactory() { + + @Override + public Iterator newTabletIter(Text startRow) { + int start = -1; + + if (startRow == null) { + start = 0; + } else { + for (int i = 0; i < tabletRanges.size(); i++) { + if (tabletRanges.get(i).contains(startRow)) { + start = i; + break; + } } } + + return tabletRanges.subList(start, tabletRanges.size()).iterator(); } - return tabletRanges.subList(start, tabletRanges.size()).iterator(); + @Override + public void close() {} }; var sortedExtents = loadRanges.keySet().stream().sorted().collect(Collectors.toList());