Skip to content

Commit 03d77af

Browse files
ooqdavies
authored andcommitted
[SPARK-16525] [SQL] Enable Row Based HashMap in HashAggregateExec
## What changes were proposed in this pull request? This PR is the second step for the following feature: For hash aggregation in Spark SQL, we use a fast aggregation hashmap to act as a "cache" in order to boost aggregation performance. Previously, the hashmap is backed by a `ColumnarBatch`. This has performance issues when we have wide schema for the aggregation table (large number of key fields or value fields). In this JIRA, we support another implementation of fast hashmap, which is backed by a `RowBatch`. We then automatically pick between the two implementations based on certain knobs. In this second-step PR, we enable `RowBasedHashMapGenerator` in `HashAggregateExec`. ## How was this patch tested? Added tests: `RowBasedAggregateHashMapSuite` and ` VectorizedAggregateHashMapSuite` Additional micro-benchmarks tests and TPCDS results will be added in a separate PR in the series. Author: Qifan Pu <[email protected]> Author: ooq <[email protected]> Closes #14176 from ooq/rowbasedfastaggmap-pr2.
1 parent 15539e5 commit 03d77af

File tree

8 files changed

+326
-119
lines changed

8 files changed

+326
-119
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,10 +165,10 @@ private void freeCurrentPage() {
165165
protected FixedLengthRowBasedKeyValueBatch(StructType keySchema, StructType valueSchema,
166166
int maxRows, TaskMemoryManager manager) {
167167
super(keySchema, valueSchema, maxRows, manager);
168-
klen = keySchema.defaultSize()
169-
+ UnsafeRow.calculateBitSetWidthInBytes(keySchema.length());
170-
vlen = valueSchema.defaultSize()
171-
+ UnsafeRow.calculateBitSetWidthInBytes(valueSchema.length());
168+
int keySize = keySchema.size() * 8; // each fixed-length field is stored in a 8-byte word
169+
int valueSize = valueSchema.size() * 8;
170+
klen = keySize + UnsafeRow.calculateBitSetWidthInBytes(keySchema.length());
171+
vlen = valueSize + UnsafeRow.calculateBitSetWidthInBytes(valueSchema.length());
172172
recordLength = klen + vlen + 8;
173173
}
174174
}

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala

Lines changed: 198 additions & 92 deletions
Large diffs are not rendered by default.

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,16 @@ class RowBasedHashMapGenerator(
141141
}
142142

143143
val createUnsafeRowForKey = groupingKeys.zipWithIndex.map { case (key: Buffer, ordinal: Int) =>
144-
s"agg_rowWriter.write(${ordinal}, ${key.name})"}
145-
.mkString(";\n")
144+
key.dataType match {
145+
case t: DecimalType =>
146+
s"agg_rowWriter.write(${ordinal}, ${key.name}, ${t.precision}, ${t.scale})"
147+
case t: DataType =>
148+
if (!t.isInstanceOf[StringType] && !ctx.isPrimitiveType(t)) {
149+
throw new IllegalArgumentException(s"cannot generate code for unsupported type: $t")
150+
}
151+
s"agg_rowWriter.write(${ordinal}, ${key.name})"
152+
}
153+
}.mkString(";\n")
146154

