From c51448d1173739dd592895b0902ab61d66da499d Mon Sep 17 00:00:00 2001 From: xin Wu Date: Thu, 5 May 2016 07:50:37 -0700 Subject: [PATCH 1/7] move DistinctAggregateRewrite rule to optimizer --- .../sql/catalyst/analysis/Analyzer.scala | 1 - .../DistinctAggregationRewriter.scala | 17 +---- .../sql/catalyst/optimizer/Optimizer.scala | 3 +- .../execution/AggregationQuerySuite.scala | 63 +++++++++++++++++++ 4 files changed, 68 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index bc62c7fc6a920..04f62d78ea91a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -80,7 +80,6 @@ class Analyzer( ExtractWindowExpressions :: GlobalAggregates :: ResolveAggregateFunctions :: - DistinctAggregationRewriter(conf) :: HiveTypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), Batch("Nondeterministic", Once, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala index 9c78f6d4cc71b..9f677cf45c680 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala @@ -100,13 +100,10 @@ import org.apache.spark.sql.types.IntegerType * we could improve this in the current rule by applying more advanced expression cannocalization * techniques. */ -case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalPlan] { +object DistinctAggregationRewriter extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case p if !p.resolved => p - // We need to wait until this Aggregate operator is resolved. + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case a: Aggregate => rewrite(a) - case p => p } def rewrite(a: Aggregate): Aggregate = { @@ -123,15 +120,7 @@ case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalP .filter(_.isDistinct) .groupBy(_.aggregateFunction.children.toSet) - val shouldRewrite = if (conf.specializeSingleDistinctAggPlanning) { - // When the flag is set to specialize single distinct agg planning, - // we will rely on our Aggregation strategy to handle queries with a single - // distinct column. - distinctAggGroups.size > 1 - } else { - distinctAggGroups.size >= 1 - } - if (shouldRewrite) { + if (distinctAggGroups.size > 1) { // Create the attributes for the grouping id and the group by clause. val gid = new AttributeReference("gid", IntegerType, false)() val groupByMap = a.groupingExpressions.collect { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 682b860672b2d..58b221283a299 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.immutable.HashSet -import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries} +import org.apache.spark.sql.catalyst.analysis.{DistinctAggregationRewriter, CleanupAliases, EliminateSubQueries} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.Inner @@ -38,6 +38,7 @@ object DefaultOptimizer extends Optimizer { Batch("Remove SubQueries", FixedPoint(100), EliminateSubQueries) :: Batch("Aggregate", FixedPoint(100), + DistinctAggregationRewriter, ReplaceDistinctWithAggregate, RemoveLiteralFromGroupExpressions) :: Batch("Operator Optimizations", FixedPoint(100), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 64bff827aead9..204399958ac0d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -930,6 +930,69 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(11) :: Nil) } } + + test("SPARK-14495: two distinct aggregation with having clause of one distinct aggregation") { + checkAnswer( + sqlContext.sql( + """ + |select key, count(distinct value1), count(distinct value2) + |from agg2 group by key + |having count(distinct value1) > 0 + """.stripMargin), + Seq( + Row(null, 3, 3), + Row(1, 2, 3), + Row(2, 2, 1) + ) + ) + } + + test("SPARK-14495: one distinct aggregration with having clause of one distinct aggregation") { + checkAnswer( + sqlContext.sql( + """ + |select key, count(distinct value1) + |from agg2 group by key + |having count(distinct value1) > 0 + """.stripMargin), + Seq( + Row(null, 3), + Row(1, 2), + Row(2, 2) + ) + ) + } + + test("SPARK-14495: two distinct aggregration with having clause of two distinct aggregation") { + checkAnswer( + sqlContext.sql( + """ + |select key, count(distinct value1), count(distinct value2) + |from agg2 group by key + |having count(distinct value1) > 0 and count(distinct value2) = 3 + """.stripMargin), + Seq( + Row(null, 3, 3), + Row(1, 2, 3) + ) + ) + } + + test("SPARK-14495: two distinct aggregration with having clause of non-distinct aggregation") { + checkAnswer( + sqlContext.sql( + """ + |select key, count(distinct value1), count(distinct value2) + |from agg2 group by key + |having count(value1) > 0 + """.stripMargin), + Seq( + Row(null, 3, 3), + Row(1, 2, 3), + Row(2, 2, 1) + ) + ) + } } From f73428f94746d6d074baf6702589545bdbd11cad Mon Sep 17 00:00:00 2001 From: xin Wu Date: Fri, 6 May 2016 19:23:30 -0700 Subject: [PATCH 2/7] modify testcases and remove property spark.sql.specializeSingleDistinctAggPlanning --- .../spark/sql/catalyst/CatalystConf.scala | 10 +- .../DistinctAggregationRewriter.scala | 1 - .../scala/org/apache/spark/sql/SQLConf.scala | 15 -- .../spark/sql/execution/PlannerSuite.scala | 8 +- .../execution/AggregationQuerySuite.scala | 141 +++++++++--------- 5 files changed, 73 insertions(+), 102 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala index 7c2b8a9407884..3f351b07b37df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.catalyst private[spark] trait CatalystConf { def caseSensitiveAnalysis: Boolean - - protected[spark] def specializeSingleDistinctAggPlanning: Boolean } /** @@ -31,13 +29,7 @@ object EmptyConf extends CatalystConf { override def caseSensitiveAnalysis: Boolean = { throw new UnsupportedOperationException } - - protected[spark] override def specializeSingleDistinctAggPlanning: Boolean = { - throw new UnsupportedOperationException - } } /** A CatalystConf that can be used for local testing. */ -case class SimpleCatalystConf(caseSensitiveAnalysis: Boolean) extends CatalystConf { - protected[spark] override def specializeSingleDistinctAggPlanning: Boolean = true -} +case class SimpleCatalystConf(caseSensitiveAnalysis: Boolean) extends CatalystConf diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala index 9f677cf45c680..c27524888b801 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Complete} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, LogicalPlan} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 6cc680a1b2286..e10254d4c25ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -449,18 +449,6 @@ private[spark] object SQLConf { doc = "When true, we could use `datasource`.`path` as table in SQL query" ) - val SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING = - booleanConf("spark.sql.specializeSingleDistinctAggPlanning", - defaultValue = Some(false), - isPublic = false, - doc = "When true, if a query only has a single distinct column and it has " + - "grouping expressions, we will use our planner rule to handle this distinct " + - "column (other cases are handled by DistinctAggregationRewriter). " + - "When false, we will always use DistinctAggregationRewriter to plan " + - "aggregation queries with DISTINCT keyword. This is an internal flag that is " + - "used to benchmark the performance impact of using DistinctAggregationRewriter to " + - "plan aggregation queries with a single distinct column.") - object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" val EXTERNAL_SORT = "spark.sql.planner.externalSort" @@ -579,9 +567,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def runSQLOnFile: Boolean = getConf(RUN_SQL_ON_FILES) - protected[spark] override def specializeSingleDistinctAggPlanning: Boolean = - getConf(SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING) - /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 2fb439f50117a..7d7c39c9d78b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -44,10 +44,12 @@ class PlannerSuite extends SharedSQLContext { fail(s"Could query play aggregation query $query. Is it an aggregation query?")) val aggregations = planned.collect { case n if n.nodeName contains "Aggregate" => n } - // For the new aggregation code path, there will be four aggregate operator for - // distinct aggregations. + // For the new aggregation code path, there will be three aggregate operator for + // distinct aggregations. There used to be four aggregate operators because single + // distinct aggregate used to trigger DistinctAggregationRewriter rewrite. Now the + // the rewrite only happens when there are multiple distinct aggregations. assert( - aggregations.size == 2 || aggregations.size == 4, + aggregations.size == 2 || aggregations.size == 3, s"The plan of query $query does not have partial aggregations.") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 204399958ac0d..41d5c2074a278 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -560,80 +560,73 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te } test("single distinct column set") { - Seq(true, false).foreach { specializeSingleDistinctAgg => - val conf = - (SQLConf.SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING.key, - specializeSingleDistinctAgg.toString) - withSQLConf(conf) { - // DISTINCT is not meaningful with Max and Min, so we just ignore the DISTINCT keyword. - checkAnswer( - sqlContext.sql( - """ - |SELECT - | min(distinct value1), - | sum(distinct value1), - | avg(value1), - | avg(value2), - | max(distinct value1) - |FROM agg2 - """.stripMargin), - Row(-60, 70.0, 101.0/9.0, 5.6, 100)) - - checkAnswer( - sqlContext.sql( - """ - |SELECT - | mydoubleavg(distinct value1), - | avg(value1), - | avg(value2), - | key, - | mydoubleavg(value1 - 1), - | mydoubleavg(distinct value1) * 0.1, - | avg(value1 + value2) - |FROM agg2 - |GROUP BY key - """.stripMargin), - Row(120.0, 70.0/3.0, -10.0/3.0, 1, 67.0/3.0 + 100.0, 12.0, 20.0) :: - Row(100.0, 1.0/3.0, 1.0, 2, -2.0/3.0 + 100.0, 10.0, 2.0) :: - Row(null, null, 3.0, 3, null, null, null) :: - Row(110.0, 10.0, 20.0, null, 109.0, 11.0, 30.0) :: Nil) - - checkAnswer( - sqlContext.sql( - """ - |SELECT - | key, - | mydoubleavg(distinct value1), - | mydoublesum(value2), - | mydoublesum(distinct value1), - | mydoubleavg(distinct value1), - | mydoubleavg(value1) - |FROM agg2 - |GROUP BY key - """.stripMargin), - Row(1, 120.0, -10.0, 40.0, 120.0, 70.0/3.0 + 100.0) :: - Row(2, 100.0, 3.0, 0.0, 100.0, 1.0/3.0 + 100.0) :: - Row(3, null, 3.0, null, null, null) :: - Row(null, 110.0, 60.0, 30.0, 110.0, 110.0) :: Nil) - - checkAnswer( - sqlContext.sql( - """ - |SELECT - | count(value1), - | count(*), - | count(1), - | count(DISTINCT value1), - | key - |FROM agg2 - |GROUP BY key - """.stripMargin), - Row(3, 3, 3, 2, 1) :: - Row(3, 4, 4, 2, 2) :: - Row(0, 2, 2, 0, 3) :: - Row(3, 4, 4, 3, null) :: Nil) - } - } + // DISTINCT is not meaningful with Max and Min, so we just ignore the DISTINCT keyword. + checkAnswer( + sqlContext.sql( + """ + |SELECT + | min(distinct value1), + | sum(distinct value1), + | avg(value1), + | avg(value2), + | max(distinct value1) + |FROM agg2 + """.stripMargin), + Row(-60, 70.0, 101.0/9.0, 5.6, 100)) + + checkAnswer( + sqlContext.sql( + """ + |SELECT + | mydoubleavg(distinct value1), + | avg(value1), + | avg(value2), + | key, + | mydoubleavg(value1 - 1), + | mydoubleavg(distinct value1) * 0.1, + | avg(value1 + value2) + |FROM agg2 + |GROUP BY key + """.stripMargin), + Row(120.0, 70.0/3.0, -10.0/3.0, 1, 67.0/3.0 + 100.0, 12.0, 20.0) :: + Row(100.0, 1.0/3.0, 1.0, 2, -2.0/3.0 + 100.0, 10.0, 2.0) :: + Row(null, null, 3.0, 3, null, null, null) :: + Row(110.0, 10.0, 20.0, null, 109.0, 11.0, 30.0) :: Nil) + + checkAnswer( + sqlContext.sql( + """ + |SELECT + | key, + | mydoubleavg(distinct value1), + | mydoublesum(value2), + | mydoublesum(distinct value1), + | mydoubleavg(distinct value1), + | mydoubleavg(value1) + |FROM agg2 + |GROUP BY key + """.stripMargin), + Row(1, 120.0, -10.0, 40.0, 120.0, 70.0/3.0 + 100.0) :: + Row(2, 100.0, 3.0, 0.0, 100.0, 1.0/3.0 + 100.0) :: + Row(3, null, 3.0, null, null, null) :: + Row(null, 110.0, 60.0, 30.0, 110.0, 110.0) :: Nil) + + checkAnswer( + sqlContext.sql( + """ + |SELECT + | count(value1), + | count(*), + | count(1), + | count(DISTINCT value1), + | key + |FROM agg2 + |GROUP BY key + """.stripMargin), + Row(3, 3, 3, 2, 1) :: + Row(3, 4, 4, 2, 2) :: + Row(0, 2, 2, 0, 3) :: + Row(3, 4, 4, 3, null) :: Nil) } test("single distinct multiple columns set") { From fb550a14706eb188dda7a1561269acb80a375227 Mon Sep 17 00:00:00 2001 From: xin Wu Date: Fri, 6 May 2016 22:32:54 -0700 Subject: [PATCH 3/7] fix import order --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 58b221283a299..946ccc6aa5c6e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.immutable.HashSet -import org.apache.spark.sql.catalyst.analysis.{DistinctAggregationRewriter, CleanupAliases, EliminateSubQueries} +import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubQueries} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.Inner From 326eb4bc8586be8ce51c3d7c39c96de684321e94 Mon Sep 17 00:00:00 2001 From: xin Wu Date: Mon, 9 May 2016 17:01:02 -0700 Subject: [PATCH 4/7] SPARK-14495: update upon review. keep SQLConf property specializeSingleDistinctAggPlanning --- .../spark/sql/catalyst/CatalystConf.scala | 10 +++++++++- .../DistinctAggregationRewriter.scala | 13 +++++++++++-- .../sql/catalyst/optimizer/Optimizer.scala | 19 +++++++++++++++---- .../expressions/ExpressionEvalHelper.scala | 4 ++-- .../expressions/MathFunctionsSuite.scala | 4 ++-- .../scala/org/apache/spark/sql/SQLConf.scala | 15 +++++++++++++++ .../org/apache/spark/sql/SQLContext.scala | 2 +- 7 files changed, 55 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala index 3f351b07b37df..7c2b8a9407884 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst private[spark] trait CatalystConf { def caseSensitiveAnalysis: Boolean + + protected[spark] def specializeSingleDistinctAggPlanning: Boolean } /** @@ -29,7 +31,13 @@ object EmptyConf extends CatalystConf { override def caseSensitiveAnalysis: Boolean = { throw new UnsupportedOperationException } + + protected[spark] override def specializeSingleDistinctAggPlanning: Boolean = { + throw new UnsupportedOperationException + } } /** A CatalystConf that can be used for local testing. */ -case class SimpleCatalystConf(caseSensitiveAnalysis: Boolean) extends CatalystConf +case class SimpleCatalystConf(caseSensitiveAnalysis: Boolean) extends CatalystConf { + protected[spark] override def specializeSingleDistinctAggPlanning: Boolean = true +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala index c27524888b801..47d6d3640b1af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Complete} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, LogicalPlan} @@ -99,7 +100,7 @@ import org.apache.spark.sql.types.IntegerType * we could improve this in the current rule by applying more advanced expression cannocalization * techniques. */ -object DistinctAggregationRewriter extends Rule[LogicalPlan] { +case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case a: Aggregate => rewrite(a) @@ -119,7 +120,15 @@ object DistinctAggregationRewriter extends Rule[LogicalPlan] { .filter(_.isDistinct) .groupBy(_.aggregateFunction.children.toSet) - if (distinctAggGroups.size > 1) { + val shouldRewrite = if (conf.specializeSingleDistinctAggPlanning) { + // When the flag is set to specialize single distinct agg planning, + // we will rely on our Aggregation strategy to handle queries with a single + // distinct column. + distinctAggGroups.size > 1 + } else { + distinctAggGroups.size >= 1 + } + if (shouldRewrite) { // Create the attributes for the grouping id and the group by clause. val gid = new AttributeReference("gid", IntegerType, false)() val groupByMap = a.groupingExpressions.collect { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 946ccc6aa5c6e..de7dbc0cb0910 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.immutable.HashSet +import org.apache.spark.sql.catalyst.{SimpleCatalystConf, CatalystConf} import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubQueries} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -30,15 +31,13 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.types._ -abstract class Optimizer extends RuleExecutor[LogicalPlan] - -object DefaultOptimizer extends Optimizer { +abstract class Optimizer(conf: CatalystConf) extends RuleExecutor[LogicalPlan] { val batches = // SubQueries are only needed for analysis and can be removed before execution. Batch("Remove SubQueries", FixedPoint(100), EliminateSubQueries) :: Batch("Aggregate", FixedPoint(100), - DistinctAggregationRewriter, + DistinctAggregationRewriter(conf), ReplaceDistinctWithAggregate, RemoveLiteralFromGroupExpressions) :: Batch("Operator Optimizations", FixedPoint(100), @@ -69,6 +68,18 @@ object DefaultOptimizer extends Optimizer { Batch("LocalRelation", FixedPoint(100), ConvertToLocalRelation) :: Nil } +case class DefaultOptimizer(conf: CatalystConf) extends Optimizer(conf) + +/** + * An optimizer used in test code. + * + * To ensure extendability, we leave the standard rules in the abstract optimizer rules, while + * specific rules go to the subclasses + */ +object SimpleTestOptimizer extends SimpleTestOptimizer + +class SimpleTestOptimizer extends Optimizer( + new SimpleCatalystConf(caseSensitiveAnalysis = true)) /** * Pushes operations down into a Sample. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index 465f7d08aa142..074785eb467d2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -24,7 +24,7 @@ import org.scalatest.prop.GeneratorDrivenPropertyChecks import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.codegen._ -import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer +import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project} import org.apache.spark.sql.types.DataType @@ -189,7 +189,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks { expected: Any, inputRow: InternalRow = EmptyRow): Unit = { val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation) - val optimizedPlan = DefaultOptimizer.execute(plan) + val optimizedPlan = SimpleTestOptimizer.execute(plan) checkEvaluationWithoutCodegen(optimizedPlan.expressions.head, expected, inputRow) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala index aacc56fc44186..90f90965697a9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateProjection, GenerateMutableProjection} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer +import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project} import org.apache.spark.sql.types._ @@ -150,7 +150,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { expression: Expression, inputRow: InternalRow = EmptyRow): Unit = { val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation) - val optimizedPlan = DefaultOptimizer.execute(plan) + val optimizedPlan = SimpleTestOptimizer.execute(plan) checkNaNWithoutCodegen(optimizedPlan.expressions.head, inputRow) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index e10254d4c25ce..6cc680a1b2286 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -449,6 +449,18 @@ private[spark] object SQLConf { doc = "When true, we could use `datasource`.`path` as table in SQL query" ) + val SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING = + booleanConf("spark.sql.specializeSingleDistinctAggPlanning", + defaultValue = Some(false), + isPublic = false, + doc = "When true, if a query only has a single distinct column and it has " + + "grouping expressions, we will use our planner rule to handle this distinct " + + "column (other cases are handled by DistinctAggregationRewriter). " + + "When false, we will always use DistinctAggregationRewriter to plan " + + "aggregation queries with DISTINCT keyword. This is an internal flag that is " + + "used to benchmark the performance impact of using DistinctAggregationRewriter to " + + "plan aggregation queries with a single distinct column.") + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" val EXTERNAL_SORT = "spark.sql.planner.externalSort" @@ -567,6 +579,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def runSQLOnFile: Boolean = getConf(RUN_SQL_ON_FILES) + protected[spark] override def specializeSingleDistinctAggPlanning: Boolean = + getConf(SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 47fd7fc1178a9..8a07cee909bc6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -202,7 +202,7 @@ class SQLContext private[sql]( } @transient - protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer + protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer(conf) @transient protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_)) From e3deb13f347978dcff22afd15b6f398eefc3deef Mon Sep 17 00:00:00 2001 From: xin Wu Date: Mon, 9 May 2016 17:08:20 -0700 Subject: [PATCH 5/7] SPARK-14495: reorder import --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index de7dbc0cb0910..676e0b7aceb67 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.immutable.HashSet -import org.apache.spark.sql.catalyst.{SimpleCatalystConf, CatalystConf} +import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubQueries} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ From e0eeb7d92f397f94698208afa4081a083e835074 Mon Sep 17 00:00:00 2001 From: xin Wu Date: Tue, 10 May 2016 22:18:08 -0700 Subject: [PATCH 6/7] SPARK-14495: update testcase based on review --- .../execution/AggregationQuerySuite.scala | 141 +++++++++--------- 1 file changed, 74 insertions(+), 67 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 41d5c2074a278..204399958ac0d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -560,73 +560,80 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te } test("single distinct column set") { - // DISTINCT is not meaningful with Max and Min, so we just ignore the DISTINCT keyword. - checkAnswer( - sqlContext.sql( - """ - |SELECT - | min(distinct value1), - | sum(distinct value1), - | avg(value1), - | avg(value2), - | max(distinct value1) - |FROM agg2 - """.stripMargin), - Row(-60, 70.0, 101.0/9.0, 5.6, 100)) - - checkAnswer( - sqlContext.sql( - """ - |SELECT - | mydoubleavg(distinct value1), - | avg(value1), - | avg(value2), - | key, - | mydoubleavg(value1 - 1), - | mydoubleavg(distinct value1) * 0.1, - | avg(value1 + value2) - |FROM agg2 - |GROUP BY key - """.stripMargin), - Row(120.0, 70.0/3.0, -10.0/3.0, 1, 67.0/3.0 + 100.0, 12.0, 20.0) :: - Row(100.0, 1.0/3.0, 1.0, 2, -2.0/3.0 + 100.0, 10.0, 2.0) :: - Row(null, null, 3.0, 3, null, null, null) :: - Row(110.0, 10.0, 20.0, null, 109.0, 11.0, 30.0) :: Nil) - - checkAnswer( - sqlContext.sql( - """ - |SELECT - | key, - | mydoubleavg(distinct value1), - | mydoublesum(value2), - | mydoublesum(distinct value1), - | mydoubleavg(distinct value1), - | mydoubleavg(value1) - |FROM agg2 - |GROUP BY key - """.stripMargin), - Row(1, 120.0, -10.0, 40.0, 120.0, 70.0/3.0 + 100.0) :: - Row(2, 100.0, 3.0, 0.0, 100.0, 1.0/3.0 + 100.0) :: - Row(3, null, 3.0, null, null, null) :: - Row(null, 110.0, 60.0, 30.0, 110.0, 110.0) :: Nil) - - checkAnswer( - sqlContext.sql( - """ - |SELECT - | count(value1), - | count(*), - | count(1), - | count(DISTINCT value1), - | key - |FROM agg2 - |GROUP BY key - """.stripMargin), - Row(3, 3, 3, 2, 1) :: - Row(3, 4, 4, 2, 2) :: - Row(0, 2, 2, 0, 3) :: - Row(3, 4, 4, 3, null) :: Nil) + Seq(true, false).foreach { specializeSingleDistinctAgg => + val conf = + (SQLConf.SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING.key, + specializeSingleDistinctAgg.toString) + withSQLConf(conf) { + // DISTINCT is not meaningful with Max and Min, so we just ignore the DISTINCT keyword. + checkAnswer( + sqlContext.sql( + """ + |SELECT + | min(distinct value1), + | sum(distinct value1), + | avg(value1), + | avg(value2), + | max(distinct value1) + |FROM agg2 + """.stripMargin), + Row(-60, 70.0, 101.0/9.0, 5.6, 100)) + + checkAnswer( + sqlContext.sql( + """ + |SELECT + | mydoubleavg(distinct value1), + | avg(value1), + | avg(value2), + | key, + | mydoubleavg(value1 - 1), + | mydoubleavg(distinct value1) * 0.1, + | avg(value1 + value2) + |FROM agg2 + |GROUP BY key + """.stripMargin), + Row(120.0, 70.0/3.0, -10.0/3.0, 1, 67.0/3.0 + 100.0, 12.0, 20.0) :: + Row(100.0, 1.0/3.0, 1.0, 2, -2.0/3.0 + 100.0, 10.0, 2.0) :: + Row(null, null, 3.0, 3, null, null, null) :: + Row(110.0, 10.0, 20.0, null, 109.0, 11.0, 30.0) :: Nil) + + checkAnswer( + sqlContext.sql( + """ + |SELECT + | key, + | mydoubleavg(distinct value1), + | mydoublesum(value2), + | mydoublesum(distinct value1), + | mydoubleavg(distinct value1), + | mydoubleavg(value1) + |FROM agg2 + |GROUP BY key + """.stripMargin), + Row(1, 120.0, -10.0, 40.0, 120.0, 70.0/3.0 + 100.0) :: + Row(2, 100.0, 3.0, 0.0, 100.0, 1.0/3.0 + 100.0) :: + Row(3, null, 3.0, null, null, null) :: + Row(null, 110.0, 60.0, 30.0, 110.0, 110.0) :: Nil) + + checkAnswer( + sqlContext.sql( + """ + |SELECT + | count(value1), + | count(*), + | count(1), + | count(DISTINCT value1), + | key + |FROM agg2 + |GROUP BY key + """.stripMargin), + Row(3, 3, 3, 2, 1) :: + Row(3, 4, 4, 2, 2) :: + Row(0, 2, 2, 0, 3) :: + Row(3, 4, 4, 3, null) :: Nil) + } + } } test("single distinct multiple columns set") { From 3782cda0cf5bde4d9c8fd7dadc1313eed134e99b Mon Sep 17 00:00:00 2001 From: xin Wu Date: Tue, 10 May 2016 23:49:02 -0700 Subject: [PATCH 7/7] SPARK-14495: remove unnecessary testcases --- .../execution/AggregationQuerySuite.scala | 49 +------------------ 1 file changed, 1 insertion(+), 48 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 204399958ac0d..d21227a00fb76 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -931,7 +931,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te } } - test("SPARK-14495: two distinct aggregation with having clause of one distinct aggregation") { + test("SPARK-14495: distinct aggregate in having clause") { checkAnswer( sqlContext.sql( """ @@ -946,53 +946,6 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te ) ) } - - test("SPARK-14495: one distinct aggregration with having clause of one distinct aggregation") { - checkAnswer( - sqlContext.sql( - """ - |select key, count(distinct value1) - |from agg2 group by key - |having count(distinct value1) > 0 - """.stripMargin), - Seq( - Row(null, 3), - Row(1, 2), - Row(2, 2) - ) - ) - } - - test("SPARK-14495: two distinct aggregration with having clause of two distinct aggregation") { - checkAnswer( - sqlContext.sql( - """ - |select key, count(distinct value1), count(distinct value2) - |from agg2 group by key - |having count(distinct value1) > 0 and count(distinct value2) = 3 - """.stripMargin), - Seq( - Row(null, 3, 3), - Row(1, 2, 3) - ) - ) - } - - test("SPARK-14495: two distinct aggregration with having clause of non-distinct aggregation") { - checkAnswer( - sqlContext.sql( - """ - |select key, count(distinct value1), count(distinct value2) - |from agg2 group by key - |having count(value1) > 0 - """.stripMargin), - Seq( - Row(null, 3, 3), - Row(1, 2, 3), - Row(2, 2, 1) - ) - ) - } }