diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 350fd74173f65..207a45174bce3 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -81,6 +81,39 @@ class TaskMetrics extends Serializable { * Storage statuses of any blocks that have been updated as a result of this task. */ var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None + + /** + * Custom task-specific metrics. Any piece of Spark machinery that cares to track custom task-related metrics can + * now use the setCustomMetric(name,value) method on TaskMetrics to do that. + * e.g. Any RDD that wants to track a custom metric of its interest (related to execution of a task) can + * call setCustomMetric(name,value) inside its compute() method. + * Map Key -> name of custom metric + * Map Value -> list of numeric values for custom metric + */ + private val _customMetrics: scala.collection.mutable.HashMap[String, List[Long]] = scala.collection.mutable.HashMap() + + /** + * Custom task-specific metrics. Any piece of Spark machinery that cares to track custom task-related metrics can + * now use the setCustomMetric(name,value) method on TaskMetrics to do that. + * e.g. Any RDD that wants to track a custom metric of its interest (related to execution of a task) can + * call setCustomMetric(name,value) inside its compute() method. + * Map Key -> name of custom metric + * Map Value -> list of numeric values for custom metric + */ + def customMetrics = _customMetrics + + /** + * Convenience method for setting a custom metric + * @param metricName name of custom metric + * @param metricValue value for custom metric + */ + def setCustomMetric(metricName: String, metricValue: Long) { + if (_customMetrics.contains(metricName)) { + _customMetrics(metricName) = _customMetrics(metricName) ++ List(metricValue) + } else { + _customMetrics(metricName) = List(metricValue) + } + } } private[spark] object TaskMetrics { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 4bce472036f7d..ddf4f5a295947 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -96,7 +96,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { // scalastyle:on val taskHeaders: Seq[String] = Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++ - Seq("Duration", "GC Time", "Result Ser Time") ++ + Seq("Duration", "GC Time", "Result Ser Time", "Custom Metrics") ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++ {if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++ @@ -220,6 +220,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("") val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) + val customMetrics = metrics.map(_.customMetrics).getOrElse(scala.collection.mutable.HashMap()) val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead) val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("") @@ -261,6 +262,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {