-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-35141][SQL] Support two level of hash maps for final hash aggregation #32242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @cloud-fan and @maropu could you help take a look when you have time? Thanks. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #137655 has finished for PR 32242 at commit
|
| Examples: | ||
| > SELECT name, _FUNC_(), sum(age), avg(height) FROM VALUES (2, 'Alice', 165), (5, 'Bob', 180) people(age, name, height) GROUP BY cube(name, height); | ||
| Alice 0 2 165.0 | ||
| Bob 0 5 180.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is needed as hash aggregation output order is changed, and it causes ExpressionInfoSuite.check outputs of expression examples test failure in https://github.com/c21/spark/runs/2386397792?check_suite_focus=true .
| val bufferTerm = ctx.freshName("aggBuffer") | ||
| val outputFunc = generateResultFunction(ctx) | ||
|
|
||
| val limitNotReachedCondition = limitNotReachedCond |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding the limit early termination for first level map as well. This is needed to fix test failure SQLMetricsSuite.SPARK-25497: LIMIT within whole stage codegen should not consume all the inputs in https://github.com/c21/spark/runs/2386397792?check_suite_focus=true. And this is good to have anyway.
| | // freed at the end of the task. This is necessary to avoid memory leaks in when the | ||
| | // downstream operator does not fully consume the aggregation map's output | ||
| | // (e.g. aggregate followed by limit). | ||
| | taskContext.addTaskCompletionListener( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed as we need to clean up resource for query with limit and hash aggregate. Generally no query does limit early stop for partial aggregate so it's not a problem before this PR. However, we do have limit early stop for final aggregate, and it causes test failure of SQLQuerySuite.SPARK-21743: top-most limit should not cause memory leak in https://github.com/c21/spark/runs/2386397792?check_suite_focus=true . So here adding a listener to clean up first level hash map resource similar to second level hash map -UnsafeFixedWidthAggregationMap in #21738 .
Use older style of Java syntax here instead of lambda expression, as janino does not support lambda expression compilation yet - https://github.com/janino-compiler/janino/blob/master/janino/src/main/java/org/codehaus/janino/UnitCompiler.java#L6998 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do it outside of the fast hash map? Then we can apply it to both the row-based and vectorized fast hash map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - sounds good, moved out to HashAggregateExec.
| // | ||
| // This scenario only happens in unit test with number-of-rows-based fall back. | ||
| // There should not be same keys in both maps with size-based fall back in production. | ||
| private val isTestFinalAggregateWithFallback: Boolean = testFallbackStartsAt.isDefined && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of merging the hash maps, shall we fix the number-of-rows-based fallback to make it similar to the size-based fallback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - I was thinking the same way too. I found it's quite hard to fix the fallback logic. I tried the approach to add a find(key): Boolean method in generated first level map, and to first check if key already exists in first level map. But I found other case like the key can be put into second level map, later added to first level map as well (fallback row counter reset to 0 case).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't touched this part of the code for a while, can you briefly introduce how size-based fallback work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - sure. This is how number-of-rows-based fallback works.
With an internal config spark.sql.TungstenAggregate.testFallbackStartsAt, we can set (1). when to fallback from first level hash map to second level hash map, and (2). when to fallback from second level hash map to sort.
Suppose spark.sql.TungstenAggregate.testFallbackStartsAt = "2, 3".
Then the generated code per input row (aggregate the row into hash map) looks like:
UnsafeRow agg_buffer = null;
if (counter < 2) {
// 1st level hash map
agg_buffer = fastHashMap.findOrInsert(key);
}
if (agg_buffer == null) {
// generated. code for key in unsafe row format
...
if (counter < 3) {
// 2nd level hash map
agg_buffer = regularHashMap.getAggregationBufferFromUnsafeRow(key_in_unsafe_row, ...);
}
if (agg_buffer == null) {
// sort-based fallback
regularHashMap.destructAndCreateExternalSorter();
...
counter = 0;
}
}
counter += 1;
Example generated code is Line 187-232 in https://gist.github.com/c21/d0f704c0a33c24ec05387ff4df438bff .
I tried to add a method fastHashMap.find(key): boolean, and change code like this:
...
if (fastHashMap.find(key) || counter < 2) {
// 1st level hash map
agg_buffer = fastHashMap.findOrInsert(key);
}
...
But I later found the case as I mentioned above:
- key(a) is inserted into second level hash map (when counter exceeds 1st threshold)
- sort-based fallback happens, and counter is reset to 0 (when counter exceeds 2nd threshold)
- key(a) is not in first level hash map, and counter does not exceed 1st threshold, the key(a) is inserted into first level hash map as well by mistake.
We can further add code like this:
if ((fastHashMap.find(key) && !regularHashMap.find(key_in_unsafe_row)) || counter < 2) {
// 1st level hash map
agg_buffer = fastHashMap.findOrInsert(key);
}
But it introduces more ad-hoc change and looks pretty ugly with a lot of code needs to be moved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - sorry, I overlooked your question, you are asking how size-based fallback works.
Size-based fallback works as:
- try to insert into 1st level hash map, and fallback to 2nd level hash map when no space in the required memory page (
RowBasedKeyValueBatch) - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala#L165-L166 . - try to insert into 2nd level hash map, and fallback to sort-based when no space in
UnsafeFixedWidthAggregationMap- https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java#L148-L150 . - the 2nd level hash map will be sorted and spilled and another new 2nd level hash map will be created. The 1st level hash map cannot be spilled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that the major issue is we use a single counter to control both the fast and regular hash map fallback. My first thought is to add a dedicated counter for the fast hash map fallback, then I realized that the fast hash map has a capacity property. Can we simply set the capacity to testFallbackStartsAt._1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - maybe I am missing something but not sure how these two solutions fix the problem.
- dedicated counters for two maps
if (counter1 < 2) {
// 1st level hash map
agg_buffer = fastHashMap.findOrInsert(key);
}
if (agg_buffer == null) {
// generated. code for key in unsafe row format
...
if (counter2 < 3) {
// 2nd level hash map
agg_buffer = regularHashMap.getAggregationBufferFromUnsafeRow(key_in_unsafe_row, ...);
}
if (agg_buffer == null) {
// sort-based fallback
regularHashMap.destructAndCreateExternalSorter();
...
counter2 = 0;
}
}
counter1 += 1;
counter2 += 1;
Counter example:
1. key_a is inserted into 1st level map (counter1 = 0)
2. a couple of keys are inserted into 1st level map (count1 =2)
3. key_a is inserted into 2nd level map (count1 = 2, count2 = 2)
- set 1st level map
bitMaxCapacityto be log2(testFallbackStartsAt._1).
if (counter < 2) {
// 1st level hash map
agg_buffer = fastHashMap.findOrInsert(key);
}
if (agg_buffer == null) {
// generated. code for key in unsafe row format
...
if (counter < 3) {
// 2nd level hash map
agg_buffer = regularHashMap.getAggregationBufferFromUnsafeRow(key_in_unsafe_row, ...);
}
if (agg_buffer == null) {
// sort-based fallback
regularHashMap.destructAndCreateExternalSorter();
...
counter = 0;
}
}
counter += 1;
Counter example:
1. key_a is inserted into 1st level map (counter = 0)
2. a couple of NULL keys are inserted into 2nd level map (count = 2). Note: 1st level map does not support NULL key.
3. key_a is inserted into 2nd level map (count1 = 2)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My idea is to simulate the size-based fallback: "no space" -> "reach the capacity/limit"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - updated per offline discussion. Changed the first level fallback by restricting first level map capacity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also just FYI - I updated the generated code in PR description for checking if needed.
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #137677 has finished for PR 32242 at commit
|
| // Generates the code to register a cleanup task with TaskContext to ensure that memory | ||
| // is guaranteed to be freed at the end of the task. This is necessary to avoid memory | ||
| // leaks in when the downstream operator does not fully consume the aggregation map's | ||
| // output (e.g. aggregate followed by limit). | ||
| val hookToCloseFastHashMap = | ||
| s""" | ||
| |$thisPlan.getTaskContext().addTaskCompletionListener( | ||
| | new org.apache.spark.util.TaskCompletionListener() { | ||
| | @Override | ||
| | public void onTaskCompletion(org.apache.spark.TaskContext context) { | ||
| | $fastHashMapTerm.close(); | ||
| | } | ||
| |}); | ||
| """.stripMargin | ||
| (iter, create, hookToCloseFastHashMap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only real change inside L728-779. Add a new hookToCloseFastHashMap here to clean up fast hash map. The other code is not changed except indentation. Not sure why github highlights so many change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change the code in a less diff way?
...
val hookToCloseFastHashMap = if (isFastHashMapEnabled) {
...
} else ""
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or we put the logic in the base HashMapGenerator as a method, and call the method in both the vectorized and row-based fast hash map generator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - updated to go with #32242 (comment) . Thanks.
| "org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow>", | ||
| "fastHashMapIter", forceInline = true) | ||
| val create = s"$fastHashMapTerm = new $fastHashMapClassName(" + | ||
| s"$thisPlan.getTaskContext().taskMemoryManager(), " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is changed compared to previous code.
| if (isFastHashMapEnabled) { | ||
| // Generates the fast hash map class and creates the fast hash map term. | ||
| val fastHashMapClassName = ctx.freshName("FastHashMap") | ||
| val (iter, create) = if (isVectorizedHashMapEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is changed compared to previous code.
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #137705 has finished for PR 32242 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
cloud-fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for a small comment
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #137775 has finished for PR 32242 at commit
|
|
Test build #137793 has finished for PR 32242 at commit
|
|
|
||
| private def enableTwoLevelHashMap(ctx: CodegenContext): Unit = { | ||
| if (!checkIfFastHashMapSupported(ctx)) { | ||
| if (modes.forall(mode => mode == Partial || mode == PartialMerge) && !Utils.isTesting) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
last question: can we search the commit history and figure out why we didn't enable the fast hash map in the final aggregate? It seems we did it on purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - I was wondering at first place before making this PR as well. The decision to only support partial aggregate is made when the first level hash map was introduced (#12345 and #14176), and never changed afterwards. I checked with @sameeragarwal before making this PR. He told me there is no fundamental reason to not support final aggregate.
Just for documentation, I asked him why we don't support nested type (array/map/struct) as key type for fast hash map. He told me the reason was the size of keys might be too large for long array/map/struct, so the size of fast hash map may not fit in cache and lose the benefit.
|
thanks, merging to master! |
|
Thank you @cloud-fan for review! |
…n by default ### What changes were proposed in this pull request? This PR is to disable two level of maps for final hash aggregation by default. The feature was introduced in #32242 and we found it can lead to query performance regression when the final aggregation gets rows with a lot of distinct keys. The 1st level hash map is full so a lot of rows will waste the 1st hash map lookup and inserted into 2nd hash map. This feature still benefits query with not so many distinct keys though, so introducing a config here `spark.sql.codegen.aggregate.final.map.twolevel.enabled`, to allow query to enable the feature when seeing benefit. ### Why are the changes needed? Fix query regression. ### Does this PR introduce _any_ user-facing change? Yes, the introduced `spark.sql.codegen.aggregate.final.map.twolevel.enabled` config. ### How was this patch tested? Existing unit test in `AggregationQuerySuite.scala`. Also verified generated code for an example query in the file: ``` spark.sql( """ |SELECT key, avg(value) |FROM agg1 |GROUP BY key """.stripMargin) ``` Verified the generated code for final hash aggregation not have two level maps by default: https://gist.github.com/c21/d4ce87ef28a22d1ce839e0cda000ce14 . Verified the generated code for final hash aggregation have two level maps if enabling the config: https://gist.github.com/c21/4b59752c1f3f98303b60ccff66b5db69 . Closes #34270 from c21/agg-fix. Authored-by: Cheng Su <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…n by default ### What changes were proposed in this pull request? This PR is to disable two level of maps for final hash aggregation by default. The feature was introduced in #32242 and we found it can lead to query performance regression when the final aggregation gets rows with a lot of distinct keys. The 1st level hash map is full so a lot of rows will waste the 1st hash map lookup and inserted into 2nd hash map. This feature still benefits query with not so many distinct keys though, so introducing a config here `spark.sql.codegen.aggregate.final.map.twolevel.enabled`, to allow query to enable the feature when seeing benefit. ### Why are the changes needed? Fix query regression. ### Does this PR introduce _any_ user-facing change? Yes, the introduced `spark.sql.codegen.aggregate.final.map.twolevel.enabled` config. ### How was this patch tested? Existing unit test in `AggregationQuerySuite.scala`. Also verified generated code for an example query in the file: ``` spark.sql( """ |SELECT key, avg(value) |FROM agg1 |GROUP BY key """.stripMargin) ``` Verified the generated code for final hash aggregation not have two level maps by default: https://gist.github.com/c21/d4ce87ef28a22d1ce839e0cda000ce14 . Verified the generated code for final hash aggregation have two level maps if enabling the config: https://gist.github.com/c21/4b59752c1f3f98303b60ccff66b5db69 . Closes #34270 from c21/agg-fix. Authored-by: Cheng Su <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 3354a21) Signed-off-by: Wenchen Fan <[email protected]>
…n by default ### What changes were proposed in this pull request? This PR is to disable two level of maps for final hash aggregation by default. The feature was introduced in apache#32242 and we found it can lead to query performance regression when the final aggregation gets rows with a lot of distinct keys. The 1st level hash map is full so a lot of rows will waste the 1st hash map lookup and inserted into 2nd hash map. This feature still benefits query with not so many distinct keys though, so introducing a config here `spark.sql.codegen.aggregate.final.map.twolevel.enabled`, to allow query to enable the feature when seeing benefit. ### Why are the changes needed? Fix query regression. ### Does this PR introduce _any_ user-facing change? Yes, the introduced `spark.sql.codegen.aggregate.final.map.twolevel.enabled` config. ### How was this patch tested? Existing unit test in `AggregationQuerySuite.scala`. Also verified generated code for an example query in the file: ``` spark.sql( """ |SELECT key, avg(value) |FROM agg1 |GROUP BY key """.stripMargin) ``` Verified the generated code for final hash aggregation not have two level maps by default: https://gist.github.com/c21/d4ce87ef28a22d1ce839e0cda000ce14 . Verified the generated code for final hash aggregation have two level maps if enabling the config: https://gist.github.com/c21/4b59752c1f3f98303b60ccff66b5db69 . Closes apache#34270 from c21/agg-fix. Authored-by: Cheng Su <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 3354a21) Signed-off-by: Wenchen Fan <[email protected]>
…n by default ### What changes were proposed in this pull request? This PR is to disable two level of maps for final hash aggregation by default. The feature was introduced in apache#32242 and we found it can lead to query performance regression when the final aggregation gets rows with a lot of distinct keys. The 1st level hash map is full so a lot of rows will waste the 1st hash map lookup and inserted into 2nd hash map. This feature still benefits query with not so many distinct keys though, so introducing a config here `spark.sql.codegen.aggregate.final.map.twolevel.enabled`, to allow query to enable the feature when seeing benefit. ### Why are the changes needed? Fix query regression. ### Does this PR introduce _any_ user-facing change? Yes, the introduced `spark.sql.codegen.aggregate.final.map.twolevel.enabled` config. ### How was this patch tested? Existing unit test in `AggregationQuerySuite.scala`. Also verified generated code for an example query in the file: ``` spark.sql( """ |SELECT key, avg(value) |FROM agg1 |GROUP BY key """.stripMargin) ``` Verified the generated code for final hash aggregation not have two level maps by default: https://gist.github.com/c21/d4ce87ef28a22d1ce839e0cda000ce14 . Verified the generated code for final hash aggregation have two level maps if enabling the config: https://gist.github.com/c21/4b59752c1f3f98303b60ccff66b5db69 . Closes apache#34270 from c21/agg-fix. Authored-by: Cheng Su <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 3354a21) Signed-off-by: Wenchen Fan <[email protected]>
…n by default ### What changes were proposed in this pull request? This PR is to disable two level of maps for final hash aggregation by default. The feature was introduced in apache#32242 and we found it can lead to query performance regression when the final aggregation gets rows with a lot of distinct keys. The 1st level hash map is full so a lot of rows will waste the 1st hash map lookup and inserted into 2nd hash map. This feature still benefits query with not so many distinct keys though, so introducing a config here `spark.sql.codegen.aggregate.final.map.twolevel.enabled`, to allow query to enable the feature when seeing benefit. ### Why are the changes needed? Fix query regression. ### Does this PR introduce _any_ user-facing change? Yes, the introduced `spark.sql.codegen.aggregate.final.map.twolevel.enabled` config. ### How was this patch tested? Existing unit test in `AggregationQuerySuite.scala`. Also verified generated code for an example query in the file: ``` spark.sql( """ |SELECT key, avg(value) |FROM agg1 |GROUP BY key """.stripMargin) ``` Verified the generated code for final hash aggregation not have two level maps by default: https://gist.github.com/c21/d4ce87ef28a22d1ce839e0cda000ce14 . Verified the generated code for final hash aggregation have two level maps if enabling the config: https://gist.github.com/c21/4b59752c1f3f98303b60ccff66b5db69 . Closes apache#34270 from c21/agg-fix. Authored-by: Cheng Su <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 3354a21) Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
For partial hash aggregation (code-gen path), we have two level of hash map for aggregation. First level is from
RowBasedHashMapGenerator, which is computation faster compared to the second level fromUnsafeFixedWidthAggregationMap. The introducing of two level hash map can help improve CPU performance of query as the first level hash map normally fits in hardware cache and has cheaper hash function for key lookup.For final hash aggregation, we can also support two level of hash map, to improve query performance further.
The original two level of hash map code works for final aggregation mostly out of box. The major change here is to support testing fall back of final aggregation (see change related to
bitMaxCapacityandcheckFallbackForGeneratedHashMap).Example:
An aggregation query:
The generated code for final aggregation is here.
An aggregation query with testing fallback:
The generated code for final aggregation is here. Note the no more counter condition for first level fast map.
Why are the changes needed?
Improve the CPU performance of hash aggregation query in general.
For
AggregateBenchmark."Aggregate w multiple keys", seeing query performance improved by 10%.codegen = Tmeans whole stage code-gen is enabled.hashmap = Tmeans two level maps is enabled for partial aggregation.finalhashmap = Tmeans two level maps is enabled for final aggregation.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing unit test in
HashAggregationQuerySuiteandHashAggregationQueryWithControlledFallbackSuitealready cover the test.