Skip to content

Commit

Permalink
fix: Positional delete bug when base row number is greater than the l…
Browse files Browse the repository at this point in the history
…argest

delete position
  • Loading branch information
nmahadevuni committed Dec 5, 2024
1 parent a0bbea2 commit 0d90de6
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 88 deletions.
21 changes: 18 additions & 3 deletions velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,26 @@ void PositionalDeleteFileReader::updateDeleteBitmap(
// split.
const int64_t* deletePositions =
deletePositionsVector->as<FlatVector<int64_t>>()->rawValues();
int64_t offset = baseReadOffset + splitOffset_;
int64_t rowNumberLowerBound = baseReadOffset + splitOffset_;

// If the rowNumberLowerBound is greater than the last position in this delete
// rows batch, nothing to delete.
if (rowNumberLowerBound >
deletePositions[deletePositionsVector->size() - 1]) {
return;
}

// Skip the delete positions in deletePositionsVector until they are in the
// [rowNumberLowerBound, rowNumberUpperBound) range.
while (deletePositionsOffset_ < deletePositionsVector->size() &&
deletePositions[deletePositionsOffset_] < rowNumberLowerBound) {
deletePositionsOffset_++;
}
while (deletePositionsOffset_ < deletePositionsVector->size() &&
deletePositions[deletePositionsOffset_] < rowNumberUpperBound) {
bits::setBit(
deleteBitmap, deletePositions[deletePositionsOffset_] - offset);
deleteBitmap,
deletePositions[deletePositionsOffset_] - rowNumberLowerBound);
deletePositionsOffset_++;
}

Expand All @@ -239,7 +253,8 @@ void PositionalDeleteFileReader::updateDeleteBitmap(
deletePositions[deletePositionsOffset_] > rowNumberUpperBound)
? 0
: bits::nbytes(
deletePositions[deletePositionsOffset_ - 1] + 1 - offset)));
deletePositions[deletePositionsOffset_ - 1] + 1 -
rowNumberLowerBound)));
}

bool PositionalDeleteFileReader::readFinishedForBatch(
Expand Down
206 changes: 121 additions & 85 deletions velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class HiveIcebergTest : public HiveConnectorTestBase {
/// Also create 1 delete file delete_file_1 which contains delete positions
/// for data_file_1.
void assertSingleBaseFileSingleDeleteFile(
const std::vector<int64_t>& deletePositionsVec) {
const std::vector<int64_t>& deletePositionsVec,
const uint32_t splitCount = 1) {
std::map<std::string, std::vector<int64_t>> rowGroupSizesForFiles = {
{"data_file_1", {10000, 10000}}};
std::unordered_map<
Expand All @@ -57,7 +58,7 @@ class HiveIcebergTest : public HiveConnectorTestBase {
{"delete_file_1", {{"data_file_1", deletePositionsVec}}}};

assertPositionalDeletes(
rowGroupSizesForFiles, deleteFilesForBaseDatafiles, 0);
rowGroupSizesForFiles, deleteFilesForBaseDatafiles, 0, splitCount);
}

/// Create 3 base data files, where the first file data_file_0 has 500 rows,
Expand All @@ -66,7 +67,8 @@ class HiveIcebergTest : public HiveConnectorTestBase {
/// delete file delete_file_1, which contains delete positions for
/// data_file_1.
void assertMultipleBaseFileSingleDeleteFile(
const std::vector<int64_t>& deletePositionsVec) {
const std::vector<int64_t>& deletePositionsVec,
const uint32_t splitCount = 1) {
int64_t previousFileRowCount = 500;
int64_t afterFileRowCount = 500;

Expand All @@ -77,13 +79,15 @@ class HiveIcebergTest : public HiveConnectorTestBase {
{"data_file_2", {afterFileRowCount}},
},
{{"delete_file_1", {{"data_file_1", deletePositionsVec}}}},
0);
0,
splitCount);
}

/// Create 1 base data file data_file_1 with 2 RowGroups of 10000 rows each.
/// Create multiple delete files with name data_file_1, data_file_2, and so on
void assertSingleBaseFileMultipleDeleteFiles(
const std::vector<std::vector<int64_t>>& deletePositionsVecs) {
const std::vector<std::vector<int64_t>>& deletePositionsVecs,
const uint32_t splitCount = 1) {
std::map<std::string, std::vector<int64_t>> rowGroupSizesForFiles = {
{"data_file_1", {10000, 10000}}};

Expand All @@ -97,15 +101,16 @@ class HiveIcebergTest : public HiveConnectorTestBase {
{"data_file_1", deletePositionsVecs[i]}};
}
assertPositionalDeletes(
rowGroupSizesForFiles, deleteFilesForBaseDatafiles, 0);
rowGroupSizesForFiles, deleteFilesForBaseDatafiles, 0, splitCount);
}

