diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index cf0268773c39..7ac52aaa2bc3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -495,7 +495,7 @@ object SQLConf { .version("3.0.0") .intConf .checkValue(_ > 0, "The skew factor must be positive.") - .createWithDefault(10) + .createWithDefault(5) val SKEW_JOIN_SKEWED_PARTITION_THRESHOLD = buildConf("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala index 6aa34497c9ea..84c65df31a7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.internal.SQLConf * avoid many small reduce tasks that hurt performance. */ case class CoalesceShufflePartitions(session: SparkSession) extends Rule[SparkPlan] { - import CoalesceShufflePartitions._ private def conf = session.sessionState.conf override def apply(plan: SparkPlan): SparkPlan = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 396c9c9d6b4e..b5d287ca7ac7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.apache.commons.io.FileUtils -import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv} +import org.apache.spark.{MapOutputTrackerMaster, SparkEnv} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ @@ -70,9 +70,9 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { size > conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD) } - private def medianSize(stats: MapOutputStatistics): Long = { - val numPartitions = stats.bytesByPartitionId.length - val bytes = stats.bytesByPartitionId.sorted + private def medianSize(sizes: Seq[Long]): Long = { + val numPartitions = sizes.length + val bytes = sizes.sorted numPartitions match { case _ if (numPartitions % 2 == 0) => math.max((bytes(numPartitions / 2) + bytes(numPartitions / 2 - 1)) / 2, 1) @@ -163,16 +163,16 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { if supportedJoinTypes.contains(joinType) => assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length) val numPartitions = left.partitionsWithSizes.length - // We use the median size of the original shuffle partitions to detect skewed partitions. - val leftMedSize = medianSize(left.mapStats) - val rightMedSize = medianSize(right.mapStats) + // Use the median size of the actual (coalesced) partition sizes to detect skewed partitions. + val leftMedSize = medianSize(left.partitionsWithSizes.map(_._2)) + val rightMedSize = medianSize(right.partitionsWithSizes.map(_._2)) logDebug( s""" |Optimizing skewed join. |Left side partitions size info: - |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)} + |${getSizeInfo(leftMedSize, left.partitionsWithSizes.map(_._2))} |Right side partitions size info: - |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)} + |${getSizeInfo(rightMedSize, right.partitionsWithSizes.map(_._2))} """.stripMargin) val canSplitLeft = canSplitLeftSide(joinType) val canSplitRight = canSplitRightSide(joinType) @@ -291,17 +291,15 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { private object ShuffleStage { def unapply(plan: SparkPlan): Option[ShuffleStageInfo] = plan match { case s: ShuffleQueryStageExec if s.mapStats.isDefined => - val mapStats = s.mapStats.get - val sizes = mapStats.bytesByPartitionId + val sizes = s.mapStats.get.bytesByPartitionId val partitions = sizes.zipWithIndex.map { case (size, i) => CoalescedPartitionSpec(i, i + 1) -> size } - Some(ShuffleStageInfo(s, mapStats, partitions)) + Some(ShuffleStageInfo(s, partitions)) case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) if s.mapStats.isDefined && partitionSpecs.nonEmpty => - val mapStats = s.mapStats.get - val sizes = mapStats.bytesByPartitionId + val sizes = s.mapStats.get.bytesByPartitionId val partitions = partitionSpecs.map { case spec @ CoalescedPartitionSpec(start, end) => var sum = 0L @@ -314,7 +312,7 @@ private object ShuffleStage { case other => throw new IllegalArgumentException( s"Expect CoalescedPartitionSpec but got $other") } - Some(ShuffleStageInfo(s, mapStats, partitions)) + Some(ShuffleStageInfo(s, partitions)) case _ => None } @@ -322,5 +320,4 @@ private object ShuffleStage { private case class ShuffleStageInfo( shuffleStage: ShuffleQueryStageExec, - mapStats: MapOutputStatistics, partitionsWithSizes: Seq[(CoalescedPartitionSpec, Long)]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 3aeb6c5063d2..2ceae557f97d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListe import org.apache.spark.sql.{QueryTest, Row, SparkSession, Strategy} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} -import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, SparkPlan} +import org.apache.spark.sql.execution.{PartialReducerPartitionSpec, ReusedSubqueryExec, ShuffledRowRDD, SparkPlan} import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} @@ -648,79 +648,65 @@ class AdaptiveQueryExecSuite } } - // TODO: we need a way to customize data distribution after shuffle, to improve test coverage - // of this case. test("SPARK-29544: adaptive skew join with different join types") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", - SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "2000", - SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2000") { + SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1", + SQLConf.SHUFFLE_PARTITIONS.key -> "100", + SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") { withTempView("skewData1", "skewData2") { spark .range(0, 1000, 1, 10) - .selectExpr("id % 2 as key1", "id as value1") + .select( + when('id < 250, 249) + .when('id >= 750, 1000) + .otherwise('id).as("key1"), + 'id as "value1") .createOrReplaceTempView("skewData1") spark .range(0, 1000, 1, 10) - .selectExpr("id % 1 as key2", "id as value2") + .select( + when('id < 250, 249) + .otherwise('id).as("key2"), + 'id as "value2") .createOrReplaceTempView("skewData2") - def checkSkewJoin(joins: Seq[SortMergeJoinExec], expectedNumPartitions: Int): Unit = { + def checkSkewJoin( + joins: Seq[SortMergeJoinExec], + leftSkewNum: Int, + rightSkewNum: Int): Unit = { assert(joins.size == 1 && joins.head.isSkewJoin) assert(joins.head.left.collect { case r: CustomShuffleReaderExec => r - }.head.partitionSpecs.length == expectedNumPartitions) + }.head.partitionSpecs.collect { + case p: PartialReducerPartitionSpec => p.reducerIndex + }.distinct.length == leftSkewNum) assert(joins.head.right.collect { case r: CustomShuffleReaderExec => r - }.head.partitionSpecs.length == expectedNumPartitions) + }.head.partitionSpecs.collect { + case p: PartialReducerPartitionSpec => p.reducerIndex + }.distinct.length == rightSkewNum) } // skewed inner join optimization val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult( "SELECT * FROM skewData1 join skewData2 ON key1 = key2") - // left stats: [3496, 0, 0, 0, 4014] - // right stats:[6292, 0, 0, 0, 0] - // Partition 0: both left and right sides are skewed, left side is divided - // into 2 splits and right side is divided into 4 splits, so - // 2 x 4 sub-partitions. - // Partition 1, 2, 3: not skewed, and coalesced into 1 partition, but it's ignored as the - // size is 0. - // Partition 4: only left side is skewed, and divide into 2 splits, so - // 2 sub-partitions. - // So total (8 + 0 + 2) partitions. val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan) - checkSkewJoin(innerSmj, 8 + 0 + 2) + checkSkewJoin(innerSmj, 2, 1) // skewed left outer join optimization val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult( "SELECT * FROM skewData1 left outer join skewData2 ON key1 = key2") - // left stats: [3496, 0, 0, 0, 4014] - // right stats:[6292, 0, 0, 0, 0] - // Partition 0: both left and right sides are skewed, but left join can't split right side, - // so only left side is divided into 2 splits, and thus 2 sub-partitions. - // Partition 1, 2, 3: not skewed, and coalesced into 1 partition, but it's ignored as the - // size is 0. - // Partition 4: only left side is skewed, and divide into 2 splits, so - // 2 sub-partitions. - // So total (2 + 0 + 2) partitions. val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan) - checkSkewJoin(leftSmj, 2 + 0 + 2) + checkSkewJoin(leftSmj, 2, 0) // skewed right outer join optimization val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult( "SELECT * FROM skewData1 right outer join skewData2 ON key1 = key2") - // left stats: [3496, 0, 0, 0, 4014] - // right stats:[6292, 0, 0, 0, 0] - // Partition 0: both left and right sides are skewed, but right join can't split left side, - // so only right side is divided into 4 splits, and thus 4 sub-partitions. - // Partition 1, 2, 3: not skewed, and coalesced into 1 partition, but it's ignored as the - // size is 0. - // Partition 4: only left side is skewed, but right join can't split left side, so just - // 1 partition. - // So total (4 + 0 + 1) partitions. val rightSmj = findTopLevelSortMergeJoin(rightAdaptivePlan) - checkSkewJoin(rightSmj, 4 + 0 + 1) + checkSkewJoin(rightSmj, 0, 1) } } } @@ -874,28 +860,40 @@ class AdaptiveQueryExecSuite withSQLConf( SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", - SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2000", - SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "2000") { + SQLConf.SHUFFLE_PARTITIONS.key -> "100", + SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") { withTempView("skewData1", "skewData2") { spark .range(0, 1000, 1, 10) - .selectExpr("id % 2 as key1", "id as value1") + .select( + when('id < 250, 249) + .when('id >= 750, 1000) + .otherwise('id).as("key1"), + 'id as "value1") .createOrReplaceTempView("skewData1") spark .range(0, 1000, 1, 10) - .selectExpr("id % 1 as key2", "id as value2") + .select( + when('id < 250, 249) + .otherwise('id).as("key2"), + 'id as "value2") .createOrReplaceTempView("skewData2") val (_, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT * FROM skewData1 join skewData2 ON key1 = key2") - val reader = collect(adaptivePlan) { + val readers = collect(adaptivePlan) { case r: CustomShuffleReaderExec => r - }.head - assert(!reader.isLocalReader) - assert(reader.hasSkewedPartition) - assert(!reader.hasCoalescedPartition) // 0-size partitions are ignored. - assert(reader.metrics.contains("numSkewedPartitions")) - assert(reader.metrics("numSkewedPartitions").value > 0) - assert(reader.metrics("numSkewedSplits").value > 0) + } + readers.foreach { reader => + assert(!reader.isLocalReader) + assert(reader.hasCoalescedPartition) + assert(reader.hasSkewedPartition) + assert(reader.metrics.contains("numSkewedPartitions")) + } + assert(readers(0).metrics("numSkewedPartitions").value == 2) + assert(readers(0).metrics("numSkewedSplits").value == 15) + assert(readers(1).metrics("numSkewedPartitions").value == 1) + assert(readers(1).metrics("numSkewedSplits").value == 12) } } }