From 365400b6f15d030647046bf2d20597c60576cde0 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Thu, 10 Apr 2014 13:55:44 -0700 Subject: [PATCH 01/15] [SPARK-1683] Track task read metrics. This commit adds a new metric in TaskMetrics to record the input data size and displays this information in the UI. An earlier version of this commit also added the read time, which can be useful for diagnosing straggler problems, but unfortunately that change introduced a significant performance regression for jobs that don't do much computation. In order to track read time, we'll need to do sampling. --- .../scala/org/apache/spark/CacheManager.scala | 10 ++- .../apache/spark/executor/TaskMetrics.scala | 29 +++++++ .../scala/org/apache/spark/rdd/BlockRDD.scala | 2 +- .../org/apache/spark/rdd/HadoopRDD.scala | 12 +++ .../org/apache/spark/rdd/NewHadoopRDD.scala | 10 +++ .../spark/rdd/ParallelCollectionRDD.scala | 12 ++- .../apache/spark/scheduler/JobLogger.scala | 15 +++- .../apache/spark/storage/BlockManager.scala | 61 ++++++++------ .../apache/spark/storage/ThreadingTest.scala | 2 +- .../apache/spark/ui/exec/ExecutorsPage.scala | 4 + .../apache/spark/ui/exec/ExecutorsTab.scala | 5 ++ .../spark/ui/jobs/ExecutorSummary.scala | 1 + .../apache/spark/ui/jobs/ExecutorTable.scala | 2 + .../spark/ui/jobs/JobProgressListener.scala | 15 ++-- .../org/apache/spark/ui/jobs/StagePage.scala | 36 ++++++-- .../org/apache/spark/ui/jobs/StageTable.scala | 7 ++ .../org/apache/spark/CacheManagerSuite.scala | 4 +- .../spark/scheduler/SparkListenerSuite.scala | 2 + .../spark/storage/BlockManagerSuite.scala | 84 ++++++++++++++----- 19 files changed, 240 insertions(+), 73 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 3f667a4a0f9c..7807a5103878 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -19,6 +19,7 @@ package org.apache.spark import scala.collection.mutable.{ArrayBuffer, HashSet} +import org.apache.spark.executor.InputMetrics import org.apache.spark.rdd.RDD import org.apache.spark.storage._ @@ -41,9 +42,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { val key = RDDBlockId(rdd.id, partition.index) logDebug(s"Looking for partition $key") blockManager.get(key) match { - case Some(values) => + case Some(blockResult) => // Partition is already materialized, so just return its values - new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) + context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics) + new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) case None => // Acquire a lock for loading this partition @@ -110,7 +112,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { logInfo(s"Whoever was loading $id failed; we'll try it ourselves") loading.add(id) } - values.map(_.asInstanceOf[Iterator[T]]) + values.data.map(_.asInstanceOf[Iterator[T]]) } } } @@ -132,7 +134,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { * exceptions that can be avoided. */ updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true) blockManager.get(key) match { - case Some(v) => v.asInstanceOf[Iterator[T]] + case Some(v) => v.data.asInstanceOf[Iterator[T]] case None => logInfo(s"Failure to store $key") throw new BlockException(key, s"Block manager failed to return cached value for $key!") diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 350fd74173f6..dbfd44547dfe 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -66,6 +66,12 @@ class TaskMetrics extends Serializable { */ var diskBytesSpilled: Long = _ + /** + * If this task reads from a HadoopRDD, from cached data, or from a parallelized collection, + * metrics on how much data was read are stored here. + */ + var inputMetrics: Option[InputMetrics] = None + /** * If this task reads from shuffle output, metrics on getting shuffle data will be collected here */ @@ -87,6 +93,29 @@ private[spark] object TaskMetrics { def empty: TaskMetrics = new TaskMetrics } +/** + * :: DeveloperApi :: + * Method by which input data was read. Network means that the data was read over the network + * from a remote block manager. + */ +@DeveloperApi +private[spark] object DataReadMethod extends Enumeration with Serializable { + type DataReadMethod = Value + val Memory, Disk, Hdfs, Network = Value +} + +/** + * :: DeveloperApi :: + * Metrics about reading input data. + */ +@DeveloperApi +case class InputMetrics(val readMethod: DataReadMethod.Value) { + /** + * Total bytes read. + */ + var bytesRead: Long = 0L +} + /** * :: DeveloperApi :: diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala index c64da8804d16..2673ec22509e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala @@ -46,7 +46,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds val blockManager = SparkEnv.get.blockManager val blockId = split.asInstanceOf[BlockRDDPartition].blockId blockManager.get(blockId) match { - case Some(block) => block.asInstanceOf[Iterator[T]] + case Some(block) => block.data.asInstanceOf[Iterator[T]] case None => throw new Exception("Could not compute split, block " + blockId + " not found") } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 2aa111d600e9..ce72b6b32f90 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -38,6 +38,7 @@ import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.executor.{DataReadMethod, InputMetrics} import org.apache.spark.util.NextIterator /** @@ -196,6 +197,17 @@ class HadoopRDD[K, V]( context.addOnCompleteCallback{ () => closeIfNeeded() } val key: K = reader.createKey() val value: V = reader.createValue() + + // Set the task input metrics. + val inputMetrics = new InputMetrics(DataReadMethod.Hdfs) + try { + inputMetrics.bytesRead = split.inputSplit.value.getLength() + } catch { + case e: java.io.IOException => + logWarning("Unable to get input size to set InputMetrics for task", e) + } + context.taskMetrics.inputMetrics = Some(inputMetrics) + override def getNext() = { try { finished = !reader.next(key, value) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index ac1ccc06f238..ddbc099d90a6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -31,6 +31,7 @@ import org.apache.spark.Logging import org.apache.spark.Partition import org.apache.spark.SerializableWritable import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.executor.{DataReadMethod, InputMetrics} private[spark] class NewHadoopPartition( rddId: Int, @@ -112,6 +113,15 @@ class NewHadoopRDD[K, V]( split.serializableHadoopSplit.value, hadoopAttemptContext) reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) + val inputMetrics = new InputMetrics(DataReadMethod.Hdfs) + try { + inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength() + } catch { + case e: Exception => + logWarning("Unable to get input split size in order to set task input bytes", e) + } + context.taskMetrics.inputMetrics = Some(inputMetrics) + // Register an on-task-completion callback to close the input stream. context.addOnCompleteCallback(() => close()) var havePair = false diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala index 66c71bf7e8bb..7e240ca9ddf5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala @@ -26,8 +26,9 @@ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import org.apache.spark._ +import org.apache.spark.executor.{DataReadMethod, InputMetrics} import org.apache.spark.serializer.JavaSerializer -import org.apache.spark.util.Utils +import org.apache.spark.util.{SizeEstimator, Utils} private[spark] class ParallelCollectionPartition[T: ClassTag]( var rddId: Long, @@ -99,7 +100,14 @@ private[spark] class ParallelCollectionRDD[T: ClassTag]( } override def compute(s: Partition, context: TaskContext) = { - new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator) + val parallelCollectionsPartition = s.asInstanceOf[ParallelCollectionPartition[T]] + + // Set the input metrics for the task. + val inputMetrics = new InputMetrics(DataReadMethod.Memory) + inputMetrics.bytesRead = SizeEstimator.estimate(parallelCollectionsPartition.values) + context.taskMetrics.inputMetrics = Some(inputMetrics) + + new InterruptibleIterator(context, parallelCollectionsPartition.iterator) } override def getPreferredLocations(s: Partition): Seq[String] = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index a1e21cad48b9..47dd112f6832 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -26,7 +26,9 @@ import scala.collection.mutable.HashMap import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.executor.{DataReadMethod, TaskMetrics} +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel /** * :: DeveloperApi :: @@ -160,7 +162,13 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime + " EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskMetrics.hostname val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime - val readMetrics = taskMetrics.shuffleReadMetrics match { + val inputMetrics = taskMetrics.inputMetrics match { + case Some(metrics) => + " READ_METHOD=" + metrics.readMethod.toString + + " INPUT_BYTES=" + metrics.bytesRead + case None => "" + } + val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match { case Some(metrics) => " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime + " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + @@ -174,7 +182,8 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten case None => "" } - stageLogInfo(stageId, status + info + executorRunTime + readMetrics + writeMetrics) + stageLogInfo(stageId, status + info + executorRunTime + inputMetrics + shuffleReadMetrics + + writeMetrics) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index d2f7baf928b6..a67cef44c0f4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -29,6 +29,7 @@ import akka.actor.{ActorSystem, Cancellable, Props} import sun.nio.ch.DirectBuffer import org.apache.spark._ +import org.apache.spark.executor.{DataReadMethod, InputMetrics} import org.apache.spark.io.CompressionCodec import org.apache.spark.network._ import org.apache.spark.serializer.Serializer @@ -39,6 +40,13 @@ private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValu private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues +/* Class for returning a fetched block and associated metrics. */ +private[spark] class BlockResult(val data: Iterator[Any], readMethod: DataReadMethod.Value, + bytes: Long) { + val inputMetrics = new InputMetrics(readMethod) + inputMetrics.bytesRead = bytes +} + private[spark] class BlockManager( executorId: String, actorSystem: ActorSystem, @@ -334,9 +342,9 @@ private[spark] class BlockManager( /** * Get block from local block manager. */ - def getLocal(blockId: BlockId): Option[Iterator[Any]] = { + def getLocal(blockId: BlockId): Option[BlockResult] = { logDebug(s"Getting local block $blockId") - doGetLocal(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]] + doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]] } /** @@ -355,11 +363,11 @@ private[spark] class BlockManager( blockId, s"Block $blockId not found on disk, though it should be") } } else { - doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]] + doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]] } } - private def doGetLocal(blockId: BlockId, asValues: Boolean): Option[Any] = { + private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { @@ -386,14 +394,14 @@ private[spark] class BlockManager( // Look for the block in memory if (level.useMemory) { logDebug(s"Getting block $blockId from memory") - val result = if (asValues) { - memoryStore.getValues(blockId) + val result = if (asBlockResult) { + memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size)) } else { memoryStore.getBytes(blockId) } result match { case Some(values) => - return Some(values) + return result case None => logDebug(s"Block $blockId not found in memory") } @@ -405,10 +413,11 @@ private[spark] class BlockManager( if (tachyonStore.contains(blockId)) { tachyonStore.getBytes(blockId) match { case Some(bytes) => - if (!asValues) { + if (!asBlockResult) { return Some(bytes) } else { - return Some(dataDeserialize(blockId, bytes)) + return Some(new BlockResult( + dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size)) } case None => logDebug(s"Block $blockId not found in tachyon") @@ -429,14 +438,15 @@ private[spark] class BlockManager( if (!level.useMemory) { // If the block shouldn't be stored in memory, we can just return it - if (asValues) { - return Some(dataDeserialize(blockId, bytes)) + if (asBlockResult) { + return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk, + info.size)) } else { return Some(bytes) } } else { // Otherwise, we also have to store something in the memory store - if (!level.deserialized || !asValues) { + if (!level.deserialized || !asBlockResult) { /* We'll store the bytes in memory if the block's storage level includes * "memory serialized", or if it should be cached as objects in memory * but we only requested its serialized bytes. */ @@ -445,7 +455,7 @@ private[spark] class BlockManager( memoryStore.putBytes(blockId, copyForMemory, level) bytes.rewind() } - if (!asValues) { + if (!asBlockResult) { return Some(bytes) } else { val values = dataDeserialize(blockId, bytes) @@ -457,12 +467,12 @@ private[spark] class BlockManager( memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data match { case Left(values2) => - return Some(values2) + return Some(new BlockResult(values2, DataReadMethod.Disk, info.size)) case _ => - throw new SparkException("Memory store did not return an iterator") + throw new Exception("Memory store did not return back an iterator") } } else { - return Some(values) + return Some(new BlockResult(values, DataReadMethod.Disk, info.size)) } } } @@ -477,9 +487,9 @@ private[spark] class BlockManager( /** * Get block from remote block managers. */ - def getRemote(blockId: BlockId): Option[Iterator[Any]] = { + def getRemote(blockId: BlockId): Option[BlockResult] = { logDebug(s"Getting remote block $blockId") - doGetRemote(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]] + doGetRemote(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]] } /** @@ -487,10 +497,10 @@ private[spark] class BlockManager( */ def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = { logDebug(s"Getting remote block $blockId as bytes") - doGetRemote(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]] + doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]] } - private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = { + private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { require(blockId != null, "BlockId is null") val locations = Random.shuffle(master.getLocations(blockId)) for (loc <- locations) { @@ -498,8 +508,11 @@ private[spark] class BlockManager( val data = BlockManagerWorker.syncGetBlock( GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) if (data != null) { - if (asValues) { - return Some(dataDeserialize(blockId, data)) + if (asBlockResult) { + return Some(new BlockResult( + dataDeserialize(blockId, data), + DataReadMethod.Network, + data.limit())) } else { return Some(data) } @@ -513,7 +526,7 @@ private[spark] class BlockManager( /** * Get a block from the block manager (either local or remote). */ - def get(blockId: BlockId): Option[Iterator[Any]] = { + def get(blockId: BlockId): Option[BlockResult] = { val local = getLocal(blockId) if (local.isDefined) { logInfo(s"Found block $blockId locally") @@ -792,7 +805,7 @@ private[spark] class BlockManager( * Read a block consisting of a single object. */ def getSingle(blockId: BlockId): Option[Any] = { - get(blockId).map(_.next()) + get(blockId).map(_.data.next()) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala index a107c5182b3b..328be158db68 100644 --- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala @@ -78,7 +78,7 @@ private[spark] object ThreadingTest { val startTime = System.currentTimeMillis() manager.get(blockId) match { case Some(retrievedBlock) => - assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, + assert(retrievedBlock.data.toList.asInstanceOf[List[Int]] == block.toList, "Block " + blockId + " did not match") println("Got block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms") diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 2d8c3b949c1a..fc85e4eebec4 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -71,6 +71,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { "Complete Tasks", "Total Tasks", "Task Time", + "Input Bytes", "Shuffle Read", "Shuffle Write") @@ -96,6 +97,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { {values("Complete Tasks")} {values("Total Tasks")} {Utils.msDurationToString(values("Task Time").toLong)} + {Utils.msDurationToString(values("Input Bytes").toLong)} {Utils.bytesToString(values("Shuffle Read").toLong)} {Utils.bytesToString(values("Shuffle Write").toLong)} @@ -116,6 +118,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0) val totalTasks = activeTasks + failedTasks + completedTasks val totalDuration = listener.executorToDuration.getOrElse(execId, 0) + val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0) val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0) val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0) @@ -133,6 +136,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { completedTasks, totalTasks, totalDuration, + totalInputBytes, totalShuffleRead, totalShuffleWrite, maxMem diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 91d37b835b19..58eeb86bf9a3 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -46,6 +46,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) val executorToTasksComplete = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]() val executorToDuration = HashMap[String, Long]() + val executorToInputBytes = HashMap[String, Long]() val executorToShuffleRead = HashMap[String, Long]() val executorToShuffleWrite = HashMap[String, Long]() @@ -72,6 +73,10 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) // Update shuffle read/write val metrics = taskEnd.taskMetrics if (metrics != null) { + metrics.inputMetrics.foreach { inputMetrics => + executorToInputBytes(eid) = + executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead + } metrics.shuffleReadMetrics.foreach { shuffleRead => executorToShuffleRead(eid) = executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala index 2aaf6329b792..c4a8996c0b9a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala @@ -28,6 +28,7 @@ class ExecutorSummary { var taskTime : Long = 0 var failedTasks : Int = 0 var succeededTasks : Int = 0 + var inputBytes: Long = 0 var shuffleRead : Long = 0 var shuffleWrite : Long = 0 var memoryBytesSpilled : Long = 0 diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index add0e9878a54..2a34a9af925d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -43,6 +43,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { Total Tasks Failed Tasks Succeeded Tasks + Input Bytes Shuffle Read Shuffle Write Shuffle Spill (Memory) @@ -75,6 +76,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { {v.failedTasks + v.succeededTasks} {v.failedTasks} {v.succeededTasks} + {Utils.bytesToString(v.inputBytes)} {Utils.bytesToString(v.shuffleRead)} {Utils.bytesToString(v.shuffleWrite)} {Utils.bytesToString(v.memoryBytesSpilled)} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 381a5443df8b..2286a7f952f2 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -46,13 +46,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { val completedStages = ListBuffer[StageInfo]() val failedStages = ListBuffer[StageInfo]() - // Total metrics reflect metrics only for completed tasks - var totalTime = 0L - var totalShuffleRead = 0L - var totalShuffleWrite = 0L - // TODO: Should probably consolidate all following into a single hash map. val stageIdToTime = HashMap[Int, Long]() + val stageIdToInputBytes = HashMap[Int, Long]() val stageIdToShuffleRead = HashMap[Int, Long]() val stageIdToShuffleWrite = HashMap[Int, Long]() val stageIdToMemoryBytesSpilled = HashMap[Int, Long]() @@ -93,6 +89,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { val toRemove = math.max(retainedStages / 10, 1) stages.take(toRemove).foreach { s => stageIdToTime.remove(s.stageId) + stageIdToInputBytes.remove(s.stageId) stageIdToShuffleRead.remove(s.stageId) stageIdToShuffleWrite.remove(s.stageId) stageIdToMemoryBytesSpilled.remove(s.stageId) @@ -171,6 +168,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { val metrics = taskEnd.taskMetrics if (metrics != null) { + metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead } metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead } metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten } y.memoryBytesSpilled += metrics.memoryBytesSpilled @@ -200,18 +198,19 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { stageIdToTime.getOrElseUpdate(sid, 0L) val time = metrics.map(_.executorRunTime).getOrElse(0L) stageIdToTime(sid) += time - totalTime += time + + stageIdToInputBytes.getOrElseUpdate(sid, 0L) + val inputBytes = metrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L) + stageIdToInputBytes(sid) += inputBytes stageIdToShuffleRead.getOrElseUpdate(sid, 0L) val shuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L) stageIdToShuffleRead(sid) += shuffleRead - totalShuffleRead += shuffleRead stageIdToShuffleWrite.getOrElseUpdate(sid, 0L) val shuffleWrite = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L) stageIdToShuffleWrite(sid) += shuffleWrite - totalShuffleWrite += shuffleWrite stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L) val memoryBytesSpilled = metrics.map(_.memoryBytesSpilled).getOrElse(0L) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 8e3d5d1cd4c6..0d6c8af4690c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -48,6 +48,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime) val numCompleted = tasks.count(_.taskInfo.finished) + val inputBytes = listener.stageIdToInputBytes.getOrElse(stageId, 0L) + val hasInput = inputBytes > 0 val shuffleReadBytes = listener.stageIdToShuffleRead.getOrElse(stageId, 0L) val hasShuffleRead = shuffleReadBytes > 0 val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L) @@ -69,6 +71,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { Total task time across all tasks: {UIUtils.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)} + {if (hasInput) +
  • + Input: + {Utils.bytesToString(inputBytes)} +
  • + } {if (hasShuffleRead)
  • Shuffle read: @@ -96,15 +104,16 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { // scalastyle:on val taskHeaders: Seq[String] = Seq( - "Index", "ID", "Attempt", "Status", "Locality Level", "Executor", + "Task Index", "Task ID", "Attempt", "Status", "Locality Level", "Executor", "Launch Time", "Duration", "GC Time") ++ + {if (hasInput) Seq("Input") else Nil} ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++ {if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++ Seq("Errors") val taskTable = UIUtils.listingTable( - taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks) + taskHeaders, taskRow(hasInput, hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks) // Excludes tasks which failed and have incomplete metrics val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.taskMetrics.isDefined) @@ -159,6 +168,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { def getQuantileCols(data: Seq[Double]) = Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong)) + val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) => + metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble + } + val inputQuantiles = "Input" +: getQuantileCols(inputSizes) + val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble } @@ -186,6 +200,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { serviceQuantiles, gettingResultQuantiles, schedulerDelayQuantiles, + if (hasInput) inputQuantiles else Nil, if (hasShuffleRead) shuffleReadQuantiles else Nil, if (hasShuffleWrite) shuffleWriteQuantiles else Nil, if (hasBytesSpilled) memoryBytesSpilledQuantiles else Nil, @@ -211,7 +226,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean) (taskData: TaskUIData): Seq[Node] = { - taskData match { case TaskUIData(info, metrics, errorMessage) => + taskData match { case TaskUIData(info, metrics, exception) => val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis()) else metrics.map(_.executorRunTime).getOrElse(1L) val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration) @@ -219,6 +234,10 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) + val maybeInput = metrics.flatMap(_.inputMetrics).map(_.bytesRead) + val inputSortable = maybeInput.map(_.toString).getOrElse("") + val inputReadable = maybeInput.map(Utils.bytesToString).getOrElse("") + val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead) val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("") val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("") @@ -265,12 +284,17 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { {if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""} --> - {if (shuffleRead) { + {if (hasInput) { + + {inputReadable} + + }} + {if (hasShuffleRead) { {shuffleReadReadable} }} - {if (shuffleWrite) { + {if (hasShuffleWrite) { {writeTimeReadable} @@ -278,7 +302,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { {shuffleWriteReadable} }} - {if (bytesSpilled) { + {if (hasBytesSpilled) { {memoryBytesSpilledReadable} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 30971f769682..e7ba81927ebe 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -43,6 +43,7 @@ private[ui] class StageTableBase( Submitted Duration Tasks: Succeeded/Total + Input Shuffle Read Shuffle Write } @@ -123,6 +124,11 @@ private[ui] class StageTableBase( case _ => "" } val totalTasks = s.numTasks + val inputSortable = listener.stageIdToInputBytes.getOrElse(s.stageId, 0L) + val inputRead = inputSortable match { + case 0 => "" + case b => Utils.bytesToString(b) + } val shuffleReadSortable = listener.stageIdToShuffleRead.getOrElse(s.stageId, 0L) val shuffleRead = shuffleReadSortable match { case 0 => "" @@ -150,6 +156,7 @@ private[ui] class StageTableBase( {makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)} + {inputRead} {shuffleRead} {shuffleWrite} } diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index 4f178db40f63..7f5d0b061e8b 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.mock.EasyMockSugar +import org.apache.spark.executor.{DataReadMethod, TaskMetrics} import org.apache.spark.rdd.RDD import org.apache.spark.storage._ @@ -66,7 +67,8 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar test("get cached rdd") { expecting { - blockManager.get(RDDBlockId(0, 0)).andReturn(Some(ArrayBuffer(5, 6, 7).iterator)) + val result = new BlockResult(ArrayBuffer(5, 6, 7).iterator, DataReadMethod.Memory, 12) + blockManager.get(RDDBlockId(0, 0)).andReturn(Some(result)) } whenExecuting(blockManager) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 6df0a080961b..9f3d4d15c299 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -251,6 +251,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers taskInfoMetrics.foreach { case (taskInfo, taskMetrics) => taskMetrics.resultSize should be > (0l) if (stageInfo.rddInfos.exists(info => info.name == d2.name || info.name == d3.name)) { + taskMetrics.inputMetrics should be ('defined) + taskMetrics.inputMetrics.get.bytesRead should be > (0l) taskMetrics.shuffleWriteMetrics should be ('defined) taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index d7dbe5164b7f..23cb6905bfde 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -31,11 +31,13 @@ import org.scalatest.concurrent.Timeouts._ import org.scalatest.Matchers import org.scalatest.time.SpanSugar._ -import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf} +import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf, SparkContext} +import org.apache.spark.executor.DataReadMethod import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils} +import scala.collection.mutable.ArrayBuffer import scala.language.implicitConversions import scala.language.postfixOps @@ -415,6 +417,39 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } } + test("correct BlockResult returned from get() calls") { + store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr, + mapOutputTracker) + val list1 = List(new Array[Byte](200), new Array[Byte](200)) + val list1ForSizeEstimate = new ArrayBuffer[Any] + list1ForSizeEstimate ++= list1.iterator + val list1SizeEstimate = SizeEstimator.estimate(list1ForSizeEstimate) + val list2 = List(new Array[Byte](50), new Array[Byte](100), new Array[Byte](150)) + val list2ForSizeEstimate = new ArrayBuffer[Any] + list2ForSizeEstimate ++= list2.iterator + val list2SizeEstimate = SizeEstimator.estimate(list2ForSizeEstimate) + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.put("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.put("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true) + val list1Get = store.get("list1") + assert(list1Get.isDefined, "list1 expected to be in store") + assert(list1Get.get.data.size === 2) + assert(list1Get.get.inputMetrics.bytesRead === list1SizeEstimate) + assert(list1Get.get.inputMetrics.readMethod === DataReadMethod.Memory) + val list2MemoryGet = store.get("list2memory") + assert(list2MemoryGet.isDefined, "list2memory expected to be in store") + assert(list2MemoryGet.get.data.size === 3) + assert(list2MemoryGet.get.inputMetrics.bytesRead === list2SizeEstimate) + assert(list2MemoryGet.get.inputMetrics.readMethod === DataReadMethod.Memory) + val list2DiskGet = store.get("list2disk") + assert(list2DiskGet.isDefined, "list2memory expected to be in store") + assert(list2DiskGet.get.data.size === 3) + System.out.println(list2DiskGet) + // We don't know the exact size of the data on disk, but it should certainly be > 0. + assert(list2DiskGet.get.inputMetrics.bytesRead > 0) + assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk) + } + test("in-memory LRU storage") { store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr, mapOutputTracker) @@ -630,18 +665,18 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(store.get("list2").isDefined, "list2 was not in store") - assert(store.get("list2").get.size == 2) + assert(store.get("list2").get.data.size === 2) assert(store.get("list3").isDefined, "list3 was not in store") - assert(store.get("list3").get.size == 2) + assert(store.get("list3").get.data.size === 2) assert(store.get("list1") === None, "list1 was in store") assert(store.get("list2").isDefined, "list2 was not in store") - assert(store.get("list2").get.size == 2) + assert(store.get("list2").get.data.size === 2) // At this point list2 was gotten last, so LRU will getSingle rid of list3 store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(store.get("list1").isDefined, "list1 was not in store") - assert(store.get("list1").get.size == 2) + assert(store.get("list1").get.data.size === 2) assert(store.get("list2").isDefined, "list2 was not in store") - assert(store.get("list2").get.size == 2) + assert(store.get("list2").get.data.size === 2) assert(store.get("list3") === None, "list1 was in store") } @@ -656,28 +691,31 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true) + val listForSizeEstimate = new ArrayBuffer[Any] + listForSizeEstimate ++= list1.iterator + val listSize = SizeEstimator.estimate(listForSizeEstimate) // At this point LRU should not kick in because list3 is only on disk - assert(store.get("list1").isDefined, "list2 was not in store") - assert(store.get("list1").get.size === 2) - assert(store.get("list2").isDefined, "list3 was not in store") - assert(store.get("list2").get.size === 2) - assert(store.get("list3").isDefined, "list1 was not in store") - assert(store.get("list3").get.size === 2) - assert(store.get("list1").isDefined, "list2 was not in store") - assert(store.get("list1").get.size === 2) - assert(store.get("list2").isDefined, "list3 was not in store") - assert(store.get("list2").get.size === 2) - assert(store.get("list3").isDefined, "list1 was not in store") - assert(store.get("list3").get.size === 2) + assert(store.get("list1").isDefined, "list1 was not in store") + assert(store.get("list1").get.data.size === 2) + assert(store.get("list2").isDefined, "list2 was not in store") + assert(store.get("list2").get.data.size === 2) + assert(store.get("list3").isDefined, "list3 was not in store") + assert(store.get("list3").get.data.size === 2) + assert(store.get("list1").isDefined, "list1 was not in store") + assert(store.get("list1").get.data.size === 2) + assert(store.get("list2").isDefined, "list2 was not in store") + assert(store.get("list2").get.data.size === 2) + assert(store.get("list3").isDefined, "list3 was not in store") + assert(store.get("list3").get.data.size === 2) // Now let's add in list4, which uses both disk and memory; list1 should drop out store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true) assert(store.get("list1") === None, "list1 was in store") - assert(store.get("list2").isDefined, "list3 was not in store") - assert(store.get("list2").get.size === 2) - assert(store.get("list3").isDefined, "list1 was not in store") - assert(store.get("list3").get.size === 2) + assert(store.get("list2").isDefined, "list2 was not in store") + assert(store.get("list2").get.data.size === 2) + assert(store.get("list3").isDefined, "list3 was not in store") + assert(store.get("list3").get.data.size === 2) assert(store.get("list4").isDefined, "list4 was not in store") - assert(store.get("list4").get.size === 2) + assert(store.get("list4").get.data.size === 2) } test("negative byte values in ByteBufferInputStream") { From 4e5292515e22551aaf207478e6f93d0e7eb7f08b Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Tue, 24 Jun 2014 14:15:02 -0700 Subject: [PATCH 02/15] Added Json output and associated tests. --- .../scala/org/apache/spark/CacheManager.scala | 2 +- .../apache/spark/executor/TaskMetrics.scala | 2 +- .../org/apache/spark/util/JsonProtocol.scala | 20 +++++- .../apache/spark/util/JsonProtocolSuite.scala | 71 +++++++++++++++---- 4 files changed, 79 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 7807a5103878..8f867686a044 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -112,7 +112,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { logInfo(s"Whoever was loading $id failed; we'll try it ourselves") loading.add(id) } - values.data.map(_.asInstanceOf[Iterator[T]]) + values.map(_.data.asInstanceOf[Iterator[T]]) } } } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index dbfd44547dfe..8c07b0305639 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -109,7 +109,7 @@ private[spark] object DataReadMethod extends Enumeration with Serializable { * Metrics about reading input data. */ @DeveloperApi -case class InputMetrics(val readMethod: DataReadMethod.Value) { +case class InputMetrics(readMethod: DataReadMethod.Value) { /** * Total bytes read. */ diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 6245b4b8023c..26c9c9d6034e 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -26,7 +26,8 @@ import org.json4s.DefaultFormats import org.json4s.JsonDSL._ import org.json4s.JsonAST._ -import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics} +import org.apache.spark.executor.{DataReadMethod, InputMetrics, ShuffleReadMetrics, + ShuffleWriteMetrics, TaskMetrics} import org.apache.spark.scheduler._ import org.apache.spark.storage._ import org.apache.spark._ @@ -213,6 +214,8 @@ private[spark] object JsonProtocol { taskMetrics.shuffleReadMetrics.map(shuffleReadMetricsToJson).getOrElse(JNothing) val shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing) + val inputMetrics = + taskMetrics.inputMetrics.map(inputMetricsToJson).getOrElse(JNothing) val updatedBlocks = taskMetrics.updatedBlocks.map { blocks => JArray(blocks.toList.map { case (id, status) => @@ -230,6 +233,7 @@ private[spark] object JsonProtocol { ("Disk Bytes Spilled" -> taskMetrics.diskBytesSpilled) ~ ("Shuffle Read Metrics" -> shuffleReadMetrics) ~ ("Shuffle Write Metrics" -> shuffleWriteMetrics) ~ + ("Input Metrics" -> inputMetrics) ~ ("Updated Blocks" -> updatedBlocks) } @@ -247,6 +251,11 @@ private[spark] object JsonProtocol { ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime) } + def inputMetricsToJson(inputMetrics: InputMetrics): JValue = { + ("Data Read Method" -> inputMetrics.readMethod.toString) ~ + ("Bytes Read" -> inputMetrics.bytesRead) + } + def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = { val reason = Utils.getFormattedClassName(taskEndReason) val json = taskEndReason match { @@ -528,6 +537,8 @@ private[spark] object JsonProtocol { Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson) metrics.shuffleWriteMetrics = Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson) + metrics.inputMetrics = + Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson) metrics.updatedBlocks = Utils.jsonOption(json \ "Updated Blocks").map { value => value.extract[List[JValue]].map { block => @@ -557,6 +568,13 @@ private[spark] object JsonProtocol { metrics } + def inputMetricsFromJson(json: JValue): InputMetrics = { + val metrics = new InputMetrics( + DataReadMethod.withName((json \ "Data Read Method").extract[String])) + metrics.bytesRead = (json \ "Bytes Read").extract[Long] + metrics + } + def taskEndReasonFromJson(json: JValue): TaskEndReason = { val success = Utils.getFormattedClassName(Success) val resubmitted = Utils.getFormattedClassName(Resubmitted) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 6c4987045587..249cd31168c1 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -39,7 +39,11 @@ class JsonProtocolSuite extends FunSuite { val taskGettingResult = SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true)) val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success, - makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800)) + makeTaskInfo(123L, 234, 67, 345L, false), + makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHdfsInput = false)) + val taskEndWithHdfsInput = SparkListenerTaskEnd(1, "ShuffleMapTask", Success, + makeTaskInfo(123L, 234, 345L), + makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHdfsInput = true)) val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties) val jobEnd = SparkListenerJobEnd(20, JobSucceeded) val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]]( @@ -61,6 +65,7 @@ class JsonProtocolSuite extends FunSuite { testEvent(taskStart, taskStartJsonString) testEvent(taskGettingResult, taskGettingResultJsonString) testEvent(taskEnd, taskEndJsonString) + testEvent(taskEndWithHdfsInput, taskEndWithHdfsInputJsonString) testEvent(jobStart, jobStartJsonString) testEvent(jobEnd, jobEndJsonString) testEvent(environmentUpdate, environmentUpdateJsonString) @@ -74,8 +79,8 @@ class JsonProtocolSuite extends FunSuite { test("Dependent Classes") { testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L)) testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L)) - testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false)) - testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8)) + testTaskInfo(makeTaskInfo(999L, 888, 777L, false)) + testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHdfsInput = false)) testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000)) // StorageLevel @@ -118,7 +123,7 @@ class JsonProtocolSuite extends FunSuite { testBlockId(StreamBlockId(1, 2L)) } - test("Backward compatibility") { + test("backward compatibility") { // StageInfo.details was added after 1.0.0. val info = makeStageInfo(1, 2, 3, 4L, 5L) assert(info.details.nonEmpty) @@ -294,6 +299,8 @@ class JsonProtocolSuite extends FunSuite { metrics1.shuffleReadMetrics, metrics2.shuffleReadMetrics, assertShuffleReadEquals) assertOptionEquals( metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics, assertShuffleWriteEquals) + assertOptionEquals( + metrics1.inputMetrics, metrics2.inputMetrics, assertInputMetricsEquals) assertOptionEquals(metrics1.updatedBlocks, metrics2.updatedBlocks, assertBlocksEquals) } @@ -311,6 +318,11 @@ class JsonProtocolSuite extends FunSuite { assert(metrics1.shuffleWriteTime === metrics2.shuffleWriteTime) } + private def assertEquals(metrics1: InputMetrics, metrics2: InputMetrics) { + assert(metrics1.readMethod === metrics2.readMethod) + assert(metrics1.bytesRead === metrics2.bytesRead) + } + private def assertEquals(bm1: BlockManagerId, bm2: BlockManagerId) { assert(bm1.executorId === bm2.executorId) assert(bm1.host === bm2.host) @@ -403,6 +415,10 @@ class JsonProtocolSuite extends FunSuite { assertEquals(w1, w2) } + private def assertInputMetricsEquals(i1: InputMetrics, i2: InputMetrics) { + assertEquals(i1, i2) + } + private def assertTaskMetricsEquals(t1: TaskMetrics, t2: TaskMetrics) { assertEquals(t1, t2) } @@ -460,9 +476,13 @@ class JsonProtocolSuite extends FunSuite { new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative) } - private def makeTaskMetrics(a: Long, b: Long, c: Long, d: Long, e: Int, f: Int) = { + /** + * Creates a TaskMetrics object describing a task that read data from HDFS (if hasHdfsInput is + * set to true) or read data from a shuffle otherwise. + */ + private def makeTaskMetrics( + a: Long, b: Long, c: Long, d: Long, e: Int, f: Int, hasHdfsInput: Boolean) = { val t = new TaskMetrics - val sr = new ShuffleReadMetrics val sw = new ShuffleWriteMetrics t.hostname = "localhost" t.executorDeserializeTime = a @@ -471,15 +491,23 @@ class JsonProtocolSuite extends FunSuite { t.jvmGCTime = d t.resultSerializationTime = a + b t.memoryBytesSpilled = a + c - sr.shuffleFinishTime = b + c - sr.totalBlocksFetched = e + f - sr.remoteBytesRead = b + d - sr.localBlocksFetched = e - sr.fetchWaitTime = a + d - sr.remoteBlocksFetched = f + + if (hasHdfsInput) { + val inputMetrics = new InputMetrics(DataReadMethod.Hdfs) + inputMetrics.bytesRead = d + e + f + t.inputMetrics = Some(inputMetrics) + } else { + val sr = new ShuffleReadMetrics + sr.shuffleFinishTime = b + c + sr.totalBlocksFetched = e + f + sr.remoteBytesRead = b + d + sr.localBlocksFetched = e + sr.fetchWaitTime = a + d + sr.remoteBlocksFetched = f + t.shuffleReadMetrics = Some(sr) + } sw.shuffleBytesWritten = a + b + c sw.shuffleWriteTime = b + c + d - t.shuffleReadMetrics = Some(sr) t.shuffleWriteMetrics = Some(sw) // Make at most 6 blocks t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i => @@ -568,6 +596,23 @@ class JsonProtocolSuite extends FunSuite { |} """.stripMargin + private val taskEndWithHdfsInputJsonString = + """ + {"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask", + "Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":123,"Index": + 234,"Launch Time":345,"Executor ID":"executor","Host":"your kind sir", + "Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed": + false,"Serialized Size":0},"Task Metrics":{"Host Name":"localhost", + "Executor Deserialize Time":300,"Executor Run Time":400,"Result Size":500, + "JVM GC Time":600,"Result Serialization Time":700,"Memory Bytes Spilled": + 800,"Disk Bytes Spilled":0,"Shuffle Read Metrics":[],"Shuffle Write Metrics": + {"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},"Input Metrics": + {"Data Read Method":Hdfs,"Bytes Read":1500},"Updated Blocks": + [{"Block ID":{"Type":"RDDBlockId","RDD ID":0,"Split Index":0},"Status": + {"Storage Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false, + "Deserialized":false,"Replication":2},"Memory Size":0,"Disk Size":0,"Tachyon Size":0}}]}} + """ + private val jobStartJsonString = """ {"Event":"SparkListenerJobStart","Job ID":10,"Stage IDs":[1,2,3,4],"Properties": From 0fc33e055d4356e602c6b24f17c4b842c1791e3c Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Tue, 24 Jun 2014 16:07:05 -0700 Subject: [PATCH 03/15] Added explicit backward compatibility test --- .../org/apache/spark/util/JsonProtocolSuite.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 249cd31168c1..84f9bcefabcd 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -123,7 +123,7 @@ class JsonProtocolSuite extends FunSuite { testBlockId(StreamBlockId(1, 2L)) } - test("backward compatibility") { + test("StageInfo.details backward compatibility") { // StageInfo.details was added after 1.0.0. val info = makeStageInfo(1, 2, 3, 4L, 5L) assert(info.details.nonEmpty) @@ -134,6 +134,16 @@ class JsonProtocolSuite extends FunSuite { assert("" === newInfo.details) } + test("InputMetrics backward compatibility") { + // InputMetrics were added after 1.0.1. + val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHdfsInput = true) + assert(metrics.inputMetrics.nonEmpty) + val newJson = JsonProtocol.taskMetricsToJson(metrics) + val oldJson = newJson.removeField { case (field, _) => field == "Input Metrics" } + val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) + assert(newMetrics.inputMetrics.isEmpty) + } + /** -------------------------- * | Helper test running methods | From bf41029ff9a0bcd4f8c1d5a796c7551e1573d4dc Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Tue, 24 Jun 2014 17:05:28 -0700 Subject: [PATCH 04/15] Updated Json tests to pass --- .../org/apache/spark/util/JsonProtocolSuite.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 84f9bcefabcd..6af0e147c5b8 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -615,12 +615,11 @@ class JsonProtocolSuite extends FunSuite { false,"Serialized Size":0},"Task Metrics":{"Host Name":"localhost", "Executor Deserialize Time":300,"Executor Run Time":400,"Result Size":500, "JVM GC Time":600,"Result Serialization Time":700,"Memory Bytes Spilled": - 800,"Disk Bytes Spilled":0,"Shuffle Read Metrics":[],"Shuffle Write Metrics": - {"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},"Input Metrics": - {"Data Read Method":Hdfs,"Bytes Read":1500},"Updated Blocks": - [{"Block ID":{"Type":"RDDBlockId","RDD ID":0,"Split Index":0},"Status": - {"Storage Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false, - "Deserialized":false,"Replication":2},"Memory Size":0,"Disk Size":0,"Tachyon Size":0}}]}} + 800,"Disk Bytes Spilled":0,"Shuffle Write Metrics":{"Shuffle Bytes Written":1200, + "Shuffle Write Time":1500},"Input Metrics":{"Data Read Method":"Hdfs","Bytes Read":2100}, + "Updated Blocks":[{"Block ID":"rdd_0_0","Status":{"Storage Level":{"Use Disk":true, + "Use Memory":true,"Use Tachyon":false,"Deserialized":false,"Replication":2},"Memory Size":0, + "Tachyon Size":0,"Disk Size":0}}]}} """ private val jobStartJsonString = From f27e535e7e489ec4a499e909e6eba3a8b0b0d919 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Thu, 26 Jun 2014 22:51:11 -0700 Subject: [PATCH 05/15] Updates based on review comments and to fix rebase --- .../org/apache/spark/storage/BlockManager.scala | 6 ++++-- .../org/apache/spark/ui/jobs/StagePage.scala | 17 ++++++++++------- .../apache/spark/util/JsonProtocolSuite.scala | 12 +++++++++--- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a67cef44c0f4..0db0a5bc7341 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -41,7 +41,9 @@ private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockV private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues /* Class for returning a fetched block and associated metrics. */ -private[spark] class BlockResult(val data: Iterator[Any], readMethod: DataReadMethod.Value, +private[spark] class BlockResult( + val data: Iterator[Any], + readMethod: DataReadMethod.Value, bytes: Long) { val inputMetrics = new InputMetrics(readMethod) inputMetrics.bytesRead = bytes @@ -469,7 +471,7 @@ private[spark] class BlockManager( case Left(values2) => return Some(new BlockResult(values2, DataReadMethod.Disk, info.size)) case _ => - throw new Exception("Memory store did not return back an iterator") + throw new SparkException("Memory store did not return back an iterator") } } else { return Some(new BlockResult(values, DataReadMethod.Disk, info.size)) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 0d6c8af4690c..1c878142379c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -72,10 +72,10 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { {UIUtils.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
  • {if (hasInput) -
  • - Input: - {Utils.bytesToString(inputBytes)} -
  • +
  • + Input: + {Utils.bytesToString(inputBytes)} +
  • } {if (hasShuffleRead)
  • @@ -224,9 +224,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { } } - def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean) - (taskData: TaskUIData): Seq[Node] = { - taskData match { case TaskUIData(info, metrics, exception) => + def taskRow( + hasInput: Boolean, + hasShuffleRead: Boolean, + hasShuffleWrite: Boolean, + hasBytesSpilled: Boolean)(taskData: TaskUIData): Seq[Node] = { + taskData match { case TaskUIData(info, metrics, errorMessage) => val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis()) else metrics.map(_.executorRunTime).getOrElse(1L) val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 6af0e147c5b8..2a62b1558d09 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -42,7 +42,7 @@ class JsonProtocolSuite extends FunSuite { makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHdfsInput = false)) val taskEndWithHdfsInput = SparkListenerTaskEnd(1, "ShuffleMapTask", Success, - makeTaskInfo(123L, 234, 345L), + makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHdfsInput = true)) val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties) val jobEnd = SparkListenerJobEnd(20, JobSucceeded) @@ -79,7 +79,7 @@ class JsonProtocolSuite extends FunSuite { test("Dependent Classes") { testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L)) testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L)) - testTaskInfo(makeTaskInfo(999L, 888, 777L, false)) + testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false)) testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHdfsInput = false)) testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000)) @@ -491,7 +491,13 @@ class JsonProtocolSuite extends FunSuite { * set to true) or read data from a shuffle otherwise. */ private def makeTaskMetrics( - a: Long, b: Long, c: Long, d: Long, e: Int, f: Int, hasHdfsInput: Boolean) = { + a: Long, + b: Long, + c: Long, + d: Long, + e: Int, + f: Int, + hasHdfsInput: Boolean) = { val t = new TaskMetrics val sw = new ShuffleWriteMetrics t.hostname = "localhost" From 4bd056866ce4ffde9ded5cadb37fb88ae687359b Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Fri, 27 Jun 2014 09:23:34 -0700 Subject: [PATCH 06/15] Added input source, renamed Hdfs to Hadoop. --- .../scala/org/apache/spark/executor/TaskMetrics.scala | 8 ++++---- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 3 ++- .../main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 2 +- .../main/scala/org/apache/spark/ui/jobs/StagePage.scala | 8 +++++--- .../scala/org/apache/spark/util/JsonProtocolSuite.scala | 2 +- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 8c07b0305639..941ec6580f74 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -96,12 +96,12 @@ private[spark] object TaskMetrics { /** * :: DeveloperApi :: * Method by which input data was read. Network means that the data was read over the network - * from a remote block manager. - */ + * from a remote block manager. which may have stored the data on-disk or in-memory. + @DeveloperApi private[spark] object DataReadMethod extends Enumeration with Serializable { type DataReadMethod = Value - val Memory, Disk, Hdfs, Network = Value + val Memory, Disk, Hadoop, Network = Value } /** @@ -171,4 +171,4 @@ class ShuffleWriteMetrics extends Serializable { * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds */ var shuffleWriteTime: Long = _ -} +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index ce72b6b32f90..377966f9fd5d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -198,8 +198,9 @@ class HadoopRDD[K, V]( val key: K = reader.createKey() val value: V = reader.createValue() + if (split.inputSplit.isInstanceOf[FileSplit]) // Set the task input metrics. - val inputMetrics = new InputMetrics(DataReadMethod.Hdfs) + val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) try { inputMetrics.bytesRead = split.inputSplit.value.getLength() } catch { diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index ddbc099d90a6..d50d34f6292e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -113,7 +113,7 @@ class NewHadoopRDD[K, V]( split.serializableHadoopSplit.value, hadoopAttemptContext) reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) - val inputMetrics = new InputMetrics(DataReadMethod.Hdfs) + val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) try { inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength() } catch { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 1c878142379c..eb89d4bdf4a4 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -237,9 +237,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) - val maybeInput = metrics.flatMap(_.inputMetrics).map(_.bytesRead) - val inputSortable = maybeInput.map(_.toString).getOrElse("") - val inputReadable = maybeInput.map(Utils.bytesToString).getOrElse("") + val maybeInput = metrics.flatMap(_.inputMetrics) + val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("") + val inputReadable = maybeInput + .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})") + .getOrElse("") val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead) val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("") diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 2a62b1558d09..c759f6112810 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -509,7 +509,7 @@ class JsonProtocolSuite extends FunSuite { t.memoryBytesSpilled = a + c if (hasHdfsInput) { - val inputMetrics = new InputMetrics(DataReadMethod.Hdfs) + val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) inputMetrics.bytesRead = d + e + f t.inputMetrics = Some(inputMetrics) } else { From 44a03018cf7829f2c3c4c08613ba7692e3cd4468 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Fri, 27 Jun 2014 09:25:13 -0700 Subject: [PATCH 07/15] Fixed accidentally added line --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 377966f9fd5d..b57753e3d20e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -198,7 +198,6 @@ class HadoopRDD[K, V]( val key: K = reader.createKey() val value: V = reader.createValue() - if (split.inputSplit.isInstanceOf[FileSplit]) // Set the task input metrics. val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) try { From 869ac7bb8107bad74c070748541fdc895999e425 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Fri, 27 Jun 2014 09:46:17 -0700 Subject: [PATCH 08/15] Updated Json tests --- .../apache/spark/executor/TaskMetrics.scala | 2 +- .../apache/spark/util/JsonProtocolSuite.scala | 62 ++++++++++++------- 2 files changed, 39 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 941ec6580f74..d3bf5716c817 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -97,7 +97,7 @@ private[spark] object TaskMetrics { * :: DeveloperApi :: * Method by which input data was read. Network means that the data was read over the network * from a remote block manager. which may have stored the data on-disk or in-memory. - + */ @DeveloperApi private[spark] object DataReadMethod extends Enumeration with Serializable { type DataReadMethod = Value diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index c759f6112810..7ab38f8afb9a 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -40,10 +40,10 @@ class JsonProtocolSuite extends FunSuite { SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true)) val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success, makeTaskInfo(123L, 234, 67, 345L, false), - makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHdfsInput = false)) - val taskEndWithHdfsInput = SparkListenerTaskEnd(1, "ShuffleMapTask", Success, + makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = false)) + val taskEndWithHadoopInput = SparkListenerTaskEnd(1, "ShuffleMapTask", Success, makeTaskInfo(123L, 234, 67, 345L, false), - makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHdfsInput = true)) + makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true)) val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties) val jobEnd = SparkListenerJobEnd(20, JobSucceeded) val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]]( @@ -65,7 +65,7 @@ class JsonProtocolSuite extends FunSuite { testEvent(taskStart, taskStartJsonString) testEvent(taskGettingResult, taskGettingResultJsonString) testEvent(taskEnd, taskEndJsonString) - testEvent(taskEndWithHdfsInput, taskEndWithHdfsInputJsonString) + testEvent(taskEndWithHadoopInput, taskEndWithHadoopInputJsonString) testEvent(jobStart, jobStartJsonString) testEvent(jobEnd, jobEndJsonString) testEvent(environmentUpdate, environmentUpdateJsonString) @@ -80,7 +80,7 @@ class JsonProtocolSuite extends FunSuite { testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L)) testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L)) testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false)) - testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHdfsInput = false)) + testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false)) testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000)) // StorageLevel @@ -136,7 +136,7 @@ class JsonProtocolSuite extends FunSuite { test("InputMetrics backward compatibility") { // InputMetrics were added after 1.0.1. - val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHdfsInput = true) + val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true) assert(metrics.inputMetrics.nonEmpty) val newJson = JsonProtocol.taskMetricsToJson(metrics) val oldJson = newJson.removeField { case (field, _) => field == "Input Metrics" } @@ -487,7 +487,7 @@ class JsonProtocolSuite extends FunSuite { } /** - * Creates a TaskMetrics object describing a task that read data from HDFS (if hasHdfsInput is + * Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is * set to true) or read data from a shuffle otherwise. */ private def makeTaskMetrics( @@ -497,7 +497,7 @@ class JsonProtocolSuite extends FunSuite { d: Long, e: Int, f: Int, - hasHdfsInput: Boolean) = { + hasHadoopInput: Boolean) = { val t = new TaskMetrics val sw = new ShuffleWriteMetrics t.hostname = "localhost" @@ -508,7 +508,7 @@ class JsonProtocolSuite extends FunSuite { t.resultSerializationTime = a + b t.memoryBytesSpilled = a + c - if (hasHdfsInput) { + if (hasHadoopInput) { val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) inputMetrics.bytesRead = d + e + f t.inputMetrics = Some(inputMetrics) @@ -596,8 +596,9 @@ class JsonProtocolSuite extends FunSuite { | }, | "Shuffle Write Metrics":{ | "Shuffle Bytes Written":1200, - | "Shuffle Write Time":1500}, - | "Updated Blocks":[ + | "Shuffle Write Time":1500 + | }, + | "Updated Blocks":[ | {"Block ID":"rdd_0_0", | "Status":{ | "Storage Level":{ @@ -612,20 +613,33 @@ class JsonProtocolSuite extends FunSuite { |} """.stripMargin - private val taskEndWithHdfsInputJsonString = + private val taskEndWithHadoopInputJsonString = """ - {"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask", - "Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":123,"Index": - 234,"Launch Time":345,"Executor ID":"executor","Host":"your kind sir", - "Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed": - false,"Serialized Size":0},"Task Metrics":{"Host Name":"localhost", - "Executor Deserialize Time":300,"Executor Run Time":400,"Result Size":500, - "JVM GC Time":600,"Result Serialization Time":700,"Memory Bytes Spilled": - 800,"Disk Bytes Spilled":0,"Shuffle Write Metrics":{"Shuffle Bytes Written":1200, - "Shuffle Write Time":1500},"Input Metrics":{"Data Read Method":"Hdfs","Bytes Read":2100}, - "Updated Blocks":[{"Block ID":"rdd_0_0","Status":{"Storage Level":{"Use Disk":true, - "Use Memory":true,"Use Tachyon":false,"Deserialized":false,"Replication":2},"Memory Size":0, - "Tachyon Size":0,"Disk Size":0}}]}} + |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask", + |"Task End Reason":{"Reason":"Success"}, + |"Task Info":{ + | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor", + | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false, + | "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0 + |}, + |"Task Metrics":{ + | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400, + | "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700, + | "Memory Bytes Spilled":800,"Disk Bytes Spilled":0, + | "Shuffle Write Metrics":{"Shuffle Bytes Written":1200,"Shuffle Write Time":1500}, + | "Input Metrics":{"Data Read Method":"Hadoop","Bytes Read":2100}, + | "Updated Blocks":[ + | {"Block ID":"rdd_0_0", + | "Status":{ + | "Storage Level":{ + | "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false, + | "Replication":2 + | }, + | "Memory Size":0,"Tachyon Size":0,"Disk Size":0 + | } + | } + | ]} + | } """ private val jobStartJsonString = From bb6ec625671465401628e4d5d3714e192404ebde Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Fri, 27 Jun 2014 09:52:31 -0700 Subject: [PATCH 09/15] Small fixes --- .../main/scala/org/apache/spark/executor/TaskMetrics.scala | 4 ++-- core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index d3bf5716c817..481d5a47181c 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -96,7 +96,7 @@ private[spark] object TaskMetrics { /** * :: DeveloperApi :: * Method by which input data was read. Network means that the data was read over the network - * from a remote block manager. which may have stored the data on-disk or in-memory. + * from a remote block manager (which may have stored the data on-disk or in-memory). */ @DeveloperApi private[spark] object DataReadMethod extends Enumeration with Serializable { @@ -171,4 +171,4 @@ class ShuffleWriteMetrics extends Serializable { * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds */ var shuffleWriteTime: Long = _ -} \ No newline at end of file +} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index eb89d4bdf4a4..f9609836042c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -104,7 +104,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { // scalastyle:on val taskHeaders: Seq[String] = Seq( - "Task Index", "Task ID", "Attempt", "Status", "Locality Level", "Executor", + "Index", "ID", "Attempt", "Status", "Locality Level", "Executor", "Launch Time", "Duration", "GC Time") ++ {if (hasInput) Seq("Input") else Nil} ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ From 719f19dd86febe83da8186fa1daa2b310f16dbff Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Fri, 27 Jun 2014 11:44:11 -0700 Subject: [PATCH 10/15] Style fixes --- .../scala/org/apache/spark/executor/TaskMetrics.scala | 2 +- .../main/scala/org/apache/spark/ui/jobs/StagePage.scala | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 481d5a47181c..cab1df86a6ad 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -99,7 +99,7 @@ private[spark] object TaskMetrics { * from a remote block manager (which may have stored the data on-disk or in-memory). */ @DeveloperApi -private[spark] object DataReadMethod extends Enumeration with Serializable { +object DataReadMethod extends Enumeration with Serializable { type DataReadMethod = Value val Memory, Disk, Hadoop, Network = Value } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index f9609836042c..afb8ed754ff8 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -225,10 +225,10 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { } def taskRow( - hasInput: Boolean, - hasShuffleRead: Boolean, - hasShuffleWrite: Boolean, - hasBytesSpilled: Boolean)(taskData: TaskUIData): Seq[Node] = { + hasInput: Boolean, + hasShuffleRead: Boolean, + hasShuffleWrite: Boolean, + hasBytesSpilled: Boolean)(taskData: TaskUIData): Seq[Node] = { taskData match { case TaskUIData(info, metrics, errorMessage) => val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis()) else metrics.map(_.executorRunTime).getOrElse(1L) From ae04d99476e34f4fbf65c8575a021a5398fb8edb Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Fri, 27 Jun 2014 12:18:08 -0700 Subject: [PATCH 11/15] Remove input metrics for parallel collections --- .../org/apache/spark/executor/TaskMetrics.scala | 4 ++-- .../org/apache/spark/rdd/ParallelCollectionRDD.scala | 12 ++---------- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index cab1df86a6ad..ac73288442a7 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -67,8 +67,8 @@ class TaskMetrics extends Serializable { var diskBytesSpilled: Long = _ /** - * If this task reads from a HadoopRDD, from cached data, or from a parallelized collection, - * metrics on how much data was read are stored here. + * If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read + * are stored here. */ var inputMetrics: Option[InputMetrics] = None diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala index 7e240ca9ddf5..66c71bf7e8bb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala @@ -26,9 +26,8 @@ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import org.apache.spark._ -import org.apache.spark.executor.{DataReadMethod, InputMetrics} import org.apache.spark.serializer.JavaSerializer -import org.apache.spark.util.{SizeEstimator, Utils} +import org.apache.spark.util.Utils private[spark] class ParallelCollectionPartition[T: ClassTag]( var rddId: Long, @@ -100,14 +99,7 @@ private[spark] class ParallelCollectionRDD[T: ClassTag]( } override def compute(s: Partition, context: TaskContext) = { - val parallelCollectionsPartition = s.asInstanceOf[ParallelCollectionPartition[T]] - - // Set the input metrics for the task. - val inputMetrics = new InputMetrics(DataReadMethod.Memory) - inputMetrics.bytesRead = SizeEstimator.estimate(parallelCollectionsPartition.values) - context.taskMetrics.inputMetrics = Some(inputMetrics) - - new InterruptibleIterator(context, parallelCollectionsPartition.iterator) + new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator) } override def getPreferredLocations(s: Partition): Seq[String] = { From 8461492df78f0e1294a4902d9e4218c79a0a3f84 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Fri, 27 Jun 2014 12:34:15 -0700 Subject: [PATCH 12/15] Miniscule style fix --- .../test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 7ab38f8afb9a..316e14100e40 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -639,7 +639,7 @@ class JsonProtocolSuite extends FunSuite { | } | } | ]} - | } + |} """ private val jobStartJsonString = From d1016e88bac28678426414acc52212cf41d81b8e Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Fri, 27 Jun 2014 13:13:23 -0700 Subject: [PATCH 13/15] Udated SparkListenerSuite test --- .../scala/org/apache/spark/scheduler/SparkListenerSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 9f3d4d15c299..71f48e295ecc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -251,8 +251,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers taskInfoMetrics.foreach { case (taskInfo, taskMetrics) => taskMetrics.resultSize should be > (0l) if (stageInfo.rddInfos.exists(info => info.name == d2.name || info.name == d3.name)) { - taskMetrics.inputMetrics should be ('defined) - taskMetrics.inputMetrics.get.bytesRead should be > (0l) + taskMetrics.inputMetrics should not be ('defined) taskMetrics.shuffleWriteMetrics should be ('defined) taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l) } From 8b70cdec6e9944aabff5544c228d5f389bc6541b Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Sun, 29 Jun 2014 15:00:17 -0700 Subject: [PATCH 14/15] Added comment about potential inaccuracy of bytesRead --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 3 +++ core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 3 +++ 2 files changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index b57753e3d20e..98dcbf4e2dbf 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -201,6 +201,9 @@ class HadoopRDD[K, V]( // Set the task input metrics. val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) try { + /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't + * always at record boundaries, so tasks may need to read into other splits to complete + * a record. */ inputMetrics.bytesRead = split.inputSplit.value.getLength() } catch { case e: java.io.IOException => diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index d50d34f6292e..f2b3a64bf134 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -115,6 +115,9 @@ class NewHadoopRDD[K, V]( val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) try { + /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't + * always at record boundaries, so tasks may need to read into other splits to complete + * a record. */ inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength() } catch { case e: Exception => From f13b67da7592d77c9e0d01f12980b2ae8caeb893 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Sun, 29 Jun 2014 16:56:31 -0700 Subject: [PATCH 15/15] Correctly format input bytes on executor page --- .../src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index fc85e4eebec4..de90d0b606bb 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -97,7 +97,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { {values("Complete Tasks")} {values("Total Tasks")} {Utils.msDurationToString(values("Task Time").toLong)} - {Utils.msDurationToString(values("Input Bytes").toLong)} + {Utils.bytesToString(values("Input Bytes").toLong)} {Utils.bytesToString(values("Shuffle Read").toLong)} {Utils.bytesToString(values("Shuffle Write").toLong)}