From 0d90de6c0419ba14f85e11d19e417e54a1cc168b Mon Sep 17 00:00:00 2001 From: Mahadevuni Naveen Kumar Date: Thu, 21 Nov 2024 22:57:32 +0530 Subject: [PATCH] fix: Positional delete bug when base row number is greater than the largest delete position --- .../iceberg/PositionalDeleteFileReader.cpp | 21 +- .../hive/iceberg/tests/IcebergReadTest.cpp | 206 ++++++++++-------- 2 files changed, 139 insertions(+), 88 deletions(-) diff --git a/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp b/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp index 94828d136e6b..8de18c70859c 100644 --- a/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp +++ b/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp @@ -221,12 +221,26 @@ void PositionalDeleteFileReader::updateDeleteBitmap( // split. const int64_t* deletePositions = deletePositionsVector->as>()->rawValues(); - int64_t offset = baseReadOffset + splitOffset_; + int64_t rowNumberLowerBound = baseReadOffset + splitOffset_; + // If the rowNumberLowerBound is greater than the last position in this delete + // rows batch, nothing to delete. + if (rowNumberLowerBound > + deletePositions[deletePositionsVector->size() - 1]) { + return; + } + + // Skip the delete positions in deletePositionsVector until they are in the + // [rowNumberLowerBound, rowNumberUpperBound) range. + while (deletePositionsOffset_ < deletePositionsVector->size() && + deletePositions[deletePositionsOffset_] < rowNumberLowerBound) { + deletePositionsOffset_++; + } while (deletePositionsOffset_ < deletePositionsVector->size() && deletePositions[deletePositionsOffset_] < rowNumberUpperBound) { bits::setBit( - deleteBitmap, deletePositions[deletePositionsOffset_] - offset); + deleteBitmap, + deletePositions[deletePositionsOffset_] - rowNumberLowerBound); deletePositionsOffset_++; } @@ -239,7 +253,8 @@ void PositionalDeleteFileReader::updateDeleteBitmap( deletePositions[deletePositionsOffset_] > rowNumberUpperBound) ? 0 : bits::nbytes( - deletePositions[deletePositionsOffset_ - 1] + 1 - offset))); + deletePositions[deletePositionsOffset_ - 1] + 1 - + rowNumberLowerBound))); } bool PositionalDeleteFileReader::readFinishedForBatch( diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp index d79e21b73343..a61e6f6f4abe 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -47,7 +47,8 @@ class HiveIcebergTest : public HiveConnectorTestBase { /// Also create 1 delete file delete_file_1 which contains delete positions /// for data_file_1. void assertSingleBaseFileSingleDeleteFile( - const std::vector& deletePositionsVec) { + const std::vector& deletePositionsVec, + const uint32_t splitCount = 1) { std::map> rowGroupSizesForFiles = { {"data_file_1", {10000, 10000}}}; std::unordered_map< @@ -57,7 +58,7 @@ class HiveIcebergTest : public HiveConnectorTestBase { {"delete_file_1", {{"data_file_1", deletePositionsVec}}}}; assertPositionalDeletes( - rowGroupSizesForFiles, deleteFilesForBaseDatafiles, 0); + rowGroupSizesForFiles, deleteFilesForBaseDatafiles, 0, splitCount); } /// Create 3 base data files, where the first file data_file_0 has 500 rows, @@ -66,7 +67,8 @@ class HiveIcebergTest : public HiveConnectorTestBase { /// delete file delete_file_1, which contains delete positions for /// data_file_1. void assertMultipleBaseFileSingleDeleteFile( - const std::vector& deletePositionsVec) { + const std::vector& deletePositionsVec, + const uint32_t splitCount = 1) { int64_t previousFileRowCount = 500; int64_t afterFileRowCount = 500; @@ -77,13 +79,15 @@ class HiveIcebergTest : public HiveConnectorTestBase { {"data_file_2", {afterFileRowCount}}, }, {{"delete_file_1", {{"data_file_1", deletePositionsVec}}}}, - 0); + 0, + splitCount); } /// Create 1 base data file data_file_1 with 2 RowGroups of 10000 rows each. /// Create multiple delete files with name data_file_1, data_file_2, and so on void assertSingleBaseFileMultipleDeleteFiles( - const std::vector>& deletePositionsVecs) { + const std::vector>& deletePositionsVecs, + const uint32_t splitCount = 1) { std::map> rowGroupSizesForFiles = { {"data_file_1", {10000, 10000}}}; @@ -97,15 +101,16 @@ class HiveIcebergTest : public HiveConnectorTestBase { {"data_file_1", deletePositionsVecs[i]}}; } assertPositionalDeletes( - rowGroupSizesForFiles, deleteFilesForBaseDatafiles, 0); + rowGroupSizesForFiles, deleteFilesForBaseDatafiles, 0, splitCount); } void assertMultipleSplits( const std::vector& deletePositions, - int32_t splitCount, - int32_t numPrefetchSplits) { + int32_t fileCount, + int32_t numPrefetchSplits, + const uint32_t splitCount = 1) { std::map> rowGroupSizesForFiles; - for (int32_t i = 0; i < splitCount; i++) { + for (int32_t i = 0; i < fileCount; i++) { std::string dataFileName = fmt::format("data_file_{}", i); rowGroupSizesForFiles[dataFileName] = {rowCount}; } @@ -114,7 +119,7 @@ class HiveIcebergTest : public HiveConnectorTestBase { std::string, std::multimap>> deleteFilesForBaseDatafiles; - for (int i = 0; i < splitCount; i++) { + for (int i = 0; i < fileCount; i++) { std::string deleteFileName = fmt::format("delete_file_{}", i); deleteFilesForBaseDatafiles[deleteFileName] = { {fmt::format("data_file_{}", i), deletePositions}}; @@ -169,7 +174,8 @@ class HiveIcebergTest : public HiveConnectorTestBase { std::string, std::multimap>>& deleteFilesForBaseDatafiles, - int32_t numPrefetchSplits = 0) { + int32_t numPrefetchSplits = 0, + const uint32_t splitCount = 1) { // Keep the reference to the deleteFilePath, otherwise the corresponding // file will be deleted. std::map> dataFilePaths = @@ -209,7 +215,9 @@ class HiveIcebergTest : public HiveConnectorTestBase { } } - splits.emplace_back(makeIcebergSplit(baseFilePath, deleteFiles)); + auto icebergSplits = + makeIcebergSplits(baseFilePath, splitCount, deleteFiles); + splits.insert(splits.end(), icebergSplits.begin(), icebergSplits.end()); } std::string duckdbSql = @@ -335,8 +343,9 @@ class HiveIcebergTest : public HiveConnectorTestBase { return vectors; } - std::shared_ptr makeIcebergSplit( + std::vector> makeIcebergSplits( const std::string& dataFilePath, + const uint32_t splitCount = 1, const std::vector& deleteFiles = {}) { std::unordered_map> partitionKeys; std::unordered_map customSplitInfo; @@ -346,17 +355,24 @@ class HiveIcebergTest : public HiveConnectorTestBase { ->openFileForRead(dataFilePath); const int64_t fileSize = file->size(); - return std::make_shared( - kHiveConnectorId, - dataFilePath, - fileFomat_, - 0, - fileSize, - partitionKeys, - std::nullopt, - customSplitInfo, - nullptr, - deleteFiles); + std::vector> splits; + const uint64_t splitSize = std::ceil((fileSize) / splitCount); + + for (int i = 0; i < splitCount; ++i) { + splits.emplace_back(std::make_shared( + kHiveConnectorId, + dataFilePath, + fileFomat_, + i * splitSize, + splitSize, + partitionKeys, + std::nullopt, + customSplitInfo, + nullptr, + deleteFiles)); + } + + return splits; } std::string getDuckDBQuery( @@ -492,20 +508,23 @@ class HiveIcebergTest : public HiveConnectorTestBase { TEST_F(HiveIcebergTest, singleBaseFileSinglePositionalDeleteFile) { folly::SingletonVault::singleton()->registrationComplete(); - assertSingleBaseFileSingleDeleteFile({{0, 1, 2, 3}}); - // Delete the first and last row in each batch (10000 rows per batch) - assertSingleBaseFileSingleDeleteFile({{0, 9999, 10000, 19999}}); - // Delete several rows in the second batch (10000 rows per batch) - assertSingleBaseFileSingleDeleteFile({{10000, 10002, 19999}}); - // Delete random rows - assertSingleBaseFileSingleDeleteFile({makeRandomIncreasingValues(0, 20000)}); - // Delete 0 rows - assertSingleBaseFileSingleDeleteFile({}); - // Delete all rows - assertSingleBaseFileSingleDeleteFile( - {makeContinuousIncreasingValues(0, 20000)}); - // Delete rows that don't exist - assertSingleBaseFileSingleDeleteFile({{20000, 29999}}); + for (uint32_t splitCount = 1; splitCount <= 3; ++splitCount) { + assertSingleBaseFileSingleDeleteFile({{0, 1, 2, 3}}, splitCount); + // Delete the first and last row in each batch (10000 rows per batch) + assertSingleBaseFileSingleDeleteFile({{0, 9999, 10000, 19999}}, splitCount); + // Delete several rows in the second batch (10000 rows per batch) + assertSingleBaseFileSingleDeleteFile({{10000, 10002, 19999}}, splitCount); + // Delete random rows + assertSingleBaseFileSingleDeleteFile( + {makeRandomIncreasingValues(0, 20000)}, splitCount); + // Delete 0 rows + assertSingleBaseFileSingleDeleteFile({}, splitCount); + // Delete all rows + assertSingleBaseFileSingleDeleteFile( + {makeContinuousIncreasingValues(0, 20000)}, splitCount); + // Delete rows that don't exist + assertSingleBaseFileSingleDeleteFile({{20000, 29999}}, splitCount); + } } /// This test creates 3 base data files, only the middle one has corresponding @@ -515,15 +534,17 @@ TEST_F(HiveIcebergTest, singleBaseFileSinglePositionalDeleteFile) { TEST_F(HiveIcebergTest, MultipleBaseFilesSinglePositionalDeleteFile) { folly::SingletonVault::singleton()->registrationComplete(); - assertMultipleBaseFileSingleDeleteFile({0, 1, 2, 3}); - assertMultipleBaseFileSingleDeleteFile({0, 9999, 10000, 19999}); - assertMultipleBaseFileSingleDeleteFile({10000, 10002, 19999}); - assertMultipleBaseFileSingleDeleteFile({10000, 10002, 19999}); - assertMultipleBaseFileSingleDeleteFile( - makeRandomIncreasingValues(0, rowCount)); - assertMultipleBaseFileSingleDeleteFile({}); - assertMultipleBaseFileSingleDeleteFile( - makeContinuousIncreasingValues(0, rowCount)); + for (uint32_t splitCount = 1; splitCount <= 3; ++splitCount) { + assertMultipleBaseFileSingleDeleteFile({0, 1, 2, 3}, splitCount); + assertMultipleBaseFileSingleDeleteFile({0, 9999, 10000, 19999}, splitCount); + assertMultipleBaseFileSingleDeleteFile({10000, 10002, 19999}, splitCount); + assertMultipleBaseFileSingleDeleteFile({10000, 10002, 19999}, splitCount); + assertMultipleBaseFileSingleDeleteFile( + makeRandomIncreasingValues(0, rowCount), splitCount); + assertMultipleBaseFileSingleDeleteFile({}, splitCount); + assertMultipleBaseFileSingleDeleteFile( + makeContinuousIncreasingValues(0, rowCount), splitCount); + } } /// This test creates one base data file/split with multiple delete files. The @@ -533,37 +554,45 @@ TEST_F(HiveIcebergTest, MultipleBaseFilesSinglePositionalDeleteFile) { TEST_F(HiveIcebergTest, singleBaseFileMultiplePositionalDeleteFiles) { folly::SingletonVault::singleton()->registrationComplete(); - // Delete row 0, 1, 2, 3 from the first batch out of two. - assertSingleBaseFileMultipleDeleteFiles({{1}, {2}, {3}, {4}}); - // Delete the first and last row in each batch (10000 rows per batch). - assertSingleBaseFileMultipleDeleteFiles({{0}, {9999}, {10000}, {19999}}); - - assertSingleBaseFileMultipleDeleteFiles({{500, 21000}}); - - assertSingleBaseFileMultipleDeleteFiles( - {makeRandomIncreasingValues(0, 10000), - makeRandomIncreasingValues(10000, 20000), - makeRandomIncreasingValues(5000, 15000)}); - - assertSingleBaseFileMultipleDeleteFiles( - {makeContinuousIncreasingValues(0, 10000), - makeContinuousIncreasingValues(10000, 20000)}); - - assertSingleBaseFileMultipleDeleteFiles( - {makeContinuousIncreasingValues(0, 10000), - makeContinuousIncreasingValues(10000, 20000), - makeRandomIncreasingValues(5000, 15000)}); - - assertSingleBaseFileMultipleDeleteFiles( - {makeContinuousIncreasingValues(0, 20000), - makeContinuousIncreasingValues(0, 20000)}); - - assertSingleBaseFileMultipleDeleteFiles( - {makeRandomIncreasingValues(0, 20000), - {}, - makeRandomIncreasingValues(5000, 15000)}); - - assertSingleBaseFileMultipleDeleteFiles({{}, {}}); + for (uint32_t splitCount = 1; splitCount <= 3; ++splitCount) { + // Delete row 0, 1, 2, 3 from the first batch out of two. + assertSingleBaseFileMultipleDeleteFiles({{1}, {2}, {3}, {4}}, splitCount); + // Delete the first and last row in each batch (10000 rows per batch). + assertSingleBaseFileMultipleDeleteFiles( + {{0}, {9999}, {10000}, {19999}}, splitCount); + + assertSingleBaseFileMultipleDeleteFiles({{500, 21000}}, splitCount); + + assertSingleBaseFileMultipleDeleteFiles( + {makeRandomIncreasingValues(0, 10000), + makeRandomIncreasingValues(10000, 20000), + makeRandomIncreasingValues(5000, 15000)}, + splitCount); + + assertSingleBaseFileMultipleDeleteFiles( + {makeContinuousIncreasingValues(0, 10000), + makeContinuousIncreasingValues(10000, 20000)}, + splitCount); + + assertSingleBaseFileMultipleDeleteFiles( + {makeContinuousIncreasingValues(0, 10000), + makeContinuousIncreasingValues(10000, 20000), + makeRandomIncreasingValues(5000, 15000)}, + splitCount); + + assertSingleBaseFileMultipleDeleteFiles( + {makeContinuousIncreasingValues(0, 20000), + makeContinuousIncreasingValues(0, 20000)}, + splitCount); + + assertSingleBaseFileMultipleDeleteFiles( + {makeRandomIncreasingValues(0, 20000), + {}, + makeRandomIncreasingValues(5000, 15000)}, + splitCount); + + assertSingleBaseFileMultipleDeleteFiles({{}, {}}, splitCount); + } } /// This test creates 2 base data files, and 1 or 2 delete files, with unaligned @@ -650,13 +679,20 @@ TEST_F(HiveIcebergTest, multipleBaseFileMultiplePositionalDeleteFiles) { TEST_F(HiveIcebergTest, positionalDeletesMultipleSplits) { folly::SingletonVault::singleton()->registrationComplete(); - assertMultipleSplits({1, 2, 3, 4}, 10, 5); - assertMultipleSplits({1, 2, 3, 4}, 10, 0); - assertMultipleSplits({1, 2, 3, 4}, 10, 10); - assertMultipleSplits({0, 9999, 10000, 19999}, 10, 3); - assertMultipleSplits(makeRandomIncreasingValues(0, 20000), 10, 3); - assertMultipleSplits(makeContinuousIncreasingValues(0, 20000), 10, 3); - assertMultipleSplits({}, 10, 3); + for (uint32_t splitCount = 1; splitCount <= 3; ++splitCount) { + assertMultipleSplits({1, 2, 3, 4}, 10, 5, splitCount); + assertMultipleSplits({1, 2, 3, 4}, 10, 0, splitCount); + assertMultipleSplits({1, 2, 3, 4}, 10, 10, splitCount); + assertMultipleSplits({0, 9999, 10000, 19999}, 10, 3, splitCount); + assertMultipleSplits( + makeRandomIncreasingValues(0, 20000), 10, 3, splitCount); + assertMultipleSplits( + makeContinuousIncreasingValues(0, 20000), 10, 3, splitCount); + assertMultipleSplits({}, 10, 3, splitCount); + + assertMultipleSplits( + makeContinuousIncreasingValues(0, 5000), 1, 3, splitCount); + } } } // namespace facebook::velox::connector::hive::iceberg