Skip to content

Commit

Permalink
fix: Absorb uncontrolled memory from hash table and PO destination (f…
Browse files Browse the repository at this point in the history
…acebookincubator#12582)

Summary:
Pull Request resolved: facebookincubator#12582

Use memory pool backed velox::raw_vector for hash table and partitioned output destinations.

Reviewed By: xiaoxmeng

Differential Revision: D70744564

fbshipit-source-id: 9968e89a9f0c1cead94034f22f9bd874318615ef
  • Loading branch information
Jialiang Tan authored and facebook-github-bot committed Mar 9, 2025
1 parent c550dab commit 98436ca
Show file tree
Hide file tree
Showing 16 changed files with 55 additions and 40 deletions.
6 changes: 3 additions & 3 deletions velox/common/memory/tests/MemoryCapExceededTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@ TEST_P(MemoryCapExceededTest, singleDriver) {
"Memory Pool[",
" AGGREGATE root[",
"] parent[null] MALLOC track-usage thread-safe]<max capacity 5.00MB "
"capacity 5.00MB used 3.71MB available 0B reservation [used 0B, reserved "
"capacity 5.00MB used 3.75MB available 0B reservation [used 0B, reserved "
"5.00MB, min 0B] counters [allocs 0, frees 0, reserves 0, releases 0, "
"collisions 0])>"};
std::vector<std::string> expectedDetailedTexts = {
"node.1 usage 12.00KB reserved 1.00MB peak 1.00MB",
"op.1.0.0.FilterProject usage 12.00KB reserved 1.00MB peak 12.00KB",
"node.2 usage 3.70MB reserved 4.00MB peak 4.00MB",
"op.2.0.0.Aggregation usage 3.70MB reserved 4.00MB peak 3.70MB",
"node.2 usage 3.74MB reserved 4.00MB peak 4.00MB",
"op.2.0.0.Aggregation usage 3.74MB reserved 4.00MB peak 3.74MB",
"Top 2 leaf memory pool usages:"};

std::vector<RowVectorPtr> data;
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ void GroupingSet::createHashTable() {
}
}

lookup_ = std::make_unique<HashLookup>(table_->hashers());
lookup_ = std::make_unique<HashLookup>(table_->hashers(), &pool_);
if (!isAdaptive_ && table_->hashMode() != BaseHashTable::HashMode::kHash) {
table_->forceGenericHashMode(BaseHashTable::kNoSpillInputStartPartitionBit);
}
Expand All @@ -427,7 +427,7 @@ void GroupingSet::initializeGlobalAggregation() {
return;
}

lookup_ = std::make_unique<HashLookup>(hashers_);
lookup_ = std::make_unique<HashLookup>(hashers_, &pool_);
lookup_->reset(1);

// Row layout is:
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ void HashProbe::initialize() {
}

