Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recreate TabletsMetadata iterator when file ranges are not contiguous #5341

Open
wants to merge 5 commits into
base: 2.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -366,16 +366,24 @@ static long loadFiles(Loader loader, BulkInfo bulkInfo, Path bulkDir,

ImportTimingStats importTimingStats = new ImportTimingStats();
Timer timer = Timer.startNew();
try (TabletsMetadata tabletsMetadata = factory.newTabletsMetadata(startRow)) {

Iterator<TabletMetadata> tabletIter = tabletsMetadata.iterator();
while (lmi.hasNext()) {
loadMapEntry = lmi.next();
List<TabletMetadata> tablets =
findOverlappingTablets(fmtTid, loadMapEntry.getKey(), tabletIter, importTimingStats);
loader.load(tablets, loadMapEntry.getValue());
KeyExtent prevLastExtent = null; // KeyExtent of last tablet from prior loadMapEntry

TabletsMetadata tabletsMetadata = factory.newTabletsMetadata(startRow);
Iterator<TabletMetadata> tabletIter = tabletsMetadata.iterator();
while (lmi.hasNext()) {
loadMapEntry = lmi.next();
KeyExtent loadMapKey = loadMapEntry.getKey();
if (prevLastExtent != null && !loadMapKey.isPreviousExtent(prevLastExtent)) {
Copy link
Contributor

@keith-turner keith-turner Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some case this strategy could potentially make performance worse, like the case of importing into every 3rd tablet. The underlying scanner has already made an RPC and fetched some number of key values. Not sure of the best way to do this, but ideally we would only reset the scanner if the needed data is not already sitting in that batch of key values that was already read.

Copy link
Contributor

@keith-turner keith-turner Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if using a batch scanner would be better here to minimize the overall number of RPCs made. Would be a large change to the code. The current code, even if we optimize the use of the scanner will make a lot of RPCs for some cases (like importing into every 100th tablet in a million tablet table) and those RPCs will be made serially. A batch scanner would minimize the number of RPCs made for these cases. Batch scanner could also make RPCs in parallel. The Scanner is probably more efficient for reading tablets sequentially, but probably not much slower than the batch scanner. I suspect the batch scanner would not be much slower when data is contiguous and would be much better when data is sparse.

Would be good to gather some performance data before making large changes to improve performance to ensure they are needed. Can not do it in 2.1, but in main we could experiment w/ the SplitMillionIT and try doing things like importing into every 10th tablet for 1000 tablets, every 100th tablet for 1000 tablets, etc.

tabletsMetadata.close();
tabletsMetadata = factory.newTabletsMetadata(startRow);
tabletIter = tabletsMetadata.iterator();
}
List<TabletMetadata> tablets =
findOverlappingTablets(fmtTid, loadMapEntry.getKey(), tabletIter, importTimingStats);
loader.load(tablets, loadMapEntry.getValue());
prevLastExtent = tablets.get(tablets.size() - 1).getExtent();
}
tabletsMetadata.close();
Duration totalProcessingTime = timer.elapsed();

log.trace("{}: Completed Finding Overlapping Tablets", fmtTid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public long isReady(long tid, Manager manager) throws Exception {
@VisibleForTesting
interface TabletIterFactory {
Iterator<KeyExtent> newTabletIter(Text startRow);

void close();
}

private static boolean equals(Function<KeyExtent,Text> extractor, KeyExtent ke1, KeyExtent ke2) {
Expand All @@ -123,75 +125,81 @@ static KeyExtent validateLoadMapping(String tableId, LoadMappingIterator lmi,

Iterator<KeyExtent> tabletIter = tabletIterFactory.newTabletIter(startRow);

KeyExtent currTablet = tabletIter.next();
try {
KeyExtent currTablet = tabletIter.next();

var fileCounts = new HashMap<String,Integer>();
int count;
var fileCounts = new HashMap<String,Integer>();
int count;

KeyExtent firstTablet = currRange.getKey();
KeyExtent lastTablet = currRange.getKey();
KeyExtent firstTablet = currRange.getKey();
KeyExtent lastTablet = currRange.getKey();

if (!tabletIter.hasNext() && equals(KeyExtent::prevEndRow, currTablet, currRange.getKey())
&& equals(KeyExtent::endRow, currTablet, currRange.getKey())) {
currRange = null;
}
if (!tabletIter.hasNext() && equals(KeyExtent::prevEndRow, currTablet, currRange.getKey())
&& equals(KeyExtent::endRow, currTablet, currRange.getKey())) {
currRange = null;
}

while (tabletIter.hasNext()) {
while (tabletIter.hasNext()) {

if (currRange == null) {
if (!lmi.hasNext()) {
break;
if (currRange == null) {
if (!lmi.hasNext()) {
break;
}
currRange = lmi.next();
lastTablet = currRange.getKey();
}
currRange = lmi.next();
lastTablet = currRange.getKey();
}

while (!equals(KeyExtent::prevEndRow, currTablet, currRange.getKey())
&& tabletIter.hasNext()) {
currTablet = tabletIter.next();
}
while (!equals(KeyExtent::prevEndRow, currTablet, currRange.getKey())
&& tabletIter.hasNext()) {
currTablet = tabletIter.next();
}

boolean matchedPrevRow = equals(KeyExtent::prevEndRow, currTablet, currRange.getKey());
boolean matchedPrevRow = equals(KeyExtent::prevEndRow, currTablet, currRange.getKey());

if (matchedPrevRow && firstTablet == null) {
firstTablet = currTablet;
}
if (matchedPrevRow && firstTablet == null) {
firstTablet = currTablet;
}

count = matchedPrevRow ? 1 : 0;
count = matchedPrevRow ? 1 : 0;

while (!equals(KeyExtent::endRow, currTablet, currRange.getKey()) && tabletIter.hasNext()) {
currTablet = tabletIter.next();
count++;
}

if (!matchedPrevRow || !equals(KeyExtent::endRow, currTablet, currRange.getKey())) {
break;
}

while (!equals(KeyExtent::endRow, currTablet, currRange.getKey()) && tabletIter.hasNext()) {
currTablet = tabletIter.next();
count++;
if (maxNumTablets > 0) {
int fc = count;
currRange.getValue()
.forEach(fileInfo -> fileCounts.merge(fileInfo.getFileName(), fc, Integer::sum));
}
currRange = null;
}

if (!matchedPrevRow || !equals(KeyExtent::endRow, currTablet, currRange.getKey())) {
break;
if (currRange != null || lmi.hasNext()) {
// merge happened after the mapping was generated and before the table lock was acquired
throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT,
TableOperationExceptionType.BULK_CONCURRENT_MERGE, "Concurrent merge happened");
}

if (maxNumTablets > 0) {
int fc = count;
currRange.getValue()
.forEach(fileInfo -> fileCounts.merge(fileInfo.getFileName(), fc, Integer::sum));
fileCounts.values().removeIf(c -> c <= maxNumTablets);
if (!fileCounts.isEmpty()) {
throw new AcceptableThriftTableOperationException(tableId, null,
TableOperation.BULK_IMPORT, TableOperationExceptionType.OTHER,
"Files overlap the configured max (" + maxNumTablets + ") number of tablets: "
+ new TreeMap<>(fileCounts));
}
}
currRange = null;
}

if (currRange != null || lmi.hasNext()) {
// merge happened after the mapping was generated and before the table lock was acquired
throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT,
TableOperationExceptionType.BULK_CONCURRENT_MERGE, "Concurrent merge happened");
}
return new KeyExtent(firstTablet.tableId(), lastTablet.endRow(), firstTablet.prevEndRow());

if (maxNumTablets > 0) {
fileCounts.values().removeIf(c -> c <= maxNumTablets);
if (!fileCounts.isEmpty()) {
throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT,
TableOperationExceptionType.OTHER, "Files overlap the configured max (" + maxNumTablets
+ ") number of tablets: " + new TreeMap<>(fileCounts));
}
} finally {
tabletIterFactory.close();
}

return new KeyExtent(firstTablet.tableId(), lastTablet.endRow(), firstTablet.prevEndRow());
}

private KeyExtent checkForMerge(final long tid, final Manager manager) throws Exception {
Expand All @@ -205,10 +213,24 @@ private KeyExtent checkForMerge(final long tid, final Manager manager) throws Ex
try (LoadMappingIterator lmi =
BulkSerialize.readLoadMapping(bulkDir.toString(), bulkInfo.tableId, fs::open)) {

TabletIterFactory tabletIterFactory =
startRow -> TabletsMetadata.builder(manager.getContext()).forTable(bulkInfo.tableId)
.overlapping(startRow, null).checkConsistency().fetch(PREV_ROW).build().stream()
.map(TabletMetadata::getExtent).iterator();
TabletIterFactory tabletIterFactory = new TabletIterFactory() {

TabletsMetadata tm = null;

@Override
public Iterator<KeyExtent> newTabletIter(Text startRow) {
tm = TabletsMetadata.builder(manager.getContext()).forTable(bulkInfo.tableId)
.overlapping(startRow, null).checkConsistency().fetch(PREV_ROW).build();
return tm.stream().map(TabletMetadata::getExtent).iterator();
}

@Override
public void close() {
if (tm != null) {
tm.close();
}
}
};

return validateLoadMapping(bulkInfo.tableId.canonical(), lmi, tabletIterFactory, maxTablets,
tid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -96,21 +97,28 @@ private void runTest(List<KeyExtent> loadRanges, List<KeyExtent> tabletRanges) t

public void runTest(Map<KeyExtent,String> loadRanges, List<KeyExtent> tabletRanges,
int maxTablets) throws Exception {
TabletIterFactory tabletIterFactory = startRow -> {
int start = -1;

if (startRow == null) {
start = 0;
} else {
for (int i = 0; i < tabletRanges.size(); i++) {
if (tabletRanges.get(i).contains(startRow)) {
start = i;
break;
TabletIterFactory tabletIterFactory = new TabletIterFactory() {

@Override
public Iterator<KeyExtent> newTabletIter(Text startRow) {
int start = -1;

if (startRow == null) {
start = 0;
} else {
for (int i = 0; i < tabletRanges.size(); i++) {
if (tabletRanges.get(i).contains(startRow)) {
start = i;
break;
}
}
}

return tabletRanges.subList(start, tabletRanges.size()).iterator();
}

return tabletRanges.subList(start, tabletRanges.size()).iterator();
@Override
public void close() {}
};

var sortedExtents = loadRanges.keySet().stream().sorted().collect(Collectors.toList());
Expand Down