Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ object SQLConf {
.version("3.0.0")
.intConf
.checkValue(_ > 0, "The skew factor must be positive.")
.createWithDefault(10)
.createWithDefault(5)

val SKEW_JOIN_SKEWED_PARTITION_THRESHOLD =
buildConf("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.spark.sql.internal.SQLConf
* avoid many small reduce tasks that hurt performance.
*/
case class CoalesceShufflePartitions(session: SparkSession) extends Rule[SparkPlan] {
import CoalesceShufflePartitions._
private def conf = session.sessionState.conf

override def apply(plan: SparkPlan): SparkPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable

import org.apache.commons.io.FileUtils

import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
import org.apache.spark.{MapOutputTrackerMaster, SparkEnv}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
Expand Down Expand Up @@ -70,9 +70,9 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
size > conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD)
}

private def medianSize(stats: MapOutputStatistics): Long = {
val numPartitions = stats.bytesByPartitionId.length
val bytes = stats.bytesByPartitionId.sorted
private def medianSize(sizes: Seq[Long]): Long = {
val numPartitions = sizes.length
val bytes = sizes.sorted
numPartitions match {
case _ if (numPartitions % 2 == 0) =>
math.max((bytes(numPartitions / 2) + bytes(numPartitions / 2 - 1)) / 2, 1)
Expand Down Expand Up @@ -163,16 +163,16 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
if supportedJoinTypes.contains(joinType) =>
assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
val numPartitions = left.partitionsWithSizes.length
// We use the median size of the original shuffle partitions to detect skewed partitions.
val leftMedSize = medianSize(left.mapStats)
val rightMedSize = medianSize(right.mapStats)
// Use the median size of the actual (coalesced) partition sizes to detect skewed partitions.
val leftMedSize = medianSize(left.partitionsWithSizes.map(_._2))
val rightMedSize = medianSize(right.partitionsWithSizes.map(_._2))
logDebug(
s"""
|Optimizing skewed join.
|Left side partitions size info:
|${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
|${getSizeInfo(leftMedSize, left.partitionsWithSizes.map(_._2))}
|Right side partitions size info:
|${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
|${getSizeInfo(rightMedSize, right.partitionsWithSizes.map(_._2))}
""".stripMargin)
val canSplitLeft = canSplitLeftSide(joinType)
val canSplitRight = canSplitRightSide(joinType)
Expand Down Expand Up @@ -291,17 +291,15 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
private object ShuffleStage {
def unapply(plan: SparkPlan): Option[ShuffleStageInfo] = plan match {
case s: ShuffleQueryStageExec if s.mapStats.isDefined =>
val mapStats = s.mapStats.get
val sizes = mapStats.bytesByPartitionId
val sizes = s.mapStats.get.bytesByPartitionId
val partitions = sizes.zipWithIndex.map {
case (size, i) => CoalescedPartitionSpec(i, i + 1) -> size
}
Some(ShuffleStageInfo(s, mapStats, partitions))
Some(ShuffleStageInfo(s, partitions))

case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs)
if s.mapStats.isDefined && partitionSpecs.nonEmpty =>
val mapStats = s.mapStats.get
val sizes = mapStats.bytesByPartitionId
val sizes = s.mapStats.get.bytesByPartitionId
val partitions = partitionSpecs.map {
case spec @ CoalescedPartitionSpec(start, end) =>
var sum = 0L
Expand All @@ -314,13 +312,12 @@ private object ShuffleStage {
case other => throw new IllegalArgumentException(
s"Expect CoalescedPartitionSpec but got $other")
}
Some(ShuffleStageInfo(s, mapStats, partitions))
Some(ShuffleStageInfo(s, partitions))

case _ => None
}
}

private case class ShuffleStageInfo(
shuffleStage: ShuffleQueryStageExec,
mapStats: MapOutputStatistics,
partitionsWithSizes: Seq[(CoalescedPartitionSpec, Long)])
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListe
import org.apache.spark.sql.{QueryTest, Row, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, SparkPlan}
import org.apache.spark.sql.execution.{PartialReducerPartitionSpec, ReusedSubqueryExec, ShuffledRowRDD, SparkPlan}
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
Expand Down Expand Up @@ -648,79 +648,65 @@ class AdaptiveQueryExecSuite
}
}

// TODO: we need a way to customize data distribution after shuffle, to improve test coverage
// of this case.
test("SPARK-29544: adaptive skew join with different join types") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "2000",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2000") {
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
SQLConf.SHUFFLE_PARTITIONS.key -> "100",
SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
withTempView("skewData1", "skewData2") {
spark
.range(0, 1000, 1, 10)
.selectExpr("id % 2 as key1", "id as value1")
.select(
when('id < 250, 249)
.when('id >= 750, 1000)
.otherwise('id).as("key1"),
'id as "value1")
.createOrReplaceTempView("skewData1")
spark
.range(0, 1000, 1, 10)
.selectExpr("id % 1 as key2", "id as value2")
.select(
when('id < 250, 249)
.otherwise('id).as("key2"),
'id as "value2")
.createOrReplaceTempView("skewData2")

def checkSkewJoin(joins: Seq[SortMergeJoinExec], expectedNumPartitions: Int): Unit = {
def checkSkewJoin(
joins: Seq[SortMergeJoinExec],
leftSkewNum: Int,
rightSkewNum: Int): Unit = {
assert(joins.size == 1 && joins.head.isSkewJoin)
assert(joins.head.left.collect {
case r: CustomShuffleReaderExec => r
}.head.partitionSpecs.length == expectedNumPartitions)
}.head.partitionSpecs.collect {
case p: PartialReducerPartitionSpec => p.reducerIndex
}.distinct.length == leftSkewNum)
assert(joins.head.right.collect {
case r: CustomShuffleReaderExec => r
}.head.partitionSpecs.length == expectedNumPartitions)
}.head.partitionSpecs.collect {
case p: PartialReducerPartitionSpec => p.reducerIndex
}.distinct.length == rightSkewNum)
}

// skewed inner join optimization
val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM skewData1 join skewData2 ON key1 = key2")
// left stats: [3496, 0, 0, 0, 4014]
// right stats:[6292, 0, 0, 0, 0]
// Partition 0: both left and right sides are skewed, left side is divided
// into 2 splits and right side is divided into 4 splits, so
// 2 x 4 sub-partitions.
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition, but it's ignored as the
// size is 0.
// Partition 4: only left side is skewed, and divide into 2 splits, so
// 2 sub-partitions.
// So total (8 + 0 + 2) partitions.
val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan)
checkSkewJoin(innerSmj, 8 + 0 + 2)
checkSkewJoin(innerSmj, 2, 1)

// skewed left outer join optimization
val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM skewData1 left outer join skewData2 ON key1 = key2")
// left stats: [3496, 0, 0, 0, 4014]
// right stats:[6292, 0, 0, 0, 0]
// Partition 0: both left and right sides are skewed, but left join can't split right side,
// so only left side is divided into 2 splits, and thus 2 sub-partitions.
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition, but it's ignored as the
// size is 0.
// Partition 4: only left side is skewed, and divide into 2 splits, so
// 2 sub-partitions.
// So total (2 + 0 + 2) partitions.
val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan)
checkSkewJoin(leftSmj, 2 + 0 + 2)
checkSkewJoin(leftSmj, 2, 0)

// skewed right outer join optimization
val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM skewData1 right outer join skewData2 ON key1 = key2")
// left stats: [3496, 0, 0, 0, 4014]
// right stats:[6292, 0, 0, 0, 0]
// Partition 0: both left and right sides are skewed, but right join can't split left side,
// so only right side is divided into 4 splits, and thus 4 sub-partitions.
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition, but it's ignored as the
// size is 0.
// Partition 4: only left side is skewed, but right join can't split left side, so just
// 1 partition.
// So total (4 + 0 + 1) partitions.
val rightSmj = findTopLevelSortMergeJoin(rightAdaptivePlan)
checkSkewJoin(rightSmj, 4 + 0 + 1)
checkSkewJoin(rightSmj, 0, 1)
}
}
}
Expand Down Expand Up @@ -874,28 +860,40 @@ class AdaptiveQueryExecSuite

withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2000",
SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "2000") {
SQLConf.SHUFFLE_PARTITIONS.key -> "100",
SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
withTempView("skewData1", "skewData2") {
spark
.range(0, 1000, 1, 10)
.selectExpr("id % 2 as key1", "id as value1")
.select(
when('id < 250, 249)
.when('id >= 750, 1000)
.otherwise('id).as("key1"),
'id as "value1")
.createOrReplaceTempView("skewData1")
spark
.range(0, 1000, 1, 10)
.selectExpr("id % 1 as key2", "id as value2")
.select(
when('id < 250, 249)
.otherwise('id).as("key2"),
'id as "value2")
.createOrReplaceTempView("skewData2")
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM skewData1 join skewData2 ON key1 = key2")
val reader = collect(adaptivePlan) {
val readers = collect(adaptivePlan) {
case r: CustomShuffleReaderExec => r
}.head
assert(!reader.isLocalReader)
assert(reader.hasSkewedPartition)
assert(!reader.hasCoalescedPartition) // 0-size partitions are ignored.
assert(reader.metrics.contains("numSkewedPartitions"))
assert(reader.metrics("numSkewedPartitions").value > 0)
assert(reader.metrics("numSkewedSplits").value > 0)
}
readers.foreach { reader =>
assert(!reader.isLocalReader)
assert(reader.hasCoalescedPartition)
assert(reader.hasSkewedPartition)
assert(reader.metrics.contains("numSkewedPartitions"))
}
assert(readers(0).metrics("numSkewedPartitions").value == 2)
assert(readers(0).metrics("numSkewedSplits").value == 15)
assert(readers(1).metrics("numSkewedPartitions").value == 1)
assert(readers(1).metrics("numSkewedSplits").value == 12)
}
}
}
Expand Down