VELOX_CHECK_NULL(lookup_);
lookup_ = std::make_unique<HashLookup>(hashers_);
lookup_ = std::make_unique<HashLookup>(hashers_, pool());
auto buildType = joinNode_->sources()[1]->outputType();
auto tableType = makeTableType(buildType.get(), joinNode_->rightKeys());
if (joinNode_->filter()) {
Expand Down
6 changes: 4 additions & 2 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ HashTable<ignoreNullKeys>::HashTable(
uint32_t minTableSizeForParallelJoinBuild,
memory::MemoryPool* pool)
: BaseHashTable(std::move(hashers)),
pool_(pool),
minTableSizeForParallelJoinBuild_(minTableSizeForParallelJoinBuild),
isJoinBuild_(isJoinBuild) {
isJoinBuild_(isJoinBuild),
buildPartitionBounds_(raw_vector<PartitionBoundIndexType>(pool)) {
std::vector<TypePtr> keys;
for (auto& hasher : hashers_) {
keys.push_back(hasher->type());
Expand Down Expand Up @@ -2041,7 +2043,7 @@ int32_t HashTable<false>::listNullKeyRows(
// Null-aware joins allow only one join key.
VELOX_CHECK_EQ(hashers_.size(), 1);
VELOX_CHECK_EQ(hashers_.size(), hashers.size());
HashLookup lookup(hashers);
HashLookup lookup(hashers, pool_);
if (hashMode_ == HashMode::kHash) {
lookup.hashes.push_back(VectorHasher::kNullHash);
} else {
Expand Down
12 changes: 10 additions & 2 deletions velox/exec/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,14 @@ struct TableInsertPartitionInfo {

/// Contains input and output parameters for groupProbe and joinProbe APIs.
struct HashLookup {
explicit HashLookup(const std::vector<std::unique_ptr<VectorHasher>>& h)
: hashers(h) {}
HashLookup(
const std::vector<std::unique_ptr<VectorHasher>>& h,
memory::MemoryPool* pool)
: hashers(h),
rows(raw_vector<vector_size_t>(pool)),
hashes(raw_vector<uint64_t>(pool)),
hits(raw_vector<char*>(pool)),
normalizedKeys(raw_vector<uint64_t>(pool)) {}

void reset(vector_size_t size) {
rows.resize(size);
Expand Down Expand Up @@ -1035,6 +1041,8 @@ class HashTable : public BaseHashTable {
// time and block driver threads.
void checkHashBitsOverlap(int8_t spillInputStartPartitionBit);

memory::MemoryPool* const pool_;

// The min table size in row to trigger parallel join table build.
const uint32_t minTableSizeForParallelJoinBuild_;

Expand Down
3 changes: 2 additions & 1 deletion velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ Destination::Destination(
serdeOptions_(serdeOptions),
pool_(pool),
eagerFlush_(eagerFlush),
recordEnqueued_(std::move(recordEnqueued)) {
recordEnqueued_(std::move(recordEnqueued)),
rows_(raw_vector<vector_size_t>(pool)) {
setTargetSizePct();
}

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/RowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ RowNumber::RowNumber(
false, // hasProbedFlag
0, // minTableSizeForParallelJoinBuild
pool());
lookup_ = std::make_unique<HashLookup>(table_->hashers());
lookup_ = std::make_unique<HashLookup>(table_->hashers(), pool());

const auto numRowsColumn = table_->rows()->columnAt(numKeys);
numRowsOffset_ = numRowsColumn.offset();
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/TopNRowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ TopNRowNumber::TopNRowNumber(
0, // minTableSizeForParallelJoinBuild
pool());
partitionOffset_ = table_->rows()->columnAt(numKeys).offset();
lookup_ = std::make_unique<HashLookup>(table_->hashers());
lookup_ = std::make_unique<HashLookup>(table_->hashers(), pool());
} else {
allocator_ = std::make_unique<HashStringAllocator>(pool());
singlePartition_ = std::make_unique<TopRows>(allocator_.get(), comparator_);
Expand Down
6 changes: 4 additions & 2 deletions velox/exec/benchmarks/HashJoinListResultBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,8 @@ class HashTableListJoinResultBenchmark : public VectorTestBase {

// Hash probe and list join result.
int64_t probeTableAndListResult() {
auto lookup = std::make_unique<HashLookup>(topTable_->hashers());
auto lookup =
std::make_unique<HashLookup>(topTable_->hashers(), pool_.get());
const auto numBatch = params_.probeSize / params_.hashTableSize;
const auto batchSize = params_.hashTableSize;
BufferPtr outputRowMapping;
Expand Down Expand Up @@ -476,7 +477,8 @@ class HashTableListJoinResultBenchmark : public VectorTestBase {
}

void eraseTable() {
auto lookup = std::make_unique<HashLookup>(topTable_->hashers());
auto lookup =
std::make_unique<HashLookup>(topTable_->hashers(), pool_.get());
auto batchSize = 10000;
auto mode = topTable_->hashMode();
BufferPtr outputRowMapping;
Expand Down
8 changes: 5 additions & 3 deletions velox/exec/benchmarks/HashTableBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,8 @@ class HashTableBenchmark : public VectorTestBase {
}

void testProbe() {
auto lookup = std::make_unique<HashLookup>(topTable_->hashers());
auto lookup =
std::make_unique<HashLookup>(topTable_->hashers(), pool_.get());
auto batchSize = batches_[0]->size();
SelectivityVector rows(batchSize);
auto mode = topTable_->hashMode();
Expand Down Expand Up @@ -493,7 +494,8 @@ class HashTableBenchmark : public VectorTestBase {

// Same as testProbe for normalized keys, uses F14Set instead.
void testF14Probe() {
auto lookup = std::make_unique<HashLookup>(topTable_->hashers());
auto lookup =
std::make_unique<HashLookup>(topTable_->hashers(), pool_.get());

auto batchSize = batches_[0]->size();
SelectivityVector rows(batchSize);
Expand Down Expand Up @@ -623,7 +625,7 @@ int main(int argc, char** argv) {
folly::Init init{&argc, &argv};
memory::MemoryManagerOptions options;
options.useMmapAllocator = true;
options.allocatorCapacity = 10UL << 30;
options.allocatorCapacity = 64UL << 30;
options.useMmapArena = true;
options.mmapArenaCapacityRatio = 1;
memory::MemoryManager::initialize(options);
Expand Down
9 changes: 5 additions & 4 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1248,7 +1248,7 @@ TEST_F(AggregationTest, memoryAllocations) {
// Verify memory allocations. Aggregation should make 2 allocations: 1 for the
// RowContainer holding single accumulator and 1 for the result.
auto planStats = toPlanStats(task->taskStats());
ASSERT_EQ(2, planStats.at(aggNodeId).numMemoryAllocations);
ASSERT_EQ(5, planStats.at(aggNodeId).numMemoryAllocations);

plan = PlanBuilder()
.values(data)
Expand All @@ -1264,7 +1264,7 @@ TEST_F(AggregationTest, memoryAllocations) {
// hash table, 1 for the RowContainer holding accumulators, 2 for results (1
// for values of the grouping key column, 1 for sum column).
planStats = toPlanStats(task->taskStats());
ASSERT_EQ(4, planStats.at(aggNodeId).numMemoryAllocations);
ASSERT_EQ(7, planStats.at(aggNodeId).numMemoryAllocations);
}

TEST_F(AggregationTest, groupingSets) {
Expand Down Expand Up @@ -2446,8 +2446,9 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) {
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
reclaimerStats_.reset();
// We expect all the memory has been freed from the hash table.
ASSERT_EQ(op->pool()->usedBytes(), 0);
// We expect all the memory has been freed from the hash table, except for
// the ones used by raw_vector.
ASSERT_EQ(op->pool()->usedBytes(), 28672);
} else {
{
memory::ScopedMemoryArbitrationContext ctx(op->pool());
Expand Down
9 changes: 4 additions & 5 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ DEBUG_ONLY_TEST_P(MultiThreadedHashJoinTest, filterSpillOnFirstProbeInput) {
return;
}
testingRunArbitration(op->pool());
ASSERT_EQ(op->pool()->usedBytes(), 0);
ASSERT_EQ(op->pool()->reservedBytes(), 0);
ASSERT_EQ(op->pool()->usedBytes(), 40960);
ASSERT_EQ(op->pool()->reservedBytes(), 1048576);
}));

HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get())
Expand Down Expand Up @@ -7020,7 +7020,6 @@ DEBUG_ONLY_TEST_F(HashJoinTest, probeSpillOnWaitForPeers) {
ASSERT_EQ(opStats.at("HashBuild").spilledBytes, 0);

const auto* arbitrator = memory::memoryManager()->arbitrator();
ASSERT_GT(arbitrator->stats().numRequests, 0);
ASSERT_GT(arbitrator->stats().reclaimedUsedBytes, 0);
}
waitForAllTasksToBeDeleted();
Expand Down Expand Up @@ -8067,8 +8066,8 @@ DEBUG_ONLY_TEST_F(HashJoinTest, probeReclaimedMemoryReport) {
ASSERT_GT(reclaimedBytes, 0);
ASSERT_EQ(nodeMemoryUsage - nodePool->reservedBytes(), reclaimedBytes);
}
// Verify all the memory has been freed.
ASSERT_EQ(nodePool->reservedBytes(), 0);
// Verify all the memory has been freed, except for the ones for hash lookup.
ASSERT_EQ(nodePool->reservedBytes(), 1048576);

driverWaitFlag = false;
driverWait.notifyAll();
Expand Down
18 changes: 9 additions & 9 deletions velox/exec/tests/HashTableTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class HashTableTest : public testing::TestWithParam<bool>,
int32_t sequence = 0;
std::vector<RowVectorPtr> batches;
auto table = createHashTableForAggregation(tableType, numKeys);
auto lookup = std::make_unique<HashLookup>(table->hashers());
auto lookup = std::make_unique<HashLookup>(table->hashers(), pool());
std::vector<char*> allInserted;
int32_t numErased = 0;
// We insert 1000 and delete 500.
Expand Down Expand Up @@ -455,7 +455,7 @@ class HashTableTest : public testing::TestWithParam<bool>,
}

void testProbe() {
auto lookup = std::make_unique<HashLookup>(topTable_->hashers());
auto lookup = std::make_unique<HashLookup>(topTable_->hashers(), pool());
const auto batchSize = batches_[0]->size();
SelectivityVector rows(batchSize);
const auto mode = topTable_->hashMode();
Expand Down Expand Up @@ -679,7 +679,7 @@ TEST_P(HashTableTest, clearAfterInsert) {
}
for (const bool clearTable : {false, true}) {
const auto table = createHashTableForAggregation(rowType, numKeys);
auto lookup = std::make_unique<HashLookup>(table->hashers());
auto lookup = std::make_unique<HashLookup>(table->hashers(), pool());
for (const auto& batch : inputBatches) {
lookup->reset(batch->size());
insertGroups(*batch, *lookup, *table);
Expand All @@ -704,7 +704,7 @@ TEST_P(HashTableTest, bestWithReserveOverflow) {
ROW({"a", "b", "c", "d"}, {BIGINT(), BIGINT(), BIGINT(), BIGINT()});
const auto numKeys = 4;
auto table = createHashTableForAggregation(rowType, numKeys);
auto lookup = std::make_unique<HashLookup>(table->hashers());
auto lookup = std::make_unique<HashLookup>(table->hashers(), pool());

// Make sure rangesWithReserve overflows.
// Ranges for keys are: 200K, 200K, 200K, 100K.
Expand Down Expand Up @@ -765,7 +765,7 @@ TEST_P(HashTableTest, bestWithReserveOverflow) {
TEST_P(HashTableTest, enableRangeWhereCan) {
auto rowType = ROW({"a", "b", "c"}, {BIGINT(), VARCHAR(), VARCHAR()});
auto table = createHashTableForAggregation(rowType, 3);
auto lookup = std::make_unique<HashLookup>(table->hashers());
auto lookup = std::make_unique<HashLookup>(table->hashers(), pool());

// Generate 3 keys with the following ranges and number of distinct values
// (ndv):
Expand Down Expand Up @@ -804,7 +804,7 @@ TEST_P(HashTableTest, enableRangeWhereCan) {

TEST_P(HashTableTest, arrayProbeNormalizedKey) {
auto table = createHashTableForAggregation(ROW({"a"}, {BIGINT()}), 1);
auto lookup = std::make_unique<HashLookup>(table->hashers());
auto lookup = std::make_unique<HashLookup>(table->hashers(), pool());

for (auto i = 0; i < 200; ++i) {
auto data = makeRowVector({
Expand Down Expand Up @@ -887,7 +887,7 @@ TEST_P(HashTableTest, listJoinResultsSize) {
outputRowsBuf.resize(kNumRows);
auto outputRows = folly::Range(outputRowsBuf.data(), kNumRows);

HashLookup lookup(table->hashers());
HashLookup lookup(table->hashers(), pool());
lookup.rows.reserve(kNumRows);
lookup.hits.reserve(kNumRows);
for (auto i = 0; i < kNumRows; i++) {
Expand Down Expand Up @@ -968,7 +968,7 @@ TEST_P(HashTableTest, groupBySpill) {
TEST_P(HashTableTest, checkSizeValidation) {
auto rowType = ROW({"a"}, {BIGINT()});
auto table = createHashTableForAggregation(rowType, 1);
auto lookup = std::make_unique<HashLookup>(table->hashers());
auto lookup = std::make_unique<HashLookup>(table->hashers(), pool());
auto testHelper = HashTableTestHelper<false>::create(table.get());

// The initial set hash mode with table size of 256K entries.
Expand Down Expand Up @@ -1105,7 +1105,7 @@ TEST_P(HashTableTest, offsetOverflowLoadTags) {
auto rowType = ROW({"a"}, {BIGINT()});
auto table = createHashTableForAggregation(rowType, rowType->size());
table->hashMode();
auto lookup = std::make_unique<HashLookup>(table->hashers());
auto lookup = std::make_unique<HashLookup>(table->hashers(), pool());
auto batchSize = 1 << 25;
for (auto i = 0; i < 64; ++i) {
std::vector<RowVectorPtr> batches;
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/RowNumberTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ DEBUG_ONLY_TEST_F(RowNumberTest, spillOnlyDuringInputOrOutput) {

testingRunArbitration(op->pool(), 0);
// We expect all the memory to be freed after the spill.
ASSERT_EQ(op->pool()->usedBytes(), 0);
ASSERT_EQ(op->pool()->usedBytes(), 40960);
})));

core::PlanNodeId rowNumberPlanNodeId;
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/tests/TopNRowNumberTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,8 @@ DEBUG_ONLY_TEST_F(TopNRowNumberTest, memoryUsageCheckAfterReclaim) {
return;
}
testingRunArbitration(op->pool());
ASSERT_EQ(op->pool()->usedBytes(), 0);
ASSERT_EQ(op->pool()->reservedBytes(), 0);
ASSERT_EQ(op->pool()->usedBytes(), 20480);
ASSERT_EQ(op->pool()->reservedBytes(), 1048576);
})));

const vector_size_t size = 10'000;
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/utils/TestIndexStorageConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ TestIndexSource::lookup(const LookupRequest& request) {
checkNotFailed();
const auto numInputRows = request.input->size();
auto& hashTable = tableHandle_->indexTable()->table;
auto lookup = std::make_unique<HashLookup>(hashTable->hashers());
auto lookup = std::make_unique<HashLookup>(hashTable->hashers(), pool_.get());
SelectivityVector activeRows(numInputRows);
VELOX_CHECK(activeRows.isAllSelected());
hashTable->prepareForJoinProbe(
Expand Down

0 comments on commit 98436ca

Please sign in to comment.