From b012550d6981d3b49aac78432dc42e2974fd3649 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 31 Mar 2017 04:07:38 +0000 Subject: [PATCH 1/3] Exists should not be evaluated in Join operator too. --- .../sql/catalyst/expressions/predicates.scala | 3 +- .../sql/catalyst/optimizer/Optimizer.scala | 3 +- .../sql/catalyst/optimizer/subquery.scala | 28 ++++++++++++++++ .../org/apache/spark/sql/SubquerySuite.scala | 32 +++++++++++++++++++ 4 files changed, 64 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 1235204591bb..77a0ff26bca8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -90,11 +90,12 @@ trait PredicateHelper { * Returns true iff `expr` could be evaluated as a condition within join. */ protected def canEvaluateWithinJoin(expr: Expression): Boolean = expr match { - case l: ListQuery => + case _: ListQuery | _: Exists => // A ListQuery defines the query which we want to search in an IN subquery expression. // Currently the only way to evaluate an IN subquery is to convert it to a // LeftSemi/LeftAnti/ExistenceJoin by `RewritePredicateSubquery` rule. // It cannot be evaluated as part of a Join operator. + // An Exists shouldn't be push into a Join operator too. false case e: SubqueryExpression => // non-correlated subquery will be replaced as literal diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index dbf479d21513..7e49a8cbca40 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -65,7 +65,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) Batch("Pullup Correlated Expressions", Once, PullupCorrelatedPredicates) :: Batch("Subquery", Once, - OptimizeSubqueries) :: + OptimizeSubqueries, + RewriteEmptyExists) :: Batch("Replace Operators", fixedPoint, ReplaceIntersectWithSemiJoin, ReplaceExceptWithAntiJoin, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index 2a3e07aebe70..6e3882712481 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -498,3 +498,31 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } } } + +/** + * This rule rewrites a EXISTS predicate sub-queries into an Aggregate with count. + * So it doesn't be converted to a JOIN later. + */ +object RewriteEmptyExists extends Rule[LogicalPlan] with PredicateHelper { + private def containsAgg(plan: LogicalPlan): Boolean = { + plan.collect { + case a: Aggregate => a + }.nonEmpty + } + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case Filter(condition, child) => + val (withSubquery, withoutSubquery) = + splitConjunctivePredicates(condition).partition(SubqueryExpression.hasInOrExistsSubquery) + val newWithSubquery = withSubquery.map(_.transform { + case e @ Exists(sub, conditions, exprId) if conditions.isEmpty && !containsAgg(sub) => + val countExpr = Alias(Count(Literal(1)).toAggregateExpression(), "count")() + val expr = Alias(GreaterThan(countExpr.toAttribute, Literal(0)), e.toString)() + ScalarSubquery( + Project(Seq(expr), Aggregate(Nil, Seq(countExpr), sub)), + children = Seq.empty, + exprId = exprId) + }) + Filter((newWithSubquery ++ withoutSubquery).reduce(And), child) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 5fe6667ceca1..d342648fd844 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.expressions.{Alias, ScalarSubquery} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join} import org.apache.spark.sql.test.SharedSQLContext class SubquerySuite extends QueryTest with SharedSQLContext { @@ -844,4 +847,33 @@ class SubquerySuite extends QueryTest with SharedSQLContext { Row(0) :: Row(1) :: Nil) } } + + test("ListQuery and Exists should work even no correlated references") { + checkAnswer( + sql("select * from l, r where l.a = r.c AND (r.d in (select d from r) OR l.a >= 1)"), + Row(2, 1.0, 2, 3.0) :: Row(2, 1.0, 2, 3.0) :: Row(2, 1.0, 2, 3.0) :: + Row(2, 1.0, 2, 3.0) :: Row(3.0, 3.0, 3, 2.0) :: Row(6, null, 6, null) :: Nil) + checkAnswer( + sql("select * from l, r where l.a = r.c + 1 AND (exists (select * from r) OR l.a = r.c)"), + Row(3, 3.0, 2, 3.0) :: Row(3, 3.0, 2, 3.0) :: Nil) + } + + test("Convert Exists without correlated references to aggregation with count") { + val df = + sql("select * from l, r where l.a = r.c + 1 AND (exists (select * from r) OR l.a = r.c)") + val joinPlan = df.queryExecution.optimizedPlan.asInstanceOf[Join] + val scalarSubquery = joinPlan.condition.get.collect { + case s: ScalarSubquery => s + } + assert(scalarSubquery.length == 1) + val aggPlan = scalarSubquery.head.plan.collect { + case a: Aggregate => a + } + assert(aggPlan.length == 1) + assert(aggPlan.head.aggregateExpressions.length == 1) + val countAggExpr = aggPlan.head.aggregateExpressions.collect { + case a @ Alias(AggregateExpression(_: Count, _, _, _), _) => a + } + assert(countAggExpr.length == 1) + } } From 88016bfc5a4fa72f4513fd2e8ec9a439c2d77ee6 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 1 Apr 2017 02:49:12 +0000 Subject: [PATCH 2/3] Add a local limit operator to limit data scan. --- .../org/apache/spark/sql/catalyst/optimizer/subquery.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index 6e3882712481..6ccbf98405c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -519,7 +519,8 @@ object RewriteEmptyExists extends Rule[LogicalPlan] with PredicateHelper { val countExpr = Alias(Count(Literal(1)).toAggregateExpression(), "count")() val expr = Alias(GreaterThan(countExpr.toAttribute, Literal(0)), e.toString)() ScalarSubquery( - Project(Seq(expr), Aggregate(Nil, Seq(countExpr), sub)), + Project(Seq(expr), + Aggregate(Nil, Seq(countExpr), LocalLimit(Literal(1), sub))), children = Seq.empty, exprId = exprId) }) From 24ae5ce866f82641470ed9598fad9fece450313c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 11 Apr 2017 09:50:55 +0000 Subject: [PATCH 3/3] Revert optimization for Exists subquery without correlated references. --- .../sql/catalyst/optimizer/Optimizer.scala | 3 +- .../sql/catalyst/optimizer/subquery.scala | 29 ------------------- .../org/apache/spark/sql/SubquerySuite.scala | 22 -------------- 3 files changed, 1 insertion(+), 53 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 71f530958e2d..d221b0611a89 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -65,8 +65,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf) Batch("Pullup Correlated Expressions", Once, PullupCorrelatedPredicates) :: Batch("Subquery", Once, - OptimizeSubqueries, - RewriteEmptyExists) :: + OptimizeSubqueries) :: Batch("Replace Operators", fixedPoint, ReplaceIntersectWithSemiJoin, ReplaceExceptWithAntiJoin, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index 6ccbf98405c9..2a3e07aebe70 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -498,32 +498,3 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { } } } - -/** - * This rule rewrites a EXISTS predicate sub-queries into an Aggregate with count. - * So it doesn't be converted to a JOIN later. - */ -object RewriteEmptyExists extends Rule[LogicalPlan] with PredicateHelper { - private def containsAgg(plan: LogicalPlan): Boolean = { - plan.collect { - case a: Aggregate => a - }.nonEmpty - } - - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case Filter(condition, child) => - val (withSubquery, withoutSubquery) = - splitConjunctivePredicates(condition).partition(SubqueryExpression.hasInOrExistsSubquery) - val newWithSubquery = withSubquery.map(_.transform { - case e @ Exists(sub, conditions, exprId) if conditions.isEmpty && !containsAgg(sub) => - val countExpr = Alias(Count(Literal(1)).toAggregateExpression(), "count")() - val expr = Alias(GreaterThan(countExpr.toAttribute, Literal(0)), e.toString)() - ScalarSubquery( - Project(Seq(expr), - Aggregate(Nil, Seq(countExpr), LocalLimit(Literal(1), sub))), - children = Seq.empty, - exprId = exprId) - }) - Filter((newWithSubquery ++ withoutSubquery).reduce(And), child) - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index d342648fd844..0f0199cbe277 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -17,9 +17,6 @@ package org.apache.spark.sql -import org.apache.spark.sql.catalyst.expressions.{Alias, ScalarSubquery} -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count} -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join} import org.apache.spark.sql.test.SharedSQLContext class SubquerySuite extends QueryTest with SharedSQLContext { @@ -857,23 +854,4 @@ class SubquerySuite extends QueryTest with SharedSQLContext { sql("select * from l, r where l.a = r.c + 1 AND (exists (select * from r) OR l.a = r.c)"), Row(3, 3.0, 2, 3.0) :: Row(3, 3.0, 2, 3.0) :: Nil) } - - test("Convert Exists without correlated references to aggregation with count") { - val df = - sql("select * from l, r where l.a = r.c + 1 AND (exists (select * from r) OR l.a = r.c)") - val joinPlan = df.queryExecution.optimizedPlan.asInstanceOf[Join] - val scalarSubquery = joinPlan.condition.get.collect { - case s: ScalarSubquery => s - } - assert(scalarSubquery.length == 1) - val aggPlan = scalarSubquery.head.plan.collect { - case a: Aggregate => a - } - assert(aggPlan.length == 1) - assert(aggPlan.head.aggregateExpressions.length == 1) - val countAggExpr = aggPlan.head.aggregateExpressions.collect { - case a @ Alias(AggregateExpression(_: Count, _, _, _), _) => a - } - assert(countAggExpr.length == 1) - } }