Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -447,14 +447,6 @@ object SQLConf {
.intConf
.createWithDefault(10)

val ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS =
buildConf("spark.sql.adaptive.skewedJoinOptimization.skewedPartitionMaxSplits")
.doc("Configures the maximum number of task to handle a skewed partition in adaptive skewed" +
"join.")
.intConf
.checkValue( _ >= 1, "The split size at least be 1")
.createWithDefault(5)

val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin")
.doc("The relation with a non-empty partition ratio lower than this config will not be " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
private def getMapStartIndices(stage: ShuffleQueryStageExec, partitionId: Int): Array[Int] = {
val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
val maxSplits = math.min(conf.getConf(
SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS), mapPartitionSizes.length)
val avgPartitionSize = mapPartitionSizes.sum / maxSplits
val avgPartitionSize = mapPartitionSizes.sum / mapPartitionSizes.length
val advisoryPartitionSize = math.max(avgPartitionSize,
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD))
val partitionStartIndices = ArrayBuffer[Int]()
Expand All @@ -95,9 +93,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
i += 1
}

if (partitionStartIndices.size > maxSplits) {
partitionStartIndices.take(maxSplits).toArray
} else partitionStartIndices.toArray
partitionStartIndices.toArray
}

private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ class AdaptiveQueryExecSuite
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key -> "100",
SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key -> "2000",
SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "700") {
withTempView("skewData1", "skewData2") {
spark
Expand All @@ -636,42 +636,43 @@ class AdaptiveQueryExecSuite
"SELECT * FROM skewData1 join skewData2 ON key1 = key2")
// left stats: [3496, 0, 0, 0, 4014]
// right stats:[6292, 0, 0, 0, 0]
// Partition 0: both left and right sides are skewed, and divide into 5 splits, so
// 5 x 5 sub-partitions.
// Partition 0: both left and right sides are skewed, left side is divided
// into 2 splits and right side is divided into 4 splits, so
// 2 x 4 sub-partitions.
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
// Partition 4: only left side is skewed, and divide into 5 splits, so
// 5 sub-partitions.
// So total (25 + 1 + 5) partitions.
// Partition 4: only left side is skewed, and divide into 3 splits, so
// 3 sub-partitions.
// So total (8 + 1 + 3) partitions.
val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan)
checkSkewJoin(innerSmj, 25 + 1 + 5)
checkSkewJoin(innerSmj, 8 + 1 + 3)

// skewed left outer join optimization
val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM skewData1 left outer join skewData2 ON key1 = key2")
// left stats: [3496, 0, 0, 0, 4014]
// right stats:[6292, 0, 0, 0, 0]
// Partition 0: both left and right sides are skewed, but left join can't split right side,
// so only left side is divided into 5 splits, and thus 5 sub-partitions.
// so only left side is divided into 2 splits, and thus 2 sub-partitions.
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
// Partition 4: only left side is skewed, and divide into 5 splits, so
// 5 sub-partitions.
// So total (5 + 1 + 5) partitions.
// Partition 4: only left side is skewed, and divide into 3 splits, so
// 3 sub-partitions.
// So total (2 + 1 + 3) partitions.
val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan)
checkSkewJoin(leftSmj, 5 + 1 + 5)
checkSkewJoin(leftSmj, 2 + 1 + 3)

// skewed right outer join optimization
val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM skewData1 right outer join skewData2 ON key1 = key2")
// left stats: [3496, 0, 0, 0, 4014]
// right stats:[6292, 0, 0, 0, 0]
// Partition 0: both left and right sides are skewed, but right join can't split left side,
// so only right side is divided into 5 splits, and thus 5 sub-partitions.
// so only right side is divided into 4 splits, and thus 4 sub-partitions.
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
// Partition 4: only left side is skewed, but right join can't split left side, so just
// 1 partition.
// So total (5 + 1 + 1) partitions.
// So total (4 + 1 + 1) partitions.
val rightSmj = findTopLevelSortMergeJoin(rightAdaptivePlan)
checkSkewJoin(rightSmj, 5 + 1 + 1)
checkSkewJoin(rightSmj, 4 + 1 + 1)
}
}
}
Expand Down