From 255daac2500876c08c138847870ca37d6c1806bf Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 13 Oct 2021 01:00:30 -0700 Subject: [PATCH 1/3] Disable two level map for final hash aggregation by default --- .../org/apache/spark/sql/internal/SQLConf.scala | 14 +++++++++++++- .../execution/aggregate/HashAggregateExec.scala | 9 ++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1dc4a8eaabaa..6a033b519b03 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1705,11 +1705,21 @@ object SQLConf { .doc("Enable two-level aggregate hash map. When enabled, records will first be " + "inserted/looked-up at a 1st-level, small, fast map, and then fallback to a " + "2nd-level, larger, slower map when 1st level is full or keys cannot be found. " + - "When disabled, records go directly to the 2nd level.") + "When disabled, records go directly to the 2nd level. Enable for partial aggregate only.") .version("2.3.0") .booleanConf .createWithDefault(true) + val ENABLE_TWOLEVEL_FINAL_AGG_MAP = + buildConf("spark.sql.codegen.aggregate.final.map.twolevel.enabled") + .internal() + .doc("Enable two-level aggregate hash map for final aggregate as well. Disable by default " + + "because final aggregate might get more distinct keys compared to partial aggregate. " + + "Overhead of looking up 1st-level map might dominate when having a lot of distinct keys.") + .version("3.2.0") + .booleanConf + .createWithDefault(false) + val ENABLE_VECTORIZED_HASH_MAP = buildConf("spark.sql.codegen.aggregate.map.vectorized.enable") .internal() @@ -3865,6 +3875,8 @@ class SQLConf extends Serializable with Logging { def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP) + def enableTwoLevelFinalAggMap: Boolean = getConf(ENABLE_TWOLEVEL_FINAL_AGG_MAP) + def enableVectorizedHashMap: Boolean = getConf(ENABLE_VECTORIZED_HASH_MAP) def useObjectHashAggregation: Boolean = getConf(USE_OBJECT_HASH_AGG) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index da310b6e4be7..2d7b79957600 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -667,7 +667,14 @@ case class HashAggregateExec( val isNotByteArrayDecimalType = bufferSchema.map(_.dataType).filter(_.isInstanceOf[DecimalType]) .forall(!DecimalType.isByteArrayDecimalType(_)) - isSupported && isNotByteArrayDecimalType + val isEnabledForAggModes = + if (modes.forall(mode => mode == Partial || mode == PartialMerge)) { + true + } else { + conf.enableTwoLevelFinalAggMap + } + + isSupported && isNotByteArrayDecimalType && isEnabledForAggModes } private def enableTwoLevelHashMap(ctx: CodegenContext): Unit = { From d02f1088739b9b6945b4a1bb8b16a14e43a70123 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 13 Oct 2021 16:21:05 -0700 Subject: [PATCH 2/3] Address all comments --- .../org/apache/spark/sql/internal/SQLConf.scala | 14 +++++++------- .../execution/aggregate/HashAggregateExec.scala | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6a033b519b03..d3386f36def2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1705,20 +1705,20 @@ object SQLConf { .doc("Enable two-level aggregate hash map. When enabled, records will first be " + "inserted/looked-up at a 1st-level, small, fast map, and then fallback to a " + "2nd-level, larger, slower map when 1st level is full or keys cannot be found. " + - "When disabled, records go directly to the 2nd level. Enable for partial aggregate only.") + "When disabled, records go directly to the 2nd level.") .version("2.3.0") .booleanConf .createWithDefault(true) - val ENABLE_TWOLEVEL_FINAL_AGG_MAP = - buildConf("spark.sql.codegen.aggregate.final.map.twolevel.enabled") + val ENABLE_TWOLEVEL_AGG_MAP_PARTIAL_ONLY = + buildConf("spark.sql.codegen.aggregate.map.twolevel.partialOnly") .internal() - .doc("Enable two-level aggregate hash map for final aggregate as well. Disable by default " + + .doc("Enable two-level aggregate hash map for partial aggregate only, " + "because final aggregate might get more distinct keys compared to partial aggregate. " + "Overhead of looking up 1st-level map might dominate when having a lot of distinct keys.") - .version("3.2.0") + .version("3.2.1") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val ENABLE_VECTORIZED_HASH_MAP = buildConf("spark.sql.codegen.aggregate.map.vectorized.enable") @@ -3875,7 +3875,7 @@ class SQLConf extends Serializable with Logging { def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP) - def enableTwoLevelFinalAggMap: Boolean = getConf(ENABLE_TWOLEVEL_FINAL_AGG_MAP) + def enableTwoLevelAggMapPartialOnly: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP_PARTIAL_ONLY) def enableVectorizedHashMap: Boolean = getConf(ENABLE_VECTORIZED_HASH_MAP) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 2d7b79957600..cfecd66a8879 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -671,7 +671,7 @@ case class HashAggregateExec( if (modes.forall(mode => mode == Partial || mode == PartialMerge)) { true } else { - conf.enableTwoLevelFinalAggMap + !conf.enableTwoLevelAggMapPartialOnly } isSupported && isNotByteArrayDecimalType && isEnabledForAggModes From 889adbeb0923e0f3645816480e41dcc810b5309d Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 14 Oct 2021 00:47:11 -0700 Subject: [PATCH 3/3] Address all comments and fix unit test failure --- .../org/apache/spark/sql/catalyst/expressions/grouping.scala | 2 +- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 -- .../spark/sql/execution/aggregate/HashAggregateExec.scala | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala index 8b95ee0c4d3c..8ce0e57b6915 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala @@ -217,9 +217,9 @@ case class Grouping(child: Expression) extends Expression with Unevaluable Examples: > SELECT name, _FUNC_(), sum(age), avg(height) FROM VALUES (2, 'Alice', 165), (5, 'Bob', 180) people(age, name, height) GROUP BY cube(name, height); Alice 0 2 165.0 - Bob 0 5 180.0 Alice 1 2 165.0 NULL 3 7 172.5 + Bob 0 5 180.0 Bob 1 5 180.0 NULL 2 2 165.0 NULL 2 5 180.0 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d3386f36def2..f7b9f1f466ef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3875,8 +3875,6 @@ class SQLConf extends Serializable with Logging { def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP) - def enableTwoLevelAggMapPartialOnly: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP_PARTIAL_ONLY) - def enableVectorizedHashMap: Boolean = getConf(ENABLE_VECTORIZED_HASH_MAP) def useObjectHashAggregation: Boolean = getConf(USE_OBJECT_HASH_AGG) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index cfecd66a8879..854515402860 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -671,7 +671,7 @@ case class HashAggregateExec( if (modes.forall(mode => mode == Partial || mode == PartialMerge)) { true } else { - !conf.enableTwoLevelAggMapPartialOnly + !conf.getConf(SQLConf.ENABLE_TWOLEVEL_AGG_MAP_PARTIAL_ONLY) } isSupported && isNotByteArrayDecimalType && isEnabledForAggModes