-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-30918][SQL] improve the splitting of skewed partitions #27669
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not related to this PR but a small fix: we shouldn't coalesce partitions if the config is off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we combine this if with the one below: if (nonSkewPartitionIndices.length == 1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we combine this if with the one below: if (nonSkewPartitionIndices.length == 1)
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: when ... is enabled.
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A rule to optimize skewed joins to avoid straggler tasks whose share of data are significantly larger than those of the rest of the tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not accurate to say "sub-join" here. How about:
The general idea is to divide each skew partition into smaller partitions and replicate its matching partition on the other side of the join so that they can run in parallel tasks. Note that when matching partitions from the left side and the right side both have skew, it will become a cartesian product of splits from left and right joining together.
And let's replace the term "sub-join" accordingly in the example below as well.
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
|
Test build #118793 has finished for PR 27669 at commit
|
|
Test build #118870 has finished for PR 27669 at commit
|
| nonSkewPartitionIndices: Seq[Int]): Seq[ShufflePartitionSpec] = { | ||
| assert(nonSkewPartitionIndices.nonEmpty) | ||
| if (nonSkewPartitionIndices.length == 1) { | ||
| val isEnabled = conf.getConf(SQLConf.REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: isCoalesceEnabled
| val avgPartitionSize = mapPartitionSizes.sum / mapPartitionSizes.length | ||
| val advisoryPartitionSize = math.max(avgPartitionSize, | ||
| conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)) | ||
| val advisoryPartitionSize = math.max(avgPartitionSize, targetSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still need avgPartitionSize? we shouldn't have a problem even if targetSize smaller than average, right? coz the worse case would always be one mapper per task.
|
Test build #118877 has finished for PR 27669 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM pending jenkins. And you need to fix the compilation errors first, @cloud-fan :)
| * target post-shuffle partition size if avg size is smaller than it. | ||
| */ | ||
| private def targetSize(stats: MapOutputStatistics, medianSize: Long): Long = { | ||
| val targetPostShuffleSize = conf.getConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When user enable skewed join optimization and want to change the skewed condition by adjusting the targetPostShuffleSize. If we use the SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE here, it may also effect the task numbers in map stage. It is better to use the ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD config to set the targetPostShuffleSize in skewed join optimization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would user want the new partition size after split to be different from the sizes of non-skew partition size? The goal of this rule is to coordinate all partitions to be around the same size if possible...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with the old approach was the new skew partition size after split can be much smaller than that of the non-skew partition size. Being small itself is not a problem, but having more splits may come with a price, esp. with both side skews, and meanwhile if non-skew partitions take longer to finish, it wouldn't be worth that price.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JkSelf do you have any real-world use cases for it? I noticed it as well but have the same feeling with @maryannxue : why would users set a different value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After coming across the config description of ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD, I probably get @JkSelf 's point. In the description, it is meant to test if a partition is skewed... but the way it is actually used here in this class, it is more like the target size for splitting the skewed partitions.
So we need to changes here:
- bring this conf back and use it in
isSkewedinstead. - if eventually the entire "skewed" partition is not split at all because the size is smaller than the target size, we need to avoid adding the SkewDesc for that partition.
|
Test build #118901 has finished for PR 27669 at commit
|
|
retest this please |
|
Test build #118916 has finished for PR 27669 at commit
|
### What changes were proposed in this pull request? Use the average size of the non-skewed partitions as the target size when splitting skewed partitions, instead of ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD ### Why are the changes needed? The goal of skew join optimization is to make the data distribution move even. So it makes more sense the use the average size of the non-skewed partitions as the target size. ### Does this PR introduce any user-facing change? no ### How was this patch tested? existing tests Closes #27669 from cloud-fan/aqe. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Xiao Li <[email protected]> (cherry picked from commit 8f247e5) Signed-off-by: Xiao Li <[email protected]>
|
Thanks! Merged to master/3.0 |
| * right: [R1, R2, R3, R4] | ||
| * | ||
| * Let's say L2, L4 and R3, R4 are skewed, and each of them get split into 2 sub-partitions. This | ||
| * is scheduled to run 4 tasks at the beginning: (L1, R1), (L2, R2), (L2, R2), (L2, R2). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be a mistake. Did you want to say the following?
- (L1, R1), (L2, R2), (L2, R2), (L2, R2).
+ (L1, R1), (L2, R2), (L3, R3), (L4, R4).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yes! will fix it soon
|
cc @dbtsai |
### What changes were proposed in this pull request? This is a follow up of #27669 in order to fix a typo. ### Why are the changes needed? N/A ### Does this PR introduce any user-facing change? no ### How was this patch tested? N/A Closes #27714 from cloud-fan/typo. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit eced932) Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? Use the average size of the non-skewed partitions as the target size when splitting skewed partitions, instead of ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD ### Why are the changes needed? The goal of skew join optimization is to make the data distribution move even. So it makes more sense the use the average size of the non-skewed partitions as the target size. ### Does this PR introduce any user-facing change? no ### How was this patch tested? existing tests Closes apache#27669 from cloud-fan/aqe. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Xiao Li <[email protected]>
### What changes were proposed in this pull request? This is a follow up of apache#27669 in order to fix a typo. ### Why are the changes needed? N/A ### Does this PR introduce any user-facing change? no ### How was this patch tested? N/A Closes apache#27714 from cloud-fan/typo. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
Use the average size of the non-skewed partitions as the target size when splitting skewed partitions, instead of ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD
Why are the changes needed?
The goal of skew join optimization is to make the data distribution move even. So it makes more sense the use the average size of the non-skewed partitions as the target size.
Does this PR introduce any user-facing change?
no
How was this patch tested?
existing tests