From 5bc3cba549a586b40e66d5363ba98f18e089cc6c Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Thu, 16 Oct 2014 18:05:28 -0700 Subject: [PATCH 1/5] [SPARK-3984] [SPARK-3983] Improve UI task metrics. This commit fixes the scheduler delay in the UI (which previously included things that are not scheduler delay, like time to deserialize the task and serialize the result), and also adds finer-grained information to the summary table for each stage about task launch overhead (which is useful for debugging performance of short jobs, where the overhead is not-insignificant). --- .../org/apache/spark/executor/Executor.scala | 11 +++++++-- .../apache/spark/executor/TaskMetrics.scala | 5 ++++ .../scala/org/apache/spark/ui/ToolTips.scala | 3 +++ .../org/apache/spark/ui/jobs/StagePage.scala | 24 ++++++++++++++++++- .../spark/ui/jobs/TaskDetailsClassNames.scala | 2 ++ .../org/apache/spark/util/JsonProtocol.scala | 2 ++ .../apache/spark/util/JsonProtocolSuite.scala | 12 ++++++++++ 7 files changed, 56 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 2889f59e33e8..22666e508fd5 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -110,10 +110,15 @@ private[spark] class Executor( // Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] + // Time when the task arrived on the executor. Used to track the overhead of getting a thread for + // the task to run in. + private val taskStartTimes = new ConcurrentHashMap[Long, Long] + startDriverHeartbeater() def launchTask( context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) { + taskStartTimes.put(taskId, System.currentTimeMillis) val tr = new TaskRunner(context, taskId, taskName, serializedTask) runningTasks.put(taskId, tr) threadPool.execute(tr) @@ -152,7 +157,7 @@ private[spark] class Executor( } override def run() { - val startTime = System.currentTimeMillis() + val deserializeStartTime = System.currentTimeMillis() Thread.currentThread.setContextClassLoader(replClassLoader) val ser = SparkEnv.get.closureSerializer.newInstance() logInfo(s"Running $taskName (TID $taskId)") @@ -197,7 +202,8 @@ private[spark] class Executor( val afterSerialization = System.currentTimeMillis() for (m <- task.metrics) { - m.executorDeserializeTime = taskStart - startTime + m.executorLaunchTime = deserializeStartTime - taskStartTimes.get(taskId) + m.executorDeserializeTime = taskStart - deserializeStartTime m.executorRunTime = taskFinish - taskStart m.jvmGCTime = gcTime - startGCTime m.resultSerializationTime = afterSerialization - beforeSerialization @@ -267,6 +273,7 @@ private[spark] class Executor( // Release memory used by this thread for unrolling blocks env.blockManager.memoryStore.releaseUnrollMemoryForThisThread() runningTasks.remove(taskId) + taskStartTimes.remove(taskId) } } } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 57bc2b40cec4..9a6154901931 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -41,6 +41,11 @@ class TaskMetrics extends Serializable { */ var hostname: String = _ + /** + * Time taken on the executor to launch the task in its own thread. + */ + var executorLaunchTime: Long = _ + /** * Time taken on the executor to deserialize this task */ diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala index f02904df31fc..a3dc13fe3bee 100644 --- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala +++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala @@ -18,6 +18,9 @@ package org.apache.spark.ui private[spark] object ToolTips { + val EXECUTOR_LAUNCH_TIME = + """Overhead associated with launching the task in its own thread on the executor.""" + val SCHEDULER_DELAY = """Scheduler delay includes time to ship the task from the scheduler to the executor, and time to send the task result from the executor to the scheduler. If diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index bf45272aefde..d70cbfdade41 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -179,6 +179,20 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { } } + val executorLaunchTimes = validTasks.map { case TaskUIData(_, metrics, _) => + metrics.get.executorLaunchTime.toDouble + } + val executorLaunchTitle = Launch time + val executorLaunchQuantiles = + executorLaunchTitle +: getFormattedTimeQuantiles(executorLaunchTimes) + + val deserializationTimes = validTasks.map { case TaskUIData(_, metrics, _) => + metrics.get.executorDeserializeTime.toDouble + } + val deserializationQuantiles = + Task deserialization time +: getFormattedTimeQuantiles(deserializationTimes) + val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.executorRunTime.toDouble } @@ -248,6 +262,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble } + val shuffleWriteQuantiles = Shuffle Write +: getFormattedSizeQuantiles(shuffleWriteSizes) @@ -266,6 +281,10 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val listings: Seq[Seq[Node]] = Seq( {serviceQuantiles}, {schedulerDelayQuantiles}, + {executorLaunchQuantiles} + + {deserializationQuantiles} + {gcQuantiles}, {serializationQuantiles} @@ -424,6 +443,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { (info.finishTime - info.launchTime) } } - totalExecutionTime - metrics.executorRunTime + val executorOverhead = (metrics.executorLaunchTime + + metrics.executorDeserializeTime + + metrics.resultSerializationTime) + totalExecutionTime - metrics.executorRunTime - executorOverhead } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala b/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala index 23d672cabda0..3c400099f2ca 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala @@ -24,6 +24,8 @@ package org.apache.spark.ui.jobs private object TaskDetailsClassNames { val SCHEDULER_DELAY = "scheduler_delay" val GC_TIME = "gc_time" + val EXECUTOR_LAUNCH_TIME = "launch_time" + val TASK_DESERIALIZATION_TIME = "deserialization_time" val RESULT_SERIALIZATION_TIME = "serialization_time" val GETTING_RESULT_TIME = "getting_result_time" } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 5b2e7d3a7edb..2c58699d0ddc 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -240,6 +240,7 @@ private[spark] object JsonProtocol { }) }.getOrElse(JNothing) ("Host Name" -> taskMetrics.hostname) ~ + ("Executor Launch Time" -> taskMetrics.executorLaunchTime) ~ ("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~ ("Executor Run Time" -> taskMetrics.executorRunTime) ~ ("Result Size" -> taskMetrics.resultSize) ~ @@ -562,6 +563,7 @@ private[spark] object JsonProtocol { } val metrics = new TaskMetrics metrics.hostname = (json \ "Host Name").extract[String] + metrics.executorLaunchTime = (json \ "Executor Launch Time").extractOpt[Long].getOrElse(0) metrics.executorDeserializeTime = (json \ "Executor Deserialize Time").extract[Long] metrics.executorRunTime = (json \ "Executor Run Time").extract[Long] metrics.resultSize = (json \ "Result Size").extract[Long] diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index f1f88c5fd363..6e07c7c3952e 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -154,6 +154,15 @@ class JsonProtocolSuite extends FunSuite { assert(newMetrics.inputMetrics.isEmpty) } + test("TaskMetrics.executorLaunchTime backward compatibility") { + // executorLaunchTime was added after 1.1. + val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true) + val newJson = JsonProtocol.taskMetricsToJson(metrics) + val oldJson = newJson.removeField { case (field, _) => field == "Executor Launch Time" } + val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) + assert(newMetrics.executorLaunchTime === 0L) + } + test("BlockManager events backward compatibility") { // SparkListenerBlockManagerAdded/Removed in Spark 1.0.0 do not have a "time" property. val blockManagerAdded = SparkListenerBlockManagerAdded(1L, @@ -554,6 +563,7 @@ class JsonProtocolSuite extends FunSuite { val t = new TaskMetrics val sw = new ShuffleWriteMetrics t.hostname = "localhost" + t.executorLaunchTime = c + d t.executorDeserializeTime = a t.executorRunTime = b t.resultSize = c @@ -796,6 +806,7 @@ class JsonProtocolSuite extends FunSuite { | }, | "Task Metrics": { | "Host Name": "localhost", + | "Executor Launch Time": 1100, | "Executor Deserialize Time": 300, | "Executor Run Time": 400, | "Result Size": 500, @@ -879,6 +890,7 @@ class JsonProtocolSuite extends FunSuite { | }, | "Task Metrics": { | "Host Name": "localhost", + | "Executor Launch Time": 1100, | "Executor Deserialize Time": 300, | "Executor Run Time": 400, | "Result Size": 500, From 335be4bef9dbf724f7058161057acf1782c92377 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Fri, 31 Oct 2014 17:22:05 -0700 Subject: [PATCH 2/5] Made metrics hideable --- .../scala/org/apache/spark/ui/ToolTips.scala | 9 ++-- .../org/apache/spark/ui/jobs/StagePage.scala | 43 +++++++++++++++++-- 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala index a3dc13fe3bee..b0510b140bb1 100644 --- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala +++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala @@ -18,15 +18,18 @@ package org.apache.spark.ui private[spark] object ToolTips { - val EXECUTOR_LAUNCH_TIME = - """Overhead associated with launching the task in its own thread on the executor.""" - val SCHEDULER_DELAY = """Scheduler delay includes time to ship the task from the scheduler to the executor, and time to send the task result from the executor to the scheduler. If scheduler delay is large, consider decreasing the size of tasks or decreasing the size of task results.""" + val EXECUTOR_LAUNCH_TIME = + """Overhead associated with launching the task in its own thread on the executor.""" + + val TASK_DESERIALIZATION_TIME = + """Time spent deserializating the task closure on the executor.""" + val INPUT = "Bytes read from Hadoop or from Spark storage." val SHUFFLE_WRITE = "Bytes written to disk in order to be read by a shuffle in a future stage." diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index d70cbfdade41..e276907836c6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -119,6 +119,20 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { GC Time +
  • + + + Executor Launch Time + +
  • +
  • + + + Task Deserialization Time + +
  • @@ -148,6 +162,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { ("Executor ID / Host", ""), ("Launch Time", ""), ("Duration", ""), ("Scheduler Delay", TaskDetailsClassNames.SCHEDULER_DELAY), ("GC Time", TaskDetailsClassNames.GC_TIME), + ("Executor Launch Time", TaskDetailsClassNames.EXECUTOR_LAUNCH_TIME), + ("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME), ("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME), ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++ {if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++ @@ -182,8 +198,13 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val executorLaunchTimes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.executorLaunchTime.toDouble } - val executorLaunchTitle = Launch time + val executorLaunchTitle = + + + Executor Launch Time + + val executorLaunchQuantiles = executorLaunchTitle +: getFormattedTimeQuantiles(executorLaunchTimes) @@ -191,7 +212,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { metrics.get.executorDeserializeTime.toDouble } val deserializationQuantiles = - Task deserialization time +: getFormattedTimeQuantiles(deserializationTimes) + + + Task Deserialization Time + + +: getFormattedTimeQuantiles(deserializationTimes) val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.executorRunTime.toDouble @@ -333,7 +359,10 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("") val schedulerDelay = getSchedulerDelay(info, metrics.get) val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) + val executorLaunchTime = metrics.map(_.executorLaunchTime).getOrElse(0L) + val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L) val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) + val gettingResultTime = info.gettingResultTime val maybeAccumulators = info.accumulables @@ -389,6 +418,14 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""} + + {UIUtils.formatDuration(executorLaunchTime.toLong)} + + + {UIUtils.formatDuration(taskDeserializationTime.toLong)} + {UIUtils.formatDuration(serializationTime)} From 1f13afe3f07b64f94ac49f17f52ce778e7c68c29 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Sat, 1 Nov 2014 16:13:21 -0700 Subject: [PATCH 3/5] Minor spacing fixes --- core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index e276907836c6..72d85ba9e4f4 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -288,7 +288,6 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble } - val shuffleWriteQuantiles = Shuffle Write +: getFormattedSizeQuantiles(shuffleWriteSizes) @@ -362,7 +361,6 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val executorLaunchTime = metrics.map(_.executorLaunchTime).getOrElse(0L) val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L) val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) - val gettingResultTime = info.gettingResultTime val maybeAccumulators = info.accumulables From 531575d381b5e4967d5b2f4385c5135040f98165 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Tue, 4 Nov 2014 22:52:16 -0800 Subject: [PATCH 4/5] Removed executor launch time --- .../org/apache/spark/executor/Executor.scala | 7 ----- .../apache/spark/executor/TaskMetrics.scala | 5 ---- .../scala/org/apache/spark/ui/ToolTips.scala | 3 -- .../org/apache/spark/ui/jobs/StagePage.scala | 30 +------------------ .../spark/ui/jobs/TaskDetailsClassNames.scala | 1 - .../org/apache/spark/util/JsonProtocol.scala | 2 -- .../apache/spark/util/JsonProtocolSuite.scala | 12 -------- 7 files changed, 1 insertion(+), 59 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 22666e508fd5..3ddfb0a3fd2e 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -110,15 +110,10 @@ private[spark] class Executor( // Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] - // Time when the task arrived on the executor. Used to track the overhead of getting a thread for - // the task to run in. - private val taskStartTimes = new ConcurrentHashMap[Long, Long] - startDriverHeartbeater() def launchTask( context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) { - taskStartTimes.put(taskId, System.currentTimeMillis) val tr = new TaskRunner(context, taskId, taskName, serializedTask) runningTasks.put(taskId, tr) threadPool.execute(tr) @@ -202,7 +197,6 @@ private[spark] class Executor( val afterSerialization = System.currentTimeMillis() for (m <- task.metrics) { - m.executorLaunchTime = deserializeStartTime - taskStartTimes.get(taskId) m.executorDeserializeTime = taskStart - deserializeStartTime m.executorRunTime = taskFinish - taskStart m.jvmGCTime = gcTime - startGCTime @@ -273,7 +267,6 @@ private[spark] class Executor( // Release memory used by this thread for unrolling blocks env.blockManager.memoryStore.releaseUnrollMemoryForThisThread() runningTasks.remove(taskId) - taskStartTimes.remove(taskId) } } } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 9a6154901931..57bc2b40cec4 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -41,11 +41,6 @@ class TaskMetrics extends Serializable { */ var hostname: String = _ - /** - * Time taken on the executor to launch the task in its own thread. - */ - var executorLaunchTime: Long = _ - /** * Time taken on the executor to deserialize this task */ diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala index b0510b140bb1..51dc08f668a4 100644 --- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala +++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala @@ -24,9 +24,6 @@ private[spark] object ToolTips { scheduler delay is large, consider decreasing the size of tasks or decreasing the size of task results.""" - val EXECUTOR_LAUNCH_TIME = - """Overhead associated with launching the task in its own thread on the executor.""" - val TASK_DESERIALIZATION_TIME = """Time spent deserializating the task closure on the executor.""" diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 72d85ba9e4f4..5771778a75b6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -119,13 +119,6 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { GC Time
  • -
  • - - - Executor Launch Time - -
  • @@ -162,7 +155,6 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { ("Executor ID / Host", ""), ("Launch Time", ""), ("Duration", ""), ("Scheduler Delay", TaskDetailsClassNames.SCHEDULER_DELAY), ("GC Time", TaskDetailsClassNames.GC_TIME), - ("Executor Launch Time", TaskDetailsClassNames.EXECUTOR_LAUNCH_TIME), ("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME), ("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME), ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++ @@ -195,19 +187,6 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { } } - val executorLaunchTimes = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.executorLaunchTime.toDouble - } - val executorLaunchTitle = - - - Executor Launch Time - - - val executorLaunchQuantiles = - executorLaunchTitle +: getFormattedTimeQuantiles(executorLaunchTimes) - val deserializationTimes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.executorDeserializeTime.toDouble } @@ -306,7 +285,6 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val listings: Seq[Seq[Node]] = Seq( {serviceQuantiles}, {schedulerDelayQuantiles}, - {executorLaunchQuantiles} {deserializationQuantiles} @@ -358,7 +336,6 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("") val schedulerDelay = getSchedulerDelay(info, metrics.get) val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) - val executorLaunchTime = metrics.map(_.executorLaunchTime).getOrElse(0L) val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L) val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) val gettingResultTime = info.gettingResultTime @@ -416,10 +393,6 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""} - - {UIUtils.formatDuration(executorLaunchTime.toLong)} - {UIUtils.formatDuration(taskDeserializationTime.toLong)} @@ -478,8 +451,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { (info.finishTime - info.launchTime) } } - val executorOverhead = (metrics.executorLaunchTime + - metrics.executorDeserializeTime + + val executorOverhead = (metrics.executorDeserializeTime + metrics.resultSerializationTime) totalExecutionTime - metrics.executorRunTime - executorOverhead } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala b/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala index 3c400099f2ca..eb371bd0ea7e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala @@ -24,7 +24,6 @@ package org.apache.spark.ui.jobs private object TaskDetailsClassNames { val SCHEDULER_DELAY = "scheduler_delay" val GC_TIME = "gc_time" - val EXECUTOR_LAUNCH_TIME = "launch_time" val TASK_DESERIALIZATION_TIME = "deserialization_time" val RESULT_SERIALIZATION_TIME = "serialization_time" val GETTING_RESULT_TIME = "getting_result_time" diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 2c58699d0ddc..5b2e7d3a7edb 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -240,7 +240,6 @@ private[spark] object JsonProtocol { }) }.getOrElse(JNothing) ("Host Name" -> taskMetrics.hostname) ~ - ("Executor Launch Time" -> taskMetrics.executorLaunchTime) ~ ("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~ ("Executor Run Time" -> taskMetrics.executorRunTime) ~ ("Result Size" -> taskMetrics.resultSize) ~ @@ -563,7 +562,6 @@ private[spark] object JsonProtocol { } val metrics = new TaskMetrics metrics.hostname = (json \ "Host Name").extract[String] - metrics.executorLaunchTime = (json \ "Executor Launch Time").extractOpt[Long].getOrElse(0) metrics.executorDeserializeTime = (json \ "Executor Deserialize Time").extract[Long] metrics.executorRunTime = (json \ "Executor Run Time").extract[Long] metrics.resultSize = (json \ "Result Size").extract[Long] diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 6e07c7c3952e..f1f88c5fd363 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -154,15 +154,6 @@ class JsonProtocolSuite extends FunSuite { assert(newMetrics.inputMetrics.isEmpty) } - test("TaskMetrics.executorLaunchTime backward compatibility") { - // executorLaunchTime was added after 1.1. - val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true) - val newJson = JsonProtocol.taskMetricsToJson(metrics) - val oldJson = newJson.removeField { case (field, _) => field == "Executor Launch Time" } - val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) - assert(newMetrics.executorLaunchTime === 0L) - } - test("BlockManager events backward compatibility") { // SparkListenerBlockManagerAdded/Removed in Spark 1.0.0 do not have a "time" property. val blockManagerAdded = SparkListenerBlockManagerAdded(1L, @@ -563,7 +554,6 @@ class JsonProtocolSuite extends FunSuite { val t = new TaskMetrics val sw = new ShuffleWriteMetrics t.hostname = "localhost" - t.executorLaunchTime = c + d t.executorDeserializeTime = a t.executorRunTime = b t.resultSize = c @@ -806,7 +796,6 @@ class JsonProtocolSuite extends FunSuite { | }, | "Task Metrics": { | "Host Name": "localhost", - | "Executor Launch Time": 1100, | "Executor Deserialize Time": 300, | "Executor Run Time": 400, | "Result Size": 500, @@ -890,7 +879,6 @@ class JsonProtocolSuite extends FunSuite { | }, | "Task Metrics": { | "Host Name": "localhost", - | "Executor Launch Time": 1100, | "Executor Deserialize Time": 300, | "Executor Run Time": 400, | "Result Size": 500, From 0c1398eebc6ca72522304ca5827234a9a887c056 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Wed, 5 Nov 2014 11:47:55 -0800 Subject: [PATCH 5/5] Fixed ordering --- .../org/apache/spark/ui/jobs/StagePage.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 5771778a75b6..1e57cc42ecf5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -114,16 +114,16 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
  • - - GC Time + title={ToolTips.TASK_DESERIALIZATION_TIME} data-placement="right"> + + Task Deserialization Time
  • - - Task Deserialization Time + title={ToolTips.GC_TIME} data-placement="right"> + + GC Time
  • @@ -154,8 +154,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { ("Index", ""), ("ID", ""), ("Attempt", ""), ("Status", ""), ("Locality Level", ""), ("Executor ID / Host", ""), ("Launch Time", ""), ("Duration", ""), ("Scheduler Delay", TaskDetailsClassNames.SCHEDULER_DELAY), - ("GC Time", TaskDetailsClassNames.GC_TIME), ("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME), + ("GC Time", TaskDetailsClassNames.GC_TIME), ("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME), ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++ {if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++ @@ -390,13 +390,13 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { class={TaskDetailsClassNames.SCHEDULER_DELAY}> {UIUtils.formatDuration(schedulerDelay.toLong)} - - {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""} - {UIUtils.formatDuration(taskDeserializationTime.toLong)} + + {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""} + {UIUtils.formatDuration(serializationTime)}