From 9e1c3159c0250bb921a83f923d5c9ebea1ffca42 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 1 Sep 2016 14:44:30 +0800 Subject: [PATCH 1/3] Simplified predicates should be pushdown. --- .../sql/catalyst/optimizer/Optimizer.scala | 2 ++ .../optimizer/FilterPushdownSuite.scala | 22 +++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 7617d34261807..2a0533c039f8b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -89,6 +89,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) CombineFilters, CombineLimits, CombineUnions, + // Push down Filters again after combination + PushDownPredicate, // Constant folding and strength reduction NullPropagation, FoldablePropagation, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 55836f96f7e0e..8819938ff77d7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -34,6 +34,7 @@ class FilterPushdownSuite extends PlanTest { Batch("Subqueries", Once, EliminateSubqueryAliases) :: Batch("Filter Pushdown", FixedPoint(10), + PushDownPredicate, CombineFilters, PushDownPredicate, BooleanSimplification, @@ -171,6 +172,27 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("push down filters that are combined") { + // The following predicate ('a === 2 || 'a === 3) && ('c > 10 || 'a === 2) + // will be simplified as ('a == 2) || ('c > 10 && 'a == 3). + // ('a === 2 || 'a === 3) can be pushed down. But the simplified one can't. + val originalQuery = testRelation + .select('a, 'b, ('c + 1) as 'cc) + .groupBy('a)('a, count('cc) as 'c) + .where('c > 10) // this predicate can't be pushed down. + .where(('a === 2 || 'a === 3) && ('c > 10 || 'a === 2)) + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + testRelation + .where('a === 2 || 'a === 3) + .select('a, 'b, ('c + 1) as 'cc) + .groupBy('a)('a, count('cc) as 'c) + .where('c > 10).analyze + + comparePlans(optimized, correctAnswer) + } + test("joins: push to either side") { val x = testRelation.subquery('x) val y = testRelation.subquery('y) From 8f6f91df7fe1d02a69215aea6ca9ae0b37416747 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 5 Sep 2016 14:46:23 +0800 Subject: [PATCH 2/3] Address comment. Consider more general case. --- .../sql/catalyst/optimizer/Optimizer.scala | 24 +++++++++++++++---- .../optimizer/FilterPushdownSuite.scala | 21 +++++++++++++++- 2 files changed, 40 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2a0533c039f8b..0b552cbdbc761 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -89,8 +89,6 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) CombineFilters, CombineLimits, CombineUnions, - // Push down Filters again after combination - PushDownPredicate, // Constant folding and strength reduction NullPropagation, FoldablePropagation, @@ -588,15 +586,33 @@ object CombineUnions extends Rule[LogicalPlan] { * one conjunctive predicate. */ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper { + private def toCNF(predicate: Expression): Expression = { + val disjunctives = splitDisjunctivePredicates(predicate) + var finalPredicates = splitConjunctivePredicates(disjunctives.head) + disjunctives.tail.foreach { cond => + val predicates = new ArrayBuffer[Expression]() + splitConjunctivePredicates(cond).map { p => + predicates ++= finalPredicates.map(Or(_, p)) + } + finalPredicates = predicates.toSeq + } + finalPredicates.reduce(And) + } def apply(plan: LogicalPlan): LogicalPlan = plan transform { case Filter(fc, nf @ Filter(nc, grandChild)) => - (ExpressionSet(splitConjunctivePredicates(fc)) -- - ExpressionSet(splitConjunctivePredicates(nc))).reduceOption(And) match { + val fcCNF = toCNF(fc) + val ncCNF = toCNF(nc) + val combinedFilter = (ExpressionSet(splitConjunctivePredicates(fcCNF)) -- + ExpressionSet(splitConjunctivePredicates(ncCNF))).reduceOption(And) match { case Some(ac) => Filter(And(nc, ac), grandChild) case None => nf } + // [[Filter]] can't pushdown through another [[Filter]]. Once they are combined, + // [[BooleanSimplification]] rule will possibly simplify the predicate to the form that + // will not be able to pushdown. So we pushdown the combined [[Filter]] immediately. + PushDownPredicate(combinedFilter) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 8819938ff77d7..2f95cb4d4ee46 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -36,7 +36,6 @@ class FilterPushdownSuite extends PlanTest { Batch("Filter Pushdown", FixedPoint(10), PushDownPredicate, CombineFilters, - PushDownPredicate, BooleanSimplification, PushPredicateThroughJoin, CollapseProject) :: Nil @@ -193,6 +192,26 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("disjunctive predicates which are able to pushdown should be pushed down after converted") { + // (('a === 2) || ('c > 10 || 'a === 3)) can't be pushdown due to the disjunctive form. + // However, its conjunctive normal form can be pushdown. + val originalQuery = testRelation + .select('a, 'b, ('c + 1) as 'cc) + .groupBy('a)('a, count('cc) as 'c) + .where('c > 10) + .where(('a === 2) || ('c > 10 && 'a === 3)) + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + testRelation + .where('a === 2 || 'a === 3) + .select('a, 'b, ('c + 1) as 'cc) + .groupBy('a)('a, count('cc) as 'c) + .where('c > 10).analyze + + comparePlans(optimized, correctAnswer) + } + test("joins: push to either side") { val x = testRelation.subquery('x) val y = testRelation.subquery('y) From f69473fd3f09f8b11fe63eff07ab72dfce9fee96 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 13 Sep 2016 11:56:09 +0800 Subject: [PATCH 3/3] Focus on the first problem of predicate pushdown. --- .../sql/catalyst/optimizer/Optimizer.scala | 18 ++---------------- .../optimizer/FilterPushdownSuite.scala | 4 +++- 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0b552cbdbc761..aa00969c9af4b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -586,24 +586,10 @@ object CombineUnions extends Rule[LogicalPlan] { * one conjunctive predicate. */ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper { - private def toCNF(predicate: Expression): Expression = { - val disjunctives = splitDisjunctivePredicates(predicate) - var finalPredicates = splitConjunctivePredicates(disjunctives.head) - disjunctives.tail.foreach { cond => - val predicates = new ArrayBuffer[Expression]() - splitConjunctivePredicates(cond).map { p => - predicates ++= finalPredicates.map(Or(_, p)) - } - finalPredicates = predicates.toSeq - } - finalPredicates.reduce(And) - } def apply(plan: LogicalPlan): LogicalPlan = plan transform { case Filter(fc, nf @ Filter(nc, grandChild)) => - val fcCNF = toCNF(fc) - val ncCNF = toCNF(nc) - val combinedFilter = (ExpressionSet(splitConjunctivePredicates(fcCNF)) -- - ExpressionSet(splitConjunctivePredicates(ncCNF))).reduceOption(And) match { + val combinedFilter = (ExpressionSet(splitConjunctivePredicates(fc)) -- + ExpressionSet(splitConjunctivePredicates(nc))).reduceOption(And) match { case Some(ac) => Filter(And(nc, ac), grandChild) case None => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 2f95cb4d4ee46..dfcc29c5feed9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -192,7 +192,9 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - test("disjunctive predicates which are able to pushdown should be pushed down after converted") { + // TODO: currently predicate pushdown doesn't convert predicates to pushdown-able form. + // We should do it to push down more predicates. + ignore("predicates which are able to pushdown should be pushed down after converted") { // (('a === 2) || ('c > 10 || 'a === 3)) can't be pushdown due to the disjunctive form. // However, its conjunctive normal form can be pushdown. val originalQuery = testRelation