Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
EliminateOuterJoin,
PushPredicateThroughJoin,
PushDownPredicate,
PushDownLeftSemiAntiJoin,
LimitPushDown,
ColumnPruning,
InferFiltersFromConstraints,
Expand Down Expand Up @@ -1016,24 +1017,13 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
// This also applies to Aggregate.
case Filter(condition, project @ Project(fields, grandChild))
if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, condition) =>

// Create a map of Aliases to their values from the child projection.
// e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b).
val aliasMap = AttributeMap(fields.collect {
case a: Alias => (a.toAttribute, a.child)
})

val aliasMap = getAliasMap(project)
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))

case filter @ Filter(condition, aggregate: Aggregate)
if aggregate.aggregateExpressions.forall(_.deterministic)
&& aggregate.groupingExpressions.nonEmpty =>
// Find all the aliased expressions in the aggregate list that don't include any actual
// AggregateExpression, and create a map from the alias to the expression
val aliasMap = AttributeMap(aggregate.aggregateExpressions.collect {
case a: Alias if a.child.find(_.isInstanceOf[AggregateExpression]).isEmpty =>
(a.toAttribute, a.child)
})
val aliasMap = getAliasMap(aggregate)

// For each filter, expand the alias and check if the filter can be evaluated using
// attributes produced by the aggregate operator's child operator.
Expand Down Expand Up @@ -1131,7 +1121,23 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
}
}

