Skip to content

Conversation

@dilipbiswal
Copy link
Contributor

@dilipbiswal dilipbiswal commented Feb 9, 2019

What changes were proposed in this pull request?

This PR adds support for pushing down LeftSemi and LeftAnti joins below operators such as Project, Aggregate, Window, Union etc. This is the initial piece of work that will be needed for
the subsequent work of moving the subquery rewrites to the beginning of optimization phase.

The larger PR is here . This PR addresses the comment at link.

How was this patch tested?

Added a new test suite LeftSemiAntiJoinPushDownSuite.

@dilipbiswal dilipbiswal changed the title [SPARK-19712] Pushing down Left Semi and Left Anti joins [SPARK-19712][SQL] Pushing down Left Semi and Left Anti joins Feb 9, 2019
@dilipbiswal dilipbiswal changed the title [SPARK-19712][SQL] Pushing down Left Semi and Left Anti joins [SPARK-19712][SQL] Pushing Left Semi and Left Anti joins through Project, Aggregate, Window, Union etc. Feb 9, 2019
@SparkQA
Copy link

SparkQA commented Feb 9, 2019

Test build #102109 has finished for PR 23750 at commit f819ced.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dilipbiswal
Copy link
Contributor Author

cc @cloud-fan @gatorsmile

@dilipbiswal
Copy link
Contributor Author

gentle ping @cloud-fan

Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PushDownLeftSemiAntiJoin and PushDownPredicate have a lot of the similar parts, so could you brush up code to share these logics as much as possible?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s not used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about removing this method, and then just writing like this in Optimizer.scala#1197?

      if pList.forall(_.deterministic) && !pList.find(hasScalarSubquery(_)).isDefined &&

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this empty line

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this rule is so big, how about moving this into a separate file?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

join and p not used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code and comment are duplicate in PushDownPredicate, so can we share it between them?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we share the code and comment with PushDownPredicate?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this empty line

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to consider deterministic here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu Thanks for reviewing. I have addressed your comments. Please look through it when you get a chance. Thanks.

@SparkQA
Copy link

SparkQA commented Feb 16, 2019

Test build #102410 has finished for PR 23750 at commit edfe3d7.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dilipbiswal
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Feb 16, 2019

Test build #102411 has finished for PR 23750 at commit edfe3d7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dilipbiswal dilipbiswal force-pushed the SPARK-19712-pushleftsemi branch from edfe3d7 to 488eda8 Compare February 16, 2019 20:13
@SparkQA
Copy link

SparkQA commented Feb 17, 2019

Test build #102421 has finished for PR 23750 at commit 488eda8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


private def canPushThrough(p: UnaryNode): Boolean = p match {

def getAliasMap(plan: LogicalPlan): AttributeMap[Expression] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: How about splitting this function into pieces like getAliasMap(p: Project) and getAliasMap(agg: Aggregate)? I a bit worry that other developers wrongly use this function like getAliasMap(non-project/aggregate plan) and this throws an unmatched exception....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu Good idea. Thanks.

* 4) Aggregate
* 5) Other permissible unary operators. please see [[PushDownPredicate.canPushThrough]].
*/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove this line

// LeftSemi/LeftAnti over Project
case Join(p @ Project(pList, gChild), rightOp, LeftSemiOrAnti(joinType), joinCond, hint)
if pList.forall(_.deterministic) &&
!pList.find(ScalarSubquery.hasScalarSubquery(_)).isDefined &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!pList.exists(ScalarSubquery.hasScalarSubquery)?


object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Similar to the above Filter over Project
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Update the comment

p.copy(child = Join(gChild, rightOp, joinType, newJoinCond, hint))
}

// Similar to the above Filter over Aggregate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

// attributes produced by the aggregate operator's child operator.
val (pushDown, stayUp) = splitConjunctivePredicates(joinCond.get).partition { cond =>
val replaced = replaceAlias(cond, aliasMap)
cond.references.nonEmpty &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left the same comment in the previous review though, I still have a question here....: Is it ok to push down non-deterministic exprs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu In my knowledge, join conditions cannot have non-deterministic expressions ? Its ensured in checkAnalysis.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I see. Thanks!

}

// Similar to the above Filter over Window
// LeftSemi/LeftAnti over Window
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references)) ++
rightOp.outputSet

