From 192a04dfa9f13829744c3734b6efb44caf6e8e3a Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 1 Dec 2015 13:26:31 -0800 Subject: [PATCH 1/4] change the default plan for single distinct --- .../spark/sql/catalyst/CatalystConf.scala | 6 +++--- .../DistinctAggregationRewriter.scala | 8 +++----- .../scala/org/apache/spark/sql/SQLConf.scala | 19 +++++++------------ .../execution/AggregationQuerySuite.scala | 2 +- 4 files changed, 14 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala index 7c2b8a940788..bc5f5d6d56eb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst private[spark] trait CatalystConf { def caseSensitiveAnalysis: Boolean - protected[spark] def specializeSingleDistinctAggPlanning: Boolean + protected[spark] def aggregationPlanning15: Boolean } /** @@ -32,12 +32,12 @@ object EmptyConf extends CatalystConf { throw new UnsupportedOperationException } - protected[spark] override def specializeSingleDistinctAggPlanning: Boolean = { + protected[spark] override def aggregationPlanning15: Boolean = { throw new UnsupportedOperationException } } /** A CatalystConf that can be used for local testing. */ case class SimpleCatalystConf(caseSensitiveAnalysis: Boolean) extends CatalystConf { - protected[spark] override def specializeSingleDistinctAggPlanning: Boolean = true + protected[spark] override def aggregationPlanning15: Boolean = false } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala index 9c78f6d4cc71..2030260b5802 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala @@ -123,13 +123,11 @@ case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalP .filter(_.isDistinct) .groupBy(_.aggregateFunction.children.toSet) - val shouldRewrite = if (conf.specializeSingleDistinctAggPlanning) { - // When the flag is set to specialize single distinct agg planning, - // we will rely on our Aggregation strategy to handle queries with a single - // distinct column. + val shouldRewrite = if (conf.aggregationPlanning15) { + // use the same plan as Spark 1.5 (one shuffle) for single distinct distinctAggGroups.size > 1 } else { - distinctAggGroups.size >= 1 + distinctAggGroups.nonEmpty } if (shouldRewrite) { // Create the attributes for the grouping id and the group by clause. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 5ef3a48c56a8..b7e3210704a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -449,17 +449,13 @@ private[spark] object SQLConf { doc = "When true, we could use `datasource`.`path` as table in SQL query" ) - val SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING = - booleanConf("spark.sql.specializeSingleDistinctAggPlanning", - defaultValue = Some(true), + val AGG_PLANNING_15 = + booleanConf("spark.sql.aggregationPlanning15", + defaultValue = Some(false), isPublic = false, - doc = "When true, if a query only has a single distinct column and it has " + - "grouping expressions, we will use our planner rule to handle this distinct " + - "column (other cases are handled by DistinctAggregationRewriter). " + - "When false, we will always use DistinctAggregationRewriter to plan " + - "aggregation queries with DISTINCT keyword. This is an internal flag that is " + - "used to benchmark the performance impact of using DistinctAggregationRewriter to " + - "plan aggregation queries with a single distinct column.") + doc = "When true, will generate the plan as Spark 1.5 (using one shuffle for a query have " + + "single distinct aggregation). When false, will generate more robust plan (using two " + + "shuffle)") object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" @@ -579,8 +575,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def runSQLOnFile: Boolean = getConf(RUN_SQL_ON_FILES) - protected[spark] override def specializeSingleDistinctAggPlanning: Boolean = - getConf(SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING) + protected[spark] override def aggregationPlanning15: Boolean = getConf(AGG_PLANNING_15) /** ********************** SQLConf functionality methods ************ */ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 39c0a2a0de04..c2b5c2c50f50 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -527,7 +527,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te test("single distinct column set") { Seq(true, false).foreach { specializeSingleDistinctAgg => val conf = - (SQLConf.SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING.key, + (SQLConf.AGG_PLANNING_15.key, specializeSingleDistinctAgg.toString) withSQLConf(conf) { // DISTINCT is not meaningful with Max and Min, so we just ignore the DISTINCT keyword. From 893b327bd71aeb7bb707cca6ce2d3c921a04382b Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 1 Dec 2015 15:26:47 -0800 Subject: [PATCH 2/4] fix test --- .../scala/org/apache/spark/sql/execution/PlannerSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index dfec139985f7..a4626259b282 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -44,10 +44,10 @@ class PlannerSuite extends SharedSQLContext { fail(s"Could query play aggregation query $query. Is it an aggregation query?")) val aggregations = planned.collect { case n if n.nodeName contains "Aggregate" => n } - // For the new aggregation code path, there will be three aggregate operator for + // For the new aggregation code path, there will be four aggregate operator for // distinct aggregations. assert( - aggregations.size == 2 || aggregations.size == 3, + aggregations.size == 2 || aggregations.size == 4, s"The plan of query $query does not have partial aggregations.") } From 306b8de30210ab5e15a66121ba4a1b9b7c96c6ac Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 1 Dec 2015 16:55:33 -0800 Subject: [PATCH 3/4] Revert "change the default plan for single distinct" This reverts commit 192a04dfa9f13829744c3734b6efb44caf6e8e3a. --- .../spark/sql/catalyst/CatalystConf.scala | 6 +++--- .../DistinctAggregationRewriter.scala | 8 +++++--- .../scala/org/apache/spark/sql/SQLConf.scala | 19 ++++++++++++------- .../execution/AggregationQuerySuite.scala | 2 +- 4 files changed, 21 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala index bc5f5d6d56eb..7c2b8a940788 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst private[spark] trait CatalystConf { def caseSensitiveAnalysis: Boolean - protected[spark] def aggregationPlanning15: Boolean + protected[spark] def specializeSingleDistinctAggPlanning: Boolean } /** @@ -32,12 +32,12 @@ object EmptyConf extends CatalystConf { throw new UnsupportedOperationException } - protected[spark] override def aggregationPlanning15: Boolean = { + protected[spark] override def specializeSingleDistinctAggPlanning: Boolean = { throw new UnsupportedOperationException } } /** A CatalystConf that can be used for local testing. */ case class SimpleCatalystConf(caseSensitiveAnalysis: Boolean) extends CatalystConf { - protected[spark] override def aggregationPlanning15: Boolean = false + protected[spark] override def specializeSingleDistinctAggPlanning: Boolean = true } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala index 2030260b5802..9c78f6d4cc71 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala @@ -123,11 +123,13 @@ case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalP .filter(_.isDistinct) .groupBy(_.aggregateFunction.children.toSet) - val shouldRewrite = if (conf.aggregationPlanning15) { - // use the same plan as Spark 1.5 (one shuffle) for single distinct + val shouldRewrite = if (conf.specializeSingleDistinctAggPlanning) { + // When the flag is set to specialize single distinct agg planning, + // we will rely on our Aggregation strategy to handle queries with a single + // distinct column. distinctAggGroups.size > 1 } else { - distinctAggGroups.nonEmpty + distinctAggGroups.size >= 1 } if (shouldRewrite) { // Create the attributes for the grouping id and the group by clause. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index b7e3210704a3..5ef3a48c56a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -449,13 +449,17 @@ private[spark] object SQLConf { doc = "When true, we could use `datasource`.`path` as table in SQL query" ) - val AGG_PLANNING_15 = - booleanConf("spark.sql.aggregationPlanning15", - defaultValue = Some(false), + val SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING = + booleanConf("spark.sql.specializeSingleDistinctAggPlanning", + defaultValue = Some(true), isPublic = false, - doc = "When true, will generate the plan as Spark 1.5 (using one shuffle for a query have " + - "single distinct aggregation). When false, will generate more robust plan (using two " + - "shuffle)") + doc = "When true, if a query only has a single distinct column and it has " + + "grouping expressions, we will use our planner rule to handle this distinct " + + "column (other cases are handled by DistinctAggregationRewriter). " + + "When false, we will always use DistinctAggregationRewriter to plan " + + "aggregation queries with DISTINCT keyword. This is an internal flag that is " + + "used to benchmark the performance impact of using DistinctAggregationRewriter to " + + "plan aggregation queries with a single distinct column.") object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" @@ -575,7 +579,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def runSQLOnFile: Boolean = getConf(RUN_SQL_ON_FILES) - protected[spark] override def aggregationPlanning15: Boolean = getConf(AGG_PLANNING_15) + protected[spark] override def specializeSingleDistinctAggPlanning: Boolean = + getConf(SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING) /** ********************** SQLConf functionality methods ************ */ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index c2b5c2c50f50..39c0a2a0de04 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -527,7 +527,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te test("single distinct column set") { Seq(true, false).foreach { specializeSingleDistinctAgg => val conf = - (SQLConf.AGG_PLANNING_15.key, + (SQLConf.SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING.key, specializeSingleDistinctAgg.toString) withSQLConf(conf) { // DISTINCT is not meaningful with Max and Min, so we just ignore the DISTINCT keyword. From 0a237b983939fb0da1a512c8a23ac32785229812 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 1 Dec 2015 16:56:30 -0800 Subject: [PATCH 4/4] change default --- sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 5ef3a48c56a8..58adf64e4986 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -451,7 +451,7 @@ private[spark] object SQLConf { val SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING = booleanConf("spark.sql.specializeSingleDistinctAggPlanning", - defaultValue = Some(true), + defaultValue = Some(false), isPublic = false, doc = "When true, if a query only has a single distinct column and it has " + "grouping expressions, we will use our planner rule to handle this distinct " +