From 68e2034670afd38837d786bc8a0c132d69e4c122 Mon Sep 17 00:00:00 2001 From: Mahadevuni Naveen Kumar Date: Mon, 10 Mar 2025 22:21:34 +0530 Subject: [PATCH] refactor(iceberg): Simplify positional delete read --- .../hive/iceberg/IcebergSplitReader.cpp | 57 ++++++++----------- .../hive/iceberg/IcebergSplitReader.h | 3 - .../iceberg/PositionalDeleteFileReader.cpp | 12 ---- 3 files changed, 24 insertions(+), 48 deletions(-) diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index b8694eaef3c2..c3e00af7ab8d 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -51,8 +51,7 @@ IcebergSplitReader::IcebergSplitReader( scanSpec), baseReadOffset_(0), splitOffset_(0), - deleteBitmap_(nullptr), - deleteBitmapBitOffset_(0) {} + deleteBitmap_(nullptr) {} void IcebergSplitReader::prepareSplit( std::shared_ptr metadataFilter, @@ -105,39 +104,32 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { mutation.randomSkip = baseReaderOpts_.randomSkip().get(); mutation.deletedRows = nullptr; - if (deleteBitmap_ && deleteBitmapBitOffset_ > 0) { - // There are unconsumed bits from last batch - if (deleteBitmapBitOffset_ < deleteBitmap_->size() * 8) { - bits::copyBits( - deleteBitmap_->as(), - deleteBitmapBitOffset_, - deleteBitmap_->asMutable(), - 0, - deleteBitmap_->size() * 8 - deleteBitmapBitOffset_); - - uint64_t newBitMapSizeInBytes = - deleteBitmap_->size() - deleteBitmapBitOffset_ / 8; - if (deleteBitmapBitOffset_ % 8 != 0) { - newBitMapSizeInBytes--; - } - deleteBitmap_->setSize(newBitMapSizeInBytes); - } else { - // All bits were consumed, reset to 0 for all bits - std::memset( - (void*)(deleteBitmap_->asMutable()), - 0L, - deleteBitmap_->size()); - } + if (deleteBitmap_) { + std::memset( + (void*)(deleteBitmap_->asMutable()), 0L, deleteBitmap_->size()); + } + + auto actualSize = baseRowReader_->nextReadSize(size); + + if (actualSize == dwio::common::RowReader::kAtEnd) { + return 0; } if (!positionalDeleteFileReaders_.empty()) { - auto numBytes = bits::nbytes(size); - dwio::common::ensureCapacity( - deleteBitmap_, numBytes, connectorQueryCtx_->memoryPool(), true, true); + auto numBytes = bits::nbytes(actualSize); + + if (!deleteBitmap_ || deleteBitmap_->size() < numBytes) { + dwio::common::ensureCapacity( + deleteBitmap_, + numBytes, + connectorQueryCtx_->memoryPool(), + false, + true); + } for (auto iter = positionalDeleteFileReaders_.begin(); iter != positionalDeleteFileReaders_.end();) { - (*iter)->readDeletePositions(baseReadOffset_, size, deleteBitmap_); + (*iter)->readDeletePositions(baseReadOffset_, actualSize, deleteBitmap_); if ((*iter)->noMoreData()) { iter = positionalDeleteFileReaders_.erase(iter); @@ -151,11 +143,10 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { ? deleteBitmap_->as() : nullptr; - auto rowsScanned = baseRowReader_->next(size, output, &mutation); - baseReadOffset_ += rowsScanned; - deleteBitmapBitOffset_ = rowsScanned; + baseRowReader_->next(actualSize, output, &mutation); + baseReadOffset_ += actualSize; - return rowsScanned; + return actualSize; } } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h index 72f1f8eca31c..795912159b96 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.h +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -57,8 +57,5 @@ class IcebergSplitReader : public SplitReader { std::list> positionalDeleteFileReaders_; BufferPtr deleteBitmap_; - // The offset in bits of the deleteBitmap_ starting from where the bits shall - // be consumed - uint64_t deleteBitmapBitOffset_; }; } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp b/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp index 9cfbbb62400a..4af6b0063bbd 100644 --- a/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp +++ b/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp @@ -247,18 +247,6 @@ void PositionalDeleteFileReader::updateDeleteBitmap( deletePositions[deletePositionsOffset_] - rowNumberLowerBound); deletePositionsOffset_++; } - - // There might be multiple delete files for a single base file. The size of - // the deleteBitmapBuffer should be the largest position among all delte files - deleteBitmapBuffer->setSize(std::max( - static_cast(deleteBitmapBuffer->size()), - deletePositionsOffset_ == 0 || - (deletePositionsOffset_ < deletePositionsVector->size() && - deletePositions[deletePositionsOffset_] >= rowNumberUpperBound) - ? 0 - : bits::nbytes( - deletePositions[deletePositionsOffset_ - 1] + 1 - - rowNumberLowerBound))); } bool PositionalDeleteFileReader::readFinishedForBatch(