1717
1818package org .apache .spark .sql .execution .adaptive
1919
20+ import scala .collection .mutable .ArrayBuffer
21+
2022import org .apache .spark .rdd .RDD
2123import org .apache .spark .sql .catalyst .InternalRow
2224import org .apache .spark .sql .catalyst .expressions .{Attribute , Expression }
@@ -95,54 +97,68 @@ case class CustomShuffleReaderExec private(
9597 case _ => None
9698 }
9799
98- private def partitionDataSizeMetrics = {
99- val maxSize = SQLMetrics .createSizeMetric(sparkContext, " maximum partition data size" )
100- val minSize = SQLMetrics .createSizeMetric(sparkContext, " minimum partition data size" )
101- val avgSize = SQLMetrics .createSizeMetric(sparkContext, " average partition data size" )
102- val mapStatsOpt = shuffleStage.get.mapStats
103- val sizes = mapStatsOpt.map { mapStats =>
104- val mapSizes = mapStats.bytesByPartitionId
105- partitionSpecs.map {
106- case CoalescedPartitionSpec (startReducerIndex, endReducerIndex) =>
107- startReducerIndex.until(endReducerIndex).map(mapSizes).sum
108- case p : PartialReducerPartitionSpec => p.dataSize
109- case p => throw new IllegalStateException (" unexpected " + p)
100+ private def sendDriverMetrics (): Unit = {
101+ val executionId = sparkContext.getLocalProperty(SQLExecution .EXECUTION_ID_KEY )
102+ var driverAccumUpdates : Seq [(Long , Long )] = Seq .empty
103+
104+ val numPartitionsMetric = metrics(" numPartitions" )
105+ numPartitionsMetric.set(partitionSpecs.length)
106+ driverAccumUpdates = driverAccumUpdates :+
107+ (numPartitionsMetric.id, partitionSpecs.length.toLong)
108+
109+ if (hasSkewedPartition) {
110+ val skewedMetric = metrics(" numSkewedPartitions" )
111+ val numSkewedPartitions = partitionSpecs.collect {
112+ case p : PartialReducerPartitionSpec => p.reducerIndex
113+ }.distinct.length
114+ skewedMetric.set(numSkewedPartitions)
115+ driverAccumUpdates = driverAccumUpdates :+ (skewedMetric.id, numSkewedPartitions.toLong)
116+ }
117+
118+ if (! isLocalReader) {
119+ val partitionMetrics = metrics(" partitionDataSize" )
120+ val mapStats = shuffleStage.get.mapStats
121+
122+ if (mapStats.isEmpty) {
123+ partitionMetrics.set(0 )
124+ driverAccumUpdates = driverAccumUpdates :+ (partitionMetrics.id, 0L )
125+ } else {
126+ var sum = 0L
127+ partitionSpecs.foreach {
128+ case CoalescedPartitionSpec (startReducerIndex, endReducerIndex) =>
129+ val dataSize = startReducerIndex.until(endReducerIndex).map(
130+ mapStats.get.bytesByPartitionId(_)).sum
131+ driverAccumUpdates = driverAccumUpdates :+ (partitionMetrics.id, dataSize)
132+ sum += dataSize
133+ case p : PartialReducerPartitionSpec =>
134+ driverAccumUpdates = driverAccumUpdates :+ (partitionMetrics.id, p.dataSize)
135+ sum += p.dataSize
136+ case p => throw new IllegalStateException (" unexpected " + p)
137+ }
138+
139+ // Set sum value to "partitionDataSize" metric.
140+ partitionMetrics.set(sum)
110141 }
111- }.getOrElse(Seq (0L ))
112-
113- maxSize.set(sizes.max)
114- minSize.set(sizes.min)
115- avgSize.set(sizes.sum / sizes.length)
116- Map (
117- " maxPartitionDataSize" -> maxSize,
118- " minPartitionDataSize" -> minSize,
119- " avgPartitionDataSize" -> avgSize)
120- }
142+ }
121143
122- private def skewedPartitionMetrics = {
123- val metrics = SQLMetrics .createMetric(sparkContext, " number of skewed partitions" )
124- val numSkewedPartitions = partitionSpecs.collect {
125- case p : PartialReducerPartitionSpec => p.reducerIndex
126- }.distinct.length
127- metrics.set(numSkewedPartitions)
128- Map (" numSkewedPartitions" -> metrics)
144+ SQLMetrics .postDriverMetricsUpdatedByValue(sparkContext, executionId, driverAccumUpdates)
129145 }
130146
131147 @ transient override lazy val metrics : Map [String , SQLMetric ] = {
132148 if (shuffleStage.isDefined) {
133- val numPartitions = SQLMetrics .createMetric(sparkContext, " number of partitions" )
134- numPartitions.set(partitionSpecs.length)
135- Map (" numPartitions" -> numPartitions) ++ {
149+ Map (" numPartitions" -> SQLMetrics .createMetric(sparkContext, " number of partitions" )) ++ {
136150 if (isLocalReader) {
137151 // We split the mapper partition evenly when creating local shuffle reader, so no
138152 // data size info is available.
139153 Map .empty
140154 } else {
141- partitionDataSizeMetrics
155+ Map (" partitionDataSize" ->
156+ SQLMetrics .createSizeMetric(sparkContext, " partition data size" ))
142157 }
143158 } ++ {
144159 if (hasSkewedPartition) {
145- skewedPartitionMetrics
160+ Map (" numSkewedPartitions" ->
161+ SQLMetrics .createMetric(sparkContext, " number of skewed partitions" ))
146162 } else {
147163 Map .empty
148164 }
@@ -154,8 +170,8 @@ case class CustomShuffleReaderExec private(
154170 }
155171
156172 private lazy val cachedShuffleRDD : RDD [InternalRow ] = {
157- val executionId = sparkContext.getLocalProperty( SQLExecution . EXECUTION_ID_KEY )
158- SQLMetrics .postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)
173+ sendDriverMetrics( )
174+
159175 shuffleStage.map { stage =>
160176 new ShuffledRowRDD (
161177 stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, partitionSpecs.toArray)
0 commit comments