void assertMultipleSplits(
const std::vector<int64_t>& deletePositions,
int32_t splitCount,
int32_t numPrefetchSplits) {
int32_t fileCount,
int32_t numPrefetchSplits,
const uint32_t splitCount = 1) {
std::map<std::string, std::vector<int64_t>> rowGroupSizesForFiles;
for (int32_t i = 0; i < splitCount; i++) {
for (int32_t i = 0; i < fileCount; i++) {
std::string dataFileName = fmt::format("data_file_{}", i);
rowGroupSizesForFiles[dataFileName] = {rowCount};
}
Expand All @@ -114,7 +119,7 @@ class HiveIcebergTest : public HiveConnectorTestBase {
std::string,
std::multimap<std::string, std::vector<int64_t>>>
deleteFilesForBaseDatafiles;
for (int i = 0; i < splitCount; i++) {
for (int i = 0; i < fileCount; i++) {
std::string deleteFileName = fmt::format("delete_file_{}", i);
deleteFilesForBaseDatafiles[deleteFileName] = {
{fmt::format("data_file_{}", i), deletePositions}};
Expand Down Expand Up @@ -169,7 +174,8 @@ class HiveIcebergTest : public HiveConnectorTestBase {
std::string,
std::multimap<std::string, std::vector<int64_t>>>&
deleteFilesForBaseDatafiles,
int32_t numPrefetchSplits = 0) {
int32_t numPrefetchSplits = 0,
const uint32_t splitCount = 1) {
// Keep the reference to the deleteFilePath, otherwise the corresponding
// file will be deleted.
std::map<std::string, std::shared_ptr<TempFilePath>> dataFilePaths =
Expand Down Expand Up @@ -209,7 +215,9 @@ class HiveIcebergTest : public HiveConnectorTestBase {
}
}

splits.emplace_back(makeIcebergSplit(baseFilePath, deleteFiles));
auto icebergSplits =
makeIcebergSplits(baseFilePath, splitCount, deleteFiles);
splits.insert(splits.end(), icebergSplits.begin(), icebergSplits.end());
}

std::string duckdbSql =
Expand Down Expand Up @@ -335,8 +343,9 @@ class HiveIcebergTest : public HiveConnectorTestBase {
return vectors;
}

std::shared_ptr<ConnectorSplit> makeIcebergSplit(
std::vector<std::shared_ptr<ConnectorSplit>> makeIcebergSplits(
const std::string& dataFilePath,
const uint32_t splitCount = 1,
const std::vector<IcebergDeleteFile>& deleteFiles = {}) {
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
std::unordered_map<std::string, std::string> customSplitInfo;
Expand All @@ -346,17 +355,24 @@ class HiveIcebergTest : public HiveConnectorTestBase {
->openFileForRead(dataFilePath);
const int64_t fileSize = file->size();

return std::make_shared<HiveIcebergSplit>(
kHiveConnectorId,
dataFilePath,
fileFomat_,
0,
fileSize,
partitionKeys,
std::nullopt,
customSplitInfo,
nullptr,
deleteFiles);
std::vector<std::shared_ptr<ConnectorSplit>> splits;
const uint64_t splitSize = std::ceil((fileSize) / splitCount);

for (int i = 0; i < splitCount; ++i) {
splits.emplace_back(std::make_shared<HiveIcebergSplit>(
kHiveConnectorId,
dataFilePath,
fileFomat_,
i * splitSize,
splitSize,
partitionKeys,
std::nullopt,
customSplitInfo,
nullptr,
deleteFiles));
}

return splits;
}

std::string getDuckDBQuery(
Expand Down Expand Up @@ -492,20 +508,23 @@ class HiveIcebergTest : public HiveConnectorTestBase {
TEST_F(HiveIcebergTest, singleBaseFileSinglePositionalDeleteFile) {
folly::SingletonVault::singleton()->registrationComplete();

assertSingleBaseFileSingleDeleteFile({{0, 1, 2, 3}});
// Delete the first and last row in each batch (10000 rows per batch)
assertSingleBaseFileSingleDeleteFile({{0, 9999, 10000, 19999}});
// Delete several rows in the second batch (10000 rows per batch)
assertSingleBaseFileSingleDeleteFile({{10000, 10002, 19999}});
// Delete random rows
assertSingleBaseFileSingleDeleteFile({makeRandomIncreasingValues(0, 20000)});
// Delete 0 rows
assertSingleBaseFileSingleDeleteFile({});
// Delete all rows
assertSingleBaseFileSingleDeleteFile(
{makeContinuousIncreasingValues(0, 20000)});
// Delete rows that don't exist
assertSingleBaseFileSingleDeleteFile({{20000, 29999}});
for (uint32_t splitCount = 1; splitCount <= 3; ++splitCount) {
assertSingleBaseFileSingleDeleteFile({{0, 1, 2, 3}}, splitCount);
// Delete the first and last row in each batch (10000 rows per batch)
assertSingleBaseFileSingleDeleteFile({{0, 9999, 10000, 19999}}, splitCount);
// Delete several rows in the second batch (10000 rows per batch)
assertSingleBaseFileSingleDeleteFile({{10000, 10002, 19999}}, splitCount);
// Delete random rows
assertSingleBaseFileSingleDeleteFile(
{makeRandomIncreasingValues(0, 20000)}, splitCount);
// Delete 0 rows
assertSingleBaseFileSingleDeleteFile({}, splitCount);
// Delete all rows
assertSingleBaseFileSingleDeleteFile(
{makeContinuousIncreasingValues(0, 20000)}, splitCount);
// Delete rows that don't exist
assertSingleBaseFileSingleDeleteFile({{20000, 29999}}, splitCount);
}
}

/// This test creates 3 base data files, only the middle one has corresponding
Expand All @@ -515,15 +534,17 @@ TEST_F(HiveIcebergTest, singleBaseFileSinglePositionalDeleteFile) {
TEST_F(HiveIcebergTest, MultipleBaseFilesSinglePositionalDeleteFile) {
folly::SingletonVault::singleton()->registrationComplete();

assertMultipleBaseFileSingleDeleteFile({0, 1, 2, 3});
assertMultipleBaseFileSingleDeleteFile({0, 9999, 10000, 19999});
assertMultipleBaseFileSingleDeleteFile({10000, 10002, 19999});
assertMultipleBaseFileSingleDeleteFile({10000, 10002, 19999});
assertMultipleBaseFileSingleDeleteFile(
makeRandomIncreasingValues(0, rowCount));
assertMultipleBaseFileSingleDeleteFile({});
assertMultipleBaseFileSingleDeleteFile(
makeContinuousIncreasingValues(0, rowCount));
for (uint32_t splitCount = 1; splitCount <= 3; ++splitCount) {
assertMultipleBaseFileSingleDeleteFile({0, 1, 2, 3}, splitCount);
assertMultipleBaseFileSingleDeleteFile({0, 9999, 10000, 19999}, splitCount);
assertMultipleBaseFileSingleDeleteFile({10000, 10002, 19999}, splitCount);
assertMultipleBaseFileSingleDeleteFile({10000, 10002, 19999}, splitCount);
assertMultipleBaseFileSingleDeleteFile(
makeRandomIncreasingValues(0, rowCount), splitCount);
assertMultipleBaseFileSingleDeleteFile({}, splitCount);
assertMultipleBaseFileSingleDeleteFile(
makeContinuousIncreasingValues(0, rowCount), splitCount);
}
}

/// This test creates one base data file/split with multiple delete files. The
Expand All @@ -533,37 +554,45 @@ TEST_F(HiveIcebergTest, MultipleBaseFilesSinglePositionalDeleteFile) {
TEST_F(HiveIcebergTest, singleBaseFileMultiplePositionalDeleteFiles) {
folly::SingletonVault::singleton()->registrationComplete();

// Delete row 0, 1, 2, 3 from the first batch out of two.
assertSingleBaseFileMultipleDeleteFiles({{1}, {2}, {3}, {4}});
// Delete the first and last row in each batch (10000 rows per batch).
assertSingleBaseFileMultipleDeleteFiles({{0}, {9999}, {10000}, {19999}});

assertSingleBaseFileMultipleDeleteFiles({{500, 21000}});

assertSingleBaseFileMultipleDeleteFiles(
{makeRandomIncreasingValues(0, 10000),
makeRandomIncreasingValues(10000, 20000),
makeRandomIncreasingValues(5000, 15000)});

assertSingleBaseFileMultipleDeleteFiles(
{makeContinuousIncreasingValues(0, 10000),
makeContinuousIncreasingValues(10000, 20000)});

assertSingleBaseFileMultipleDeleteFiles(
{makeContinuousIncreasingValues(0, 10000),
makeContinuousIncreasingValues(10000, 20000),
makeRandomIncreasingValues(5000, 15000)});

assertSingleBaseFileMultipleDeleteFiles(
{makeContinuousIncreasingValues(0, 20000),
makeContinuousIncreasingValues(0, 20000)});

assertSingleBaseFileMultipleDeleteFiles(
{makeRandomIncreasingValues(0, 20000),
{},
makeRandomIncreasingValues(5000, 15000)});

assertSingleBaseFileMultipleDeleteFiles({{}, {}});
for (uint32_t splitCount = 1; splitCount <= 3; ++splitCount) {
// Delete row 0, 1, 2, 3 from the first batch out of two.
assertSingleBaseFileMultipleDeleteFiles({{1}, {2}, {3}, {4}}, splitCount);
// Delete the first and last row in each batch (10000 rows per batch).
assertSingleBaseFileMultipleDeleteFiles(
{{0}, {9999}, {10000}, {19999}}, splitCount);

assertSingleBaseFileMultipleDeleteFiles({{500, 21000}}, splitCount);

assertSingleBaseFileMultipleDeleteFiles(
{makeRandomIncreasingValues(0, 10000),
makeRandomIncreasingValues(10000, 20000),
makeRandomIncreasingValues(5000, 15000)},
splitCount);

assertSingleBaseFileMultipleDeleteFiles(
{makeContinuousIncreasingValues(0, 10000),
makeContinuousIncreasingValues(10000, 20000)},
splitCount);

assertSingleBaseFileMultipleDeleteFiles(
{makeContinuousIncreasingValues(0, 10000),
makeContinuousIncreasingValues(10000, 20000),
makeRandomIncreasingValues(5000, 15000)},
splitCount);

assertSingleBaseFileMultipleDeleteFiles(
{makeContinuousIncreasingValues(0, 20000),
makeContinuousIncreasingValues(0, 20000)},
splitCount);

assertSingleBaseFileMultipleDeleteFiles(
{makeRandomIncreasingValues(0, 20000),
{},
makeRandomIncreasingValues(5000, 15000)},
splitCount);

assertSingleBaseFileMultipleDeleteFiles({{}, {}}, splitCount);
}
}

/// This test creates 2 base data files, and 1 or 2 delete files, with unaligned
Expand Down Expand Up @@ -650,13 +679,20 @@ TEST_F(HiveIcebergTest, multipleBaseFileMultiplePositionalDeleteFiles) {
TEST_F(HiveIcebergTest, positionalDeletesMultipleSplits) {
folly::SingletonVault::singleton()->registrationComplete();

assertMultipleSplits({1, 2, 3, 4}, 10, 5);
assertMultipleSplits({1, 2, 3, 4}, 10, 0);
assertMultipleSplits({1, 2, 3, 4}, 10, 10);
assertMultipleSplits({0, 9999, 10000, 19999}, 10, 3);
assertMultipleSplits(makeRandomIncreasingValues(0, 20000), 10, 3);
assertMultipleSplits(makeContinuousIncreasingValues(0, 20000), 10, 3);
assertMultipleSplits({}, 10, 3);
for (uint32_t splitCount = 1; splitCount <= 3; ++splitCount) {
assertMultipleSplits({1, 2, 3, 4}, 10, 5, splitCount);
assertMultipleSplits({1, 2, 3, 4}, 10, 0, splitCount);
assertMultipleSplits({1, 2, 3, 4}, 10, 10, splitCount);
assertMultipleSplits({0, 9999, 10000, 19999}, 10, 3, splitCount);
assertMultipleSplits(
makeRandomIncreasingValues(0, 20000), 10, 3, splitCount);
assertMultipleSplits(
makeContinuousIncreasingValues(0, 20000), 10, 3, splitCount);
assertMultipleSplits({}, 10, 3, splitCount);

assertMultipleSplits(
makeContinuousIncreasingValues(0, 5000), 1, 3, splitCount);
}
}

} // namespace facebook::velox::connector::hive::iceberg

0 comments on commit 0d90de6

Please sign in to comment.