From 4adec182144e23d7c7d6422e0090d5b59eb32b86 Mon Sep 17 00:00:00 2001 From: Jiaqi Zhang Date: Fri, 21 Feb 2025 00:41:10 -0800 Subject: [PATCH] fix: Fix grouping key reordering during spilling (#12395) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/12395 When prefix sort is enabled, we sort the grouping keys to maximize the prefixsort benefit as introduced in https://github.com/facebookincubator/velox/pull/11720. However, when spilling happens and when reading the spilled data, there are key order mismatch between the spilled data and the operator output. It can cause segmentation fault when there is RowType mismatch or other type mismatch failures. This PR fixes by adding a spill data loader (`spillResultWitoutAggregates_`) which has the reordered grouping keys, loading the spilled data, and mapping the keys back to result after loading. Reviewed By: xiaoxmeng Differential Revision: D69860326 fbshipit-source-id: ca1aa35cfe08124709a72cf4eeddb0b2b3731102 --- velox/exec/GroupingSet.cpp | 47 ++++++++++++++++++++++++++-- velox/exec/GroupingSet.h | 15 +++++++++ velox/exec/tests/AggregationTest.cpp | 37 +++++++++++++--------- 3 files changed, 82 insertions(+), 17 deletions(-) diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index eea5c48df7d0..cc4055e484e1 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -1191,6 +1191,46 @@ bool GroupingSet::mergeNextWithAggregates( VELOX_UNREACHABLE(); } +void GroupingSet::prepareSpillResultWithoutAggregates( + int32_t maxOutputRows, + const RowVectorPtr& result) { + const auto numColumns = result->type()->size(); + if (spillResultWithoutAggregates_ == nullptr) { + std::vector names(numColumns); + VELOX_CHECK_EQ(table_->rows()->keyTypes().size(), numColumns); + std::vector types{table_->rows()->keyTypes()}; + + const auto& resultType = dynamic_cast(result->type().get()); + for (auto i = 0; i < numColumns; ++i) { + names[groupingKeyOutputProjections_[i]] = resultType->nameOf(i); + } + spillResultWithoutAggregates_ = BaseVector::create( + std::make_shared(std::move(names), std::move(types)), + maxOutputRows, + &pool_); + } else { + VectorPtr spillResultWithoutAggregates = + std::move(spillResultWithoutAggregates_); + BaseVector::prepareForReuse(spillResultWithoutAggregates, maxOutputRows); + spillResultWithoutAggregates_ = + std::static_pointer_cast(spillResultWithoutAggregates); + } + + VELOX_CHECK_NOT_NULL(spillResultWithoutAggregates_); + for (auto i = 0; i < numColumns; ++i) { + spillResultWithoutAggregates_->childAt(groupingKeyOutputProjections_[i]) = + std::move(result->childAt(i)); + } +} + +void GroupingSet::projectResult(const RowVectorPtr& result) { + for (auto i = 0; i < result->type()->size(); ++i) { + result->childAt(i) = std::move(spillResultWithoutAggregates_->childAt( + groupingKeyOutputProjections_[i])); + } + result->resize(spillResultWithoutAggregates_->size()); +} + bool GroupingSet::mergeNextWithoutAggregates( int32_t maxOutputRows, const RowVectorPtr& result) { @@ -1215,6 +1255,8 @@ bool GroupingSet::mergeNextWithoutAggregates( // less than 'numDistinctSpillFilesPerPartition_'. bool newDistinct{true}; int32_t numOutputRows{0}; + prepareSpillResultWithoutAggregates(maxOutputRows, result); + while (numOutputRows < maxOutputRows) { const auto next = merge_->nextWithEquals(); auto* stream = next.first; @@ -1239,13 +1281,14 @@ bool GroupingSet::mergeNextWithoutAggregates( } if (newDistinct) { // Yield result for new distinct. - result->copy( + spillResultWithoutAggregates_->copy( &stream->current(), numOutputRows++, stream->currentIndex(), 1); } stream->pop(); newDistinct = true; } - result->resize(numOutputRows); + spillResultWithoutAggregates_->resize(numOutputRows); + projectResult(result); return numOutputRows > 0; } diff --git a/velox/exec/GroupingSet.h b/velox/exec/GroupingSet.h index f91543ce96da..75402a563804 100644 --- a/velox/exec/GroupingSet.h +++ b/velox/exec/GroupingSet.h @@ -189,6 +189,15 @@ class GroupingSet { // index for this aggregation), otherwise it returns reference to activeRows_. const SelectivityVector& getSelectivityVector(size_t aggregateIndex) const; + // Prepare spillResultWithoutAggregates_ for loading spilled data. + void prepareSpillResultWithoutAggregates( + int32_t maxOutputRows, + const RowVectorPtr& result); + + // If prefixsort is enabled, loads the read data from + // spillResultWithoutAggregates_ into result. + void projectResult(const RowVectorPtr& result); + // Checks if input will fit in the existing memory and increases reservation // if not. If reservation cannot be increased, spills enough to make 'input' // fit. @@ -336,6 +345,12 @@ class GroupingSet { // First row in remainingInput_ that needs to be processed. vector_size_t firstRemainingRow_; + // In case of distinct aggregation without aggregates and the grouping key + // reordered, the spilled data is first loaded into + // 'spillResultWithoutAggregates_' and then reordered back and load to + // result. + RowVectorPtr spillResultWithoutAggregates_{nullptr}; + // The value of mayPushdown flag specified in addInput() for the // 'remainingInput_'. bool remainingMayPushdown_; diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index b19a0a222ec0..0581e41d2273 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -963,8 +963,13 @@ TEST_F(AggregationTest, partialDistinctWithAbandon) { } TEST_F(AggregationTest, distinctWithGroupingKeysReordered) { - rowType_ = ROW( - {"c0", "c1", "c2", "c3"}, {BIGINT(), INTEGER(), VARCHAR(), VARCHAR()}); + rowType_ = + ROW({"c0", "c1", "c2", "c3", "c4"}, + {BIGINT(), + VARCHAR(), + INTEGER(), + ROW({"a0", "a1", "a2"}, {VARCHAR(), BOOLEAN(), BIGINT()}), + BOOLEAN()}); const int vectorSize = 2'000; VectorFuzzer::Options options; @@ -983,19 +988,21 @@ TEST_F(AggregationTest, distinctWithGroupingKeysReordered) { // Distinct aggregation with grouping key with larger prefix encoded size // first. auto spillDirectory = exec::test::TempDirectoryPath::create(); - auto task = AssertQueryBuilder(duckDbQueryRunner_) - .config(QueryConfig::kAbandonPartialAggregationMinRows, 100) - .config(QueryConfig::kAbandonPartialAggregationMinPct, 50) - .spillDirectory(spillDirectory->getPath()) - .config(QueryConfig::kSpillEnabled, true) - .config(QueryConfig::kAggregationSpillEnabled, true) - .config(QueryConfig::kSpillPrefixSortEnabled, true) - .maxDrivers(1) - .plan(PlanBuilder() - .values(vectors) - .singleAggregation({"c2", "c0"}, {}) - .planNode()) - .assertResults("SELECT distinct c2, c0 FROM tmp"); + TestScopedSpillInjection scopedSpillInjection(100); + auto task = + AssertQueryBuilder(duckDbQueryRunner_) + .config(QueryConfig::kAbandonPartialAggregationMinRows, 100) + .config(QueryConfig::kAbandonPartialAggregationMinPct, 50) + .spillDirectory(spillDirectory->getPath()) + .config(QueryConfig::kSpillEnabled, true) + .config(QueryConfig::kAggregationSpillEnabled, true) + .config(QueryConfig::kSpillPrefixSortEnabled, true) + .maxDrivers(1) + .plan(PlanBuilder() + .values(vectors) + .singleAggregation({"c4", "c1", "c3", "c2", "c0"}, {}) + .planNode()) + .assertResults("SELECT distinct c4, c1, c3, c2, c0 FROM tmp"); } TEST_F(AggregationTest, largeValueRangeArray) {