From b69d385b1a0cf0b8745a85427c77cbc8cb21e627 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Thu, 5 Feb 2015 19:01:27 -0800 Subject: [PATCH 1/4] add a prune rule for grouping set --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0da081ed1a6e..57d26aae982f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -119,6 +119,10 @@ object ColumnPruning extends Rule[LogicalPlan] { case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty => a.copy(child = Project(a.references.toSeq, child)) + case p @ Project(_, Aggregate(ge, ec, child)) + if (ec.map(_.references).toSet -- p.references).nonEmpty => + Aggregate(ge, ec.filter(e => p.references.contains(e)), child) + // Eliminate unneeded attributes from either side of a Join. case Project(projectList, Join(left, right, joinType, condition)) => // Collect the list of all references required either above or to evaluate the condition. From 80ddcc6491d3865b5d4c24f2e4ae95d7e1330d62 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Thu, 5 Feb 2015 21:37:35 -0800 Subject: [PATCH 2/4] keep project --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 57d26aae982f..bd8eeed4b0cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -119,9 +119,9 @@ object ColumnPruning extends Rule[LogicalPlan] { case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty => a.copy(child = Project(a.references.toSeq, child)) - case p @ Project(_, Aggregate(ge, ec, child)) + case p @ Project(pl, Aggregate(ge, ec, child)) if (ec.map(_.references).toSet -- p.references).nonEmpty => - Aggregate(ge, ec.filter(e => p.references.contains(e)), child) + Project(pl, Aggregate(ge, ec.filter(e => p.references.contains(e)), child)) // Eliminate unneeded attributes from either side of a Join. case Project(projectList, Join(left, right, joinType, condition)) => From 61f8ef7f96ebb7c8bc9fb4a34eb31d78a1da083a Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Sun, 8 Feb 2015 20:58:10 -0800 Subject: [PATCH 3/4] add a unit test --- .../optimizer/FilterPushdownSuite.scala | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 0b74bacb18f4..a704199fda18 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries -import org.apache.spark.sql.catalyst.expressions.Explode +import org.apache.spark.sql.catalyst.expressions.{Count, Explode} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.rules._ @@ -37,7 +37,8 @@ class FilterPushdownSuite extends PlanTest { CombineFilters, PushPredicateThroughProject, PushPredicateThroughJoin, - PushPredicateThroughGenerate) :: Nil + PushPredicateThroughGenerate, + ColumnPruning) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) @@ -58,6 +59,22 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("column pruning for group") { + val originalQuery = + testRelation + .groupBy('a)('a, Count('b)) + .select('a) + + val optimized = Optimize(originalQuery.analyze) + val correctAnswer = + testRelation + .select('a) + .groupBy('a)('a) + .select('a).analyze + + comparePlans(optimized, correctAnswer) + } + // After this line is unimplemented. test("simple push down") { val originalQuery = From 5d2d8a3a8b1a7d5afea1c08933d7891c758c31a3 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Mon, 9 Feb 2015 19:03:40 -0800 Subject: [PATCH 4/4] address Michael's comments --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index bd8eeed4b0cc..1a75fcf3545b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -119,9 +119,14 @@ object ColumnPruning extends Rule[LogicalPlan] { case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty => a.copy(child = Project(a.references.toSeq, child)) - case p @ Project(pl, Aggregate(ge, ec, child)) - if (ec.map(_.references).toSet -- p.references).nonEmpty => - Project(pl, Aggregate(ge, ec.filter(e => p.references.contains(e)), child)) + case p @ Project(projectList, a @ Aggregate(groupingExpressions, aggregateExpressions, child)) + if (a.outputSet -- p.references).nonEmpty => + Project( + projectList, + Aggregate( + groupingExpressions, + aggregateExpressions.filter(e => p.references.contains(e)), + child)) // Eliminate unneeded attributes from either side of a Join. case Project(projectList, Join(left, right, joinType, condition)) =>