Skip to content

Commit 192a04d

Browse files
author
Davies Liu
committed
change the default plan for single distinct
1 parent a8ceec5 commit 192a04d

File tree

4 files changed

+14
-21
lines changed

4 files changed

+14
-21
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst
2020
private[spark] trait CatalystConf {
2121
def caseSensitiveAnalysis: Boolean
2222

23-
protected[spark] def specializeSingleDistinctAggPlanning: Boolean
23+
protected[spark] def aggregationPlanning15: Boolean
2424
}
2525

2626
/**
@@ -32,12 +32,12 @@ object EmptyConf extends CatalystConf {
3232
throw new UnsupportedOperationException
3333
}
3434

35-
protected[spark] override def specializeSingleDistinctAggPlanning: Boolean = {
35+
protected[spark] override def aggregationPlanning15: Boolean = {
3636
throw new UnsupportedOperationException
3737
}
3838
}
3939

4040
/** A CatalystConf that can be used for local testing. */
4141
case class SimpleCatalystConf(caseSensitiveAnalysis: Boolean) extends CatalystConf {
42-
protected[spark] override def specializeSingleDistinctAggPlanning: Boolean = true
42+
protected[spark] override def aggregationPlanning15: Boolean = false
4343
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,13 +123,11 @@ case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalP
123123
.filter(_.isDistinct)
124124
.groupBy(_.aggregateFunction.children.toSet)
125125

126-
val shouldRewrite = if (conf.specializeSingleDistinctAggPlanning) {
127-
// When the flag is set to specialize single distinct agg planning,
128-
// we will rely on our Aggregation strategy to handle queries with a single
129-
// distinct column.
126+
val shouldRewrite = if (conf.aggregationPlanning15) {
127+
// use the same plan as Spark 1.5 (one shuffle) for single distinct
130128
distinctAggGroups.size > 1
131129
} else {
132-
distinctAggGroups.size >= 1
130+
distinctAggGroups.nonEmpty
133131
}
134132
if (shouldRewrite) {
135133
// Create the attributes for the grouping id and the group by clause.

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -449,17 +449,13 @@ private[spark] object SQLConf {
449449
doc = "When true, we could use `datasource`.`path` as table in SQL query"
450450
)
451451

452-
val SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING =
453-
booleanConf("spark.sql.specializeSingleDistinctAggPlanning",
454-
defaultValue = Some(true),
452+
val AGG_PLANNING_15 =
453+
booleanConf("spark.sql.aggregationPlanning15",
454+
defaultValue = Some(false),
455455
isPublic = false,
456-
doc = "When true, if a query only has a single distinct column and it has " +
457-
"grouping expressions, we will use our planner rule to handle this distinct " +
458-
"column (other cases are handled by DistinctAggregationRewriter). " +
459-
"When false, we will always use DistinctAggregationRewriter to plan " +
460-
"aggregation queries with DISTINCT keyword. This is an internal flag that is " +
461-
"used to benchmark the performance impact of using DistinctAggregationRewriter to " +
462-
"plan aggregation queries with a single distinct column.")
456+
doc = "When true, will generate the plan as Spark 1.5 (using one shuffle for a query have " +
457+
"single distinct aggregation). When false, will generate more robust plan (using two " +
458+
"shuffle)")
463459

464460
object Deprecated {
465461
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
@@ -579,8 +575,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
579575

580576
private[spark] def runSQLOnFile: Boolean = getConf(RUN_SQL_ON_FILES)
581577

582-
protected[spark] override def specializeSingleDistinctAggPlanning: Boolean =
583-
getConf(SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING)
578+
protected[spark] override def aggregationPlanning15: Boolean = getConf(AGG_PLANNING_15)
584579

585580
/** ********************** SQLConf functionality methods ************ */
586581

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te
527527
test("single distinct column set") {
528528
Seq(true, false).foreach { specializeSingleDistinctAgg =>
529529
val conf =
530-
(SQLConf.SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING.key,
530+
(SQLConf.AGG_PLANNING_15.key,
531531
specializeSingleDistinctAgg.toString)
532532
withSQLConf(conf) {
533533
// DISTINCT is not meaningful with Max and Min, so we just ignore the DISTINCT keyword.

0 commit comments

Comments
 (0)