From 6b26c629439973045da77f7bcd4b852afe8ebd8b Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Sun, 2 Dec 2018 20:19:55 +0800 Subject: [PATCH 01/11] Commit for fist time success --- .../scala/org/apache/spark/Dependency.scala | 5 ++- .../spark/executor/ShuffleWriteMetrics.scala | 25 +++++++++-- .../spark/scheduler/ShuffleMapTask.scala | 7 ++- .../spark/sql/execution/SparkPlan.scala | 29 +++++++++++- .../spark/sql/execution/SparkPlanInfo.scala | 14 +++++- .../exchange/ShuffleExchangeExec.scala | 14 ++++-- .../apache/spark/sql/execution/limit.scala | 12 ++++- .../sql/execution/metric/SQLMetrics.scala | 12 +++++ .../metric/SQLShuffleMetricsReporter.scala | 45 +++++++++++++++++++ .../execution/metric/SQLMetricsSuite.scala | 11 ++--- 10 files changed, 152 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index 9ea6d2fa2fd95..57a89dd5cfeb6 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -22,7 +22,7 @@ import scala.reflect.ClassTag import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer -import org.apache.spark.shuffle.ShuffleHandle +import org.apache.spark.shuffle.{ShuffleHandle, ShuffleWriteMetricsReporter} /** * :: DeveloperApi :: @@ -73,7 +73,8 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( val serializer: Serializer = SparkEnv.get.serializer, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, - val mapSideCombine: Boolean = false) + val mapSideCombine: Boolean = false, + val shuffleWriteMetricsReporter: Option[ShuffleWriteMetricsReporter] = None) extends Dependency[Product2[K, V]] { if (mapSideCombine) { diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala index d0b0e7da079c9..32d5e47931360 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala @@ -21,6 +21,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.shuffle.ShuffleWriteMetricsReporter import org.apache.spark.util.LongAccumulator +import scala.collection.mutable.ArrayBuffer /** * :: DeveloperApi :: @@ -33,6 +34,9 @@ class ShuffleWriteMetrics private[spark] () extends ShuffleWriteMetricsReporter private[executor] val _recordsWritten = new LongAccumulator private[executor] val _writeTime = new LongAccumulator + @transient private[this] lazy val externalReporters = + new ArrayBuffer[ShuffleWriteMetricsReporter] + /** * Number of bytes written for the shuffle by this task. */ @@ -48,13 +52,28 @@ class ShuffleWriteMetrics private[spark] () extends ShuffleWriteMetricsReporter */ def writeTime: Long = _writeTime.sum - private[spark] override def incBytesWritten(v: Long): Unit = _bytesWritten.add(v) - private[spark] override def incRecordsWritten(v: Long): Unit = _recordsWritten.add(v) - private[spark] override def incWriteTime(v: Long): Unit = _writeTime.add(v) + private[spark] override def incBytesWritten(v: Long): Unit = { + _bytesWritten.add(v) + externalReporters.foreach(_.incBytesWritten(v)) + } + private[spark] override def incRecordsWritten(v: Long): Unit = { + _recordsWritten.add(v) + externalReporters.foreach(_.incRecordsWritten(v)) + } + private[spark] override def incWriteTime(v: Long): Unit = { + _writeTime.add(v) + externalReporters.foreach(_.incWriteTime(v)) + } private[spark] override def decBytesWritten(v: Long): Unit = { _bytesWritten.setValue(bytesWritten - v) + externalReporters.foreach(_.decBytesWritten(v)) } private[spark] override def decRecordsWritten(v: Long): Unit = { _recordsWritten.setValue(recordsWritten - v) + externalReporters.foreach(_.decRecordsWritten(v)) + } + private[spark] def registerExternalShuffleWriteReporter( + reporter: ShuffleWriteMetricsReporter): Unit = { + externalReporters.append(reporter) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 5412717d61988..25f57e6ef5c74 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -27,7 +27,7 @@ import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.shuffle.ShuffleWriter +import org.apache.spark.shuffle.{ProxyShuffleWriteMetricsReporter, ShuffleWriter} /** * A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner @@ -92,6 +92,11 @@ private[spark] class ShuffleMapTask( threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L + if (dep.shuffleWriteMetricsReporter.isDefined) { + context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter( + dep.shuffleWriteMetricsReporter.get) + } + var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 9d9b020309d9f..2aa1de3c44598 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{Predicate => GenPredicate, _} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.types.DataType import org.apache.spark.util.ThreadUtils @@ -78,6 +78,12 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ */ def metrics: Map[String, SQLMetric] = Map.empty + def metricsWithShuffleWrite: Map[String, SQLMetric] = if (shuffleWriteMetricsCreated) { + metrics ++ shuffleWriteMetrics + } else { + metrics + } + /** * Resets all the metrics. */ @@ -421,6 +427,27 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } newOrdering(order, Seq.empty) } + + private var shuffleWriteMetrics: Map[String, SQLMetric] = Map.empty + private var shuffleWriteMetricsCreated: Boolean = false + + final def createShuffleWriteMetrics(): Unit = { + if (!shuffleWriteMetricsCreated) { + shuffleWriteMetricsCreated = true + shuffleWriteMetrics = + SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) + } + } + + final def createShuffleWriteMetricsReporter(): SQLShuffleWriteMetricsReporter = { + if (shuffleWriteMetrics.nonEmpty) { + SQLShuffleWriteMetricsReporter(shuffleWriteMetrics) + } else { + require(sqlContext.conf.wholeStageEnabled && this.isInstanceOf[WholeStageCodegenExec]) + SQLShuffleWriteMetricsReporter( + this.asInstanceOf[WholeStageCodegenExec].child.shuffleWriteMetrics) + } + } } object SparkPlan { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 59ffd16381116..3da164e59dad8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.execution.exchange.ReusedExchangeExec +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec, ReusedExchangeExec} import org.apache.spark.sql.execution.metric.SQLMetricInfo /** @@ -53,10 +54,19 @@ private[execution] object SparkPlanInfo { case ReusedExchangeExec(_, child) => child :: Nil case _ => plan.children ++ plan.subqueries } - val metrics = plan.metrics.toSeq.map { case (key, metric) => + val metrics = plan.metricsWithShuffleWrite.toSeq.map { case (key, metric) => new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType) } + // create shuffle write metrics for child of ShuffleExchangeExec + plan match { + case ShuffleExchangeExec(_, WholeStageCodegenExec(child), _) => + child.createShuffleWriteMetrics() + case ShuffleExchangeExec(_, child, _) => + child.createShuffleWriteMetrics() + case _ => + } + // dump the file scan metadata (e.g file path) to event log val metadata = plan match { case fileScan: FileSourceScanExec => fileScan.metadata diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index c9ca395bceaa4..27187b8155367 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -23,6 +23,7 @@ import java.util.function.Supplier import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer +import org.apache.spark.shuffle.ShuffleWriteMetricsReporter import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors._ @@ -90,7 +91,11 @@ case class ShuffleExchangeExec( private[exchange] def prepareShuffleDependency() : ShuffleDependency[Int, InternalRow, InternalRow] = { ShuffleExchangeExec.prepareShuffleDependency( - child.execute(), child.output, newPartitioning, serializer) + child.execute(), + child.output, + newPartitioning, + serializer, + child.createShuffleWriteMetricsReporter()) } /** @@ -204,7 +209,9 @@ object ShuffleExchangeExec { rdd: RDD[InternalRow], outputAttributes: Seq[Attribute], newPartitioning: Partitioning, - serializer: Serializer): ShuffleDependency[Int, InternalRow, InternalRow] = { + serializer: Serializer, + shuffleWriteMetricsReporter: ShuffleWriteMetricsReporter) + : ShuffleDependency[Int, InternalRow, InternalRow] = { val part: Partitioner = newPartitioning match { case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions) case HashPartitioning(_, n) => @@ -333,7 +340,8 @@ object ShuffleExchangeExec { new ShuffleDependency[Int, InternalRow, InternalRow]( rddWithPartitionIds, new PartitionIdPassthrough(part.numPartitions), - serializer) + serializer, + shuffleWriteMetricsReporter = Some(shuffleWriteMetricsReporter)) dependency } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index e9ab7cd138d99..3b7ab3c8c65f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -43,7 +43,11 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit)) val shuffled = new ShuffledRowRDD( ShuffleExchangeExec.prepareShuffleDependency( - locallyLimited, child.output, SinglePartition, serializer), + locallyLimited, + child.output, + SinglePartition, + serializer, + child.createShuffleWriteMetricsReporter()), metrics) shuffled.mapPartitionsInternal(_.take(limit)) } @@ -165,7 +169,11 @@ case class TakeOrderedAndProjectExec( } val shuffled = new ShuffledRowRDD( ShuffleExchangeExec.prepareShuffleDependency( - localTopK, child.output, SinglePartition, serializer), + localTopK, + child.output, + SinglePartition, + serializer, + child.createShuffleWriteMetricsReporter()), metrics) shuffled.mapPartitions { iter => val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index cbf707f4a9cfd..e21e39a5ff9b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -78,6 +78,7 @@ object SQLMetrics { private val SUM_METRIC = "sum" private val SIZE_METRIC = "size" private val TIMING_METRIC = "timing" + private val NANO_TIMING_METRIC = "nanosecond" private val AVERAGE_METRIC = "average" private val baseForAvgMetric: Int = 10 @@ -121,6 +122,15 @@ object SQLMetrics { acc } + def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = { + // The final result of this metric in physical operator UI may looks like: + // duration(min, med, max): + // 5s (800ms, 1s, 2s) + val acc = new SQLMetric(NANO_TIMING_METRIC, -1) + acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = false) + acc + } + /** * Create a metric to report the average information (including min, med, max) like * avg hash probe. As average metrics are double values, this kind of metrics should be @@ -163,6 +173,8 @@ object SQLMetrics { Utils.bytesToString } else if (metricsType == TIMING_METRIC) { Utils.msDurationToString + } else if (metricsType == NANO_TIMING_METRIC) { + duration => Utils.msDurationToString(duration / 100000) } else { throw new IllegalStateException("unexpected metrics type: " + metricsType) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala index 780f0d7622294..59b5d0bd488bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.metric import org.apache.spark.SparkContext import org.apache.spark.executor.TempShuffleReadMetrics +import org.apache.spark.shuffle.ShuffleWriteMetricsReporter /** * A shuffle metrics reporter for SQL exchange operators. @@ -95,3 +96,47 @@ private[spark] object SQLShuffleMetricsReporter { FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"), RECORDS_READ -> SQLMetrics.createMetric(sc, "records read")) } + +private[spark] class SQLShuffleWriteMetricsReporter( + metrics: Map[String, SQLMetric]) extends ShuffleWriteMetricsReporter with Serializable { + @transient private[spark] lazy val _bytesWritten = + metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN) + @transient private[spark] lazy val _recordsWritten = + metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN) + @transient private[spark] lazy val _writeTime = + metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME) + + override private[spark] def incBytesWritten(v: Long): Unit = { + _bytesWritten.add(v) + } + override private[spark] def decRecordsWritten(v: Long): Unit = { + _recordsWritten.set(_recordsWritten.value - v) + } + override private[spark] def incRecordsWritten(v: Long): Unit = { + _recordsWritten.add(v) + } + override private[spark] def incWriteTime(v: Long): Unit = { + _writeTime.add(v) + } + override private[spark] def decBytesWritten(v: Long): Unit = { + _bytesWritten.set(_bytesWritten.value - v) + } +} + +private[spark] object SQLShuffleWriteMetricsReporter { + val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten" + val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten" + val SHUFFLE_WRITE_TIME = "shuffleWriteTime" + + def apply(metrics: Map[String, SQLMetric]): SQLShuffleWriteMetricsReporter = { + new SQLShuffleWriteMetricsReporter(metrics) + } + + def createShuffleWriteMetrics(sc: SparkContext): Map[String, SQLMetric] = Map( + SHUFFLE_BYTES_WRITTEN -> + SQLMetrics.createSizeMetric(sc, "shuffle bytes written"), + SHUFFLE_RECORDS_WRITTEN -> + SQLMetrics.createMetric(sc, "shuffle records written"), + SHUFFLE_WRITE_TIME -> + SQLMetrics.createNanoTimingMetric(sc, "shuffle write time")) +} \ No newline at end of file diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 0f1d08b6af5d5..7a60cde3fc844 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -207,6 +207,7 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared "local blocks fetched" -> 2L, "remote blocks fetched" -> 0L)))) ) + Thread.sleep(30000) } } @@ -225,14 +226,7 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared // It's 8 because we read 6 rows in the left and 2 row in the right one "number of output rows" -> 8L)))) ) - - val df2 = spark.sql( - "SELECT * FROM testDataForJoin right JOIN testData2 ON testData2.a = testDataForJoin.a") - testSparkPlanMetrics(df2, 1, Map( - 0L -> (("SortMergeJoin", Map( - // It's 8 because we read 6 rows in the left and 2 row in the right one - "number of output rows" -> 8L)))) - ) + Thread.sleep(30000) } } @@ -609,5 +603,6 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared // The final Filter should produce 1 rows, because the input is just one row. assert(filters.head.metrics("numOutputRows").value == 1) } + Thread.sleep(100000) } } From a8a1225837419c99a3d9941046a2ca6b501f6dc8 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 3 Dec 2018 20:06:34 +0800 Subject: [PATCH 02/11] Simplify implement by add metrics in ShuffleExchangeExec --- .../spark/sql/execution/SparkPlan.scala | 29 +----------- .../spark/sql/execution/SparkPlanInfo.scala | 14 +----- .../exchange/ShuffleExchangeExec.scala | 8 ++-- .../apache/spark/sql/execution/limit.scala | 15 ++++-- .../execution/metric/SQLMetricsSuite.scala | 47 +++++++++++++++---- 5 files changed, 57 insertions(+), 56 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 2aa1de3c44598..9d9b020309d9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{Predicate => GenPredicate, _} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleWriteMetricsReporter} +import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.DataType import org.apache.spark.util.ThreadUtils @@ -78,12 +78,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ */ def metrics: Map[String, SQLMetric] = Map.empty - def metricsWithShuffleWrite: Map[String, SQLMetric] = if (shuffleWriteMetricsCreated) { - metrics ++ shuffleWriteMetrics - } else { - metrics - } - /** * Resets all the metrics. */ @@ -427,27 +421,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } newOrdering(order, Seq.empty) } - - private var shuffleWriteMetrics: Map[String, SQLMetric] = Map.empty - private var shuffleWriteMetricsCreated: Boolean = false - - final def createShuffleWriteMetrics(): Unit = { - if (!shuffleWriteMetricsCreated) { - shuffleWriteMetricsCreated = true - shuffleWriteMetrics = - SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) - } - } - - final def createShuffleWriteMetricsReporter(): SQLShuffleWriteMetricsReporter = { - if (shuffleWriteMetrics.nonEmpty) { - SQLShuffleWriteMetricsReporter(shuffleWriteMetrics) - } else { - require(sqlContext.conf.wholeStageEnabled && this.isInstanceOf[WholeStageCodegenExec]) - SQLShuffleWriteMetricsReporter( - this.asInstanceOf[WholeStageCodegenExec].child.shuffleWriteMetrics) - } - } } object SparkPlan { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 3da164e59dad8..59ffd16381116 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -18,8 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.internal.Logging -import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec, ReusedExchangeExec} +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.SQLMetricInfo /** @@ -54,19 +53,10 @@ private[execution] object SparkPlanInfo { case ReusedExchangeExec(_, child) => child :: Nil case _ => plan.children ++ plan.subqueries } - val metrics = plan.metricsWithShuffleWrite.toSeq.map { case (key, metric) => + val metrics = plan.metrics.toSeq.map { case (key, metric) => new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType) } - // create shuffle write metrics for child of ShuffleExchangeExec - plan match { - case ShuffleExchangeExec(_, WholeStageCodegenExec(child), _) => - child.createShuffleWriteMetrics() - case ShuffleExchangeExec(_, child, _) => - child.createShuffleWriteMetrics() - case _ => - } - // dump the file scan metadata (e.g file path) to event log val metadata = plan match { case fileScan: FileSourceScanExec => fileScan.metadata diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 27187b8155367..1b717cddb324e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Uns import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.metric.{SQLMetrics, SQLShuffleMetricsReporter} +import org.apache.spark.sql.execution.metric.{SQLMetrics, SQLShuffleMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.util.MutablePair @@ -48,9 +48,11 @@ case class ShuffleExchangeExec( // NOTE: coordinator can be null after serialization/deserialization, // e.g. it can be null on the Executor side + private val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) + override lazy val metrics = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size") - ) ++ SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) + ) ++ SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) ++ writeMetrics override def nodeName: String = { val extraInfo = coordinator match { @@ -95,7 +97,7 @@ case class ShuffleExchangeExec( child.output, newPartitioning, serializer, - child.createShuffleWriteMetricsReporter()) + SQLShuffleWriteMetricsReporter(writeMetrics)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 3b7ab3c8c65f9..cdff5fc07aecc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec -import org.apache.spark.sql.execution.metric.SQLShuffleMetricsReporter +import org.apache.spark.sql.execution.metric.{SQLShuffleWriteMetricsReporter, SQLShuffleMetricsReporter} /** * Take the first `limit` elements and collect them to a single partition. @@ -38,7 +38,9 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override lazy val metrics = SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) + private val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) + override lazy val metrics = + SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) ++ writeMetrics protected override def doExecute(): RDD[InternalRow] = { val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit)) val shuffled = new ShuffledRowRDD( @@ -47,7 +49,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode child.output, SinglePartition, serializer, - child.createShuffleWriteMetricsReporter()), + SQLShuffleWriteMetricsReporter(writeMetrics)), metrics) shuffled.mapPartitionsInternal(_.take(limit)) } @@ -158,7 +160,10 @@ case class TakeOrderedAndProjectExec( private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override lazy val metrics = SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) + private val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) + + override lazy val metrics = + SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) ++ writeMetrics protected override def doExecute(): RDD[InternalRow] = { val ord = new LazilyGeneratedOrdering(sortOrder, child.output) @@ -173,7 +178,7 @@ case class TakeOrderedAndProjectExec( child.output, SinglePartition, serializer, - child.createShuffleWriteMetricsReporter()), + SQLShuffleWriteMetricsReporter(writeMetrics)), metrics) shuffled.mapPartitions { iter => val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 7a60cde3fc844..2251607e76af8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -97,7 +97,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared val shuffleExpected1 = Map( "records read" -> 2L, "local blocks fetched" -> 2L, - "remote blocks fetched" -> 0L) + "remote blocks fetched" -> 0L, + "shuffle records written" -> 2L) testSparkPlanMetrics(df, 1, Map( 2L -> (("HashAggregate", expected1(0))), 1L -> (("Exchange", shuffleExpected1)), @@ -114,7 +115,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared val shuffleExpected2 = Map( "records read" -> 4L, "local blocks fetched" -> 4L, - "remote blocks fetched" -> 0L) + "remote blocks fetched" -> 0L, + "shuffle records written" -> 4L) testSparkPlanMetrics(df2, 1, Map( 2L -> (("HashAggregate", expected2(0))), 1L -> (("Exchange", shuffleExpected2)), @@ -170,6 +172,11 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared val df = testData2.groupBy().agg(collect_set('a)) // 2 partitions testSparkPlanMetrics(df, 1, Map( 2L -> (("ObjectHashAggregate", Map("number of output rows" -> 2L))), + 1L -> (("Exchange", Map( + "shuffle records written" -> 2L, + "records read" -> 2L, + "local blocks fetched" -> 2L, + "remote blocks fetched" -> 0L))), 0L -> (("ObjectHashAggregate", Map("number of output rows" -> 1L)))) ) @@ -177,6 +184,11 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared val df2 = testData2.groupBy('a).agg(collect_set('a)) testSparkPlanMetrics(df2, 1, Map( 2L -> (("ObjectHashAggregate", Map("number of output rows" -> 4L))), + 1L -> (("Exchange", Map( + "shuffle records written" -> 4L, + "records read" -> 4L, + "local blocks fetched" -> 4L, + "remote blocks fetched" -> 0L))), 0L -> (("ObjectHashAggregate", Map("number of output rows" -> 3L)))) ) } @@ -205,9 +217,9 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared 2L -> (("Exchange", Map( "records read" -> 4L, "local blocks fetched" -> 2L, - "remote blocks fetched" -> 0L)))) + "remote blocks fetched" -> 0L, + "shuffle records written" -> 2L)))) ) - Thread.sleep(30000) } } @@ -226,7 +238,14 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared // It's 8 because we read 6 rows in the left and 2 row in the right one "number of output rows" -> 8L)))) ) - Thread.sleep(30000) + + val df2 = spark.sql( + "SELECT * FROM testDataForJoin right JOIN testData2 ON testData2.a = testDataForJoin.a") + testSparkPlanMetrics(df2, 1, Map( + 0L -> (("SortMergeJoin", Map( + // It's 8 because we read 6 rows in the left and 2 row in the right one + "number of output rows" -> 8L)))) + ) } } @@ -293,12 +312,25 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key", "value") // Assume the execution plan is - // ... -> ShuffledHashJoin(nodeId = 1) -> Project(nodeId = 0) + // Project(nodeId = 0) + // +- ShuffledHashJoin(nodeId = 1) + // :- Exchange(nodeId = 2) + // : +- Project(nodeId = 3) + // : +- LocalTableScan(nodeId = 4) + // +- Exchange(nodeId = 5) + // +- Project(nodeId = 6) + // +- LocalTableScan(nodeId = 7) val df = df1.join(df2, "key") testSparkPlanMetrics(df, 1, Map( 1L -> (("ShuffledHashJoin", Map( "number of output rows" -> 2L, - "avg hash probe (min, med, max)" -> "\n(1, 1, 1)")))) + "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))), + 2L -> (("Exchange", Map( + "shuffle records written" -> 2L, + "records read" -> 2L))), + 5L -> (("Exchange", Map( + "shuffle records written" -> 10L, + "records read" -> 10L)))) ) } } @@ -603,6 +635,5 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared // The final Filter should produce 1 rows, because the input is just one row. assert(filters.head.metrics("numOutputRows").value == 1) } - Thread.sleep(100000) } } From 7c8e5161904f1fd0fa4d99e6c497ef1be3542bdb Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 3 Dec 2018 20:40:41 +0800 Subject: [PATCH 03/11] code clean and comments --- .../scala/org/apache/spark/Dependency.scala | 1 + .../spark/executor/ShuffleWriteMetrics.scala | 7 ++-- .../spark/scheduler/ShuffleMapTask.scala | 3 +- project/MimaExcludes.scala | 3 ++ .../spark/sql/execution/ShuffledRowRDD.scala | 6 +-- .../exchange/ShuffleExchangeExec.scala | 4 +- .../apache/spark/sql/execution/limit.scala | 6 +-- .../sql/execution/metric/SQLMetrics.scala | 4 +- .../metric/SQLShuffleMetricsReporter.scala | 37 ++++++++++++------- .../execution/UnsafeRowSerializerSuite.scala | 4 +- 10 files changed, 44 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index 57a89dd5cfeb6..254a9d795572a 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -65,6 +65,7 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { * @param keyOrdering key ordering for RDD's shuffles * @param aggregator map/reduce-side aggregator for RDD's shuffle * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine) + * @param shuffleWriteMetricsReporter the shuffle write metrics reporter for this shuffle stage. */ @DeveloperApi class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala index 32d5e47931360..2bfacccdd8f57 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala @@ -17,16 +17,17 @@ package org.apache.spark.executor +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.annotation.DeveloperApi import org.apache.spark.shuffle.ShuffleWriteMetricsReporter import org.apache.spark.util.LongAccumulator -import scala.collection.mutable.ArrayBuffer - /** * :: DeveloperApi :: * A collection of accumulators that represent metrics about writing shuffle data. - * Operations are not thread-safe. + * Operations are not thread-safe. Also response for updating external reporters of + * [[ShuffleWriteMetricsReporter]]. */ @DeveloperApi class ShuffleWriteMetrics private[spark] () extends ShuffleWriteMetricsReporter with Serializable { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 25f57e6ef5c74..6e1784307434b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -27,7 +27,7 @@ import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.shuffle.{ProxyShuffleWriteMetricsReporter, ShuffleWriter} +import org.apache.spark.shuffle.ShuffleWriter /** * A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner @@ -92,6 +92,7 @@ private[spark] class ShuffleMapTask( threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L + // Register the shuffle write metrics reporter to shuffleWriteMetrics. if (dep.shuffleWriteMetricsReporter.isDefined) { context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter( dep.shuffleWriteMetricsReporter.get) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 1c83cf5860c58..336787f5f2570 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -214,6 +214,9 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.shuffle.sort.UnsafeShuffleWriter.this"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.storage.TimeTrackingOutputStream.this"), + // [SPARK-26139] Implement shuffle write metrics in SQL + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ShuffleDependency.this"), + // Data Source V2 API changes (problem: Problem) => problem match { case MissingClassProblem(cls) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala index 9b05faaed0459..079ff25fcb67e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala @@ -22,7 +22,7 @@ import java.util.Arrays import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleMetricsReporter} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter} /** * The [[Partition]] used by [[ShuffledRowRDD]]. A post-shuffle partition @@ -157,9 +157,9 @@ class ShuffledRowRDD( override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition] val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics() - // `SQLShuffleMetricsReporter` will update its own metrics for SQL exchange operator, + // `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator, // as well as the `tempMetrics` for basic shuffle metrics. - val sqlMetricsReporter = new SQLShuffleMetricsReporter(tempMetrics, metrics) + val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics) // The range of pre-shuffle partitions that we are fetching at here is // [startPreShufflePartitionIndex, endPreShufflePartitionIndex - 1]. val reader = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 1b717cddb324e..dd6e5f609c47e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Uns import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.metric.{SQLMetrics, SQLShuffleMetricsReporter, SQLShuffleWriteMetricsReporter} +import org.apache.spark.sql.execution.metric.{SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.util.MutablePair @@ -52,7 +52,7 @@ case class ShuffleExchangeExec( override lazy val metrics = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size") - ) ++ SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) ++ writeMetrics + ) ++ SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) ++ writeMetrics override def nodeName: String = { val extraInfo = coordinator match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index cdff5fc07aecc..a634495fa4558 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec -import org.apache.spark.sql.execution.metric.{SQLShuffleWriteMetricsReporter, SQLShuffleMetricsReporter} +import org.apache.spark.sql.execution.metric.{SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} /** * Take the first `limit` elements and collect them to a single partition. @@ -40,7 +40,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) private val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) override lazy val metrics = - SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) ++ writeMetrics + SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) ++ writeMetrics protected override def doExecute(): RDD[InternalRow] = { val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit)) val shuffled = new ShuffledRowRDD( @@ -163,7 +163,7 @@ case class TakeOrderedAndProjectExec( private val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) override lazy val metrics = - SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) ++ writeMetrics + SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) ++ writeMetrics protected override def doExecute(): RDD[InternalRow] = { val ord = new LazilyGeneratedOrdering(sortOrder, child.output) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index e21e39a5ff9b1..0fdc3e4ea8940 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -123,9 +123,7 @@ object SQLMetrics { } def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = { - // The final result of this metric in physical operator UI may looks like: - // duration(min, med, max): - // 5s (800ms, 1s, 2s) + // Same with createTimingMetric, just mark the unit of time to nanosecond. val acc = new SQLMetric(NANO_TIMING_METRIC, -1) acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = false) acc diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala index 59b5d0bd488bd..4f8b087d06362 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala @@ -22,28 +22,28 @@ import org.apache.spark.executor.TempShuffleReadMetrics import org.apache.spark.shuffle.ShuffleWriteMetricsReporter /** - * A shuffle metrics reporter for SQL exchange operators. + * A shuffle read metrics reporter for SQL exchange operators. * @param tempMetrics [[TempShuffleReadMetrics]] created in TaskContext. * @param metrics All metrics in current SparkPlan. This param should not empty and * contains all shuffle metrics defined in createShuffleReadMetrics. */ -private[spark] class SQLShuffleMetricsReporter( +private[spark] class SQLShuffleReadMetricsReporter( tempMetrics: TempShuffleReadMetrics, metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics { private[this] val _remoteBlocksFetched = - metrics(SQLShuffleMetricsReporter.REMOTE_BLOCKS_FETCHED) + metrics(SQLShuffleReadMetricsReporter.REMOTE_BLOCKS_FETCHED) private[this] val _localBlocksFetched = - metrics(SQLShuffleMetricsReporter.LOCAL_BLOCKS_FETCHED) + metrics(SQLShuffleReadMetricsReporter.LOCAL_BLOCKS_FETCHED) private[this] val _remoteBytesRead = - metrics(SQLShuffleMetricsReporter.REMOTE_BYTES_READ) + metrics(SQLShuffleReadMetricsReporter.REMOTE_BYTES_READ) private[this] val _remoteBytesReadToDisk = - metrics(SQLShuffleMetricsReporter.REMOTE_BYTES_READ_TO_DISK) + metrics(SQLShuffleReadMetricsReporter.REMOTE_BYTES_READ_TO_DISK) private[this] val _localBytesRead = - metrics(SQLShuffleMetricsReporter.LOCAL_BYTES_READ) + metrics(SQLShuffleReadMetricsReporter.LOCAL_BYTES_READ) private[this] val _fetchWaitTime = - metrics(SQLShuffleMetricsReporter.FETCH_WAIT_TIME) + metrics(SQLShuffleReadMetricsReporter.FETCH_WAIT_TIME) private[this] val _recordsRead = - metrics(SQLShuffleMetricsReporter.RECORDS_READ) + metrics(SQLShuffleReadMetricsReporter.RECORDS_READ) override def incRemoteBlocksFetched(v: Long): Unit = { _remoteBlocksFetched.add(v) @@ -75,7 +75,7 @@ private[spark] class SQLShuffleMetricsReporter( } } -private[spark] object SQLShuffleMetricsReporter { +private[spark] object SQLShuffleReadMetricsReporter { val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched" val LOCAL_BLOCKS_FETCHED = "localBlocksFetched" val REMOTE_BYTES_READ = "remoteBytesRead" @@ -97,13 +97,19 @@ private[spark] object SQLShuffleMetricsReporter { RECORDS_READ -> SQLMetrics.createMetric(sc, "records read")) } +/** + * A shuffle write metrics reporter for SQL exchange operators. Different with + * [[SQLShuffleReadMetricsReporter]], write metrics reporter will be set and serialized + * in ShuffleDependency, so the local SQLMetric should transient and create on executor. + * @param metrics Shuffle write metrics in current SparkPlan. + */ private[spark] class SQLShuffleWriteMetricsReporter( metrics: Map[String, SQLMetric]) extends ShuffleWriteMetricsReporter with Serializable { - @transient private[spark] lazy val _bytesWritten = + @transient private[this] lazy val _bytesWritten = metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN) - @transient private[spark] lazy val _recordsWritten = + @transient private[this] lazy val _recordsWritten = metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN) - @transient private[spark] lazy val _writeTime = + @transient private[this] lazy val _writeTime = metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME) override private[spark] def incBytesWritten(v: Long): Unit = { @@ -132,6 +138,9 @@ private[spark] object SQLShuffleWriteMetricsReporter { new SQLShuffleWriteMetricsReporter(metrics) } + /** + * Create all shuffle write relative metrics and return the Map. + */ def createShuffleWriteMetrics(sc: SparkContext): Map[String, SQLMetric] = Map( SHUFFLE_BYTES_WRITTEN -> SQLMetrics.createSizeMetric(sc, "shuffle bytes written"), @@ -139,4 +148,4 @@ private[spark] object SQLShuffleWriteMetricsReporter { SQLMetrics.createMetric(sc, "shuffle records written"), SHUFFLE_WRITE_TIME -> SQLMetrics.createNanoTimingMetric(sc, "shuffle write time")) -} \ No newline at end of file +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index 1ad5713ab8ae6..ca8692290edb2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{LocalSparkSession, Row, SparkSession} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow} -import org.apache.spark.sql.execution.metric.SQLShuffleMetricsReporter +import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter import org.apache.spark.sql.types._ import org.apache.spark.storage.ShuffleBlockId import org.apache.spark.util.collection.ExternalSorter @@ -140,7 +140,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkSession { new UnsafeRowSerializer(2)) val shuffled = new ShuffledRowRDD( dependency, - SQLShuffleMetricsReporter.createShuffleReadMetrics(spark.sparkContext)) + SQLShuffleReadMetricsReporter.createShuffleReadMetrics(spark.sparkContext)) shuffled.count() } } From cf35b9f948f174a5726a7feba611224c4ac495e7 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 5 Dec 2018 20:35:36 +0800 Subject: [PATCH 04/11] Address comments and change the ShuffleDependency --- .../scala/org/apache/spark/Dependency.scala | 6 ++- .../spark/executor/ShuffleWriteMetrics.scala | 30 +++------------ .../spark/scheduler/ShuffleMapTask.scala | 13 ++++--- .../exchange/ShuffleExchangeExec.scala | 4 +- .../sql/execution/metric/SQLMetrics.scala | 8 ++-- .../metric/SQLShuffleMetricsReporter.scala | 37 ++++++++++--------- 6 files changed, 43 insertions(+), 55 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index 254a9d795572a..4c4ea9d1d13e3 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -65,7 +65,8 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { * @param keyOrdering key ordering for RDD's shuffles * @param aggregator map/reduce-side aggregator for RDD's shuffle * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine) - * @param shuffleWriteMetricsReporter the shuffle write metrics reporter for this shuffle stage. + * @param writeMetricsReporterCreator the function to create an external shuffle write metrics + * reporter for this shuffle stage. */ @DeveloperApi class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @@ -75,7 +76,8 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false, - val shuffleWriteMetricsReporter: Option[ShuffleWriteMetricsReporter] = None) + val writeMetricsReporterCreator + : Option[ShuffleWriteMetricsReporter => ShuffleWriteMetricsReporter] = None) extends Dependency[Product2[K, V]] { if (mapSideCombine) { diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala index 2bfacccdd8f57..d0b0e7da079c9 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala @@ -17,17 +17,15 @@ package org.apache.spark.executor -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.annotation.DeveloperApi import org.apache.spark.shuffle.ShuffleWriteMetricsReporter import org.apache.spark.util.LongAccumulator + /** * :: DeveloperApi :: * A collection of accumulators that represent metrics about writing shuffle data. - * Operations are not thread-safe. Also response for updating external reporters of - * [[ShuffleWriteMetricsReporter]]. + * Operations are not thread-safe. */ @DeveloperApi class ShuffleWriteMetrics private[spark] () extends ShuffleWriteMetricsReporter with Serializable { @@ -35,9 +33,6 @@ class ShuffleWriteMetrics private[spark] () extends ShuffleWriteMetricsReporter private[executor] val _recordsWritten = new LongAccumulator private[executor] val _writeTime = new LongAccumulator - @transient private[this] lazy val externalReporters = - new ArrayBuffer[ShuffleWriteMetricsReporter] - /** * Number of bytes written for the shuffle by this task. */ @@ -53,28 +48,13 @@ class ShuffleWriteMetrics private[spark] () extends ShuffleWriteMetricsReporter */ def writeTime: Long = _writeTime.sum - private[spark] override def incBytesWritten(v: Long): Unit = { - _bytesWritten.add(v) - externalReporters.foreach(_.incBytesWritten(v)) - } - private[spark] override def incRecordsWritten(v: Long): Unit = { - _recordsWritten.add(v) - externalReporters.foreach(_.incRecordsWritten(v)) - } - private[spark] override def incWriteTime(v: Long): Unit = { - _writeTime.add(v) - externalReporters.foreach(_.incWriteTime(v)) - } + private[spark] override def incBytesWritten(v: Long): Unit = _bytesWritten.add(v) + private[spark] override def incRecordsWritten(v: Long): Unit = _recordsWritten.add(v) + private[spark] override def incWriteTime(v: Long): Unit = _writeTime.add(v) private[spark] override def decBytesWritten(v: Long): Unit = { _bytesWritten.setValue(bytesWritten - v) - externalReporters.foreach(_.decBytesWritten(v)) } private[spark] override def decRecordsWritten(v: Long): Unit = { _recordsWritten.setValue(recordsWritten - v) - externalReporters.foreach(_.decRecordsWritten(v)) - } - private[spark] def registerExternalShuffleWriteReporter( - reporter: ShuffleWriteMetricsReporter): Unit = { - externalReporters.append(reporter) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 6e1784307434b..60333db44b3fa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -92,17 +92,20 @@ private[spark] class ShuffleMapTask( threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L - // Register the shuffle write metrics reporter to shuffleWriteMetrics. - if (dep.shuffleWriteMetricsReporter.isDefined) { - context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter( - dep.shuffleWriteMetricsReporter.get) + // Get `SQLShuffleWriteMetricsReporter` if needed, it will update its own metrics for + // SQL exchange operator, as well as the shuffle write metrics in task context. + val contextMetricsReporter = context.taskMetrics().shuffleWriteMetrics + val metricsReporter = if (dep.writeMetricsReporterCreator.isDefined) { + dep.writeMetricsReporterCreator.get.apply(contextMetricsReporter) + } else { + contextMetricsReporter } var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any]( - dep.shuffleHandle, partitionId, context, context.taskMetrics().shuffleWriteMetrics) + dep.shuffleHandle, partitionId, context, metricsReporter) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index dd6e5f609c47e..984a43d617225 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -212,7 +212,7 @@ object ShuffleExchangeExec { outputAttributes: Seq[Attribute], newPartitioning: Partitioning, serializer: Serializer, - shuffleWriteMetricsReporter: ShuffleWriteMetricsReporter) + writeMetricsReporterCreator: ShuffleWriteMetricsReporter => ShuffleWriteMetricsReporter) : ShuffleDependency[Int, InternalRow, InternalRow] = { val part: Partitioner = newPartitioning match { case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions) @@ -343,7 +343,7 @@ object ShuffleExchangeExec { rddWithPartitionIds, new PartitionIdPassthrough(part.numPartitions), serializer, - shuffleWriteMetricsReporter = Some(shuffleWriteMetricsReporter)) + writeMetricsReporterCreator = Some(writeMetricsReporterCreator)) dependency } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 0fdc3e4ea8940..fba32cc742c2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -78,7 +78,7 @@ object SQLMetrics { private val SUM_METRIC = "sum" private val SIZE_METRIC = "size" private val TIMING_METRIC = "timing" - private val NANO_TIMING_METRIC = "nanosecond" + private val NS_TIMING_METRIC = "nanosecond" private val AVERAGE_METRIC = "average" private val baseForAvgMetric: Int = 10 @@ -124,7 +124,7 @@ object SQLMetrics { def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = { // Same with createTimingMetric, just mark the unit of time to nanosecond. - val acc = new SQLMetric(NANO_TIMING_METRIC, -1) + val acc = new SQLMetric(NS_TIMING_METRIC, -1) acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = false) acc } @@ -171,8 +171,8 @@ object SQLMetrics { Utils.bytesToString } else if (metricsType == TIMING_METRIC) { Utils.msDurationToString - } else if (metricsType == NANO_TIMING_METRIC) { - duration => Utils.msDurationToString(duration / 100000) + } else if (metricsType == NS_TIMING_METRIC) { + duration => Utils.msDurationToString(duration / 1000 / 1000) } else { throw new IllegalStateException("unexpected metrics type: " + metricsType) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala index 4f8b087d06362..3cff8f66aa126 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.metric import org.apache.spark.SparkContext import org.apache.spark.executor.TempShuffleReadMetrics -import org.apache.spark.shuffle.ShuffleWriteMetricsReporter +import org.apache.spark.shuffle.{ShuffleReadMetricsReporter, ShuffleWriteMetricsReporter} /** * A shuffle read metrics reporter for SQL exchange operators. @@ -29,7 +29,7 @@ import org.apache.spark.shuffle.ShuffleWriteMetricsReporter */ private[spark] class SQLShuffleReadMetricsReporter( tempMetrics: TempShuffleReadMetrics, - metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics { + metrics: Map[String, SQLMetric]) extends ShuffleReadMetricsReporter { private[this] val _remoteBlocksFetched = metrics(SQLShuffleReadMetricsReporter.REMOTE_BLOCKS_FETCHED) private[this] val _localBlocksFetched = @@ -45,31 +45,31 @@ private[spark] class SQLShuffleReadMetricsReporter( private[this] val _recordsRead = metrics(SQLShuffleReadMetricsReporter.RECORDS_READ) - override def incRemoteBlocksFetched(v: Long): Unit = { + override private[spark] def incRemoteBlocksFetched(v: Long): Unit = { _remoteBlocksFetched.add(v) tempMetrics.incRemoteBlocksFetched(v) } - override def incLocalBlocksFetched(v: Long): Unit = { + override private[spark] def incLocalBlocksFetched(v: Long): Unit = { _localBlocksFetched.add(v) tempMetrics.incLocalBlocksFetched(v) } - override def incRemoteBytesRead(v: Long): Unit = { + override private[spark] def incRemoteBytesRead(v: Long): Unit = { _remoteBytesRead.add(v) tempMetrics.incRemoteBytesRead(v) } - override def incRemoteBytesReadToDisk(v: Long): Unit = { + override private[spark] def incRemoteBytesReadToDisk(v: Long): Unit = { _remoteBytesReadToDisk.add(v) tempMetrics.incRemoteBytesReadToDisk(v) } - override def incLocalBytesRead(v: Long): Unit = { + override private[spark] def incLocalBytesRead(v: Long): Unit = { _localBytesRead.add(v) tempMetrics.incLocalBytesRead(v) } - override def incFetchWaitTime(v: Long): Unit = { + override private[spark] def incFetchWaitTime(v: Long): Unit = { _fetchWaitTime.add(v) tempMetrics.incFetchWaitTime(v) } - override def incRecordsRead(v: Long): Unit = { + override private[spark] def incRecordsRead(v: Long): Unit = { _recordsRead.add(v) tempMetrics.incRecordsRead(v) } @@ -99,12 +99,14 @@ private[spark] object SQLShuffleReadMetricsReporter { /** * A shuffle write metrics reporter for SQL exchange operators. Different with - * [[SQLShuffleReadMetricsReporter]], write metrics reporter will be set and serialized - * in ShuffleDependency, so the local SQLMetric should transient and create on executor. + * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in + * shuffle dependency, so the local SQLMetric should transient and create on executor. * @param metrics Shuffle write metrics in current SparkPlan. + * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter. */ -private[spark] class SQLShuffleWriteMetricsReporter( - metrics: Map[String, SQLMetric]) extends ShuffleWriteMetricsReporter with Serializable { +private[spark] case class SQLShuffleWriteMetricsReporter( + metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter) + extends ShuffleWriteMetricsReporter with Serializable { @transient private[this] lazy val _bytesWritten = metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN) @transient private[this] lazy val _recordsWritten = @@ -113,18 +115,23 @@ private[spark] class SQLShuffleWriteMetricsReporter( metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME) override private[spark] def incBytesWritten(v: Long): Unit = { + metricsReporter.incBytesWritten(v) _bytesWritten.add(v) } override private[spark] def decRecordsWritten(v: Long): Unit = { + metricsReporter.decBytesWritten(v) _recordsWritten.set(_recordsWritten.value - v) } override private[spark] def incRecordsWritten(v: Long): Unit = { + metricsReporter.incRecordsWritten(v) _recordsWritten.add(v) } override private[spark] def incWriteTime(v: Long): Unit = { + metricsReporter.incWriteTime(v) _writeTime.add(v) } override private[spark] def decBytesWritten(v: Long): Unit = { + metricsReporter.decBytesWritten(v) _bytesWritten.set(_bytesWritten.value - v) } } @@ -134,10 +141,6 @@ private[spark] object SQLShuffleWriteMetricsReporter { val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten" val SHUFFLE_WRITE_TIME = "shuffleWriteTime" - def apply(metrics: Map[String, SQLMetric]): SQLShuffleWriteMetricsReporter = { - new SQLShuffleWriteMetricsReporter(metrics) - } - /** * Create all shuffle write relative metrics and return the Map. */ From 76d1ca0036bbb50a005e9d12f8b22bf21697af7f Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 5 Dec 2018 23:37:06 +0800 Subject: [PATCH 05/11] better way to deal with duration --- .../org/apache/spark/sql/execution/metric/SQLMetrics.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index fba32cc742c2f..0fbba85efc043 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.metric import java.text.NumberFormat import java.util.Locale +import scala.concurrent.duration._ + import org.apache.spark.SparkContext import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates @@ -172,7 +174,7 @@ object SQLMetrics { } else if (metricsType == TIMING_METRIC) { Utils.msDurationToString } else if (metricsType == NS_TIMING_METRIC) { - duration => Utils.msDurationToString(duration / 1000 / 1000) + duration => Utils.msDurationToString(duration.nanos.toMillis) } else { throw new IllegalStateException("unexpected metrics type: " + metricsType) } From 9966c2abc821492d5f5c6c74034407879c764573 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 6 Dec 2018 11:17:20 +0800 Subject: [PATCH 06/11] separate read/write metrics --- .../exchange/ShuffleExchangeExec.scala | 11 +++++----- .../apache/spark/sql/execution/limit.scala | 21 +++++++++++-------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 984a43d617225..a1bb53923b8b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -47,12 +47,13 @@ case class ShuffleExchangeExec( // NOTE: coordinator can be null after serialization/deserialization, // e.g. it can be null on the Executor side - - private val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) - + private lazy val writeMetrics = + SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) + private lazy val readMetrics = + SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size") - ) ++ SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) ++ writeMetrics + ) ++ readMetrics ++ writeMetrics override def nodeName: String = { val extraInfo = coordinator match { @@ -116,7 +117,7 @@ case class ShuffleExchangeExec( assert(newPartitioning.isInstanceOf[HashPartitioning]) newPartitioning = UnknownPartitioning(indices.length) } - new ShuffledRowRDD(shuffleDependency, metrics, specifiedPartitionStartIndices) + new ShuffledRowRDD(shuffleDependency, readMetrics, specifiedPartitionStartIndices) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index a634495fa4558..7dfc28ecb36e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -38,9 +38,11 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - private val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) - override lazy val metrics = - SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) ++ writeMetrics + private lazy val writeMetrics = + SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) + private lazy val readMetrics = + SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) + override lazy val metrics = readMetrics ++ writeMetrics protected override def doExecute(): RDD[InternalRow] = { val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit)) val shuffled = new ShuffledRowRDD( @@ -50,7 +52,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode SinglePartition, serializer, SQLShuffleWriteMetricsReporter(writeMetrics)), - metrics) + readMetrics) shuffled.mapPartitionsInternal(_.take(limit)) } } @@ -160,10 +162,11 @@ case class TakeOrderedAndProjectExec( private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - private val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) - - override lazy val metrics = - SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) ++ writeMetrics + private lazy val writeMetrics = + SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) + private lazy val readMetrics = + SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) + override lazy val metrics = readMetrics ++ writeMetrics protected override def doExecute(): RDD[InternalRow] = { val ord = new LazilyGeneratedOrdering(sortOrder, child.output) @@ -179,7 +182,7 @@ case class TakeOrderedAndProjectExec( SinglePartition, serializer, SQLShuffleWriteMetricsReporter(writeMetrics)), - metrics) + readMetrics) shuffled.mapPartitions { iter => val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord) if (projectList != child.output) { From a780b706c607e4363030a9693f2b7c887e81750f Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 7 Dec 2018 01:13:44 +0800 Subject: [PATCH 07/11] Create ShuffleWriteProcessor --- .../scala/org/apache/spark/Dependency.scala | 8 +- .../spark/scheduler/ShuffleMapTask.scala | 29 +------ .../shuffle/ShuffleWriterProcessor.scala | 83 +++++++++++++++++++ .../exchange/ShuffleExchangeExec.scala | 20 +++-- .../apache/spark/sql/execution/limit.scala | 4 +- .../metric/SQLShuffleMetricsReporter.scala | 18 ++-- 6 files changed, 112 insertions(+), 50 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index 4c4ea9d1d13e3..e9be03925fdb0 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -22,7 +22,7 @@ import scala.reflect.ClassTag import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer -import org.apache.spark.shuffle.{ShuffleHandle, ShuffleWriteMetricsReporter} +import org.apache.spark.shuffle.{DefaultShuffleWriteProcessor, ShuffleHandle, ShuffleWriteProcessor} /** * :: DeveloperApi :: @@ -65,8 +65,7 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { * @param keyOrdering key ordering for RDD's shuffles * @param aggregator map/reduce-side aggregator for RDD's shuffle * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine) - * @param writeMetricsReporterCreator the function to create an external shuffle write metrics - * reporter for this shuffle stage. + * @param shuffleWriterProcessor the processor to control the write behavior in ShuffleMapTask. */ @DeveloperApi class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @@ -76,8 +75,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false, - val writeMetricsReporterCreator - : Option[ShuffleWriteMetricsReporter => ShuffleWriteMetricsReporter] = None) + val shuffleWriterProcessor: ShuffleWriteProcessor = new DefaultShuffleWriteProcessor) extends Dependency[Product2[K, V]] { if (mapSideCombine) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 60333db44b3fa..2a8d1dd995e27 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -92,34 +92,7 @@ private[spark] class ShuffleMapTask( threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L - // Get `SQLShuffleWriteMetricsReporter` if needed, it will update its own metrics for - // SQL exchange operator, as well as the shuffle write metrics in task context. - val contextMetricsReporter = context.taskMetrics().shuffleWriteMetrics - val metricsReporter = if (dep.writeMetricsReporterCreator.isDefined) { - dep.writeMetricsReporterCreator.get.apply(contextMetricsReporter) - } else { - contextMetricsReporter - } - - var writer: ShuffleWriter[Any, Any] = null - try { - val manager = SparkEnv.get.shuffleManager - writer = manager.getWriter[Any, Any]( - dep.shuffleHandle, partitionId, context, metricsReporter) - writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) - writer.stop(success = true).get - } catch { - case e: Exception => - try { - if (writer != null) { - writer.stop(success = false) - } - } catch { - case e: Exception => - log.debug("Could not stop writer", e) - } - throw e - } + dep.shuffleWriterProcessor.writeProcess(rdd, dep, partitionId, context, partition) } override def preferredLocations: Seq[TaskLocation] = preferredLocs diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala new file mode 100644 index 0000000000000..580183419e35f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.MapStatus + + +/** + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor + * and put it into [[ShuffleDependency]], and executors use it for write processing. + */ +private[spark] trait ShuffleWriteProcessor extends Serializable with Logging { + + /** + * Create a [[ShuffleWriteMetricsReporter]] from the default reporter, always return a proxy + * reporter for both local accumulator and original reporter updating. As the reporter is a + * per-row operator, here need a careful consideration on performance. + */ + def createMetricsReporter(reporter: ShuffleWriteMetricsReporter): ShuffleWriteMetricsReporter + + /** + * The write process for particular partition, it controls the life circle of [[ShuffleWriter]] + * get from [[ShuffleManager]] and triggers rdd compute, finally return the [[MapStatus]] for + * this task. + */ + def writeProcess( + rdd: RDD[_], + dep: ShuffleDependency[_, _, _], + partitionId: Int, + context: TaskContext, + partition: Partition): MapStatus = { + var writer: ShuffleWriter[Any, Any] = null + try { + val manager = SparkEnv.get.shuffleManager + writer = manager.getWriter[Any, Any]( + dep.shuffleHandle, + partitionId, + context, + createMetricsReporter(context.taskMetrics().shuffleWriteMetrics)) + writer.write( + rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) + writer.stop(success = true).get + } catch { + case e: Exception => + try { + if (writer != null) { + writer.stop(success = false) + } + } catch { + case e: Exception => + log.debug("Could not stop writer", e) + } + throw e + } + } +} + + +/** + * Default shuffle write processor use the shuffle write metrics reporter in context. + */ +private[spark] class DefaultShuffleWriteProcessor extends ShuffleWriteProcessor { + override def createMetricsReporter( + reporter: ShuffleWriteMetricsReporter): ShuffleWriteMetricsReporter = reporter +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index a1bb53923b8b0..6c511acf62f41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -23,7 +23,7 @@ import java.util.function.Supplier import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer -import org.apache.spark.shuffle.ShuffleWriteMetricsReporter +import org.apache.spark.shuffle.{ShuffleWriteMetricsReporter, ShuffleWriteProcessor} import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors._ @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Uns import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.metric.{SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.util.MutablePair @@ -98,7 +98,7 @@ case class ShuffleExchangeExec( child.output, newPartitioning, serializer, - SQLShuffleWriteMetricsReporter(writeMetrics)) + writeMetrics) } /** @@ -213,7 +213,7 @@ object ShuffleExchangeExec { outputAttributes: Seq[Attribute], newPartitioning: Partitioning, serializer: Serializer, - writeMetricsReporterCreator: ShuffleWriteMetricsReporter => ShuffleWriteMetricsReporter) + writeMetrics: Map[String, SQLMetric]) : ShuffleDependency[Int, InternalRow, InternalRow] = { val part: Partitioner = newPartitioning match { case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions) @@ -344,8 +344,18 @@ object ShuffleExchangeExec { rddWithPartitionIds, new PartitionIdPassthrough(part.numPartitions), serializer, - writeMetricsReporterCreator = Some(writeMetricsReporterCreator)) + shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics)) dependency } + + /** + * Create a customized [[ShuffleWriteProcessor]] for SQL which wrapping the default metrics + * reporter with [[SQLShuffleWriteMetricsReporter]]. + */ + def createShuffleWriteProcessor(metrics: Map[String, SQLMetric]): ShuffleWriteProcessor = { + (reporter: ShuffleWriteMetricsReporter) => { + new SQLShuffleWriteMetricsReporter(reporter, metrics) + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 7dfc28ecb36e9..bfaf080292bce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -51,7 +51,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode child.output, SinglePartition, serializer, - SQLShuffleWriteMetricsReporter(writeMetrics)), + writeMetrics), readMetrics) shuffled.mapPartitionsInternal(_.take(limit)) } @@ -181,7 +181,7 @@ case class TakeOrderedAndProjectExec( child.output, SinglePartition, serializer, - SQLShuffleWriteMetricsReporter(writeMetrics)), + writeMetrics), readMetrics) shuffled.mapPartitions { iter => val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala index 3cff8f66aa126..d75f62f240cb7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala @@ -98,20 +98,18 @@ private[spark] object SQLShuffleReadMetricsReporter { } /** - * A shuffle write metrics reporter for SQL exchange operators. Different with - * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in - * shuffle dependency, so the local SQLMetric should transient and create on executor. - * @param metrics Shuffle write metrics in current SparkPlan. + * A shuffle write metrics reporter for SQL exchange operators. * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter. + * @param metrics Shuffle write metrics in current SparkPlan. */ -private[spark] case class SQLShuffleWriteMetricsReporter( - metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter) - extends ShuffleWriteMetricsReporter with Serializable { - @transient private[this] lazy val _bytesWritten = +private[spark] class SQLShuffleWriteMetricsReporter( + metricsReporter: ShuffleWriteMetricsReporter, + metrics: Map[String, SQLMetric]) extends ShuffleWriteMetricsReporter { + private[this] val _bytesWritten = metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN) - @transient private[this] lazy val _recordsWritten = + private[this] val _recordsWritten = metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN) - @transient private[this] lazy val _writeTime = + private[this] val _writeTime = metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME) override private[spark] def incBytesWritten(v: Long): Unit = { From 7d104ebe854effb3d8ceb63cd408b9749cee1a8a Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 7 Dec 2018 01:30:19 +0800 Subject: [PATCH 08/11] Revert the changes about shuffle read metrics reaname --- .../spark/sql/execution/ShuffledRowRDD.scala | 6 +-- .../exchange/ShuffleExchangeExec.scala | 4 +- .../apache/spark/sql/execution/limit.scala | 6 +-- .../metric/SQLShuffleMetricsReporter.scala | 38 +++++++++---------- .../execution/UnsafeRowSerializerSuite.scala | 4 +- 5 files changed, 29 insertions(+), 29 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala index 079ff25fcb67e..9b05faaed0459 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala @@ -22,7 +22,7 @@ import java.util.Arrays import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleMetricsReporter} /** * The [[Partition]] used by [[ShuffledRowRDD]]. A post-shuffle partition @@ -157,9 +157,9 @@ class ShuffledRowRDD( override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition] val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics() - // `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator, + // `SQLShuffleMetricsReporter` will update its own metrics for SQL exchange operator, // as well as the `tempMetrics` for basic shuffle metrics. - val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics) + val sqlMetricsReporter = new SQLShuffleMetricsReporter(tempMetrics, metrics) // The range of pre-shuffle partitions that we are fetching at here is // [startPreShufflePartitionIndex, endPreShufflePartitionIndex - 1]. val reader = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 6c511acf62f41..0d01db2be82b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Uns import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.util.MutablePair @@ -50,7 +50,7 @@ case class ShuffleExchangeExec( private lazy val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) private lazy val readMetrics = - SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) + SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size") ) ++ readMetrics ++ writeMetrics diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index bfaf080292bce..1f2fdde538645 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec -import org.apache.spark.sql.execution.metric.{SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} +import org.apache.spark.sql.execution.metric.{SQLShuffleMetricsReporter, SQLShuffleWriteMetricsReporter} /** * Take the first `limit` elements and collect them to a single partition. @@ -41,7 +41,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode private lazy val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) private lazy val readMetrics = - SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) + SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics = readMetrics ++ writeMetrics protected override def doExecute(): RDD[InternalRow] = { val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit)) @@ -165,7 +165,7 @@ case class TakeOrderedAndProjectExec( private lazy val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) private lazy val readMetrics = - SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) + SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics = readMetrics ++ writeMetrics protected override def doExecute(): RDD[InternalRow] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala index d75f62f240cb7..ff7941e3b3e8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala @@ -19,63 +19,63 @@ package org.apache.spark.sql.execution.metric import org.apache.spark.SparkContext import org.apache.spark.executor.TempShuffleReadMetrics -import org.apache.spark.shuffle.{ShuffleReadMetricsReporter, ShuffleWriteMetricsReporter} +import org.apache.spark.shuffle.ShuffleWriteMetricsReporter /** - * A shuffle read metrics reporter for SQL exchange operators. + * A shuffle metrics reporter for SQL exchange operators. * @param tempMetrics [[TempShuffleReadMetrics]] created in TaskContext. * @param metrics All metrics in current SparkPlan. This param should not empty and * contains all shuffle metrics defined in createShuffleReadMetrics. */ -private[spark] class SQLShuffleReadMetricsReporter( +private[spark] class SQLShuffleMetricsReporter( tempMetrics: TempShuffleReadMetrics, - metrics: Map[String, SQLMetric]) extends ShuffleReadMetricsReporter { + metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics { private[this] val _remoteBlocksFetched = - metrics(SQLShuffleReadMetricsReporter.REMOTE_BLOCKS_FETCHED) + metrics(SQLShuffleMetricsReporter.REMOTE_BLOCKS_FETCHED) private[this] val _localBlocksFetched = - metrics(SQLShuffleReadMetricsReporter.LOCAL_BLOCKS_FETCHED) + metrics(SQLShuffleMetricsReporter.LOCAL_BLOCKS_FETCHED) private[this] val _remoteBytesRead = - metrics(SQLShuffleReadMetricsReporter.REMOTE_BYTES_READ) + metrics(SQLShuffleMetricsReporter.REMOTE_BYTES_READ) private[this] val _remoteBytesReadToDisk = - metrics(SQLShuffleReadMetricsReporter.REMOTE_BYTES_READ_TO_DISK) + metrics(SQLShuffleMetricsReporter.REMOTE_BYTES_READ_TO_DISK) private[this] val _localBytesRead = - metrics(SQLShuffleReadMetricsReporter.LOCAL_BYTES_READ) + metrics(SQLShuffleMetricsReporter.LOCAL_BYTES_READ) private[this] val _fetchWaitTime = - metrics(SQLShuffleReadMetricsReporter.FETCH_WAIT_TIME) + metrics(SQLShuffleMetricsReporter.FETCH_WAIT_TIME) private[this] val _recordsRead = - metrics(SQLShuffleReadMetricsReporter.RECORDS_READ) + metrics(SQLShuffleMetricsReporter.RECORDS_READ) - override private[spark] def incRemoteBlocksFetched(v: Long): Unit = { + override def incRemoteBlocksFetched(v: Long): Unit = { _remoteBlocksFetched.add(v) tempMetrics.incRemoteBlocksFetched(v) } - override private[spark] def incLocalBlocksFetched(v: Long): Unit = { + override def incLocalBlocksFetched(v: Long): Unit = { _localBlocksFetched.add(v) tempMetrics.incLocalBlocksFetched(v) } - override private[spark] def incRemoteBytesRead(v: Long): Unit = { + override def incRemoteBytesRead(v: Long): Unit = { _remoteBytesRead.add(v) tempMetrics.incRemoteBytesRead(v) } - override private[spark] def incRemoteBytesReadToDisk(v: Long): Unit = { + override def incRemoteBytesReadToDisk(v: Long): Unit = { _remoteBytesReadToDisk.add(v) tempMetrics.incRemoteBytesReadToDisk(v) } - override private[spark] def incLocalBytesRead(v: Long): Unit = { + override def incLocalBytesRead(v: Long): Unit = { _localBytesRead.add(v) tempMetrics.incLocalBytesRead(v) } - override private[spark] def incFetchWaitTime(v: Long): Unit = { + override def incFetchWaitTime(v: Long): Unit = { _fetchWaitTime.add(v) tempMetrics.incFetchWaitTime(v) } - override private[spark] def incRecordsRead(v: Long): Unit = { + override def incRecordsRead(v: Long): Unit = { _recordsRead.add(v) tempMetrics.incRecordsRead(v) } } -private[spark] object SQLShuffleReadMetricsReporter { +private[spark] object SQLShuffleMetricsReporter { val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched" val LOCAL_BLOCKS_FETCHED = "localBlocksFetched" val REMOTE_BYTES_READ = "remoteBytesRead" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index ca8692290edb2..1ad5713ab8ae6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{LocalSparkSession, Row, SparkSession} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow} -import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter +import org.apache.spark.sql.execution.metric.SQLShuffleMetricsReporter import org.apache.spark.sql.types._ import org.apache.spark.storage.ShuffleBlockId import org.apache.spark.util.collection.ExternalSorter @@ -140,7 +140,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkSession { new UnsafeRowSerializer(2)) val shuffled = new ShuffledRowRDD( dependency, - SQLShuffleReadMetricsReporter.createShuffleReadMetrics(spark.sparkContext)) + SQLShuffleMetricsReporter.createShuffleReadMetrics(spark.sparkContext)) shuffled.count() } } From d5ee2493478d11ba688172d4b27a15b18beaf559 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 7 Dec 2018 01:42:39 +0800 Subject: [PATCH 09/11] self check --- core/src/main/scala/org/apache/spark/Dependency.scala | 2 +- .../org/apache/spark/shuffle/ShuffleWriterProcessor.scala | 5 ++--- .../spark/sql/execution/exchange/ShuffleExchangeExec.scala | 4 ++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index e9be03925fdb0..a2ebb95f13cbd 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -65,7 +65,7 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { * @param keyOrdering key ordering for RDD's shuffles * @param aggregator map/reduce-side aggregator for RDD's shuffle * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine) - * @param shuffleWriterProcessor the processor to control the write behavior in ShuffleMapTask. + * @param shuffleWriterProcessor the processor to control the write behavior in ShuffleMapTask */ @DeveloperApi class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala index 580183419e35f..92c71ecf0bac3 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala @@ -22,10 +22,9 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.MapStatus - /** * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor - * and put it into [[ShuffleDependency]], and executors use it for write processing. + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask. */ private[spark] trait ShuffleWriteProcessor extends Serializable with Logging { @@ -75,7 +74,7 @@ private[spark] trait ShuffleWriteProcessor extends Serializable with Logging { /** - * Default shuffle write processor use the shuffle write metrics reporter in context. + * Default shuffle write processor which use the shuffle write metrics reporter in context. */ private[spark] class DefaultShuffleWriteProcessor extends ShuffleWriteProcessor { override def createMetricsReporter( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 0d01db2be82b8..b8fc231bafc87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -350,8 +350,8 @@ object ShuffleExchangeExec { } /** - * Create a customized [[ShuffleWriteProcessor]] for SQL which wrapping the default metrics - * reporter with [[SQLShuffleWriteMetricsReporter]]. + * Create a customized [[ShuffleWriteProcessor]] for SQL which wrap the default metrics reporter + * with [[SQLShuffleWriteMetricsReporter]] as new reporter for [[ShuffleWriteProcessor]]. */ def createShuffleWriteProcessor(metrics: Map[String, SQLMetric]): ShuffleWriteProcessor = { (reporter: ShuffleWriteMetricsReporter) => { From 6378a3d4707b0d7559fca20220229cde71f9a64b Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 7 Dec 2018 13:20:31 +0800 Subject: [PATCH 10/11] Delete DefaultShuffleWriteProcessor --- .../scala/org/apache/spark/Dependency.scala | 4 ++-- ...ssor.scala => ShuffleWriteProcessor.scala} | 19 ++++++------------- .../exchange/ShuffleExchangeExec.scala | 7 +++++-- .../sql/execution/metric/SQLMetrics.scala | 8 ++++---- 4 files changed, 17 insertions(+), 21 deletions(-) rename core/src/main/scala/org/apache/spark/shuffle/{ShuffleWriterProcessor.scala => ShuffleWriteProcessor.scala} (79%) diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index a2ebb95f13cbd..fb051a8c0db8e 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -22,7 +22,7 @@ import scala.reflect.ClassTag import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer -import org.apache.spark.shuffle.{DefaultShuffleWriteProcessor, ShuffleHandle, ShuffleWriteProcessor} +import org.apache.spark.shuffle.{ShuffleHandle, ShuffleWriteProcessor} /** * :: DeveloperApi :: @@ -75,7 +75,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false, - val shuffleWriterProcessor: ShuffleWriteProcessor = new DefaultShuffleWriteProcessor) + val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor) extends Dependency[Product2[K, V]] { if (mapSideCombine) { diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala similarity index 79% rename from core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala rename to core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala index 92c71ecf0bac3..476a5698582a9 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala @@ -26,14 +26,16 @@ import org.apache.spark.scheduler.MapStatus * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask. */ -private[spark] trait ShuffleWriteProcessor extends Serializable with Logging { +private[spark] class ShuffleWriteProcessor extends Serializable with Logging { /** - * Create a [[ShuffleWriteMetricsReporter]] from the default reporter, always return a proxy + * Create a [[ShuffleWriteMetricsReporter]] from the task context, always return a proxy * reporter for both local accumulator and original reporter updating. As the reporter is a * per-row operator, here need a careful consideration on performance. */ - def createMetricsReporter(reporter: ShuffleWriteMetricsReporter): ShuffleWriteMetricsReporter + def createMetricsReporter(context: TaskContext): ShuffleWriteMetricsReporter = { + context.taskMetrics().shuffleWriteMetrics + } /** * The write process for particular partition, it controls the life circle of [[ShuffleWriter]] @@ -53,7 +55,7 @@ private[spark] trait ShuffleWriteProcessor extends Serializable with Logging { dep.shuffleHandle, partitionId, context, - createMetricsReporter(context.taskMetrics().shuffleWriteMetrics)) + createMetricsReporter(context)) writer.write( rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get @@ -71,12 +73,3 @@ private[spark] trait ShuffleWriteProcessor extends Serializable with Logging { } } } - - -/** - * Default shuffle write processor which use the shuffle write metrics reporter in context. - */ -private[spark] class DefaultShuffleWriteProcessor extends ShuffleWriteProcessor { - override def createMetricsReporter( - reporter: ShuffleWriteMetricsReporter): ShuffleWriteMetricsReporter = reporter -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index b8fc231bafc87..e74fb27f89818 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -354,8 +354,11 @@ object ShuffleExchangeExec { * with [[SQLShuffleWriteMetricsReporter]] as new reporter for [[ShuffleWriteProcessor]]. */ def createShuffleWriteProcessor(metrics: Map[String, SQLMetric]): ShuffleWriteProcessor = { - (reporter: ShuffleWriteMetricsReporter) => { - new SQLShuffleWriteMetricsReporter(reporter, metrics) + new ShuffleWriteProcessor { + override def createMetricsReporter( + context: TaskContext): ShuffleWriteMetricsReporter = { + new SQLShuffleWriteMetricsReporter(context.taskMetrics().shuffleWriteMetrics, metrics) + } } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 0fbba85efc043..51789cb76b4c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -80,7 +80,7 @@ object SQLMetrics { private val SUM_METRIC = "sum" private val SIZE_METRIC = "size" private val TIMING_METRIC = "timing" - private val NS_TIMING_METRIC = "nanosecond" + private val NORMALIZE_TIMING_METRIC = "normalizeTiming" private val AVERAGE_METRIC = "average" private val baseForAvgMetric: Int = 10 @@ -125,8 +125,8 @@ object SQLMetrics { } def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = { - // Same with createTimingMetric, just mark the unit of time to nanosecond. - val acc = new SQLMetric(NS_TIMING_METRIC, -1) + // Same with createTimingMetric, just normalize the unit of time to millisecond. + val acc = new SQLMetric(NORMALIZE_TIMING_METRIC, -1) acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = false) acc } @@ -173,7 +173,7 @@ object SQLMetrics { Utils.bytesToString } else if (metricsType == TIMING_METRIC) { Utils.msDurationToString - } else if (metricsType == NS_TIMING_METRIC) { + } else if (metricsType == NORMALIZE_TIMING_METRIC) { duration => Utils.msDurationToString(duration.nanos.toMillis) } else { throw new IllegalStateException("unexpected metrics type: " + metricsType) From bc2c4f187f8037aa540b0e6bae5d90d7d6e3509d Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 7 Dec 2018 17:52:40 +0800 Subject: [PATCH 11/11] Address comments --- .../org/apache/spark/shuffle/ShuffleWriteProcessor.scala | 5 ++--- .../spark/sql/execution/exchange/ShuffleExchangeExec.scala | 2 +- .../org/apache/spark/sql/execution/metric/SQLMetrics.scala | 6 +++--- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala index 476a5698582a9..f5213157a9a85 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala @@ -29,11 +29,10 @@ import org.apache.spark.scheduler.MapStatus private[spark] class ShuffleWriteProcessor extends Serializable with Logging { /** - * Create a [[ShuffleWriteMetricsReporter]] from the task context, always return a proxy - * reporter for both local accumulator and original reporter updating. As the reporter is a + * Create a [[ShuffleWriteMetricsReporter]] from the task context. As the reporter is a * per-row operator, here need a careful consideration on performance. */ - def createMetricsReporter(context: TaskContext): ShuffleWriteMetricsReporter = { + protected def createMetricsReporter(context: TaskContext): ShuffleWriteMetricsReporter = { context.taskMetrics().shuffleWriteMetrics } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index e74fb27f89818..0c2020572e721 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -355,7 +355,7 @@ object ShuffleExchangeExec { */ def createShuffleWriteProcessor(metrics: Map[String, SQLMetric]): ShuffleWriteProcessor = { new ShuffleWriteProcessor { - override def createMetricsReporter( + override protected def createMetricsReporter( context: TaskContext): ShuffleWriteMetricsReporter = { new SQLShuffleWriteMetricsReporter(context.taskMetrics().shuffleWriteMetrics, metrics) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 51789cb76b4c7..19809b07508d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -80,7 +80,7 @@ object SQLMetrics { private val SUM_METRIC = "sum" private val SIZE_METRIC = "size" private val TIMING_METRIC = "timing" - private val NORMALIZE_TIMING_METRIC = "normalizeTiming" + private val NS_TIMING_METRIC = "nsTiming" private val AVERAGE_METRIC = "average" private val baseForAvgMetric: Int = 10 @@ -126,7 +126,7 @@ object SQLMetrics { def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = { // Same with createTimingMetric, just normalize the unit of time to millisecond. - val acc = new SQLMetric(NORMALIZE_TIMING_METRIC, -1) + val acc = new SQLMetric(NS_TIMING_METRIC, -1) acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = false) acc } @@ -173,7 +173,7 @@ object SQLMetrics { Utils.bytesToString } else if (metricsType == TIMING_METRIC) { Utils.msDurationToString - } else if (metricsType == NORMALIZE_TIMING_METRIC) { + } else if (metricsType == NS_TIMING_METRIC) { duration => Utils.msDurationToString(duration.nanos.toMillis) } else { throw new IllegalStateException("unexpected metrics type: " + metricsType)