From b09b89571a75e5dafb6765845c0ba370029d7406 Mon Sep 17 00:00:00 2001 From: scwf Date: Fri, 1 May 2015 22:22:35 +0800 Subject: [PATCH 1/3] improve column pruning --- .../sql/catalyst/optimizer/Optimizer.scala | 5 +++++ .../optimizer/FilterPushdownSuite.scala | 19 ++++++++++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 709f7d672d931..eaf794e3c1109 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -172,6 +172,11 @@ object ColumnPruning extends Rule[LogicalPlan] { case Project(projectList, Limit(exp, child)) => Limit(exp, Project(projectList, child)) + + // push down project if possible when the child is sort + case p @ Project(projectList, s @ Sort(order, _, grandChild)) + if (s.references -- p.references).size == 0 => + s.copy(child = Project(projectList, grandChild)) // Eliminate no-op Projects case Project(projectList, child) if child.output == projectList => child diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 58d415d9011e1..092cc32611365 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries -import org.apache.spark.sql.catalyst.expressions.{Count, Explode} +import org.apache.spark.sql.catalyst.expressions.{SortOrder, Ascending, Count, Explode} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{LeftSemi, PlanTest, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.rules._ @@ -541,4 +541,21 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, originalQuery) } + + test("push down project") { + val x = testRelation.subquery('x) + + val originalQuery = { + x.select('a, 'b) + .sortBy(SortOrder('a, Ascending)) + .select('a) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + x.select('a) + .sortBy(SortOrder('a, Ascending)).analyze + + comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) + } } From e230155e224f63a2ba5881bdf509b58473f60f0a Mon Sep 17 00:00:00 2001 From: scwf Date: Sat, 2 May 2015 00:55:30 +0800 Subject: [PATCH 2/3] fix tests failure --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index eaf794e3c1109..917e1d5ba9735 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -175,7 +175,7 @@ object ColumnPruning extends Rule[LogicalPlan] { // push down project if possible when the child is sort case p @ Project(projectList, s @ Sort(order, _, grandChild)) - if (s.references -- p.references).size == 0 => + if (s.references -- p.outputSet).isEmpty => s.copy(child = Project(projectList, grandChild)) // Eliminate no-op Projects From b00d833f1b17424d5e3ab17115ff3b2c6e972031 Mon Sep 17 00:00:00 2001 From: scwf Date: Sat, 9 May 2015 09:13:31 +0800 Subject: [PATCH 3/3] address michael's comment --- .../sql/catalyst/optimizer/Optimizer.scala | 4 ++-- .../optimizer/FilterPushdownSuite.scala | 19 ++++++++++++++++++- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 917e1d5ba9735..0bcdcced7a340 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -174,8 +174,8 @@ object ColumnPruning extends Rule[LogicalPlan] { Limit(exp, Project(projectList, child)) // push down project if possible when the child is sort - case p @ Project(projectList, s @ Sort(order, _, grandChild)) - if (s.references -- p.outputSet).isEmpty => + case p @ Project(projectList, s @ Sort(_, _, grandChild)) + if s.references.subsetOf(p.outputSet) => s.copy(child = Project(projectList, grandChild)) // Eliminate no-op Projects diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 092cc32611365..932967d854a31 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -542,9 +542,10 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, originalQuery) } - test("push down project") { + test("push down project past sort") { val x = testRelation.subquery('x) + // push down valid val originalQuery = { x.select('a, 'b) .sortBy(SortOrder('a, Ascending)) @@ -557,5 +558,21 @@ class FilterPushdownSuite extends PlanTest { .sortBy(SortOrder('a, Ascending)).analyze comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) + + // push down invalid + val originalQuery1 = { + x.select('a, 'b) + .sortBy(SortOrder('a, Ascending)) + .select('b) + } + + val optimized1 = Optimize.execute(originalQuery1.analyze) + val correctAnswer1 = + x.select('a, 'b) + .sortBy(SortOrder('a, Ascending)) + .select('b).analyze + + comparePlans(optimized1, analysis.EliminateSubQueries(correctAnswer1)) + } }