-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-13404] [SQL] Create variables for input row when it's actually used #11274
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
This is generated code for Q55: select i_brand_id brand_id, i_brand brand,
sum(ss_ext_sales_price) ext_price
from date_dim, store_sales, item
where d_date_sk = ss_sold_date_sk
and ss_item_sk = i_item_sk
and i_manager_id=28
and d_moy=11
and d_year=1999
group by i_brand, i_brand_id
order by ext_price desc, brand_id
limit 100 |
|
Test build #51564 has finished for PR 11274 at commit
|
|
Test build #51569 has finished for PR 11274 at commit
|
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
|
This PR make codegen harder to understand (easy to be wrong), the performance improvements is not that much currently (maybe because the generated part does not take a majority part), I'd like to hold this PR for a while. |
|
Test build #51668 has finished for PR 11274 at commit
|
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
|
Test build #51814 has finished for PR 11274 at commit
|
|
Test build #51912 has finished for PR 11274 at commit
|
|
Test build #51948 has finished for PR 11274 at commit
|
|
Test build #2595 has finished for PR 11274 at commit
|
|
Test build #52224 has finished for PR 11274 at commit
|
|
Test build #2594 has finished for PR 11274 at commit
|
## What changes were proposed in this pull request? This PR defer the resolution from a id of dictionary to value until the column is actually accessed (inside getInt/getLong), this is very useful for those columns and rows that are filtered out. It's also useful for binary type, we will not need to copy all the byte arrays. This PR also change the underlying type for small decimal that could be fit within a Int, in order to use getInt() to lookup the value from IntDictionary. ## How was this patch tested? Manually test TPCDS Q7 with scale factor 10, saw about 30% improvements (after PR #11274). Author: Davies Liu <[email protected]> Closes #11437 from davies/decode_dict.
|
@nongli This one seems important for us now, could you start to review it? |
| */ | ||
| def usedInputs: AttributeSet = references | ||
|
|
||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you comment what the semantics are if row == null vs not null?
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
|
Test build #52495 has finished for PR 11274 at commit
|
|
Test build #52507 has finished for PR 11274 at commit
|
| | private void $scanRows(InternalRow $row) throws java.io.IOException { | ||
| | while (true) { | ||
| | boolean firstRow = true; | ||
| | while (!shouldStop() && (firstRow || $input.hasNext())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nongli Since we changed to use continue for predicates, it's tricky to get this right.
|
@davies can you update the gist with the new output? |
| * Returns source code to evaluate all the variables, and clear the code of them, to prevent | ||
| * them to be evaluated twice. | ||
| */ | ||
| protected def evaluateVariables(variables: Seq[ExprCode]): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update the comment for ExprCode.code to specify what it means when it is empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
Patch looks good. Just a few documentation suggestions. |
|
@nongli I had updated the doc and commit message (PR description), once it pass the tests, I will merge this into master. |
|
sounds good. |
|
Test build #52612 has finished for PR 11274 at commit
|
|
Merging this into master, thanks! |
## What changes were proposed in this pull request? This PR defer the resolution from a id of dictionary to value until the column is actually accessed (inside getInt/getLong), this is very useful for those columns and rows that are filtered out. It's also useful for binary type, we will not need to copy all the byte arrays. This PR also change the underlying type for small decimal that could be fit within a Int, in order to use getInt() to lookup the value from IntDictionary. ## How was this patch tested? Manually test TPCDS Q7 with scale factor 10, saw about 30% improvements (after PR apache#11274). Author: Davies Liu <[email protected]> Closes apache#11437 from davies/decode_dict.
… used
## What changes were proposed in this pull request?
This PR change the way how we generate the code for the output variables passing from a plan to it's parent.
Right now, they are generated before call consume() of it's parent. It's not efficient, if the parent is a Filter or Join, which could filter out most the rows, the time to access some of the columns that are not used by the Filter or Join are wasted.
This PR try to improve this by defering the access of columns until they are actually used by a plan. After this PR, a plan does not need to generate code to evaluate the variables for output, just passing the ExprCode to its parent by `consume()`. In `parent.consumeChild()`, it will check the output from child and `usedInputs`, generate the code for those columns that is part of `usedInputs` before calling `doConsume()`.
This PR also change the `if` from
```
if (cond) {
xxx
}
```
to
```
if (!cond) continue;
xxx
```
The new one could help to reduce the nested indents for multiple levels of Filter and BroadcastHashJoin.
It also added some comments for operators.
## How was the this patch tested?
Unit tests. Manually ran TPCDS Q55, this PR improve the performance about 30% (scale=10, from 2.56s to 1.96s)
Author: Davies Liu <[email protected]>
Closes apache#11274 from davies/gen_defer.
## What changes were proposed in this pull request? This PR rollback some changes in apache#11274 , which introduced some performance regression when do a simple aggregation on parquet scan with one integer column. Does not really understand how this change introduce this huge impact, maybe related show JIT compiler inline functions. (saw very different stats from profiling). ## How was this patch tested? Manually run the parquet reader benchmark, before this change: ``` Intel(R) Core(TM) i7-4558U CPU 2.80GHz Int and String Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- SQL Parquet Vectorized 2391 / 3107 43.9 22.8 1.0X ``` After this change ``` Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 Intel(R) Core(TM) i7-4558U CPU 2.80GHz Int and String Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- SQL Parquet Vectorized 2032 / 2626 51.6 19.4 1.0X``` Author: Davies Liu <[email protected]> Closes apache#11912 from davies/fix_regression.
| required: AttributeSet): String = { | ||
| var evaluateVars = "" | ||
| variables.zipWithIndex.foreach { case (ev, i) => | ||
| if (ev.code != "" && required.contains(attributes(i))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davies I was just reviewing build warnings, and it flags this line. ev.code is a Block rather than String. Should it be ev.code.nonEmpty && ... instead?
What changes were proposed in this pull request?
This PR change the way how we generate the code for the output variables passing from a plan to it's parent.
Right now, they are generated before call consume() of it's parent. It's not efficient, if the parent is a Filter or Join, which could filter out most the rows, the time to access some of the columns that are not used by the Filter or Join are wasted.
This PR try to improve this by defering the access of columns until they are actually used by a plan. After this PR, a plan does not need to generate code to evaluate the variables for output, just passing the ExprCode to its parent by
consume(). Inparent.consumeChild(), it will check the output from child andusedInputs, generate the code for those columns that is part ofusedInputsbefore callingdoConsume().This PR also change the
iffromto
The new one could help to reduce the nested indents for multiple levels of Filter and BroadcastHashJoin.
It also added some comments for operators.
How was the this patch tested?
Unit tests. Manually ran TPCDS Q55, this PR improve the performance about 30% (scale=10, from 2.56s to 1.96s)