Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
2d762b4
plan exists subquery
AngersZhuuuu Nov 8, 2019
1c577bc
Update subquery.scala
AngersZhuuuu Nov 8, 2019
5fa971b
format import
AngersZhuuuu Nov 8, 2019
1401349
don;t collect executed rdd
AngersZhuuuu Nov 8, 2019
7b943aa
format code
AngersZhuuuu Nov 8, 2019
95e446d
Update predicates.scala
AngersZhuuuu Nov 10, 2019
20cda42
Update subquery.scala
AngersZhuuuu Nov 10, 2019
8e3ce4f
remove ExistsSubquery
AngersZhuuuu Nov 11, 2019
c290411
minimize cost
AngersZhuuuu Nov 11, 2019
866ddc7
follow comment
AngersZhuuuu Nov 11, 2019
3de0ecc
update import
AngersZhuuuu Nov 11, 2019
32f85c3
follow comment
AngersZhuuuu Nov 12, 2019
e47a757
Merge branch 'master' into SPARK-29800
AngersZhuuuu Nov 12, 2019
4c86605
remove broadcaset
AngersZhuuuu Nov 12, 2019
626e41f
Update subquery.scala
AngersZhuuuu Nov 13, 2019
ce76e0c
remove unused import
AngersZhuuuu Nov 13, 2019
4a4ca9b
Update subquery.scala
AngersZhuuuu Nov 13, 2019
88f804d
Merge branch 'master' into SPARK-29800
AngersZhuuuu Nov 21, 2019
7668bd6
ExistsSExec -> ExistsSubqueryExec
AngersZhuuuu Nov 25, 2019
a6b8485
Revert "Merge branch 'master' into SPARK-29800"
AngersZhuuuu Nov 25, 2019
34046be
follow comment
AngersZhuuuu Jan 2, 2020
4c6c04d
follow comment
AngersZhuuuu Jan 2, 2020
ac6a4d2
Update subquery.scala
AngersZhuuuu Jan 2, 2020
59162c6
Update finishAnalysis.scala
AngersZhuuuu Jan 2, 2020
89a1721
Update finishAnalysis.scala
AngersZhuuuu Jan 2, 2020
fb98b54
update
AngersZhuuuu Jan 2, 2020
67b4281
Update finishAnalysis.scala
AngersZhuuuu Jan 3, 2020
821ed40
Update finishAnalysis.scala
AngersZhuuuu Jan 3, 2020
e319fee
fix ut
AngersZhuuuu Jan 3, 2020
2c387f2
Update SubquerySuite.scala
AngersZhuuuu Jan 3, 2020
2aff8eb
Update SubquerySuite.scala
AngersZhuuuu Jan 3, 2020
2b7b417
Update CachedTableSuite.scala
AngersZhuuuu Jan 4, 2020
88fcdbf
Update CachedTableSuite.scala
AngersZhuuuu Jan 4, 2020
9f084ee
Merge branch 'master' into SPARK-29800
AngersZhuuuu Jan 4, 2020
8c6060a
Update CachedTableSuite.scala
AngersZhuuuu Jan 4, 2020
9a9d9d1
fix comment error
AngersZhuuuu Jan 5, 2020
173942d
follow comment
AngersZhuuuu Jan 6, 2020
26258b0
Update finishAnalysis.scala
AngersZhuuuu Jan 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ abstract class SubqueryExpression(

object SubqueryExpression {
/**
* Returns true when an expression contains an IN or EXISTS subquery and false otherwise.
* Returns true when an expression contains an IN or correlated EXISTS subquery
* and false otherwise.
*/
def hasInOrExistsSubquery(e: Expression): Boolean = {
def hasInOrCorrelatedExistsSubquery(e: Expression): Boolean = {
e.find {
case _: ListQuery | _: Exists => true
case _: ListQuery => true
case _: Exists if e.children.nonEmpty => true
case _ => false
}.isDefined
}
Expand Down Expand Up @@ -302,7 +304,10 @@ case class ListQuery(
}

/**
* The [[Exists]] expression checks if a row exists in a subquery given some correlated condition.
* The [[Exists]] expression checks if a row exists in a subquery given some correlated condition
* or some uncorrelated condition.
*
* 1. correlated condition:
*
* For example (SQL):
* {{{
Expand All @@ -312,6 +317,17 @@ case class ListQuery(
* FROM b
* WHERE b.id = a.id)
* }}}
*
* 2. uncorrelated condition example:
*
* For example (SQL):
* {{{
* SELECT *
* FROM a
* WHERE EXISTS (SELECT *
* FROM b
* WHERE b.id > 10)
* }}}
*/
case class Exists(
plan: LogicalPlan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
EliminateSubqueryAliases,
EliminateView,
ReplaceExpressions,
RewriteNonCorrelatedExists,
ComputeCurrentTime,
GetCurrentDatabase(catalogManager),
RewriteDistinctAggregates,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,21 @@ object ReplaceExpressions extends Rule[LogicalPlan] {
}
}

/**
* Rewrite non correlated exists subquery to use ScalarSubquery
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non correlated -> uncorrelated

* WHERE EXISTS (SELECT A FROM TABLE B WHERE COL1 > 10)
* will be rewritten to
* WHERE (SELECT 1 FROM (SELECT A FROM TABLE B WHERE COL1 > 10) LIMIT 1) IS NOT NULL
*/
object RewriteNonCorrelatedExists extends Rule[LogicalPlan] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add a test for this rule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add a test for this rule?

With test case


  test("Rewritten uncorrelated exists subquery to use ScalarSubquery") {
    val relation = LocalRelation('a.int)
    val relExistSubquery = LocalRelation('x.int, 'y.int, 'z.int).where('x > 10)


    val query = relation.where(Exists(relExistSubquery)).select('a)

    val optimized = Optimize.execute(query.analyze)
    val correctAnswer = relation
      .where(IsNotNull(ScalarSubquery(Limit(Literal(1),
        Project(Seq(Alias(Literal(1), "col")()), relExistSubquery)))))
      .analyze

    comparePlans(optimized, correctAnswer)
  }

Get error

\[info] RewriteSubquerySuite:
[info] - Rewritten uncorrelated exists subquery to use ScalarSubquery *** FAILED *** (852 milliseconds)
[info]   == FAIL: Plans do not match ===
[info]    Filter isnotnull(scalar-subquery#0 [])                     Filter isnotnull(scalar-subquery#0 [])
[info]    :  +- GlobalLimit 1                                        :  +- GlobalLimit 1
[info]    :     +- LocalLimit 1                                      :     +- LocalLimit 1
[info]   !:        +- Project [1 AS col#5]                           :        +- Project [1 AS col#6]
[info]    :           +- Filter (x#1 > 10)                           :           +- Filter (x#1 > 10)
[info]    :              +- LocalRelation <empty>, [x#1, y#2, z#3]   :              +- LocalRelation <empty>, [x#1, y#2, z#3]
[info]    +- LocalRelation <empty>, [a#0]                            +- LocalRelation <empty>, [a#0] (PlanTest.scala:147)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)

Because of Alias in RewriteNonCorrelatedExists .

Any good advise for test case, where I add test case can avoid this problem? @cloud-fan @viirya

override def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case exists: Exists if exists.children.isEmpty =>
IsNotNull(
ScalarSubquery(
plan = Limit(Literal(1), Project(Seq(Alias(Literal(1), "col")()), exists.plan)),
exprId = exists.exprId))
}
}

/**
* Computes the current date and time to make sure we return the same result in a single query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Filter(condition, child) =>
val (withSubquery, withoutSubquery) =
splitConjunctivePredicates(condition).partition(SubqueryExpression.hasInOrExistsSubquery)
splitConjunctivePredicates(condition)
.partition(SubqueryExpression.hasInOrCorrelatedExistsSubquery)
Comment on lines +99 to +100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nvm, I saw it.


// Construct the pruned filter condition.
val newFilter: LogicalPlan = withoutSubquery match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join, JoinStrategyHint, SHUFFLE_HASH}
import org.apache.spark.sql.catalyst.util.DateTimeConstants
import org.apache.spark.sql.execution.{RDDScanExec, SparkPlan}
import org.apache.spark.sql.execution.{ExecSubqueryExpression, RDDScanExec, SparkPlan}
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -89,10 +89,19 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSparkSessi
sum
}

private def getNumInMemoryTablesInSubquery(plan: SparkPlan): Int = {
plan.expressions.flatMap(_.collect {
case sub: ExecSubqueryExpression => getNumInMemoryTablesRecursively(sub.plan)
}).sum
}

private def getNumInMemoryTablesRecursively(plan: SparkPlan): Int = {
plan.collect {
case InMemoryTableScanExec(_, _, relation) =>
getNumInMemoryTablesRecursively(relation.cachedPlan) + 1
case inMemoryTable @ InMemoryTableScanExec(_, _, relation) =>
getNumInMemoryTablesRecursively(relation.cachedPlan) +
getNumInMemoryTablesInSubquery(inMemoryTable) + 1
case p =>
getNumInMemoryTablesInSubquery(p)
Comment on lines +100 to +104
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change needed for this PR? Looks like not directly related?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change needed for this PR? Looks like not directly related?

#26437 (comment)

}.sum
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -891,9 +891,9 @@ class SubquerySuite extends QueryTest with SharedSparkSession {

val sqlText =
"""
|SELECT * FROM t1
|SELECT * FROM t1 a
|WHERE
|NOT EXISTS (SELECT * FROM t1)
|NOT EXISTS (SELECT * FROM t1 b WHERE a.i = b.i)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need to change the existing test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need to change the existing test?

#26437 (comment)

""".stripMargin
val optimizedPlan = sql(sqlText).queryExecution.optimizedPlan
val join = optimizedPlan.collectFirst { case j: Join => j }.get
Expand Down