Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -432,19 +432,13 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD =
buildConf("spark.sql.adaptive.skewedJoinOptimization.skewedPartitionSizeThreshold")
.doc("Configures the minimum size in bytes for a partition that is considered as a skewed " +
"partition in adaptive skewed join.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("64MB")

val ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR =
buildConf("spark.sql.adaptive.skewedJoinOptimization.skewedPartitionFactor")
.doc("A partition is considered as a skewed partition if its size is larger than" +
" this factor multiple the median partition size and also larger than " +
s" ${ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key}")
s" ${SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key}")
.intConf
.checkValue(_ > 0, "The skew factor must be positive.")
.createWithDefault(10)

val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,30 @@ import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExcha
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.internal.SQLConf

/**
* A rule to optimize skewed joins to avoid straggler tasks whose share of data are significantly
* larger than those of the rest of the tasks.
*
* The general idea is to divide each skew partition into smaller partitions and replicate its
* matching partition on the other side of the join so that they can run in parallel tasks.
* Note that when matching partitions from the left side and the right side both have skew,
* it will become a cartesian product of splits from left and right joining together.
*
* For example, assume the Sort-Merge join has 4 partitions:
* left: [L1, L2, L3, L4]
* right: [R1, R2, R3, R4]
*
* Let's say L2, L4 and R3, R4 are skewed, and each of them get split into 2 sub-partitions. This
* is scheduled to run 4 tasks at the beginning: (L1, R1), (L2, R2), (L2, R2), (L2, R2).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be a mistake. Did you want to say the following?

- (L1, R1), (L2, R2), (L2, R2), (L2, R2).
+ (L1, R1), (L2, R2), (L3, R3), (L4, R4).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yes! will fix it soon

* This rule expands it to 9 tasks to increase parallelism:
* (L1, R1),
* (L2-1, R2), (L2-2, R2),
* (L3, R3-1), (L3, R3-2),
* (L4-1, R4-1), (L4-2, R4-1), (L4-1, R4-2), (L4-2, R4-2)
*
* Note that, when this rule is enabled, it also coalesces non-skewed partitions like
* `ReduceNumShufflePartitions` does.
*/
case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {

private val ensureRequirements = EnsureRequirements(conf)
Expand All @@ -43,12 +67,12 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {

/**
* A partition is considered as a skewed partition if its size is larger than the median
* partition size * spark.sql.adaptive.skewedPartitionFactor and also larger than
* spark.sql.adaptive.skewedPartitionSizeThreshold.
* partition size * ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR and also larger than
* SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.
*/
private def isSkewed(size: Long, medianSize: Long): Boolean = {
size > medianSize * conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
size > conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
size > conf.getConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
}

private def medianSize(stats: MapOutputStatistics): Long = {
Expand All @@ -61,6 +85,19 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
}
}

/**
* The goal of skew join optimization is to make the data distribution more even. The target size
* to split skewed partitions is the average size of non-skewed partition, or the
* target post-shuffle partition size if avg size is smaller than it.
*/
private def targetSize(stats: MapOutputStatistics, medianSize: Long): Long = {
val targetPostShuffleSize = conf.getConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When user enable skewed join optimization and want to change the skewed condition by adjusting the targetPostShuffleSize. If we use the SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE here, it may also effect the task numbers in map stage. It is better to use the ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD config to set the targetPostShuffleSize in skewed join optimization?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would user want the new partition size after split to be different from the sizes of non-skew partition size? The goal of this rule is to coordinate all partitions to be around the same size if possible...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with the old approach was the new skew partition size after split can be much smaller than that of the non-skew partition size. Being small itself is not a problem, but having more splits may come with a price, esp. with both side skews, and meanwhile if non-skew partitions take longer to finish, it wouldn't be worth that price.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JkSelf do you have any real-world use cases for it? I noticed it as well but have the same feeling with @maryannxue : why would users set a different value?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After coming across the config description of ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD, I probably get @JkSelf 's point. In the description, it is meant to test if a partition is skewed... but the way it is actually used here in this class, it is more like the target size for splitting the skewed partitions.
So we need to changes here:

  1. bring this conf back and use it in isSkewed instead.
  2. if eventually the entire "skewed" partition is not split at all because the size is smaller than the target size, we need to avoid adding the SkewDesc for that partition.

val nonSkewSizes = stats.bytesByPartitionId.filterNot(isSkewed(_, medianSize))
// It's impossible that all the partitions are skewed, as we use median size to define skew.
assert(nonSkewSizes.nonEmpty)
math.max(targetPostShuffleSize, nonSkewSizes.sum / nonSkewSizes.length)
}

/**
* Get the map size of the specific reduce shuffle Id.
*/
Expand All @@ -72,19 +109,19 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
/**
* Split the skewed partition based on the map size and the max split number.
*/
private def getMapStartIndices(stage: ShuffleQueryStageExec, partitionId: Int): Array[Int] = {
private def getMapStartIndices(
stage: ShuffleQueryStageExec,
partitionId: Int,
targetSize: Long): Array[Int] = {
val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
val avgPartitionSize = mapPartitionSizes.sum / mapPartitionSizes.length
val advisoryPartitionSize = math.max(avgPartitionSize,
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD))
val partitionStartIndices = ArrayBuffer[Int]()
partitionStartIndices += 0
var i = 0
var postMapPartitionSize = 0L
while (i < mapPartitionSizes.length) {
val nextMapPartitionSize = mapPartitionSizes(i)
if (i > 0 && postMapPartitionSize + nextMapPartitionSize > advisoryPartitionSize) {
if (i > 0 && postMapPartitionSize + nextMapPartitionSize > targetSize) {
partitionStartIndices += i
postMapPartitionSize = nextMapPartitionSize
} else {
Expand Down Expand Up @@ -152,6 +189,8 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
""".stripMargin)
val canSplitLeft = canSplitLeftSide(joinType)
val canSplitRight = canSplitRightSide(joinType)
val leftTargetSize = targetSize(leftStats, leftMedSize)
val rightTargetSize = targetSize(rightStats, rightMedSize)

val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
Expand Down Expand Up @@ -179,7 +218,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
leftSkewDesc.addPartitionSize(leftSize)
createSkewPartitions(
partitionIndex,
getMapStartIndices(left, partitionIndex),
getMapStartIndices(left, partitionIndex, leftTargetSize),
getNumMappers(left))
} else {
Seq(SinglePartitionSpec(partitionIndex))
Expand All @@ -189,7 +228,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
rightSkewDesc.addPartitionSize(rightSize)
createSkewPartitions(
partitionIndex,
getMapStartIndices(right, partitionIndex),
getMapStartIndices(right, partitionIndex, rightTargetSize),
getNumMappers(right))
} else {
Seq(SinglePartitionSpec(partitionIndex))
Expand Down Expand Up @@ -236,7 +275,8 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
rightStats: MapOutputStatistics,
nonSkewPartitionIndices: Seq[Int]): Seq[ShufflePartitionSpec] = {
assert(nonSkewPartitionIndices.nonEmpty)
if (nonSkewPartitionIndices.length == 1) {
val shouldCoalesce = conf.getConf(SQLConf.REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED)
if (!shouldCoalesce || nonSkewPartitionIndices.length == 1) {
Seq(SinglePartitionSpec(nonSkewPartitionIndices.head))
} else {
val startIndices = ShufflePartitionsCoalescer.coalescePartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,6 @@ class AdaptiveQueryExecSuite
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key -> "100",
SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "700") {
withTempView("skewData1", "skewData2") {
spark
Expand All @@ -609,8 +608,7 @@ class AdaptiveQueryExecSuite
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key -> "2000",
SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "700") {
SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "2000") {
withTempView("skewData1", "skewData2") {
spark
.range(0, 1000, 1, 10)
Expand Down