From e50d307ad7d3347b66938f267a601ea67107f563 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 26 Feb 2016 19:07:08 -0800 Subject: [PATCH 1/8] prune Filter based on Constraints --- .../sql/catalyst/optimizer/Optimizer.scala | 23 +++++ .../optimizer/FilterPushdownSuite.scala | 97 ++++++++++++++++++- 2 files changed, 119 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2aeb9575f1dd2..150fe4fa40e7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -83,6 +83,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { BooleanSimplification, SimplifyConditionals, RemoveDispensableExpressions, + PruneFilters, SimplifyFilters, SimplifyCasts, SimplifyCaseConversionExpressions, @@ -768,6 +769,28 @@ object CombineFilters extends Rule[LogicalPlan] { } } +/** + * Remove all the deterministic conditions in a [[Filter]] that are contained in the Child. + */ +object PruneFilters extends Rule[LogicalPlan] with PredicateHelper { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case f @ Filter(fc, p: LogicalPlan) + if splitConjunctivePredicates(fc).filter(_.deterministic).exists(p.constraints.contains) => + val (prunedPredicates, remainingPredicates) = + splitConjunctivePredicates(fc).partition { cond => + cond.deterministic && p.constraints.contains(cond) + } + if (prunedPredicates.isEmpty) { + f + } else if (remainingPredicates.isEmpty) { + p + } else { + val newCond = remainingPredicates.reduceOption(And).getOrElse(Literal(true)) + Filter(newCond, p: LogicalPlan) + } + } +} + /** * Removes filters that can be evaluated trivially. This is done either by eliding the filter for * cases where it will always evaluate to `true`, or substituting a dummy empty relation when the diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 1292aa0003dd7..8a505a2af7326 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.{LeftOuter, LeftSemi, PlanTest, RightOuter} +import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.types.IntegerType @@ -36,6 +36,7 @@ class FilterPushdownSuite extends PlanTest { Batch("Filter Pushdown", Once, SamplePushDown, CombineFilters, + PruneFilters, PushPredicateThroughProject, BooleanSimplification, PushPredicateThroughJoin, @@ -64,6 +65,100 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("Filter removal #1 -- isNull + LeftOuter") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val query = x.where("x.b".attr.isNull).join(y, LeftOuter) + val queryWithUselessFilter = query.where("x.b".attr.isNull) + + val optimized = Optimize.execute(queryWithUselessFilter.analyze) + val correctAnswer = query.analyze + + comparePlans(optimized, correctAnswer) + } + + test("Filter removal #2 -- unionall") { + val tr1 = LocalRelation('a.int, 'b.int, 'c.int) + val tr2 = LocalRelation('d.int, 'e.int, 'f.int) + val tr3 = LocalRelation('g.int, 'h.int, 'i.int) + + val query = + tr1.where('a.attr > 10) + .unionAll(tr2.where('d.attr > 10) + .unionAll(tr3.where('g.attr > 10))) + val queryWithUselessFilter = query.where('a.attr > 10) + + val optimized = Optimize.execute(queryWithUselessFilter.analyze) + val correctAnswer = query.analyze + + comparePlans(optimized, correctAnswer) + } + + test("Filter removal #3 -- multiple constraints") { + val tr1 = LocalRelation('a.int, 'b.int, 'c.int).subquery('tr1) + val tr2 = LocalRelation('a.int, 'd.int, 'e.int).subquery('tr2) + + val query = tr1 + .where("tr1.a".attr > 10 || "tr1.c".attr < 10) + .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.a".attr)) + // different order of "tr2.a" and "tr1.a" + val queryWithUselessFilter = + query.where( + ("tr1.a".attr > 10 || "tr1.c".attr < 10) && + 'd.attr < 100 && + "tr2.a".attr === "tr1.a".attr) + + val optimized = Optimize.execute(queryWithUselessFilter.analyze) + val correctAnswer = query.analyze + + comparePlans(optimized, correctAnswer) + } + + test("Filter removal #4 -- partial pruned") { + val tr1 = LocalRelation('a.int, 'b.int, 'c.int).subquery('tr1) + val tr2 = LocalRelation('a.int, 'd.int, 'e.int).subquery('tr2) + + // One of the filter condition does not exist in the constraints of its child + // Thus, the filter is not removed + val query = tr1 + .where("tr1.a".attr > 10) + .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.d".attr)) + val queryWithExtraFilters = + query.where("tr1.a".attr > 10 && 'd.attr < 100 && "tr1.a".attr === "tr2.a".attr) + + val optimized = Optimize.execute(queryWithExtraFilters.analyze) + val correctAnswer = tr1 + .where("tr1.a".attr > 10) + .join(tr2.where('d.attr < 100), + Inner, + Some("tr1.a".attr === "tr2.a".attr && "tr1.a".attr === "tr2.d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + + test("Filter removal #5 -- no predicate is pruned") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val query = x.where("x.b".attr.isNull).join(y, LeftOuter) + val queryWithExtraFilters = query.where("x.b".attr.isNotNull) + + val optimized = Optimize.execute(queryWithExtraFilters.analyze) + val correctAnswer = + testRelation.where("b".attr.isNull).where("b".attr.isNotNull) + .join(testRelation, LeftOuter).analyze + + comparePlans(optimized, correctAnswer) + } + + test("Filter removal #6 -- nondeterministic predicate is not pruned") { + val originalQuery = testRelation.where(Rand(10) > 5).select('a).where(Rand(10) > 5).analyze + val optimized = Optimize.execute(originalQuery) + val correctAnswer = testRelation.where(Rand(10) > 5).where(Rand(10) > 5).select('a).analyze + comparePlans(optimized, correctAnswer) + } + // After this line is unimplemented. test("simple push down") { val originalQuery = From f889c912a8569fae4158b51afac5b3821cc54f5b Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 26 Feb 2016 19:18:20 -0800 Subject: [PATCH 2/8] remove the useless if condition --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 150fe4fa40e7d..5b97b4dd13f7b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -774,8 +774,7 @@ object CombineFilters extends Rule[LogicalPlan] { */ object PruneFilters extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case f @ Filter(fc, p: LogicalPlan) - if splitConjunctivePredicates(fc).filter(_.deterministic).exists(p.constraints.contains) => + case f @ Filter(fc, p: LogicalPlan) => val (prunedPredicates, remainingPredicates) = splitConjunctivePredicates(fc).partition { cond => cond.deterministic && p.constraints.contains(cond) From 2442093adfad70c6541e6b0fb69f97d5168a0f5c Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 27 Feb 2016 00:19:35 -0800 Subject: [PATCH 3/8] move buckets to a new suite --- .../optimizer/FilterPushdownSuite.scala | 95 ------------------- 1 file changed, 95 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 8a505a2af7326..46df930bf25c0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -36,7 +36,6 @@ class FilterPushdownSuite extends PlanTest { Batch("Filter Pushdown", Once, SamplePushDown, CombineFilters, - PruneFilters, PushPredicateThroughProject, BooleanSimplification, PushPredicateThroughJoin, @@ -65,100 +64,6 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - test("Filter removal #1 -- isNull + LeftOuter") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - - val query = x.where("x.b".attr.isNull).join(y, LeftOuter) - val queryWithUselessFilter = query.where("x.b".attr.isNull) - - val optimized = Optimize.execute(queryWithUselessFilter.analyze) - val correctAnswer = query.analyze - - comparePlans(optimized, correctAnswer) - } - - test("Filter removal #2 -- unionall") { - val tr1 = LocalRelation('a.int, 'b.int, 'c.int) - val tr2 = LocalRelation('d.int, 'e.int, 'f.int) - val tr3 = LocalRelation('g.int, 'h.int, 'i.int) - - val query = - tr1.where('a.attr > 10) - .unionAll(tr2.where('d.attr > 10) - .unionAll(tr3.where('g.attr > 10))) - val queryWithUselessFilter = query.where('a.attr > 10) - - val optimized = Optimize.execute(queryWithUselessFilter.analyze) - val correctAnswer = query.analyze - - comparePlans(optimized, correctAnswer) - } - - test("Filter removal #3 -- multiple constraints") { - val tr1 = LocalRelation('a.int, 'b.int, 'c.int).subquery('tr1) - val tr2 = LocalRelation('a.int, 'd.int, 'e.int).subquery('tr2) - - val query = tr1 - .where("tr1.a".attr > 10 || "tr1.c".attr < 10) - .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.a".attr)) - // different order of "tr2.a" and "tr1.a" - val queryWithUselessFilter = - query.where( - ("tr1.a".attr > 10 || "tr1.c".attr < 10) && - 'd.attr < 100 && - "tr2.a".attr === "tr1.a".attr) - - val optimized = Optimize.execute(queryWithUselessFilter.analyze) - val correctAnswer = query.analyze - - comparePlans(optimized, correctAnswer) - } - - test("Filter removal #4 -- partial pruned") { - val tr1 = LocalRelation('a.int, 'b.int, 'c.int).subquery('tr1) - val tr2 = LocalRelation('a.int, 'd.int, 'e.int).subquery('tr2) - - // One of the filter condition does not exist in the constraints of its child - // Thus, the filter is not removed - val query = tr1 - .where("tr1.a".attr > 10) - .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.d".attr)) - val queryWithExtraFilters = - query.where("tr1.a".attr > 10 && 'd.attr < 100 && "tr1.a".attr === "tr2.a".attr) - - val optimized = Optimize.execute(queryWithExtraFilters.analyze) - val correctAnswer = tr1 - .where("tr1.a".attr > 10) - .join(tr2.where('d.attr < 100), - Inner, - Some("tr1.a".attr === "tr2.a".attr && "tr1.a".attr === "tr2.d".attr)).analyze - - comparePlans(optimized, correctAnswer) - } - - test("Filter removal #5 -- no predicate is pruned") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - - val query = x.where("x.b".attr.isNull).join(y, LeftOuter) - val queryWithExtraFilters = query.where("x.b".attr.isNotNull) - - val optimized = Optimize.execute(queryWithExtraFilters.analyze) - val correctAnswer = - testRelation.where("b".attr.isNull).where("b".attr.isNotNull) - .join(testRelation, LeftOuter).analyze - - comparePlans(optimized, correctAnswer) - } - - test("Filter removal #6 -- nondeterministic predicate is not pruned") { - val originalQuery = testRelation.where(Rand(10) > 5).select('a).where(Rand(10) > 5).analyze - val optimized = Optimize.execute(originalQuery) - val correctAnswer = testRelation.where(Rand(10) > 5).where(Rand(10) > 5).select('a).analyze - comparePlans(optimized, correctAnswer) - } - // After this line is unimplemented. test("simple push down") { val originalQuery = From b9e68b9d105bdf6f6aede1c0e232ea2621b12e3a Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 27 Feb 2016 00:19:49 -0800 Subject: [PATCH 4/8] move buckets to a new suite --- .../optimizer/PruneFiltersSuite.scala | 136 ++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala new file mode 100644 index 0000000000000..84ad39e0593bf --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ + +class PruneFiltersSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("Subqueries", Once, + EliminateSubqueryAliases) :: + Batch("Filter Pushdown and Pruning", Once, + CombineFilters, + PruneFilters, + PushPredicateThroughProject, + PushPredicateThroughJoin) :: Nil + } + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + + test("Constraints of isNull + LeftOuter") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val query = x.where("x.b".attr.isNull).join(y, LeftOuter) + val queryWithUselessFilter = query.where("x.b".attr.isNull) + + val optimized = Optimize.execute(queryWithUselessFilter.analyze) + val correctAnswer = query.analyze + + comparePlans(optimized, correctAnswer) + } + + test("Constraints of unionall") { + val tr1 = LocalRelation('a.int, 'b.int, 'c.int) + val tr2 = LocalRelation('d.int, 'e.int, 'f.int) + val tr3 = LocalRelation('g.int, 'h.int, 'i.int) + + val query = + tr1.where('a.attr > 10) + .unionAll(tr2.where('d.attr > 10) + .unionAll(tr3.where('g.attr > 10))) + val queryWithUselessFilter = query.where('a.attr > 10) + + val optimized = Optimize.execute(queryWithUselessFilter.analyze) + val correctAnswer = query.analyze + + comparePlans(optimized, correctAnswer) + } + + test("Pruning multiple constraints in the same run") { + val tr1 = LocalRelation('a.int, 'b.int, 'c.int).subquery('tr1) + val tr2 = LocalRelation('a.int, 'd.int, 'e.int).subquery('tr2) + + val query = tr1 + .where("tr1.a".attr > 10 || "tr1.c".attr < 10) + .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.a".attr)) + // different order of "tr2.a" and "tr1.a" + val queryWithUselessFilter = + query.where( + ("tr1.a".attr > 10 || "tr1.c".attr < 10) && + 'd.attr < 100 && + "tr2.a".attr === "tr1.a".attr) + + val optimized = Optimize.execute(queryWithUselessFilter.analyze) + val correctAnswer = query.analyze + + comparePlans(optimized, correctAnswer) + } + + test("Partial pruning") { + val tr1 = LocalRelation('a.int, 'b.int, 'c.int).subquery('tr1) + val tr2 = LocalRelation('a.int, 'd.int, 'e.int).subquery('tr2) + + // One of the filter condition does not exist in the constraints of its child + // Thus, the filter is not removed + val query = tr1 + .where("tr1.a".attr > 10) + .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.d".attr)) + val queryWithExtraFilters = + query.where("tr1.a".attr > 10 && 'd.attr < 100 && "tr1.a".attr === "tr2.a".attr) + + val optimized = Optimize.execute(queryWithExtraFilters.analyze) + val correctAnswer = tr1 + .where("tr1.a".attr > 10) + .join(tr2.where('d.attr < 100), + Inner, + Some("tr1.a".attr === "tr2.a".attr && "tr1.a".attr === "tr2.d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + + test("No predicate is pruned") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val query = x.where("x.b".attr.isNull).join(y, LeftOuter) + val queryWithExtraFilters = query.where("x.b".attr.isNotNull) + + val optimized = Optimize.execute(queryWithExtraFilters.analyze) + val correctAnswer = + testRelation.where("b".attr.isNull).where("b".attr.isNotNull) + .join(testRelation, LeftOuter).analyze + + comparePlans(optimized, correctAnswer) + } + + test("Nondeterministic predicate is not pruned") { + val originalQuery = testRelation.where(Rand(10) > 5).select('a).where(Rand(10) > 5).analyze + val optimized = Optimize.execute(originalQuery) + val correctAnswer = testRelation.where(Rand(10) > 5).where(Rand(10) > 5).select('a).analyze + comparePlans(optimized, correctAnswer) + } +} From c3a504edac2bfb049732b85e864ff145000608d7 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 27 Feb 2016 00:25:16 -0800 Subject: [PATCH 5/8] style correction. --- .../apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala index 84ad39e0593bf..0ee7cf92097e1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala @@ -61,7 +61,7 @@ class PruneFiltersSuite extends PlanTest { val query = tr1.where('a.attr > 10) .unionAll(tr2.where('d.attr > 10) - .unionAll(tr3.where('g.attr > 10))) + .unionAll(tr3.where('g.attr > 10))) val queryWithUselessFilter = query.where('a.attr > 10) val optimized = Optimize.execute(queryWithUselessFilter.analyze) From b71c0844279b533b30e98e670481a99b1139cace Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 4 Mar 2016 23:35:42 -0800 Subject: [PATCH 6/8] address comments. --- .../apache/spark/sql/catalyst/optimizer/Optimizer.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 5b97b4dd13f7b..360cf6cf7f24b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -770,7 +770,8 @@ object CombineFilters extends Rule[LogicalPlan] { } /** - * Remove all the deterministic conditions in a [[Filter]] that are contained in the Child. + * Remove all the deterministic conditions in a [[Filter]] that are guaranteed to be true + * given the constraints on the child's output. */ object PruneFilters extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { @@ -784,8 +785,8 @@ object PruneFilters extends Rule[LogicalPlan] with PredicateHelper { } else if (remainingPredicates.isEmpty) { p } else { - val newCond = remainingPredicates.reduceOption(And).getOrElse(Literal(true)) - Filter(newCond, p: LogicalPlan) + val newCond = remainingPredicates.reduce(And) + Filter(newCond, p) } } } From 83e0f4a2b88625065802e50c5c02ead8e13a71e4 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 7 Mar 2016 11:37:10 -0800 Subject: [PATCH 7/8] revert the change in import. --- .../spark/sql/catalyst/optimizer/FilterPushdownSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 46df930bf25c0..1292aa0003dd7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.{LeftOuter, LeftSemi, PlanTest, RightOuter} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.types.IntegerType From 7b7b41134257bf18169f28b648a00daf711bc2ab Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 7 Mar 2016 22:30:55 -0800 Subject: [PATCH 8/8] combine two rules --- .../sql/catalyst/optimizer/Optimizer.scala | 31 +++++++------------ .../BooleanSimplificationSuite.scala | 2 +- .../optimizer/SetOperationSuite.scala | 2 +- .../datasources/parquet/ParquetFilters.scala | 2 +- 4 files changed, 15 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 360cf6cf7f24b..c5631a87cfcf9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -84,7 +84,6 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { SimplifyConditionals, RemoveDispensableExpressions, PruneFilters, - SimplifyFilters, SimplifyCasts, SimplifyCaseConversionExpressions, EliminateSerialization) :: @@ -770,11 +769,21 @@ object CombineFilters extends Rule[LogicalPlan] { } /** - * Remove all the deterministic conditions in a [[Filter]] that are guaranteed to be true - * given the constraints on the child's output. + * Removes filters that can be evaluated trivially. This can be done through the following ways: + * 1) by eliding the filter for cases where it will always evaluate to `true`. + * 2) by substituting a dummy empty relation when the filter will always evaluate to `false`. + * 3) by eliminating the always-true conditions given the constraints on the child's output. */ object PruneFilters extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { + // If the filter condition always evaluate to true, remove the filter. + case Filter(Literal(true, BooleanType), child) => child + // If the filter condition always evaluate to null or false, + // replace the input with an empty relation. + case Filter(Literal(null, _), child) => LocalRelation(child.output, data = Seq.empty) + case Filter(Literal(false, BooleanType), child) => LocalRelation(child.output, data = Seq.empty) + // If any deterministic condition is guaranteed to be true given the constraints on the child's + // output, remove the condition case f @ Filter(fc, p: LogicalPlan) => val (prunedPredicates, remainingPredicates) = splitConjunctivePredicates(fc).partition { cond => @@ -791,22 +800,6 @@ object PruneFilters extends Rule[LogicalPlan] with PredicateHelper { } } -/** - * Removes filters that can be evaluated trivially. This is done either by eliding the filter for - * cases where it will always evaluate to `true`, or substituting a dummy empty relation when the - * filter will always evaluate to `false`. - */ -object SimplifyFilters extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - // If the filter condition always evaluate to true, remove the filter. - case Filter(Literal(true, BooleanType), child) => child - // If the filter condition always evaluate to null or false, - // replace the input with an empty relation. - case Filter(Literal(null, _), child) => LocalRelation(child.output, data = Seq.empty) - case Filter(Literal(false, BooleanType), child) => LocalRelation(child.output, data = Seq.empty) - } -} - /** * Pushes [[Filter]] operators through [[Project]] operators, in-lining any [[Alias Aliases]] * that were defined in the projection. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index 3e52441519ae2..da43751b0a310 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -36,7 +36,7 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { NullPropagation, ConstantFolding, BooleanSimplification, - SimplifyFilters) :: Nil + PruneFilters) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int, 'd.string) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala index 50f3b512d94ed..b08cdc8a3658e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala @@ -32,7 +32,7 @@ class SetOperationSuite extends PlanTest { Batch("Union Pushdown", Once, CombineUnions, SetOperationPushDown, - SimplifyFilters) :: Nil + PruneFilters) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 5a5cb5cf03d4a..95afdc789f322 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -234,7 +234,7 @@ private[sql] object ParquetFilters { // // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`, // which can be casted to `false` implicitly. Please refer to the `eval` method of these - // operators and the `SimplifyFilters` rule for details. + // operators and the `PruneFilters` rule for details. // Hyukjin: // I added [[EqualNullSafe]] with [[org.apache.parquet.filter2.predicate.Operators.Eq]].