-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-8968] [SQL] external sort by the partition clomns when dynamic partitioning to optimize the memory overhead #7336
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can add a config here to control whether to shuffle before insert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, please add a SQLConf option, and probably make it off by default.
|
/cc @liancheng |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make this private.
|
I'm thinking maybe we should make the change in Another high level comment is that, although this change does work for your workload, the following statement made in the PR description isn't correct:
By repartitioning the dataset by dynamic partition columns, you potentially reduce the number of dynamic partitions handled per task (that's why it reduces GC overhead), but the number can't be guaranteed to be reduced to 1. Actually we are also considering to improve dynamic partitioning insertion via local sorting (sort by partition columns with the spillable |
|
Test build #36989 has finished for PR 7336 at commit
|
|
+1 for local sorting. I met OOM when writing parquet with many partitions, and my first idea was also shuffle by partition column. But shuffle is expensive, local sort seems better, we may need to profile these 2 approaches. |
@liancheng here i am not mean the partition of spark rdd, i mean for each partition dir for the table such as @cloud-fan @liancheng I have tested this patch and it shows that the performance not become bad(in my situation, it improved 20%-30%). |
|
@scwf Ah I see, these two "partition" concepts are really confusing sometimes... Although I mentioned local sorting, I do tend to also include this repartitioning optimization. But we need to add a |
|
Test build #37110 has finished for PR 7336 at commit
|
|
Test build #37121 has finished for PR 7336 at commit
|
|
retest this please |
|
Test build #37126 has finished for PR 7336 at commit
|
|
@scwf Could you please help verifying whether the HiveQL CLUSTER BY clause helps in your scenario? Essentially what CLUSTER BY does is just adding an |
|
Test build #37459 has finished for PR 7336 at commit
|
|
Test build #37503 has finished for PR 7336 at commit
|
|
Test build #37506 has finished for PR 7336 at commit
|
|
Test build #37519 has finished for PR 7336 at commit
|
|
Is this related to https://issues.apache.org/jira/browse/SPARK-8890 ? |
|
@rxin Sort of. This PR tries to fix the same issue on the Hive support side. |
|
@rxin, yes we found this issue when do dynamic partitioning in our case, so here i do local sort data on the partition columns to reduce the gc overhead. |
|
@rxin any comments here? |
5420868 to
c75abcb
Compare
|
Test build #40231 has finished for PR 7336 at commit
|
|
Test build #40239 has finished for PR 7336 at commit
|
|
/cc @marmbrus can you take a look at this? |
|
retest this please |
|
This looks good, but i'd like to wait until 1.6 since we are now past the deadline and this is a pretty big change. |
|
Test build #46449 timed out for PR 7336 at commit |
aab3983 to
b137a0b
Compare
b137a0b to
7766247
Compare
|
Test build #49057 has finished for PR 7336 at commit
|
|
Is this covering a different code path from #10638 ? |
|
@rxin, yes, This PR try to fix the same issue on the Hive support side. |
|
Ping @rxin |
|
cc @cloud-fan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the above code duplicated in parent class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, extracted a common method for it.
|
retest this please |
|
Test build #49690 has finished for PR 7336 at commit
|
|
retest this please |
|
Test build #49789 has finished for PR 7336 at commit
|
|
LGTM, can you update the statistics pictures in your PR description? |
|
I'm going to merge this. The pictures won't really show up in the commit log so it is not that big of a deal, although in the future we should make sure we update it. |
|
@scwf It breaks scala 2.11 build. I am going to fix it. |
|
Fixed by d60f8d7. |
|
@yhuai thanks |
Now the hash based writer dynamic partitioning show the bad performance for big data and cause many small files and high GC. This patch we do external sort first so that each time we only need open one writer.
before this patch:

after this patch:
