Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: HashProbe load LazyVector before wrapping #12563

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ void HashProbe::fillOutput(vector_size_t size) {
for (auto [in, out] : projectedInputColumns_) {
// Load input vector if it is being split into multiple batches. It is not
// safe to wrap unloaded LazyVector into two different dictionaries.
ensureLoadedIfNotAtEnd(in);
ensureLoaded(in);
auto inputChild = input_->childAt(in);
output_->childAt(out) = wrapChild(size, outputRowMapping_, inputChild);
}
Expand Down
83 changes: 83 additions & 0 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3458,6 +3458,89 @@ TEST_F(HashJoinTest, lazyVectorPartiallyLoadedInFilterLeftSemiFilter) {
"SELECT t.c1, t.c2 FROM t WHERE c0 IN (SELECT u.c0 FROM u WHERE t.c0 = u.c0 AND NOT (t.c1 < 15 AND t.c2 >= 0))");
}

TEST_F(HashJoinTest, lazyVectorLoadedBeforeWrapping) {
const int32_t numSplits = 10;
const int32_t numRowsProbe = 333;
const int32_t numRowsBuild = 100;

std::vector<RowVectorPtr> probeVectors;
probeVectors.reserve(numSplits);

std::vector<std::shared_ptr<TempFilePath>> tempFiles;
for (int32_t i = 0; i < numSplits; ++i) {
auto rowVector = makeRowVector({
makeFlatVector<int32_t>(
numRowsProbe, [&](auto row) { return row - i * 10; }),
makeFlatVector<int64_t>(numRowsProbe, [](auto row) { return row; }),
});
probeVectors.push_back(rowVector);
tempFiles.push_back(TempFilePath::create());
writeToFile(tempFiles.back()->getPath(), rowVector);
}
auto makeInputSplits = [&](const core::PlanNodeId& nodeId) {
return [&] {
std::vector<exec::Split> probeSplits;
for (auto& file : tempFiles) {
probeSplits.push_back(
exec::Split(makeHiveConnectorSplit(file->getPath())));
}
SplitInput splits;
splits.emplace(nodeId, probeSplits);
return splits;
};
};

// 100 key values in [35, 233] range.
std::vector<RowVectorPtr> buildVectors;
for (int i = 0; i < 5; ++i) {
buildVectors.push_back(makeRowVector({
makeFlatVector<int32_t>(
numRowsBuild / 5,
[i](auto row) { return 35 + 2 * (row + i * numRowsBuild / 5); }),
makeFlatVector<int64_t>(numRowsBuild / 5, [](auto row) { return row; }),
}));
}
std::vector<RowVectorPtr> keyOnlyBuildVectors;
for (int i = 0; i < 5; ++i) {
keyOnlyBuildVectors.push_back(
makeRowVector({makeFlatVector<int32_t>(numRowsBuild / 5, [i](auto row) {
return 35 + 2 * (row + i * numRowsBuild / 5);
})}));
}

createDuckDbTable("t", probeVectors);
createDuckDbTable("u", buildVectors);

auto probeType = ROW({"c0", "c1"}, {INTEGER(), BIGINT()});

auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();

auto buildSide = PlanBuilder(planNodeIdGenerator, pool_.get())
.values(buildVectors)
.project({"c0 AS u_c0"})
.planNode();

{
core::PlanNodeId probeScanId;
core::PlanNodeId joinId;
auto op =
PlanBuilder(planNodeIdGenerator, pool_.get())
.tableScan(probeType)
.capturePlanNodeId(probeScanId)
.project(
{"cast(c0 + 1 as integer) AS t_key", "c1 AS c2", "c1 AS c3"})
.hashJoin({"t_key"}, {"u_c0"}, buildSide, "", {"c2", "c3"})
.planNode();

HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get())
.planNode(std::move(op))
.makeInputSplits(makeInputSplits(probeScanId))
.referenceQuery(
"SELECT t1.c2, t1.c3 FROM (SELECT c0, c1 AS c2, c1 AS c3 from t) t1, u WHERE (t1.c0 + 1) = u.c0")
.run();
}
}

TEST_F(HashJoinTest, dynamicFilters) {
const int32_t numSplits = 10;
const int32_t numRowsProbe = 333;
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/tests/PrintPlanWithStatsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,6 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) {
printPlanWithStats(*op, task->taskStats(), true),
{{"-- Project\\[4\\]\\[expressions: \\(c0:INTEGER, ROW\\[\"c0\"\\]\\), \\(p1:BIGINT, plus\\(ROW\\[\"c1\"\\],1\\)\\), \\(p2:BIGINT, plus\\(ROW\\[\"c1\"\\],ROW\\[\"u_c1\"\\]\\)\\)\\] -> c0:INTEGER, p1:BIGINT, p2:BIGINT"},
{" Output: 2000 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+)"},
{" dataSourceLazyCpuNanos[ ]* sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyInputBytes[ ]* sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"},
{" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"},
{" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"},
{" runningGetOutputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"},
Expand All @@ -179,6 +176,9 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) {
true},
{" blockedWaitForJoinBuildWallNanos\\s+sum: .+, count: 1, min: .+, max: .+",
true},
{" dataSourceLazyCpuNanos[ ]* sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyInputBytes[ ]* sum: .+, count: .+, min: .+, max: .+"},
{" dataSourceLazyWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"},
{" dynamicFiltersProduced\\s+sum: 1, count: 1, min: 1, max: 1"},
{" queuedWallNanos\\s+sum: .+, count: 1, min: .+, max: .+",
true}, // This line may or may not appear depending on how the threads
Expand Down
Loading