From a1314af708c9aa244250e7a4e5dc74f088c73f2b Mon Sep 17 00:00:00 2001 From: Kalpit Shah Date: Fri, 30 May 2014 02:04:49 -0700 Subject: [PATCH] SPARK-1972: Added support for tracking custom task-related metrics Any piece of Spark machinery that cares to track custom task-related metrics can now use the setCustomMetric(name,value) method on TaskMetrics to do that. The StagePage in UI now shows Custom Metrics for every task. --- .../apache/spark/executor/TaskMetrics.scala | 33 +++++++++++++++++++ .../org/apache/spark/ui/jobs/StagePage.scala | 8 ++++- .../org/apache/spark/util/JsonProtocol.scala | 18 +++++++++- .../apache/spark/util/JsonProtocolSuite.scala | 8 +++++ 4 files changed, 65 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 350fd74173f65..207a45174bce3 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -81,6 +81,39 @@ class TaskMetrics extends Serializable { * Storage statuses of any blocks that have been updated as a result of this task. */ var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None + + /** + * Custom task-specific metrics. Any piece of Spark machinery that cares to track custom task-related metrics can + * now use the setCustomMetric(name,value) method on TaskMetrics to do that. + * e.g. Any RDD that wants to track a custom metric of its interest (related to execution of a task) can + * call setCustomMetric(name,value) inside its compute() method. + * Map Key -> name of custom metric + * Map Value -> list of numeric values for custom metric + */ + private val _customMetrics: scala.collection.mutable.HashMap[String, List[Long]] = scala.collection.mutable.HashMap() + + /** + * Custom task-specific metrics. Any piece of Spark machinery that cares to track custom task-related metrics can + * now use the setCustomMetric(name,value) method on TaskMetrics to do that. + * e.g. Any RDD that wants to track a custom metric of its interest (related to execution of a task) can + * call setCustomMetric(name,value) inside its compute() method. + * Map Key -> name of custom metric + * Map Value -> list of numeric values for custom metric + */ + def customMetrics = _customMetrics + + /** + * Convenience method for setting a custom metric + * @param metricName name of custom metric + * @param metricValue value for custom metric + */ + def setCustomMetric(metricName: String, metricValue: Long) { + if (_customMetrics.contains(metricName)) { + _customMetrics(metricName) = _customMetrics(metricName) ++ List(metricValue) + } else { + _customMetrics(metricName) = List(metricValue) + } + } } private[spark] object TaskMetrics { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 4bce472036f7d..ddf4f5a295947 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -96,7 +96,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { // scalastyle:on val taskHeaders: Seq[String] = Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++ - Seq("Duration", "GC Time", "Result Ser Time") ++ + Seq("Duration", "GC Time", "Result Ser Time", "Custom Metrics") ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++ {if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++ @@ -220,6 +220,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("") val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) + val customMetrics = metrics.map(_.customMetrics).getOrElse(scala.collection.mutable.HashMap()) val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead) val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("") @@ -261,6 +262,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { {if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""} + + {customMetrics.foldLeft("")( (previous, pair) => { + (if (previous.isEmpty) "" else previous + "\n") + pair._1 + " = " + pair._2.mkString(",") + })} + {if (shuffleRead) { {shuffleReadReadable} diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 09825087bb048..c381bb1575f91 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -215,6 +215,10 @@ private[spark] object JsonProtocol { ("Status" -> blockStatusToJson(status)) }) }.getOrElse(JNothing) + val customMetrics = + taskMetrics.customMetrics.map { case (name, value) => + customMetricToJson(name, value) + } ("Host Name" -> taskMetrics.hostname) ~ ("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~ ("Executor Run Time" -> taskMetrics.executorRunTime) ~ @@ -225,7 +229,12 @@ private[spark] object JsonProtocol { ("Disk Bytes Spilled" -> taskMetrics.diskBytesSpilled) ~ ("Shuffle Read Metrics" -> shuffleReadMetrics) ~ ("Shuffle Write Metrics" -> shuffleWriteMetrics) ~ - ("Updated Blocks" -> updatedBlocks) + ("Updated Blocks" -> updatedBlocks) ~ + ("Custom Metrics" -> customMetrics) + } + + def customMetricToJson(metricName: String, metricValue: List[Long]): JValue = { + metricName -> metricValue } def shuffleReadMetricsToJson(shuffleReadMetrics: ShuffleReadMetrics): JValue = { @@ -527,6 +536,13 @@ private[spark] object JsonProtocol { (id, status) } } + + // set customMetrics + for ( + obj <- (json \ "Custom Metrics").extract[List[JValue]]; + (metricName, JArray(v)) <- obj.asInstanceOf[JObject].obj; + metricValue <- v.map(_.extract[Long]) + ) metrics.setCustomMetric(metricName, metricValue) metrics } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 3031015256ec9..1a148efe09344 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -70,6 +70,13 @@ class JsonProtocolSuite extends FunSuite { testEvent(applicationEnd, applicationEndJsonString) } + test("Custom Metrics in TaskMetrics") { + val taskMetric = makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8) + taskMetric.setCustomMetric("Custom TaskMetric 1", 1) + taskMetric.setCustomMetric("Custom TaskMetric 2", 2) + testTaskMetrics(taskMetric) + } + test("Dependent Classes") { testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L)) testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L)) @@ -280,6 +287,7 @@ class JsonProtocolSuite extends FunSuite { assertOptionEquals( metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics, assertShuffleWriteEquals) assertOptionEquals(metrics1.updatedBlocks, metrics2.updatedBlocks, assertBlocksEquals) + assert(metrics1.customMetrics === metrics2.customMetrics) } private def assertEquals(metrics1: ShuffleReadMetrics, metrics2: ShuffleReadMetrics) {