Skip to content

Commit 24ae5ce

Browse files
committed
Revert optimization for Exists subquery without correlated references.
1 parent 329f067 commit 24ae5ce

File tree

3 files changed

+1
-53
lines changed

3 files changed

+1
-53
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf)
6565
Batch("Pullup Correlated Expressions", Once,
6666
PullupCorrelatedPredicates) ::
6767
Batch("Subquery", Once,
68-
OptimizeSubqueries,
69-
RewriteEmptyExists) ::
68+
OptimizeSubqueries) ::
7069
Batch("Replace Operators", fixedPoint,
7170
ReplaceIntersectWithSemiJoin,
7271
ReplaceExceptWithAntiJoin,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -498,32 +498,3 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
498498
}
499499
}
500500
}
501-
502-
/**
503-
* This rule rewrites a EXISTS predicate sub-queries into an Aggregate with count.
504-
* So it doesn't be converted to a JOIN later.
505-
*/
506-
object RewriteEmptyExists extends Rule[LogicalPlan] with PredicateHelper {
507-
private def containsAgg(plan: LogicalPlan): Boolean = {
508-
plan.collect {
509-
case a: Aggregate => a
510-
}.nonEmpty
511-
}
512-
513-
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
514-
case Filter(condition, child) =>
515-
val (withSubquery, withoutSubquery) =
516-
splitConjunctivePredicates(condition).partition(SubqueryExpression.hasInOrExistsSubquery)
517-
val newWithSubquery = withSubquery.map(_.transform {
518-
case e @ Exists(sub, conditions, exprId) if conditions.isEmpty && !containsAgg(sub) =>
519-
val countExpr = Alias(Count(Literal(1)).toAggregateExpression(), "count")()
520-
val expr = Alias(GreaterThan(countExpr.toAttribute, Literal(0)), e.toString)()
521-
ScalarSubquery(
522-
Project(Seq(expr),
523-
Aggregate(Nil, Seq(countExpr), LocalLimit(Literal(1), sub))),
524-
children = Seq.empty,
525-
exprId = exprId)
526-
})
527-
Filter((newWithSubquery ++ withoutSubquery).reduce(And), child)
528-
}
529-
}

sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717

1818
package org.apache.spark.sql
1919

20-
import org.apache.spark.sql.catalyst.expressions.{Alias, ScalarSubquery}
21-
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count}
22-
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join}
2320
import org.apache.spark.sql.test.SharedSQLContext
2421

2522
class SubquerySuite extends QueryTest with SharedSQLContext {
@@ -857,23 +854,4 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
857854
sql("select * from l, r where l.a = r.c + 1 AND (exists (select * from r) OR l.a = r.c)"),
858855
Row(3, 3.0, 2, 3.0) :: Row(3, 3.0, 2, 3.0) :: Nil)
859856
}
860-
861-
test("Convert Exists without correlated references to aggregation with count") {
862-
val df =
863-
sql("select * from l, r where l.a = r.c + 1 AND (exists (select * from r) OR l.a = r.c)")
864-
val joinPlan = df.queryExecution.optimizedPlan.asInstanceOf[Join]
865-
val scalarSubquery = joinPlan.condition.get.collect {
866-
case s: ScalarSubquery => s
867-
}
868-
assert(scalarSubquery.length == 1)
869-
val aggPlan = scalarSubquery.head.plan.collect {
870-
case a: Aggregate => a
871-
}
872-
assert(aggPlan.length == 1)
873-
assert(aggPlan.head.aggregateExpressions.length == 1)
874-
val countAggExpr = aggPlan.head.aggregateExpressions.collect {
875-
case a @ Alias(AggregateExpression(_: Count, _, _, _), _) => a
876-
}
877-
assert(countAggExpr.length == 1)
878-
}
879857
}

0 commit comments

Comments
 (0)