diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 7617d34261807..aa00969c9af4b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -588,13 +588,17 @@ object CombineUnions extends Rule[LogicalPlan] { object CombineFilters extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case Filter(fc, nf @ Filter(nc, grandChild)) => - (ExpressionSet(splitConjunctivePredicates(fc)) -- + val combinedFilter = (ExpressionSet(splitConjunctivePredicates(fc)) -- ExpressionSet(splitConjunctivePredicates(nc))).reduceOption(And) match { case Some(ac) => Filter(And(nc, ac), grandChild) case None => nf } + // [[Filter]] can't pushdown through another [[Filter]]. Once they are combined, + // [[BooleanSimplification]] rule will possibly simplify the predicate to the form that + // will not be able to pushdown. So we pushdown the combined [[Filter]] immediately. + PushDownPredicate(combinedFilter) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 55836f96f7e0e..dfcc29c5feed9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -34,8 +34,8 @@ class FilterPushdownSuite extends PlanTest { Batch("Subqueries", Once, EliminateSubqueryAliases) :: Batch("Filter Pushdown", FixedPoint(10), - CombineFilters, PushDownPredicate, + CombineFilters, BooleanSimplification, PushPredicateThroughJoin, CollapseProject) :: Nil @@ -171,6 +171,49 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("push down filters that are combined") { + // The following predicate ('a === 2 || 'a === 3) && ('c > 10 || 'a === 2) + // will be simplified as ('a == 2) || ('c > 10 && 'a == 3). + // ('a === 2 || 'a === 3) can be pushed down. But the simplified one can't. + val originalQuery = testRelation + .select('a, 'b, ('c + 1) as 'cc) + .groupBy('a)('a, count('cc) as 'c) + .where('c > 10) // this predicate can't be pushed down. + .where(('a === 2 || 'a === 3) && ('c > 10 || 'a === 2)) + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + testRelation + .where('a === 2 || 'a === 3) + .select('a, 'b, ('c + 1) as 'cc) + .groupBy('a)('a, count('cc) as 'c) + .where('c > 10).analyze + + comparePlans(optimized, correctAnswer) + } + + // TODO: currently predicate pushdown doesn't convert predicates to pushdown-able form. + // We should do it to push down more predicates. + ignore("predicates which are able to pushdown should be pushed down after converted") { + // (('a === 2) || ('c > 10 || 'a === 3)) can't be pushdown due to the disjunctive form. + // However, its conjunctive normal form can be pushdown. + val originalQuery = testRelation + .select('a, 'b, ('c + 1) as 'cc) + .groupBy('a)('a, count('cc) as 'c) + .where('c > 10) + .where(('a === 2) || ('c > 10 && 'a === 3)) + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + testRelation + .where('a === 2 || 'a === 3) + .select('a, 'b, ('c + 1) as 'cc) + .groupBy('a)('a, count('cc) as 'c) + .where('c > 10).analyze + + comparePlans(optimized, correctAnswer) + } + test("joins: push to either side") { val x = testRelation.subquery('x) val y = testRelation.subquery('y)