private def canPushThrough(p: UnaryNode): Boolean = p match {
def getAliasMap(plan: Project): AttributeMap[Expression] = {
// Create a map of Aliases to their values from the child projection.
// e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b).
AttributeMap(plan.projectList.collect { case a: Alias => (a.toAttribute, a.child) })
}

def getAliasMap(plan: Aggregate): AttributeMap[Expression] = {
// Find all the aliased expressions in the aggregate list that don't include any actual
// AggregateExpression, and create a map from the alias to the expression
val aliasMap = plan.aggregateExpressions.collect {
case a: Alias if a.child.find(_.isInstanceOf[AggregateExpression]).isEmpty =>
(a.toAttribute, a.child)
}
AttributeMap(aliasMap)
}

def canPushThrough(p: UnaryNode): Boolean = p match {
// Note that some operators (e.g. project, aggregate, union) are being handled separately
// (earlier in this rule).
case _: AppendColumns => true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule

/**
* This rule is a variant of [[PushDownPredicate]] which can handle
* pushing down Left semi and Left Anti joins below the following operators.
* 1) Project
* 2) Window
* 3) Union
* 4) Aggregate
* 5) Other permissible unary operators. please see [[PushDownPredicate.canPushThrough]].
*/
object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// LeftSemi/LeftAnti over Project
case Join(p @ Project(pList, gChild), rightOp, LeftSemiOrAnti(joinType), joinCond, hint)
if pList.forall(_.deterministic) &&
!pList.exists(ScalarSubquery.hasCorrelatedScalarSubquery) &&
canPushThroughCondition(Seq(gChild), joinCond, rightOp) =>
if (joinCond.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for curiosity, does left anti/semi join always a condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Ha.. i had the same question a couple of days back. So i quickly tried :

select * from t1 left semi join t2

We end up getting all the rows from t1 (if i remember correctly).

Copy link
Contributor

@cloud-fan cloud-fan Mar 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for left-anti join, it returns no result.

Then it makes me think that, we should always pushdown the join if the condition is empty. For left semi join it's just a noop, and for left-anti join it helps a lot. You already did it in the rule, except https://github.com/apache/spark/pull/23750/files#diff-44d3a3f876bcf811fdbf71fce1f7072aR192

A new optimizer rule can be: we turn left-semi join to the left child if join condition is empty, and turn left-anti to empty relation if join condition is empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan When i click on the link it shows me show many diffs. Were you referring me to a few lines when you said "except" ?

2ndly, what can i say ? When i said i did try the left semi join on empty join conditions i wrote this in my notes :
"Explore if there any optimization opportunity when there are empty join condition. Is the join necessary.. need to study more" :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 2ndly, it's just an orthogonal optimizer rule, you are welcome to do it in another PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Sure wenchen. I will do it.

// No join condition, just push down the Join below Project
p.copy(child = Join(gChild, rightOp, joinType, joinCond, hint))
} else {
val aliasMap = PushDownPredicate.getAliasMap(p)
val newJoinCond = if (aliasMap.nonEmpty) {
Option(replaceAlias(joinCond.get, aliasMap))
} else {
joinCond
}
p.copy(child = Join(gChild, rightOp, joinType, newJoinCond, hint))
}

// LeftSemi/LeftAnti over Aggregate
case join @ Join(agg: Aggregate, rightOp, LeftSemiOrAnti(joinType), joinCond, hint)
if agg.aggregateExpressions.forall(_.deterministic) && agg.groupingExpressions.nonEmpty &&
!agg.aggregateExpressions.exists(ScalarSubquery.hasCorrelatedScalarSubquery) =>
if (joinCond.isEmpty) {
// No join condition, just push down Join below Aggregate
agg.copy(child = Join(agg.child, rightOp, joinType, joinCond, hint))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @maryannxue shall we keep the join hint when pushdown join through an operator?

} else {
val aliasMap = PushDownPredicate.getAliasMap(agg)

// For each join condition, expand the alias and check if the condition can be evaluated
// using attributes produced by the aggregate operator's child operator.
val (pushDown, stayUp) = splitConjunctivePredicates(joinCond.get).partition { cond =>
val replaced = replaceAlias(cond, aliasMap)
cond.references.nonEmpty &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left the same comment in the previous review though, I still have a question here....: Is it ok to push down non-deterministic exprs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu In my knowledge, join conditions cannot have non-deterministic expressions ? Its ensured in checkAnalysis.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I see. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we pushdown constant join conditions?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Mar 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Hmmn.. thats how the original logic was.. coming to think of it wenchen, wouldn't we have close to 0% chance of ever having a join conditions with constants only :-) ?

replaced.references.subsetOf(agg.child.outputSet ++ rightOp.outputSet)
}

// Check if the remaining predicates do not contain columns from the right
// hand side of the join. Since the remaining predicates will be kept
// as a filter over aggregate, this check is necessary after the left semi
// or left anti join is moved below aggregate. The reason is, for this kind
// of join, we only output from the left leg of the join.
val rightOpColumns = AttributeSet(stayUp.toSet).intersect(rightOp.outputSet)

if (pushDown.nonEmpty && rightOpColumns.isEmpty) {
val pushDownPredicate = pushDown.reduce(And)
val replaced = replaceAlias(pushDownPredicate, aliasMap)
val newAgg = agg.copy(child = Join(agg.child, rightOp, joinType, Option(replaced), hint))
// If there is no more filter to stay up, just return the Aggregate over Join.
// Otherwise, create "Filter(stayUp) <- Aggregate <- Join(pushDownPredicate)".
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left-anti join outputs records that do NOT satisfy the join condition. Let's say the join condition is a && b, and this rule turns Join(Aggregate, ..., a && b) to Filter(b, Aggregate(Join(..., ..., a))).

This seems problematic. Previously we get result satisfying Not(a && b), now we get Not(a) && b. @hvanhovell is this your concern?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that, we can't push down partial left-anti join

Copy link
Contributor Author

@dilipbiswal dilipbiswal Mar 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Thank you. I understand it better now.Let me test this out a bit and plan a follow-up.
@hvanhovell Thanks a lot for pointing this out.

if (stayUp.isEmpty) newAgg else Filter(stayUp.reduce(And), newAgg)
} else {
// The join condition is not a subset of the Aggregate's GROUP BY columns,
// no push down.
join
}
}

// LeftSemi/LeftAnti over Window
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

case join @ Join(w: Window, rightOp, LeftSemiOrAnti(joinType), joinCond, hint)
if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will Window.windowExpressions contain correlated subqueries?

e.g.

SELECT (SELECT min(k) FROM t2 WHERE t2.k = t1.k) min_t2 + max(k) over (...) FROM t1

Copy link
Contributor Author

@dilipbiswal dilipbiswal Mar 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan No.. windows expression can't contain correlated subqueries.
Edit: Actually i am not sure Wenchen. to the best of my knowledge only a few operators can host correlated subquery expressions. Project, Filter and Aggregate is the ones i know of.
Edit2:
project, filter and aggregate can have correlated scalar subqueries
only filter can have correlated in/exists subqueries.

if (joinCond.isEmpty) {
// No join condition, just push down Join below Window
w.copy(child = Join(w.child, rightOp, joinType, joinCond, hint))
} else {
val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references)) ++
rightOp.outputSet

val (pushDown, stayUp) = splitConjunctivePredicates(joinCond.get).partition { cond =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: Is it ok to push down non-deterministic exprs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu Please see my answer above.

cond.references.subsetOf(partitionAttrs)
}

// Check if the remaining predicates do not contain columns from the right
// hand side of the join. Since the remaining predicates will be kept
// as a filter over window, this check is necessary after the left semi
// or left anti join is moved below window. The reason is, for this kind
// of join, we only output from the left leg of the join.
val rightOpColumns = AttributeSet(stayUp.toSet).intersect(rightOp.outputSet)

if (pushDown.nonEmpty && rightOpColumns.isEmpty) {
val predicate = pushDown.reduce(And)
val newPlan = w.copy(child = Join(w.child, rightOp, joinType, Option(predicate), hint))
if (stayUp.isEmpty) newPlan else Filter(stayUp.reduce(And), newPlan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dilipbiswal this does hold with left anti joins? If a predicate is part of the condition then it means it should be filtered out right, and not retained?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Mar 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Thanks for reviewing. Can you please help illustrate the problem with an example ? So if the join was in filter form (in an subquery expression), we do push it down, right ? We don't distinguish between semi or anti joins ?

} else {
// The join condition is not a subset of the Window's PARTITION BY clause,
// no push down.
join
}
}

// LeftSemi/LeftAnti over Union
case join @ Join(union: Union, rightOp, LeftSemiOrAnti(joinType), joinCond, hint)
if canPushThroughCondition(union.children, joinCond, rightOp) =>
if (joinCond.isEmpty) {
// Push down the Join below Union
val newGrandChildren = union.children.map { Join(_, rightOp, joinType, joinCond, hint) }
union.withNewChildren(newGrandChildren)
} else {
val output = union.output
val newGrandChildren = union.children.map { grandchild =>
val newCond = joinCond.get transform {
case e if output.exists(_.semanticEquals(e)) =>
grandchild.output(output.indexWhere(_.semanticEquals(e)))
}
assert(newCond.references.subsetOf(grandchild.outputSet ++ rightOp.outputSet))
Join(grandchild, rightOp, joinType, Option(newCond), hint)
}
union.withNewChildren(newGrandChildren)
}

// LeftSemi/LeftAnti over UnaryNode
case join @ Join(u: UnaryNode, rightOp, LeftSemiOrAnti(joinType), joinCond, hint)
if PushDownPredicate.canPushThrough(u) && u.expressions.forall(_.deterministic) =>
pushDownJoin(join, u.child) { joinCond =>
u.withNewChildren(Seq(Join(u.child, rightOp, joinType, joinCond, hint)))
}
}

