diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index 05fdf6c5f51e..abdb5e5944c2 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -832,7 +832,7 @@ void HashProbe::fillOutput(vector_size_t size) { for (auto [in, out] : projectedInputColumns_) { // Load input vector if it is being split into multiple batches. It is not // safe to wrap unloaded LazyVector into two different dictionaries. - ensureLoadedIfNotAtEnd(in); + ensureLoaded(in); auto inputChild = input_->childAt(in); output_->childAt(out) = wrapChild(size, outputRowMapping_, inputChild); } diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 7349e7a5574b..e5f642694751 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -3458,6 +3458,89 @@ TEST_F(HashJoinTest, lazyVectorPartiallyLoadedInFilterLeftSemiFilter) { "SELECT t.c1, t.c2 FROM t WHERE c0 IN (SELECT u.c0 FROM u WHERE t.c0 = u.c0 AND NOT (t.c1 < 15 AND t.c2 >= 0))"); } +TEST_F(HashJoinTest, lazyVectorLoadedBeforeWrapping) { + const int32_t numSplits = 10; + const int32_t numRowsProbe = 333; + const int32_t numRowsBuild = 100; + + std::vector probeVectors; + probeVectors.reserve(numSplits); + + std::vector> tempFiles; + for (int32_t i = 0; i < numSplits; ++i) { + auto rowVector = makeRowVector({ + makeFlatVector( + numRowsProbe, [&](auto row) { return row - i * 10; }), + makeFlatVector(numRowsProbe, [](auto row) { return row; }), + }); + probeVectors.push_back(rowVector); + tempFiles.push_back(TempFilePath::create()); + writeToFile(tempFiles.back()->getPath(), rowVector); + } + auto makeInputSplits = [&](const core::PlanNodeId& nodeId) { + return [&] { + std::vector probeSplits; + for (auto& file : tempFiles) { + probeSplits.push_back( + exec::Split(makeHiveConnectorSplit(file->getPath()))); + } + SplitInput splits; + splits.emplace(nodeId, probeSplits); + return splits; + }; + }; + + // 100 key values in [35, 233] range. + std::vector buildVectors; + for (int i = 0; i < 5; ++i) { + buildVectors.push_back(makeRowVector({ + makeFlatVector( + numRowsBuild / 5, + [i](auto row) { return 35 + 2 * (row + i * numRowsBuild / 5); }), + makeFlatVector(numRowsBuild / 5, [](auto row) { return row; }), + })); + } + std::vector keyOnlyBuildVectors; + for (int i = 0; i < 5; ++i) { + keyOnlyBuildVectors.push_back( + makeRowVector({makeFlatVector(numRowsBuild / 5, [i](auto row) { + return 35 + 2 * (row + i * numRowsBuild / 5); + })})); + } + + createDuckDbTable("t", probeVectors); + createDuckDbTable("u", buildVectors); + + auto probeType = ROW({"c0", "c1"}, {INTEGER(), BIGINT()}); + + auto planNodeIdGenerator = std::make_shared(); + + auto buildSide = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values(buildVectors) + .project({"c0 AS u_c0"}) + .planNode(); + + { + core::PlanNodeId probeScanId; + core::PlanNodeId joinId; + auto op = + PlanBuilder(planNodeIdGenerator, pool_.get()) + .tableScan(probeType) + .capturePlanNodeId(probeScanId) + .project( + {"cast(c0 + 1 as integer) AS t_key", "c1 AS c2", "c1 AS c3"}) + .hashJoin({"t_key"}, {"u_c0"}, buildSide, "", {"c2", "c3"}) + .planNode(); + + HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) + .planNode(std::move(op)) + .makeInputSplits(makeInputSplits(probeScanId)) + .referenceQuery( + "SELECT t1.c2, t1.c3 FROM (SELECT c0, c1 AS c2, c1 AS c3 from t) t1, u WHERE (t1.c0 + 1) = u.c0") + .run(); + } +} + TEST_F(HashJoinTest, dynamicFilters) { const int32_t numSplits = 10; const int32_t numRowsProbe = 333; diff --git a/velox/exec/tests/PrintPlanWithStatsTest.cpp b/velox/exec/tests/PrintPlanWithStatsTest.cpp index bbb4e929264c..a8af79ea2927 100644 --- a/velox/exec/tests/PrintPlanWithStatsTest.cpp +++ b/velox/exec/tests/PrintPlanWithStatsTest.cpp @@ -153,9 +153,6 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) { printPlanWithStats(*op, task->taskStats(), true), {{"-- Project\\[4\\]\\[expressions: \\(c0:INTEGER, ROW\\[\"c0\"\\]\\), \\(p1:BIGINT, plus\\(ROW\\[\"c1\"\\],1\\)\\), \\(p2:BIGINT, plus\\(ROW\\[\"c1\"\\],ROW\\[\"u_c1\"\\]\\)\\)\\] -> c0:INTEGER, p1:BIGINT, p2:BIGINT"}, {" Output: 2000 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+)"}, - {" dataSourceLazyCpuNanos[ ]* sum: .+, count: .+, min: .+, max: .+"}, - {" dataSourceLazyInputBytes[ ]* sum: .+, count: .+, min: .+, max: .+"}, - {" dataSourceLazyWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningGetOutputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, @@ -179,6 +176,9 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) { true}, {" blockedWaitForJoinBuildWallNanos\\s+sum: .+, count: 1, min: .+, max: .+", true}, + {" dataSourceLazyCpuNanos[ ]* sum: .+, count: .+, min: .+, max: .+"}, + {" dataSourceLazyInputBytes[ ]* sum: .+, count: .+, min: .+, max: .+"}, + {" dataSourceLazyWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" dynamicFiltersProduced\\s+sum: 1, count: 1, min: 1, max: 1"}, {" queuedWallNanos\\s+sum: .+, count: 1, min: .+, max: .+", true}, // This line may or may not appear depending on how the threads