From deff6d5bdd91631d6a64ed223bd84be1ecd5da40 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 11 Dec 2018 19:56:52 +0800 Subject: [PATCH 1/3] rename --- .../spark/sql/execution/ShuffledRowRDD.scala | 6 ++-- .../exchange/ShuffleExchangeExec.scala | 4 +-- .../apache/spark/sql/execution/limit.scala | 6 ++-- ...la => SQLShuffleReadMetricsReporter.scala} | 32 +++++++++---------- .../execution/UnsafeRowSerializerSuite.scala | 4 +-- 5 files changed, 26 insertions(+), 26 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/metric/{SQLShuffleMetricsReporter.scala => SQLShuffleReadMetricsReporter.scala} (84%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala index 9b05faaed0459..079ff25fcb67e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala @@ -22,7 +22,7 @@ import java.util.Arrays import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleMetricsReporter} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter} /** * The [[Partition]] used by [[ShuffledRowRDD]]. A post-shuffle partition @@ -157,9 +157,9 @@ class ShuffledRowRDD( override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition] val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics() - // `SQLShuffleMetricsReporter` will update its own metrics for SQL exchange operator, + // `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator, // as well as the `tempMetrics` for basic shuffle metrics. - val sqlMetricsReporter = new SQLShuffleMetricsReporter(tempMetrics, metrics) + val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics) // The range of pre-shuffle partitions that we are fetching at here is // [startPreShufflePartitionIndex, endPreShufflePartitionIndex - 1]. val reader = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 0c2020572e721..da7b0c6f43fbc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Uns import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleMetricsReporter, SQLShuffleWriteMetricsReporter} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.util.MutablePair @@ -50,7 +50,7 @@ case class ShuffleExchangeExec( private lazy val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) private lazy val readMetrics = - SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) + SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size") ) ++ readMetrics ++ writeMetrics diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 1f2fdde538645..bfaf080292bce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec -import org.apache.spark.sql.execution.metric.{SQLShuffleMetricsReporter, SQLShuffleWriteMetricsReporter} +import org.apache.spark.sql.execution.metric.{SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} /** * Take the first `limit` elements and collect them to a single partition. @@ -41,7 +41,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode private lazy val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) private lazy val readMetrics = - SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) + SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics = readMetrics ++ writeMetrics protected override def doExecute(): RDD[InternalRow] = { val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit)) @@ -165,7 +165,7 @@ case class TakeOrderedAndProjectExec( private lazy val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) private lazy val readMetrics = - SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) + SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics = readMetrics ++ writeMetrics protected override def doExecute(): RDD[InternalRow] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleReadMetricsReporter.scala similarity index 84% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleReadMetricsReporter.scala index ff7941e3b3e8d..a06352833bf55 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleReadMetricsReporter.scala @@ -27,23 +27,23 @@ import org.apache.spark.shuffle.ShuffleWriteMetricsReporter * @param metrics All metrics in current SparkPlan. This param should not empty and * contains all shuffle metrics defined in createShuffleReadMetrics. */ -private[spark] class SQLShuffleMetricsReporter( +class SQLShuffleReadMetricsReporter( tempMetrics: TempShuffleReadMetrics, metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics { private[this] val _remoteBlocksFetched = - metrics(SQLShuffleMetricsReporter.REMOTE_BLOCKS_FETCHED) + metrics(SQLShuffleReadMetricsReporter.REMOTE_BLOCKS_FETCHED) private[this] val _localBlocksFetched = - metrics(SQLShuffleMetricsReporter.LOCAL_BLOCKS_FETCHED) + metrics(SQLShuffleReadMetricsReporter.LOCAL_BLOCKS_FETCHED) private[this] val _remoteBytesRead = - metrics(SQLShuffleMetricsReporter.REMOTE_BYTES_READ) + metrics(SQLShuffleReadMetricsReporter.REMOTE_BYTES_READ) private[this] val _remoteBytesReadToDisk = - metrics(SQLShuffleMetricsReporter.REMOTE_BYTES_READ_TO_DISK) + metrics(SQLShuffleReadMetricsReporter.REMOTE_BYTES_READ_TO_DISK) private[this] val _localBytesRead = - metrics(SQLShuffleMetricsReporter.LOCAL_BYTES_READ) + metrics(SQLShuffleReadMetricsReporter.LOCAL_BYTES_READ) private[this] val _fetchWaitTime = - metrics(SQLShuffleMetricsReporter.FETCH_WAIT_TIME) + metrics(SQLShuffleReadMetricsReporter.FETCH_WAIT_TIME) private[this] val _recordsRead = - metrics(SQLShuffleMetricsReporter.RECORDS_READ) + metrics(SQLShuffleReadMetricsReporter.RECORDS_READ) override def incRemoteBlocksFetched(v: Long): Unit = { _remoteBlocksFetched.add(v) @@ -75,7 +75,7 @@ private[spark] class SQLShuffleMetricsReporter( } } -private[spark] object SQLShuffleMetricsReporter { +object SQLShuffleReadMetricsReporter { val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched" val LOCAL_BLOCKS_FETCHED = "localBlocksFetched" val REMOTE_BYTES_READ = "remoteBytesRead" @@ -102,7 +102,7 @@ private[spark] object SQLShuffleMetricsReporter { * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter. * @param metrics Shuffle write metrics in current SparkPlan. */ -private[spark] class SQLShuffleWriteMetricsReporter( +class SQLShuffleWriteMetricsReporter( metricsReporter: ShuffleWriteMetricsReporter, metrics: Map[String, SQLMetric]) extends ShuffleWriteMetricsReporter { private[this] val _bytesWritten = @@ -112,29 +112,29 @@ private[spark] class SQLShuffleWriteMetricsReporter( private[this] val _writeTime = metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME) - override private[spark] def incBytesWritten(v: Long): Unit = { + override def incBytesWritten(v: Long): Unit = { metricsReporter.incBytesWritten(v) _bytesWritten.add(v) } - override private[spark] def decRecordsWritten(v: Long): Unit = { + override def decRecordsWritten(v: Long): Unit = { metricsReporter.decBytesWritten(v) _recordsWritten.set(_recordsWritten.value - v) } - override private[spark] def incRecordsWritten(v: Long): Unit = { + override def incRecordsWritten(v: Long): Unit = { metricsReporter.incRecordsWritten(v) _recordsWritten.add(v) } - override private[spark] def incWriteTime(v: Long): Unit = { + override def incWriteTime(v: Long): Unit = { metricsReporter.incWriteTime(v) _writeTime.add(v) } - override private[spark] def decBytesWritten(v: Long): Unit = { + override def decBytesWritten(v: Long): Unit = { metricsReporter.decBytesWritten(v) _bytesWritten.set(_bytesWritten.value - v) } } -private[spark] object SQLShuffleWriteMetricsReporter { +object SQLShuffleWriteMetricsReporter { val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten" val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten" val SHUFFLE_WRITE_TIME = "shuffleWriteTime" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index 1ad5713ab8ae6..ca8692290edb2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{LocalSparkSession, Row, SparkSession} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow} -import org.apache.spark.sql.execution.metric.SQLShuffleMetricsReporter +import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter import org.apache.spark.sql.types._ import org.apache.spark.storage.ShuffleBlockId import org.apache.spark.util.collection.ExternalSorter @@ -140,7 +140,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkSession { new UnsafeRowSerializer(2)) val shuffled = new ShuffledRowRDD( dependency, - SQLShuffleMetricsReporter.createShuffleReadMetrics(spark.sparkContext)) + SQLShuffleReadMetricsReporter.createShuffleReadMetrics(spark.sparkContext)) shuffled.count() } } From 3b3dfd95a538563077d0cc536ef252f4af2c2dcf Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 11 Dec 2018 20:10:53 +0800 Subject: [PATCH 2/3] function rename in ShuffleWriteProcessor --- .../main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala | 2 +- .../scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 2a8d1dd995e27..35664ff515d4b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -92,7 +92,7 @@ private[spark] class ShuffleMapTask( threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L - dep.shuffleWriterProcessor.writeProcess(rdd, dep, partitionId, context, partition) + dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition) } override def preferredLocations: Seq[TaskLocation] = preferredLocs diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala index f5213157a9a85..5b0c7e9f2b0b4 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala @@ -41,7 +41,7 @@ private[spark] class ShuffleWriteProcessor extends Serializable with Logging { * get from [[ShuffleManager]] and triggers rdd compute, finally return the [[MapStatus]] for * this task. */ - def writeProcess( + def write( rdd: RDD[_], dep: ShuffleDependency[_, _, _], partitionId: Int, From 8d6dac92d8787dbd7f9b576545a6fbe6999861af Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 11 Dec 2018 20:21:30 +0800 Subject: [PATCH 3/3] Change the desplay name --- ....scala => SQLShuffleMetricsReporter.scala} | 4 ++-- .../execution/metric/SQLMetricsSuite.scala | 20 +++++++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/metric/{SQLShuffleReadMetricsReporter.scala => SQLShuffleMetricsReporter.scala} (99%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleReadMetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala similarity index 99% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleReadMetricsReporter.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala index a06352833bf55..2c0ea80495abb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleReadMetricsReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala @@ -88,8 +88,8 @@ object SQLShuffleReadMetricsReporter { * Create all shuffle read relative metrics and return the Map. */ def createShuffleReadMetrics(sc: SparkContext): Map[String, SQLMetric] = Map( - REMOTE_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "remote blocks fetched"), - LOCAL_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "local blocks fetched"), + REMOTE_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "remote blocks read"), + LOCAL_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "local blocks read"), REMOTE_BYTES_READ -> SQLMetrics.createSizeMetric(sc, "remote bytes read"), REMOTE_BYTES_READ_TO_DISK -> SQLMetrics.createSizeMetric(sc, "remote bytes read to disk"), LOCAL_BYTES_READ -> SQLMetrics.createSizeMetric(sc, "local bytes read"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 4a80638f68858..afae914ff8a5b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -96,8 +96,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared "avg hash probe (min, med, max)" -> "\n(1, 1, 1)")) val shuffleExpected1 = Map( "records read" -> 2L, - "local blocks fetched" -> 2L, - "remote blocks fetched" -> 0L, + "local blocks read" -> 2L, + "remote blocks read" -> 0L, "shuffle records written" -> 2L) testSparkPlanMetrics(df, 1, Map( 2L -> (("HashAggregate", expected1(0))), @@ -114,8 +114,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared "avg hash probe (min, med, max)" -> "\n(1, 1, 1)")) val shuffleExpected2 = Map( "records read" -> 4L, - "local blocks fetched" -> 4L, - "remote blocks fetched" -> 0L, + "local blocks read" -> 4L, + "remote blocks read" -> 0L, "shuffle records written" -> 4L) testSparkPlanMetrics(df2, 1, Map( 2L -> (("HashAggregate", expected2(0))), @@ -175,8 +175,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared 1L -> (("Exchange", Map( "shuffle records written" -> 2L, "records read" -> 2L, - "local blocks fetched" -> 2L, - "remote blocks fetched" -> 0L))), + "local blocks read" -> 2L, + "remote blocks read" -> 0L))), 0L -> (("ObjectHashAggregate", Map("number of output rows" -> 1L)))) ) @@ -187,8 +187,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared 1L -> (("Exchange", Map( "shuffle records written" -> 4L, "records read" -> 4L, - "local blocks fetched" -> 4L, - "remote blocks fetched" -> 0L))), + "local blocks read" -> 4L, + "remote blocks read" -> 0L))), 0L -> (("ObjectHashAggregate", Map("number of output rows" -> 3L)))) ) } @@ -216,8 +216,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared "number of output rows" -> 4L))), 2L -> (("Exchange", Map( "records read" -> 4L, - "local blocks fetched" -> 2L, - "remote blocks fetched" -> 0L, + "local blocks read" -> 2L, + "remote blocks read" -> 0L, "shuffle records written" -> 2L)))) ) }