Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zml1206 committed Jan 8, 2025
1 parent 76d9b6d commit d74dc60
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1281,8 +1281,8 @@ object CollapseProject extends Rule[LogicalPlan] with AliasHelper {
} else {
true
}
// Alias and ExtractValue are very cheap.
case _: Alias | _: ExtractValue => e.children.forall(isCheap)
// Alias, ExtractValue and CreateNamedStruct are very cheap.
case _: Alias | _: ExtractValue | _: CreateNamedStruct => e.children.forall(isCheap)
case _ => false
}

Expand Down Expand Up @@ -1837,7 +1837,7 @@ object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with PredicateHelpe
case a: Alias if exprIdSet.contains(a.exprId) => a.toAttribute
case e => e
}
project.copy(child = Filter(newCondition, grandChild), projectList = newProjectList)
Project(newProjectList, Filter(newCondition, grandChild))

// We can push down deterministic predicate through Aggregate, including throwable predicate.
// If we can push down a filter through Aggregate, it means the filter only references the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ object RewriteWithExpression extends Rule[LogicalPlan] {
"Cannot rewrite canonicalized Common expression definitions")
}

if (CollapseProject.isCheap(child) ||
(originAlias.isEmpty && !commonExprIdSet.contains(id))) {
if (originAlias.isEmpty &&
(CollapseProject.isCheap(child) || !commonExprIdSet.contains(id))) {
refToExpr(id) = child
} else {
val childPlanIndex = inputPlans.indexWhere(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1557,30 +1557,30 @@ class FilterPushdownSuite extends PlanTest {
}

test("SPARK-50589: avoid extra expression duplication when push filter") {
// withSQLConf(SQLConf.USE_COMMON_EXPR_ID_FOR_ALIAS.key -> "false") {
// // through project
// val originalQuery1 = testRelation
// .select($"a" + $"b" as "add", $"a" - $"b" as "sub")
// .where($"add" < 10 && $"add" + $"add" > 10 && $"sub" > 0)
// .analyze
// val optimized1 = Optimize.execute(originalQuery1)
// comparePlans(optimized1, originalQuery1)
//
// // through aggregate
// val originalQuery2 = testRelation
// .groupBy($"a")($"a", $"a" + $"a" as "add", abs($"a") as "abs", count(1) as "ct")
// .where($"add" < 10 && $"add" + $"add" > 10 && $"abs" > 5)
// val optimized2 = Optimize.execute(originalQuery2.analyze)
// val correctAnswer2 = testRelation
// .select($"a", $"a" + $"a" as "_common_expr_0")
// .where($"_common_expr_0" < 10 &&
// $"_common_expr_0" + $"_common_expr_0" > 10 &&
// abs($"a") > 5)
// .select($"a")
// .groupBy($"a")($"a", $"a" + $"a" as "add", abs($"a") as "abs", count(1) as "ct")
// .analyze
// comparePlans(optimized2, correctAnswer2)
// }
withSQLConf(SQLConf.USE_COMMON_EXPR_ID_FOR_ALIAS.key -> "false") {
// through project
val originalQuery1 = testRelation
.select($"a" + $"b" as "add", $"a" - $"b" as "sub")
.where($"add" < 10 && $"add" + $"add" > 10 && $"sub" > 0)
.analyze
val optimized1 = Optimize.execute(originalQuery1)
comparePlans(optimized1, originalQuery1)

// through aggregate
val originalQuery2 = testRelation
.groupBy($"a")($"a", $"a" + $"a" as "add", abs($"a") as "abs", count(1) as "ct")
.where($"add" < 10 && $"add" + $"add" > 10 && $"abs" > 5)
val optimized2 = Optimize.execute(originalQuery2.analyze)
val correctAnswer2 = testRelation
.select($"a", $"a" + $"a" as "_common_expr_0")
.where($"_common_expr_0" < 10 &&
$"_common_expr_0" + $"_common_expr_0" > 10 &&
abs($"a") > 5)
.select($"a")
.groupBy($"a")($"a", $"a" + $"a" as "add", abs($"a") as "abs", count(1) as "ct")
.analyze
comparePlans(optimized2, correctAnswer2)
}
withSQLConf(SQLConf.USE_COMMON_EXPR_ID_FOR_ALIAS.key -> "false") {
// partial push down
val originalQuery3 = testRelation
Expand Down

0 comments on commit d74dc60

Please sign in to comment.