Skip to content

Commit de6ede1

Browse files
committed
optimize Subquery
1 parent 9683b68 commit de6ede1

File tree

1 file changed

+30
-30
lines changed
  • sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer

1 file changed

+30
-30
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

+30-30
Original file line numberDiff line numberDiff line change
@@ -1820,8 +1820,7 @@ object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with PredicateHelpe
18201820
case Filter(condition, project @ Project(fields, grandChild))
18211821
if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, condition) =>
18221822
val aliasMap = getAliasMap(project)
1823-
val replacedByWith = rewriteConditionByWith(condition, aliasMap)
1824-
project.copy(child = Filter(replaceAlias(replacedByWith, aliasMap), grandChild))
1823+
project.copy(child = Filter(rewriteCondition(condition, aliasMap), grandChild))
18251824

18261825
// We can push down deterministic predicate through Aggregate, including throwable predicate.
18271826
// If we can push down a filter through Aggregate, it means the filter only references the
@@ -1841,8 +1840,7 @@ object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with PredicateHelpe
18411840
}
18421841

18431842
if (pushDown.nonEmpty) {
1844-
val replacedByWith = rewriteConditionByWith(pushDown, aliasMap)
1845-
val replaced = replaceAlias(replacedByWith.reduce(And), aliasMap)
1843+
val replaced = rewriteCondition(pushDown.reduce(And), aliasMap)
18461844
val newAggregate = aggregate.copy(child = Filter(replaced, aggregate.child))
18471845
// If there is no more filter to stay up, just eliminate the filter.
18481846
// Otherwise, create "Filter(stayUp) <- Aggregate <- Filter(pushDownPredicate)".
@@ -1989,51 +1987,53 @@ object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with PredicateHelpe
19891987
}
19901988
}
19911989

1990+
private def rewriteCondition(
1991+
cond: Expression,
1992+
aliasMap: AttributeMap[Alias]): Expression = {
1993+
replaceAlias(rewriteConditionByWith(cond, aliasMap), aliasMap)
1994+
}
1995+
19921996
/**
19931997
* Use [[With]] to rewrite condition which contains attribute that are not cheap and be consumed
19941998
* multiple times. Each predicate generates one or 0 With. For facilitates subsequent merge
19951999
* [[With]], use the same CommonExpressionDef ids for different [[With]].
19962000
*/
19972001
private def rewriteConditionByWith(
1998-
cond: Seq[Expression],
1999-
aliasMap: AttributeMap[Alias]): Seq[Expression] = {
2002+
cond: Expression,
2003+
aliasMap: AttributeMap[Alias]): Expression = {
20002004
if (!SQLConf.get.getConf(SQLConf.ALWAYS_INLINE_COMMON_EXPR)) {
2001-
val canRewriteCond = cond.filter(canRewriteByWith)
2002-
if (canRewriteCond.nonEmpty) {
2003-
val replaceWithMap = canRewriteCond
2004-
.flatMap(_.collect {case a: Attribute => a })
2005-
.groupBy(identity)
2006-
.transform((_, v) => v.size)
2007-
.filter(m => aliasMap.contains(m._1) && m._2 > 1)
2008-
.map(m => m._1 -> trimAliases(aliasMap.getOrElse(m._1, m._1)))
2009-
.filter(m => !CollapseProject.isCheap(m._2))
2005+
// SubqueryExpression can't contain common expression ref, replace alias for it first.
2006+
val newCond = replaceAliasForSubqueryExpression(cond, aliasMap)
2007+
val replaceWithMap = newCond.collect {case a: Attribute => a }
2008+
.groupBy(identity)
2009+
.transform((_, v) => v.size)
2010+
.filter(m => aliasMap.contains(m._1) && m._2 > 1)
2011+
.map(m => m._1 -> trimAliases(aliasMap.getOrElse(m._1, m._1)))
2012+
.filter(m => !CollapseProject.isCheap(m._2))
2013+
if (replaceWithMap.isEmpty) {
2014+
newCond
2015+
} else {
20102016
val defsMap = AttributeMap(replaceWithMap.map(m => m._1 -> CommonExpressionDef(m._2)))
20112017
val refsMap = AttributeMap(defsMap.map(m => m._1 -> new CommonExpressionRef(m._2)))
2012-
cond.map(rewriteByWith(_, defsMap, refsMap))
2013-
} else cond
2018+
splitConjunctivePredicates(newCond)
2019+
.map(rewriteByWith(_, defsMap, refsMap))
2020+
.reduce(And)
2021+
}
20142022
} else cond
20152023
}
20162024

2017-
private def rewriteConditionByWith(
2018-
cond: Expression,
2025+
private def replaceAliasForSubqueryExpression(
2026+
expr: Expression,
20192027
aliasMap: AttributeMap[Alias]): Expression = {
2020-
if (!SQLConf.get.getConf(SQLConf.ALWAYS_INLINE_COMMON_EXPR)) {
2021-
rewriteConditionByWith(splitConjunctivePredicates(cond), aliasMap).reduce(And)
2022-
} else cond
2023-
}
2024-
2025-
// With does not support inline subquery
2026-
private def canRewriteByWith(expr: Expression): Boolean = {
2027-
!expr.containsPattern(PLAN_EXPRESSION)
2028+
expr.transform {
2029+
case s: SubqueryExpression => replaceAlias(s, aliasMap)
2030+
}
20282031
}
20292032

20302033
private def rewriteByWith(
20312034
expr: Expression,
20322035
defsMap: AttributeMap[CommonExpressionDef],
20332036
refsMap: AttributeMap[CommonExpressionRef]): Expression = {
2034-
if (!canRewriteByWith(expr)) {
2035-
return expr
2036-
}
20372037
val defs = mutable.HashSet.empty[CommonExpressionDef]
20382038
val replaced = expr.transform {
20392039
case a: Attribute if refsMap.contains(a) =>

0 commit comments

Comments
 (0)