-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-31070][SQL] make skew join split skewed partitions more evenly #27833
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #119464 has finished for PR 27833 at commit
|
| val diffIfMergeLastPartition = math.abs( | ||
| lastPackagedPartitionSize + postMapPartitionSize - targetSize) | ||
| // If the last partition is very small, we should merge it to the previous partition. | ||
| if (lastPartitionDiff > diffIfMergeLastPartition * 2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get your point here, but using target size and this formula doesn't quite make sense sometimes, e.g.,
targetSize = 7
lastButOneSize = 4
lastSize = 4
You'd get:
diffLastPartition = 3
diffLastTwoPartitions = 1
This would satisfy your "very small" condition, yet I'd argue that first of all the last partition is well over half the target size, so I wouldn't consider it too small; second, what if previous partitions are mostly around size of 4 too? It would more even not to merge them, right?
A simple way is probably get the average size of all partitions except the last one and merge if (avgSize - lastSize) > (lastSize + lastButOneSize - avgSize).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, we may also need consider whether the (lastSize + lastButOnesize) is larger than the targetSize.
((avgSize - lastSize) > (lastSize + lastButOneSize - avgSize)) && (lastSize + lastButOneSize) < targetSize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it always be larger than targetSize?? otherwise the two splits would have been one in the first place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good discussion here! I'm thinking about more cases:
targetSize = 7
lastButOneSize = 1
lastSize = 7
shall we merge the last partition into the previous partition?
Maybe we can use a simple heuristic: merge 2 adjacent partitions if they don't exceed the target size too much (say 20%?). And we can apply it to partitions in the middle, not have to be the last 2 partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this would work for extreme cases where you could have a very small split in between two large splits very close to the target size.
| // the previous partition. | ||
| val shouldMergePartitions = lastPartitionSize > -1 && | ||
| ((currentSizeSum + lastPartitionSize) < targetSize * 1.3 || | ||
| (currentSizeSum < targetSize * 0.3 || lastPartitionSize < targetSize * 0.3)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why pick 1.3? The worst case is: we merge 2 partitions with size 0.65 and 0.65 to a single 1.3 partition, which is acceptable.
Why pick 0.3? personal preference :)
46a68ac to
761c8a8
Compare
| // Partition 4: only left side is skewed, and divide into 3 splits, so | ||
| // 3 sub-partitions. | ||
| // Partition 4: only left side is skewed, and divide into 2 splits, so | ||
| // 2 sub-partitions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is definitely better with 2 splits, as the target size is 2000 and the total size is 4014.
| assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList2, targetSize).toSeq == | ||
| Seq(0, 2, 4, 5)) | ||
|
|
||
| // merge the small partition even if it leads to a very large partition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: merge small partitions if the partition itself is smaller than targetSize * SMALL_PARTITION_FACTOR
| assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList3, targetSize).toSeq == | ||
| Seq(0, 3)) | ||
|
|
||
| // merge the small partitions even if it exceeds targetSize * 0.2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: merge small partitions if the combined size is smaller than targetSize * MERGED_PARTITION_FACTOR.
maryannxue
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Test build #119576 has finished for PR 27833 at commit
|
|
Test build #119577 has finished for PR 27833 at commit
|
|
Test build #119572 has finished for PR 27833 at commit
|
|
Test build #119575 has finished for PR 27833 at commit
|
|
Test build #119608 has finished for PR 27833 at commit
|
|
Test build #119618 has finished for PR 27833 at commit
|
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html 2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'. 4. Be sure to keep the PR description updated to reflect all changes. 5. Please write your PR title to summarize what this PR proposes. 6. If possible, provide a concise example to reproduce the issue for a faster review. 7. If you want to add a new configuration, please read the guideline first for naming configurations in 'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'. --> ### What changes were proposed in this pull request? <!-- Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below. 1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers. 2. If you fix some SQL features, you can provide some references of other DBMSes. 3. If there is design documentation, please add the link. 4. If there is a discussion in the mailing list, please add the link. --> There are two problems when splitting skewed partitions: 1. It's impossible that we can't split the skewed partition, then we shouldn't create a skew join. 2. When splitting, it's possible that we create a partition for very small amount of data.. This PR fixes them 1. don't create `PartialReducerPartitionSpec` if we can't split. 2. merge small partitions to the previous partition. ### Why are the changes needed? <!-- Please clarify why the changes are needed. For instance, 1. If you propose a new API, clarify the use case for a new API. 2. If you fix a bug, you can clarify why it is a bug. --> make skew join split skewed partitions more evenly ### Does this PR introduce any user-facing change? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If no, write 'No'. --> no ### How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future. If tests were not added, please describe why they were not added and/or why it was difficult to add. --> updated test Closes #27833 from cloud-fan/aqe. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: gatorsmile <[email protected]> (cherry picked from commit d5f5720) Signed-off-by: gatorsmile <[email protected]>
|
Thanks! Merged to master/3.0 |
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html 2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'. 4. Be sure to keep the PR description updated to reflect all changes. 5. Please write your PR title to summarize what this PR proposes. 6. If possible, provide a concise example to reproduce the issue for a faster review. 7. If you want to add a new configuration, please read the guideline first for naming configurations in 'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'. --> ### What changes were proposed in this pull request? <!-- Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below. 1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers. 2. If you fix some SQL features, you can provide some references of other DBMSes. 3. If there is design documentation, please add the link. 4. If there is a discussion in the mailing list, please add the link. --> There are two problems when splitting skewed partitions: 1. It's impossible that we can't split the skewed partition, then we shouldn't create a skew join. 2. When splitting, it's possible that we create a partition for very small amount of data.. This PR fixes them 1. don't create `PartialReducerPartitionSpec` if we can't split. 2. merge small partitions to the previous partition. ### Why are the changes needed? <!-- Please clarify why the changes are needed. For instance, 1. If you propose a new API, clarify the use case for a new API. 2. If you fix a bug, you can clarify why it is a bug. --> make skew join split skewed partitions more evenly ### Does this PR introduce any user-facing change? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If no, write 'No'. --> no ### How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future. If tests were not added, please describe why they were not added and/or why it was difficult to add. --> updated test Closes apache#27833 from cloud-fan/aqe. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: gatorsmile <[email protected]>
What changes were proposed in this pull request?
There are two problems when splitting skewed partitions:
This PR fixes them
PartialReducerPartitionSpecif we can't split.Why are the changes needed?
make skew join split skewed partitions more evenly
Does this PR introduce any user-facing change?
no
How was this patch tested?
updated test