/**
* Check if we can safely push a join through a project or union by making sure that attributes
* referred in join condition do not contain the same attributes as the plan they are moved
* into. This can happen when both sides of join refers to the same source (self join). This
* function makes sure that the join condition refers to attributes that are not ambiguous (i.e
* present in both the legs of the join) or else the resultant plan will be invalid.
*/
private def canPushThroughCondition(
plans: Seq[LogicalPlan],
condition: Option[Expression],
rightOp: LogicalPlan): Boolean = {
val attributes = AttributeSet(plans.flatMap(_.output))
if (condition.isDefined) {
val matched = condition.get.references.intersect(rightOp.outputSet).intersect(attributes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be rightOp.outputSet.intersect(attributes).isEmpty? It's a self-join even if there is no join condition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, the self-join is problematic only if we can't rewrite join condition correctly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should mention it in the method doc, so that other reviewers won't get confused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I had this in the function prologue. Did you want it improved ?

This function makes sure that the join condition refers to attributes that are not 
ambiguous(i.e present in both the legs of the join) or else the resultant plan will be invalid.

matched.isEmpty
} else {
true
}
}


private def pushDownJoin(
join: Join,
grandchild: LogicalPlan)(insertJoin: Option[Expression] => LogicalPlan): LogicalPlan = {
if (join.condition.isEmpty) {
insertJoin(None)
} else {
val (pushDown, stayUp) = splitConjunctivePredicates(join.condition.get)
.partition {_.references.subsetOf(grandchild.outputSet ++ join.right.outputSet)}

val rightOpColumns = AttributeSet(stayUp.toSet).intersect(join.right.outputSet)
if (pushDown.nonEmpty && rightOpColumns.isEmpty) {
val newChild = insertJoin(Option(pushDown.reduceLeft(And)))
if (stayUp.nonEmpty) {
Filter(stayUp.reduceLeft(And), newChild)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, can we safely do this? What if the stayUp refers to the attributes from the right child?

Copy link
Contributor

@cloud-fan cloud-fan Mar 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to create a join instead of Filter here, if stayUp refers to the attributes from the right child

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we shouldn't do pushdown if stayUp refers to the attributes from the right child

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan You are right.. I missed this case.

} else {
newChild
}
} else {
join
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,10 @@ object LeftExistence {
case _ => None
}
}

object LeftSemiOrAnti {
def unapply(joinType: JoinType): Option[JoinType] = joinType match {
case LeftSemi | LeftAnti => Some(joinType)
case _ => None
}
}
Loading