val (pushDown, stayUp) = splitConjunctivePredicates(joinCond.get).partition { cond =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: Is it ok to push down non-deterministic exprs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu Please see my answer above.

val newGrandChildren = union.children.map { Join(_, rightOp, joinType, joinCond, hint) }
union.withNewChildren(newGrandChildren)
} else {
val pushDown = splitConjunctivePredicates(joinCond.get)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: Is it ok to push down non-deterministic exprs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu Please see my answer above.

// Similar to the above Filter over UnaryNode
// LeftSemi/LeftAnti over UnaryNode
case join @ Join(u: UnaryNode, rightOp, LeftSemiOrAnti(joinType), joinCond, hint)
if PushDownPredicate.canPushThrough(u) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to check u.expressions.forall(_.deterministic) here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu In my understanding, the operators that can host non-deterministic expressions are Project, Filter, Aggregate and Window. These are already handled in cases above. So my thinking is that we don't strictly need a check here. But i think, keeping future in mind, in case this assumption changes, we should have a check. I will add the check.

@dilipbiswal
Copy link
Contributor Author

retest this please

// LeftSemi/LeftAnti over Project
case Join(p @ Project(pList, gChild), rightOp, LeftSemiOrAnti(joinType), joinCond, hint)
if pList.forall(_.deterministic) &&
!pList.exists(ScalarSubquery.hasScalarSubquery)&&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will remove this line after we finish refactoring the subquery rewrite, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan We would keep this after the refactoring. Currently , except is planned using anti join. Here is the test that exhibits the problem.

SELECT (SELECT min(k) FROM t2 WHERE t2.k = t1.k) min_t2 FROM t1
MINUS
SELECT (SELECT min(k) FROM t2) abs_min_t2 FROM t1 WHERE  t1.k = 'one'

After the except operator is replaced .. the plan is :

GlobalLimit 21
+- LocalLimit 21
   +- Project [cast(min_t2#245 as string) AS min_t2#254]
      +- Distinct
         +- Join LeftAnti, (min_t2#245 <=> abs_min_t2#247)
            :- Project [scalar-subquery#244 [(k#242 = k#240)] AS min_t2#245]
            :  :  +- Aggregate [k#242], [min(k#242) AS min(k)#249, k#242]
            :  :     +- Project [k#242]
            :  :        +- LocalRelation [k#242, v#243]
            :  +- Project [k#240, v#241]
            :     +- LocalRelation [k#240, v#241]
            +- Project [scalar-subquery#246 [] AS abs_min_t2#247]
               :  +- Aggregate [min(k#242) AS min(k)#251]
               :     +- Project [k#242]
               :        +- LocalRelation [k#242, v#243]
               +- Filter (k#240 = one)
                  +- Project [k#240, v#241]
                     +- LocalRelation [k#240, v#241]

Here we are not pushing down the leftanti operator below project.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we convert all correlated subqueries to joins before we go to the main optimizer batch?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Feb 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan We will be moving the RewritePredicateSubquery which does the work for converting IN and EXISTS subqueries to semi/anti joins. However, Scalar subquery are treated differently in the sense that its handled by a different rule (RewriteCorrelatedScalarSubquery) and are planned using Left outer joins. As part of this work, i wasn't planning on changing Scalar subquery code. Also one thing to note is that, the rule to rewrite scalar subquery is already in the default batch and is run as a fixedPoint.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah sorry I misread the code. Yea scalar subquery will still be there.

Then why can't we push left anti join through project with scalar subqurey? scalr subquery is similar to a literal which doesn't depend on anything from the child plan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan So letting join pass through scalar subquery ends up with plan like following :

   +- Project [scalar-subquery#244 [(k#242 = k#240)] AS min_t2#245]
            :  +- Aggregate [k#242], [min(k#242) AS min(k)#249, k#242]
            :     +- Project [k#242]
            :        +- LocalRelation [k#242, v#243]
            +- Project [k#240, v#241]
               +- Join LeftAnti, (scalar-subquery#244 [(k#242 = k#240)] <=> abs_min_t2#247)
                  :  +- Aggregate [k#242], [min(k#242) AS min(k)#249, k#242]
                  :     +- Project [k#242]
                  :        +- LocalRelation [k#242, v#243]
                  :- LocalRelation [k#240, v#241]
                  +- Project [scalar-subquery#246 [] AS abs_min_t2#247]
                     :  +- Aggregate [min(k#242) AS min(k)#251]
                     :     +- Project [k#242]
                     :        +- LocalRelation [k#242, v#243]
                     +- Project [k#240, v#241]
                        +- Filter (k#240 = one)
                           +- LocalRelation [k#240, v#241]

And things go totally wrong :-). firstly, join ends up hosting a scalar-sub expression which we don't allow (Project, Aggregate, Filter are the only ones that allow). We get a TreeNodeException..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan One thing i forgot Wenchen is that, we will be introducing a new rule that pushes down LeftSemi and LeftAnti joins below Join. Once we have that, even though we will skip scalar subqueries in this rule, once they are changed to left outer join after RewriteCorrelatedScalarSubquery, we will be able to push LeftSemi and Left anti joins when applicable. So i think we will get a good plan eventually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RewriteCorrelatedScalarSubquery only touches correlated scalar subquery, what about non-correlated scalar subquery? I think it's fine to have non-correlated scalar subquery in the project list, we can still pushdown left anti/semi joins.

Copy link
Contributor Author

@dilipbiswal dilipbiswal Feb 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Yeah.. We should be able to pushdown in the non-correlated case. If it is okay with you, i would like to study this and take it as a follow-up. The reason is, at the moment, i don't know if we are generating an optimal plan in this case. Here is the plan before and after for your reference.
Before :

Join LeftSemi, (sum#7L = cast(d#3 as bigint))
:- Project [scalar-subquery#6 [] AS sum#7L]
:  :  +- Aggregate [b#1], [sum(cast(c#2 as bigint)) AS sum#5L]
:  :     +- LocalRelation <empty>, [a#0, b#1, c#2]
:  +- LocalRelation <empty>, [a#0, b#1, c#2]
+- LocalRelation <empty>, [d#3]

After the pushdown

Project [scalar-subquery#6 [] AS sum#7L]
:  +- Aggregate [b#1], [sum(cast(c#2 as bigint)) AS sum#5L]
:     +- LocalRelation <empty>, [a#0, b#1, c#2]
+- Join LeftSemi, (scalar-subquery#6 [] = cast(d#3 as bigint))
   :  +- Aggregate [b#1], [sum(cast(c#2 as bigint)) AS sum#5L]
   :     +- LocalRelation <empty>, [a#0, b#1, c#2]
   :- LocalRelation <empty>, [a#0, b#1, c#2]
   +- LocalRelation <empty>, [d#3]

I wanted to go a little defensive in the first pass. If the plan looks okay to you then
i can make the change. Please let me know.

*/
private def canPushThroughCondition(plans: Seq[LogicalPlan], condition: Option[Expression],
rightOp: LogicalPlan): Boolean = {
val attributes = AttributeSet(plans.flatMap (_.output))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no space between flatMap and (

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (condition.isDefined) {
val matched = condition.get.references.intersect(rightOp.outputSet).intersect(attributes)
matched.isEmpty
} else true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

else {
  true
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/**
* Check if we can safely push a join through a project or union by making sure that predicate
* subqueries in the condition do not contain the same attributes as the plan they are moved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we mention subquery here while the code below doesn't deal with subquery at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan will change.


// LeftSemi/LeftAnti over Aggregate
case join @ Join(agg: Aggregate, rightOp, LeftSemiOrAnti(joinType), joinCond, hint)
if agg.aggregateExpressions.forall(_.deterministic) && agg.groupingExpressions.nonEmpty =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will aggregateExpressions contain subquery?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Yeah.. aggregate expressions can host scalar subqueries. In and Exists can be hosted in only filter. I am thinking, if i need to stop the pushdown if scalar sub queries just like we do for project. Let me test this some more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you have a conclusion now?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Feb 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Yeah.. we do need the same check for aggregate expressions as well. I have already added the check.

import org.apache.spark.sql.catalyst.rules.Rule

/**
* Pushes Left semi and Left Anti joins below the following operators.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should mention that this rule is a variant of PushDownPredicate, which can pushdown letft semi/anti joins

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan good idea. I will mention it.

val df = df1.join(broadcast(df2), $"key" === $"key2", "leftsemi")
testSparkPlanMetrics(df, 2, Map(
0L -> (("BroadcastHashJoin", Map(
1L -> (("BroadcastHashJoin", Map(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change? what's changed in the plan?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Feb 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan In this case, we pushdown the leftsemi join below project.
plan before:

== Optimized Logical Plan ==
Join LeftSemi, (key#247 = key2#258), rightHint=(broadcast)
:- Project [_1#242 AS key#247, _2#243 AS value#248]
:  +- LocalRelation [_1#242, _2#243]
+- Project [_1#253 AS key2#258]
   +- LocalRelation [_1#253, _2#254]

plan after:

== Optimized Logical Plan ==
Project [_1#242 AS key#247, _2#243 AS value#248]
+- Join LeftSemi, (_1#242 = key2#258), rightHint=(broadcast)
   :- LocalRelation [_1#242, _2#243]
   +- Project [_1#253 AS key2#258]
      +- LocalRelation [_1#253, _2#254]

@SparkQA
Copy link

SparkQA commented Feb 19, 2019

Test build #102503 has finished for PR 23750 at commit ea76e29.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 20, 2019

Test build #102521 has finished for PR 23750 at commit 43e9eef.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dilipbiswal
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Feb 20, 2019

Test build #102526 has finished for PR 23750 at commit 43e9eef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

replaced.references.subsetOf(agg.child.outputSet ++ rightOp.outputSet)
}

// Check if the remaining predicates do not contain columns from subquery
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't mention subquery here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also explain what can go wrong if remaining predicates contain columns from right side.

cond.references.subsetOf(partitionAttrs)
}

// Check if the remaining predicates do not contain columns from subquery
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

// using attributes produced by the aggregate operator's child operator.
val (pushDown, stayUp) = splitConjunctivePredicates(joinCond.get).partition { cond =>
val replaced = replaceAlias(cond, aliasMap)
cond.references.nonEmpty &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we pushdown constant join conditions?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Mar 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Hmmn.. thats how the original logic was.. coming to think of it wenchen, wouldn't we have close to 0% chance of ever having a join conditions with constants only :-) ?


// LeftSemi/LeftAnti over Window
case join @ Join(w: Window, rightOp, LeftSemiOrAnti(joinType), joinCond, hint)
if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will Window.windowExpressions contain correlated subqueries?

e.g.

SELECT (SELECT min(k) FROM t2 WHERE t2.k = t1.k) min_t2 + max(k) over (...) FROM t1

Copy link
Contributor Author

@dilipbiswal dilipbiswal Mar 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan No.. windows expression can't contain correlated subqueries.
Edit: Actually i am not sure Wenchen. to the best of my knowledge only a few operators can host correlated subquery expressions. Project, Filter and Aggregate is the ones i know of.
Edit2:
project, filter and aggregate can have correlated scalar subqueries
only filter can have correlated in/exists subqueries.

} else {
val pushDown = splitConjunctivePredicates(joinCond.get)

if (pushDown.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how could pushDown be empty?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's just val pushDown = splitConjunctivePredicates(joinCond.get)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan You r right. I will change.

if pList.forall(_.deterministic) &&
!pList.exists(ScalarSubquery.hasCorrelatedScalarSubquery) &&
canPushThroughCondition(Seq(gChild), joinCond, rightOp) =>
if (joinCond.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for curiosity, does left anti/semi join always a condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Ha.. i had the same question a couple of days back. So i quickly tried :

select * from t1 left semi join t2

We end up getting all the rows from t1 (if i remember correctly).

Copy link
Contributor

@cloud-fan cloud-fan Mar 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for left-anti join, it returns no result.

Then it makes me think that, we should always pushdown the join if the condition is empty. For left semi join it's just a noop, and for left-anti join it helps a lot. You already did it in the rule, except https://github.com/apache/spark/pull/23750/files#diff-44d3a3f876bcf811fdbf71fce1f7072aR192

A new optimizer rule can be: we turn left-semi join to the left child if join condition is empty, and turn left-anti to empty relation if join condition is empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan When i click on the link it shows me show many diffs. Were you referring me to a few lines when you said "except" ?

2ndly, what can i say ? When i said i did try the left semi join on empty join conditions i wrote this in my notes :
"Explore if there any optimization opportunity when there are empty join condition. Is the join necessary.. need to study more" :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 2ndly, it's just an orthogonal optimizer rule, you are welcome to do it in another PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Sure wenchen. I will do it.

(Nil, Nil)
}

if (pushDown.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't match the other cases, that we always push down the join if join condition is empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Can you please explain a bit ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I will fix it wenchen.

@SparkQA
Copy link

SparkQA commented Mar 1, 2019

Test build #102921 has finished for PR 23750 at commit 76e7203.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


private def pushDownJoin(
join: Join,
grandchild: LogicalPlan)(insertFilter: Expression => LogicalPlan): LogicalPlan = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: insertFilter -> insertJoin. Expression => LogicalPlan -> Option[Expression] => LogicalPlan

if (pushDown.nonEmpty) {
val newChild = insertFilter(pushDown.reduceLeft(And))
if (stayUp.nonEmpty) {
Filter(stayUp.reduceLeft(And), newChild)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, can we safely do this? What if the stayUp refers to the attributes from the right child?

Copy link
Contributor

@cloud-fan cloud-fan Mar 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to create a join instead of Filter here, if stayUp refers to the attributes from the right child

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we shouldn't do pushdown if stayUp refers to the attributes from the right child

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan You are right.. I missed this case.

.join(testRelation1, joinType = LeftSemi, condition = Some('b === 'd && 'sum === 'd))

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = testRelation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: val correctAnswer = originalQuery.analyzed

}

test("Union: LeftSemiAnti join pushdown") {
val testRelation2 = LocalRelation('x.int, 'y.int, 'z.int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the indentation is wrong here

comparePlans(optimized, correctAnswer)
}

test("Unary: LeftSemiAnti join pushdown - empty join condition") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add one more test: join condition refers to the join right child.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan The first test "Unary: LeftSemiAnti join pushdown" has join condition that refers to attributes from both left and right ? We want it to refer to only right hand side ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a test that the stayUp is not empty, to cover https://github.com/apache/spark/pull/23750/files#r261810108 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Generate can trigger this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Thanks.. i have added tests and fixed the code as well :-)

@SparkQA
Copy link

SparkQA commented Mar 2, 2019

Test build #102937 has finished for PR 23750 at commit 79579c8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

.join(testRelation1, joinType = LeftSemi, condition = Some('b === 'd))

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = testRelation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to check no pushdown, it's more clear to write

val correctAnswer = originalQuery.analyzed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan you meant, `originalQuery.analyze" , correct ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea

.join(testRelation1, joinType = LeftSemi, condition = Some('sum === 'd))

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = testRelation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, val correctAnswer = originalQuery.analyzed

.join(testRelation1, joinType = LeftSemi, condition = Some('b === 'd))

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = testRelation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

.join(testRelation1, joinType = LeftSemi, condition = Some('sum === 'd && 'a === 'd))

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = testRelation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

rightOp: LogicalPlan): Boolean = {
val attributes = AttributeSet(plans.flatMap(_.output))
if (condition.isDefined) {
val matched = condition.get.references.intersect(rightOp.outputSet).intersect(attributes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be rightOp.outputSet.intersect(attributes).isEmpty? It's a self-join even if there is no join condition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, the self-join is problematic only if we can't rewrite join condition correctly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should mention it in the method doc, so that other reviewers won't get confused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I had this in the function prologue. Did you want it improved ?

This function makes sure that the join condition refers to attributes that are not 
ambiguous(i.e present in both the legs of the join) or else the resultant plan will be invalid.

@cloud-fan
Copy link
Contributor

LGTM except a few minor comments

@SparkQA
Copy link

SparkQA commented Mar 4, 2019

Test build #102963 has finished for PR 23750 at commit 5ea6a4a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 4, 2019

Test build #102964 has finished for PR 23750 at commit 68e7268.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in ad4823c Mar 4, 2019
@dilipbiswal
Copy link
Contributor Author

@cloud-fan @maropu Thank you very much !!

if (pushDown.nonEmpty && rightOpColumns.isEmpty) {
val predicate = pushDown.reduce(And)
val newPlan = w.copy(child = Join(w.child, rightOp, joinType, Option(predicate), hint))
if (stayUp.isEmpty) newPlan else Filter(stayUp.reduce(And), newPlan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dilipbiswal this does hold with left anti joins? If a predicate is part of the condition then it means it should be filtered out right, and not retained?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Mar 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Thanks for reviewing. Can you please help illustrate the problem with an example ? So if the join was in filter form (in an subquery expression), we do push it down, right ? We don't distinguish between semi or anti joins ?

val replaced = replaceAlias(pushDownPredicate, aliasMap)
val newAgg = agg.copy(child = Join(agg.child, rightOp, joinType, Option(replaced), hint))
// If there is no more filter to stay up, just return the Aggregate over Join.
// Otherwise, create "Filter(stayUp) <- Aggregate <- Join(pushDownPredicate)".
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left-anti join outputs records that do NOT satisfy the join condition. Let's say the join condition is a && b, and this rule turns Join(Aggregate, ..., a && b) to Filter(b, Aggregate(Join(..., ..., a))).

This seems problematic. Previously we get result satisfying Not(a && b), now we get Not(a) && b. @hvanhovell is this your concern?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that, we can't push down partial left-anti join

Copy link
Contributor Author

@dilipbiswal dilipbiswal Mar 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Thank you. I understand it better now.Let me test this out a bit and plan a follow-up.
@hvanhovell Thanks a lot for pointing this out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants