-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-16525][SQL] Enable Row Based HashMap in HashAggregateExec #14176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #62227 has finished for PR 14176 at commit
|
|
Test build #62229 has finished for PR 14176 at commit
|
|
Test build #62349 has finished for PR 14176 at commit
|
|
Test build #62382 has finished for PR 14176 at commit
|
|
Test build #62440 has finished for PR 14176 at commit
|
|
Test build #62452 has finished for PR 14176 at commit
|
|
Test build #62482 has finished for PR 14176 at commit
|
461028e to
5fae053
Compare
|
Test build #62934 has finished for PR 14176 at commit
|
5fae053 to
41192e8
Compare
41192e8 to
7194394
Compare
|
Test build #62944 has finished for PR 14176 at commit
|
|
Test build #62952 has finished for PR 14176 at commit
|
|
Test build #62990 has finished for PR 14176 at commit
|
| int keySize = 0; | ||
| int valueSize = 0; | ||
| for (String name : keySchema.fieldNames()) { | ||
| keySize += (keySchema.apply(name).dataType().defaultSize() + 7) / 8 * 8; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a small comment about this implicit ceiling logic and the reason why schema.defaultSize() doesn't work.
|
Added the explicit SQL tests for both hash map implementations. The test suites extend |
|
Test build #63164 has finished for PR 14176 at commit
|
| super.beforeAll() | ||
| } | ||
|
|
||
| test("SQL decimal test") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just add this in DataFrameAggregateSuite?
|
LGTM |
| import org.apache.spark.sql.functions._ | ||
| import org.apache.spark.sql.types.DecimalType | ||
|
|
||
| abstract class AggregateHashMapSuite extends DataFrameAggregateSuite { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, let's just move this in DataFrameAggregateSuite to prevent inadvertent overrides.
|
Let's hold on this, if we are going to have single implementation for fast hash map (based on the benchmark result in another PR), do need to merge this fancy implementation choosing. cc @rxin |
|
@davies @sameeragarwal I updated more results in the benchmark PR #14266 . |
|
Thanks for the comments @davies @sameeragarwal . This PR has been updated. Basically the only public boolean flag now is called |
|
Test build #64387 has finished for PR 14176 at commit
|
|
Can we make this |
|
@davies I guess there is still benefit to make it public? If the user knows that their workload would always run faster with single-level, e.g., many distinct keys. I thought about |
| private var vectorizedHashMapTerm: String = _ | ||
| private var isVectorizedHashMapEnabled: Boolean = _ | ||
| // The name for Fast HashMap | ||
| private var fastHashMapTerm: String = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use a more descriptive name than "fast", there can always be faster implementation?
|
Thanks for the comments @clockfly ! As per discussion with @sameeragarwal, I think the plan is to give users the option to turn on/off two-level hashmap. This is why we have this first level logic for enabling two-level/fast map. We also want to keep both impls (vectorized /row-based) for a while before deleting vectorized in the future, which leads to the internal flags that pick between the two impls. If you guys decide otherwise, I'm happy to update the PR accordingly. @clockfly @sameeragarwal @davies Thanks! |
|
@clockfly as Qifan said, the rationale for not deleting the old vectorized hashmap code in the short-term was to enable us to quickly benchmark and compare the two implementations for a wide variety of workloads. That said, I think the high level issue is that we don't currently expose a good interface/hooks in our generated code that can be used to test custom operator implementations while running benchmarks or tests (... and given these first level aggregate hashmap are entirely generated during query compilation, injecting a class that can work for all schema types during testing isn't very straightforward). |
|
LGTM, I will merge this one to master (enable us to do more benchmarks with these two implementations). |
|
@ooq @sameeragarwal, it looks like this patch is the culprit behind some OOMs that I'm observing with random queries; see https://issues.apache.org/jira/browse/SPARK-17405 |
|
Thanks. I will take a look tonight.
|
What changes were proposed in this pull request?
This PR is the second step for the following feature:
For hash aggregation in Spark SQL, we use a fast aggregation hashmap to act as a "cache" in order to boost aggregation performance. Previously, the hashmap is backed by a
ColumnarBatch. This has performance issues when we have wide schema for the aggregation table (large number of key fields or value fields).In this JIRA, we support another implementation of fast hashmap, which is backed by a
RowBatch. We then automatically pick between the two implementations based on certain knobs.In this second-step PR, we enable
RowBasedHashMapGeneratorinHashAggregateExec.How was this patch tested?
Added tests:
RowBasedAggregateHashMapSuiteandVectorizedAggregateHashMapSuiteAdditional micro-benchmarks tests and TPCDS results will be added in a separate PR in the series.