147155
s"""
148156
|public org.apache.spark.sql.catalyst.expressions.UnsafeRow findOrInsert(${

sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -509,14 +509,15 @@ object SQLConf {
509509
.intConf
510510
.createWithDefault(40)
511511

512-
val VECTORIZED_AGG_MAP_MAX_COLUMNS =
513-
SQLConfigBuilder("spark.sql.codegen.aggregate.map.columns.max")
512+
val ENABLE_TWOLEVEL_AGG_MAP =
513+
SQLConfigBuilder("spark.sql.codegen.aggregate.map.twolevel.enable")
514514
.internal()
515-
.doc("Sets the maximum width of schema (aggregate keys + values) for which aggregate with" +
516-
"keys uses an in-memory columnar map to speed up execution. Setting this to 0 effectively" +
517-
"disables the columnar map")
518-
.intConf
519-
.createWithDefault(3)
515+
.doc("Enable two-level aggregate hash map. When enabled, records will first be " +
516+
"inserted/looked-up at a 1st-level, small, fast map, and then fallback to a " +
517+
"2nd-level, larger, slower map when 1st level is full or keys cannot be found. " +
518+
"When disabled, records go directly to the 2nd level. Defaults to true.")
519+
.booleanConf
520+
.createWithDefault(true)
520521

521522
val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion")
522523
.internal()
@@ -687,7 +688,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
687688

688689
override def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)
689690

690-
def vectorizedAggregateMapMaxColumns: Int = getConf(VECTORIZED_AGG_MAP_MAX_COLUMNS)
691+
def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP)
691692

692693
def variableSubstituteEnabled: Boolean = getConf(VARIABLE_SUBSTITUTE_ENABLED)
693694

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql
19+
20+
import org.scalatest.BeforeAndAfter
21+
22+
class SingleLevelAggregateHashMapSuite extends DataFrameAggregateSuite with BeforeAndAfter {
23+
24+
protected override def beforeAll(): Unit = {
25+
sparkConf.set("spark.sql.codegen.fallback", "false")
26+
sparkConf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "false")
27+
super.beforeAll()
28+
}
29+
30+
// adding some checking after each test is run, assuring that the configs are not changed
31+
// in test code
32+
after {
33+
assert(sparkConf.get("spark.sql.codegen.fallback") == "false",
34+
"configuration parameter changed in test body")
35+
assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enable") == "false",
36+
"configuration parameter changed in test body")
37+
}
38+
}
39+
40+
class TwoLevelAggregateHashMapSuite extends DataFrameAggregateSuite with BeforeAndAfter {
41+
42+
protected override def beforeAll(): Unit = {
43+
sparkConf.set("spark.sql.codegen.fallback", "false")
44+
sparkConf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "true")
45+
super.beforeAll()
46+
}
47+
48+
// adding some checking after each test is run, assuring that the configs are not changed
49+
// in test code
50+
after {
51+
assert(sparkConf.get("spark.sql.codegen.fallback") == "false",
52+
"configuration parameter changed in test body")
53+
assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enable") == "true",
54+
"configuration parameter changed in test body")
55+
}
56+
}
57+
58+
class TwoLevelAggregateHashMapWithVectorizedMapSuite extends DataFrameAggregateSuite with
59+
BeforeAndAfter {
60+
61+
protected override def beforeAll(): Unit = {
62+
sparkConf.set("spark.sql.codegen.fallback", "false")
63+
sparkConf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "true")
64+
sparkConf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
65+
super.beforeAll()
66+
}
67+
68+
// adding some checking after each test is run, assuring that the configs are not changed
69+
// in test code
70+
after {
71+
assert(sparkConf.get("spark.sql.codegen.fallback") == "false",
72+
"configuration parameter changed in test body")
73+
assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enable") == "true",
74+
"configuration parameter changed in test body")
75+
assert(sparkConf.get("spark.sql.codegen.aggregate.map.vectorized.enable") == "true",
76+
"configuration parameter changed in test body")
77+
}
78+
}
79+

sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,4 +485,12 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
485485
spark.sql("select avg(a) over () from values 1.0, 2.0, 3.0 T(a)"),
486486
Row(2.0) :: Row(2.0) :: Row(2.0) :: Nil)
487487
}
488+
489+
test("SQL decimal test (used for catching certain demical handling bugs in aggregates)") {
490+
checkAnswer(
491+
decimalData.groupBy('a cast DecimalType(10, 2)).agg(avg('b cast DecimalType(10, 2))),
492+
Seq(Row(new java.math.BigDecimal(1.0), new java.math.BigDecimal(1.5)),
493+
Row(new java.math.BigDecimal(2.0), new java.math.BigDecimal(1.5)),
494+
Row(new java.math.BigDecimal(3.0), new java.math.BigDecimal(1.5))))
495+
}
488496
}

sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,14 @@ class AggregateBenchmark extends BenchmarkBase {
106106

107107
benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
108108
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
109-
sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "0")
109+
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "false")
110110
f()
111111
}
112112

113113
benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
114114
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
115-
sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "3")
115+
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "true")
116+
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
116117
f()
117118
}
118119

@@ -146,13 +147,14 @@ class AggregateBenchmark extends BenchmarkBase {
146147

147148
benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
148149
sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true)
149-
sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", 0)
150+
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "false")
150151
f()
151152
}
152153

153154
benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
154155
sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true)
155-
sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", 3)
156+
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "true")
157+
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
156158
f()
157159
}
158160

@@ -184,13 +186,14 @@ class AggregateBenchmark extends BenchmarkBase {
184186

185187
benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
186188
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
187-
sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "0")
189+
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "false")
188190
f()
189191
}
190192

191193
benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
192194
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
193-
sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "3")
195+
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "true")
196+
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
194197
f()
195198
}
196199

@@ -221,13 +224,14 @@ class AggregateBenchmark extends BenchmarkBase {
221224

222225
benchmark.addCase(s"codegen = T hashmap = F") { iter =>
223226
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
224-
sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "0")
227+
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "false")
225228
f()
226229
}
227230

228231
benchmark.addCase(s"codegen = T hashmap = T") { iter =>
229232
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
230-
sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "3")
233+
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "true")
234+
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
231235
f()
232236
}
233237

@@ -268,13 +272,14 @@ class AggregateBenchmark extends BenchmarkBase {
268272

269273
benchmark.addCase(s"codegen = T hashmap = F") { iter =>
270274
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
271-
sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "0")
275+
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "false")
272276
f()
273277
}
274278

275279
benchmark.addCase(s"codegen = T hashmap = T") { iter =>
276280
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
277-
sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "10")
281+
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "true")
282+
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
278283
f()
279284
}
280285

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -998,9 +998,9 @@ class HashAggregationQuerySuite extends AggregationQuerySuite
998998
class HashAggregationQueryWithControlledFallbackSuite extends AggregationQuerySuite {
999999

10001000
override protected def checkAnswer(actual: => DataFrame, expectedAnswer: Seq[Row]): Unit = {
1001-
Seq(0, 10).foreach { maxColumnarHashMapColumns =>
1002-
withSQLConf("spark.sql.codegen.aggregate.map.columns.max" ->
1003-
maxColumnarHashMapColumns.toString) {
1001+
Seq("true", "false").foreach { enableTwoLevelMaps =>
1002+
withSQLConf("spark.sql.codegen.aggregate.map.twolevel.enable" ->
1003+
enableTwoLevelMaps) {
10041004
(1 to 3).foreach { fallbackStartsAt =>
10051005
withSQLConf("spark.sql.TungstenAggregate.testFallbackStartsAt" ->
10061006
s"${(fallbackStartsAt - 1).toString}, ${fallbackStartsAt.toString}") {

0 commit comments

Comments
 (0)