Skip to content

Commit

Permalink
update comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zml1206 committed Jan 16, 2025
1 parent 6f369c9 commit b0e07cb
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2038,7 +2038,7 @@ object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with PredicateHelpe
}

/**
* Use [[With]] to rewrite condition which contains attribute that are not cheap.
* Use [[With]] to rewrite condition which contains non-cheap common expression.
*/
private def rewriteConditionByWith(
cond: Expression,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, PROJECT}

/**
* This rule occurs after Push Down Predicates and rewrite with. For the Project generated by With,
* it is judged whether to push down the predicate based on the cost.
* Using With, we can perfectly push the filter along with the project, but if we actually create
* the project in the end, there may be a risk of regression, because some rules cannot match the
* filter using PhysicalOperation. This rule is added after rewrite With, which uses the cost model
* to determine whether to push the predicate down through the project generated by With.
*/
object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelper {
override def apply(plan: LogicalPlan): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ object RewriteWithExpression extends Rule[LogicalPlan] {
if (commonExprs.isEmpty) {
inputPlan
} else {
Project(inputPlan.output ++ commonExprs.map(_._1).sortWith(_.exprId.id < _.exprId.id),
inputPlan)
// Sort is required to ensure idempotence.
Project(inputPlan.output ++ commonExprs.map(_._1).sortBy(_.exprId.id), inputPlan)
}
}
newPlan = newPlan.withNewChildren(newChildren)
Expand Down Expand Up @@ -150,8 +150,8 @@ object RewriteWithExpression extends Rule[LogicalPlan] {
} else if (originalAttr.nonEmpty &&
inputPlans.head.output.contains(originalAttr.get.toAttribute)) {
// originAttr only exists in Project or Filter. If the child already contains this
// attribute, extend it.
refToExpr(id) = originalAttr.get.toAttribute
// attribute, extend it to avoid duplication.
refToExpr(id) = originalAttr.get
} else {
val commonExprs = commonExprsPerChild(childPlanIndex)
val existingCommonExpr = commonExprs.find(_._2 == id.id)
Expand Down

0 comments on commit b0e07cb

Please sign in to comment.