Skip to content

Commit

Permalink
refactor(iceberg): Simplify positional delete read
Browse files Browse the repository at this point in the history
  • Loading branch information
nmahadevuni committed Mar 10, 2025
1 parent 98436ca commit 68e2034
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 48 deletions.
57 changes: 24 additions & 33 deletions velox/connectors/hive/iceberg/IcebergSplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ IcebergSplitReader::IcebergSplitReader(
scanSpec),
baseReadOffset_(0),
splitOffset_(0),
deleteBitmap_(nullptr),
deleteBitmapBitOffset_(0) {}
deleteBitmap_(nullptr) {}

void IcebergSplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
Expand Down Expand Up @@ -105,39 +104,32 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) {
mutation.randomSkip = baseReaderOpts_.randomSkip().get();
mutation.deletedRows = nullptr;

if (deleteBitmap_ && deleteBitmapBitOffset_ > 0) {
// There are unconsumed bits from last batch
if (deleteBitmapBitOffset_ < deleteBitmap_->size() * 8) {
bits::copyBits(
deleteBitmap_->as<uint64_t>(),
deleteBitmapBitOffset_,
deleteBitmap_->asMutable<uint64_t>(),
0,
deleteBitmap_->size() * 8 - deleteBitmapBitOffset_);

uint64_t newBitMapSizeInBytes =
deleteBitmap_->size() - deleteBitmapBitOffset_ / 8;
if (deleteBitmapBitOffset_ % 8 != 0) {
newBitMapSizeInBytes--;
}
deleteBitmap_->setSize(newBitMapSizeInBytes);
} else {
// All bits were consumed, reset to 0 for all bits
std::memset(
(void*)(deleteBitmap_->asMutable<int8_t>()),
0L,
deleteBitmap_->size());
}
if (deleteBitmap_) {
std::memset(
(void*)(deleteBitmap_->asMutable<int8_t>()), 0L, deleteBitmap_->size());
}

auto actualSize = baseRowReader_->nextReadSize(size);

if (actualSize == dwio::common::RowReader::kAtEnd) {
return 0;
}

if (!positionalDeleteFileReaders_.empty()) {
auto numBytes = bits::nbytes(size);
dwio::common::ensureCapacity<int8_t>(
deleteBitmap_, numBytes, connectorQueryCtx_->memoryPool(), true, true);
auto numBytes = bits::nbytes(actualSize);

if (!deleteBitmap_ || deleteBitmap_->size() < numBytes) {
dwio::common::ensureCapacity<int8_t>(
deleteBitmap_,
numBytes,
connectorQueryCtx_->memoryPool(),
false,
true);
}

for (auto iter = positionalDeleteFileReaders_.begin();
iter != positionalDeleteFileReaders_.end();) {
(*iter)->readDeletePositions(baseReadOffset_, size, deleteBitmap_);
(*iter)->readDeletePositions(baseReadOffset_, actualSize, deleteBitmap_);

if ((*iter)->noMoreData()) {
iter = positionalDeleteFileReaders_.erase(iter);
Expand All @@ -151,11 +143,10 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) {
? deleteBitmap_->as<uint64_t>()
: nullptr;

auto rowsScanned = baseRowReader_->next(size, output, &mutation);
baseReadOffset_ += rowsScanned;
deleteBitmapBitOffset_ = rowsScanned;
baseRowReader_->next(actualSize, output, &mutation);
baseReadOffset_ += actualSize;

return rowsScanned;
return actualSize;
}

} // namespace facebook::velox::connector::hive::iceberg
3 changes: 0 additions & 3 deletions velox/connectors/hive/iceberg/IcebergSplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,5 @@ class IcebergSplitReader : public SplitReader {
std::list<std::unique_ptr<PositionalDeleteFileReader>>
positionalDeleteFileReaders_;
BufferPtr deleteBitmap_;
// The offset in bits of the deleteBitmap_ starting from where the bits shall
// be consumed
uint64_t deleteBitmapBitOffset_;
};
} // namespace facebook::velox::connector::hive::iceberg
12 changes: 0 additions & 12 deletions velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,18 +247,6 @@ void PositionalDeleteFileReader::updateDeleteBitmap(
deletePositions[deletePositionsOffset_] - rowNumberLowerBound);
deletePositionsOffset_++;
}

// There might be multiple delete files for a single base file. The size of
// the deleteBitmapBuffer should be the largest position among all delte files
deleteBitmapBuffer->setSize(std::max(
static_cast<uint64_t>(deleteBitmapBuffer->size()),
deletePositionsOffset_ == 0 ||
(deletePositionsOffset_ < deletePositionsVector->size() &&
deletePositions[deletePositionsOffset_] >= rowNumberUpperBound)
? 0
: bits::nbytes(
deletePositions[deletePositionsOffset_ - 1] + 1 -
rowNumberLowerBound)));
}

bool PositionalDeleteFileReader::readFinishedForBatch(
Expand Down

0 comments on commit 68e2034

Please sign in to comment.