Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,12 @@ trait PredicateHelper {
protected def canEvaluateWithinJoin(expr: Expression): Boolean = expr match {
// Non-deterministic expressions are not allowed as join conditions.
case e if !e.deterministic => false
case l: ListQuery =>
case _: ListQuery | _: Exists =>
// A ListQuery defines the query which we want to search in an IN subquery expression.
// Currently the only way to evaluate an IN subquery is to convert it to a
// LeftSemi/LeftAnti/ExistenceJoin by `RewritePredicateSubquery` rule.
// It cannot be evaluated as part of a Join operator.
// An Exists shouldn't be push into a Join operator too.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dilipbiswal Here is another case of a regression from #16954. Would you think we should just say the following?

case SubqueryExpression => false

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ScalarSubquery without correlated references can be pushed. Doesn't it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure. The name of this is def canEvaluateWithinJoin so I assume it asks whether an input Expression can be processed as part of a Join operator. Can a ScalarSubquery be processed inside a Join?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsyca @viirya I just verified and the exists test fails the same way in 2.1. So its not a regression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsyca Looking at this further, there is a SubqueryExec operator that can execute a ScalarSubquery and InSubquery (PlanSubqueries). As part of my change, i had removed the case for PredicateSubquery as we removed PredicateSubquery all together. I just quickly tried the following and got the query to work. I haven't verified the semantics but just tried something quickly. Basically if we were to keep the Exists expression as it is and push it down as a join condition and execute it as a InSubquery (possibly with a additional limit clause) there seems to be an infrastructure for it already. Or perhaps we may want to introduce a ExistSubquery exec operator that can work more efficiently.

  case subquery: expressions.Exists =>
        val executedPlan = new QueryExecution(sparkSession, subquery.plan).executedPlan
        InSubquery(Literal.TrueLiteral,
          SubqueryExec(s"subquery${subquery.exprId.id}", executedPlan), subquery.exprId)

What do you think Natt ?

Copy link
Contributor

@nsyca nsyca Mar 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What this code does is around the idea of treating an uncorrelated subquery as a black box. The subquery is processed as a self-contained operation and a list of values is returned. After that, the code evaluates as if this is an IN list predicate like IN (). In your code above, is represented as a "true" literal. That means the returned values from the subquery must be in Boolean type too.

Putting a LIMIT does help to short-circuit the processing to the first row. I still think putting a LIMIT explicitly as an extra LogicalPlan operator may have some negative side effect in the way that it prevents other Optimizer rules to further optimize the query. I have not thought about a concrete example to back my belief though.

I feel this optimization could be done better in the run-time area, rather than trying to shoehorn it in the Optimizer phase. What we can do is 1) propagate the notion of "early out" deeper to the operator on the RHS of the outer join. If it's a scan, stop scanning on the first row. 2) one more step further: cache the result of the RHS without a rescan because the next row from the parent table will always get the same answer from rescanning the subquery.

Copy link
Member Author

@viirya viirya Apr 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure. The name of this is def canEvaluateWithinJoin so I assume it asks whether an input Expression can be processed as part of a Join operator. Can a ScalarSubquery be processed inside a Join?

I remember ScalarSubquery without correlated reference will be evaluated as individual query plan and get its result back as an expression. So it should be no difference in run time compared with other expressions.

A Limit looks good to me for now. I can't think a negative side effect prevents possible optimization for the subquery plan. Doesn't it just like a re-written query with a limit clause added?

I think this is a corner usage case. To address this in run-time like the introduction of "early out" into physical join operators works, but it may involve a lot of code changes.

  1. one more step further: cache the result of the RHS without a rescan because the next row from the parent table will always get the same answer from rescanning the subquery.

I quickly scan physical SortMergeJoin operator. If the streamed row matches the scanned group of rows, it will reuse the scanned group. Sounds it does what you said, if I don't miss something.

I think current join operators are smart enough that they won't re-scan the subquery if the next row still matches the scaned group of rows.

false
case e: SubqueryExpression =>
// non-correlated subquery will be replaced as literal
Expand Down
10 changes: 10 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -844,4 +844,14 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
Row(0) :: Row(1) :: Nil)
}
}

test("ListQuery and Exists should work even no correlated references") {
checkAnswer(
sql("select * from l, r where l.a = r.c AND (r.d in (select d from r) OR l.a >= 1)"),
Row(2, 1.0, 2, 3.0) :: Row(2, 1.0, 2, 3.0) :: Row(2, 1.0, 2, 3.0) ::
Row(2, 1.0, 2, 3.0) :: Row(3.0, 3.0, 3, 2.0) :: Row(6, null, 6, null) :: Nil)
checkAnswer(
sql("select * from l, r where l.a = r.c + 1 AND (exists (select * from r) OR l.a = r.c)"),
Row(3, 3.0, 2, 3.0) :: Row(3, 3.0, 2, 3.0) :: Nil)
}
}