diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 3f667a4a0f9c..8f867686a044 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark
import scala.collection.mutable.{ArrayBuffer, HashSet}
+import org.apache.spark.executor.InputMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
@@ -41,9 +42,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
blockManager.get(key) match {
- case Some(values) =>
+ case Some(blockResult) =>
// Partition is already materialized, so just return its values
- new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
+ context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
+ new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
case None =>
// Acquire a lock for loading this partition
@@ -110,7 +112,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
logInfo(s"Whoever was loading $id failed; we'll try it ourselves")
loading.add(id)
}
- values.map(_.asInstanceOf[Iterator[T]])
+ values.map(_.data.asInstanceOf[Iterator[T]])
}
}
}
@@ -132,7 +134,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
* exceptions that can be avoided. */
updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
blockManager.get(key) match {
- case Some(v) => v.asInstanceOf[Iterator[T]]
+ case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
logInfo(s"Failure to store $key")
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 350fd74173f6..ac73288442a7 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -66,6 +66,12 @@ class TaskMetrics extends Serializable {
*/
var diskBytesSpilled: Long = _
+ /**
+ * If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
+ * are stored here.
+ */
+ var inputMetrics: Option[InputMetrics] = None
+
/**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
*/
@@ -87,6 +93,29 @@ private[spark] object TaskMetrics {
def empty: TaskMetrics = new TaskMetrics
}
+/**
+ * :: DeveloperApi ::
+ * Method by which input data was read. Network means that the data was read over the network
+ * from a remote block manager (which may have stored the data on-disk or in-memory).
+ */
+@DeveloperApi
+object DataReadMethod extends Enumeration with Serializable {
+ type DataReadMethod = Value
+ val Memory, Disk, Hadoop, Network = Value
+}
+
+/**
+ * :: DeveloperApi ::
+ * Metrics about reading input data.
+ */
+@DeveloperApi
+case class InputMetrics(readMethod: DataReadMethod.Value) {
+ /**
+ * Total bytes read.
+ */
+ var bytesRead: Long = 0L
+}
+
/**
* :: DeveloperApi ::
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index c64da8804d16..2673ec22509e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -46,7 +46,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
val blockManager = SparkEnv.get.blockManager
val blockId = split.asInstanceOf[BlockRDDPartition].blockId
blockManager.get(blockId) match {
- case Some(block) => block.asInstanceOf[Iterator[T]]
+ case Some(block) => block.data.asInstanceOf[Iterator[T]]
case None =>
throw new Exception("Could not compute split, block " + blockId + " not found")
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2aa111d600e9..98dcbf4e2dbf 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -38,6 +38,7 @@ import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.{DataReadMethod, InputMetrics}
import org.apache.spark.util.NextIterator
/**
@@ -196,6 +197,20 @@ class HadoopRDD[K, V](
context.addOnCompleteCallback{ () => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()
+
+ // Set the task input metrics.
+ val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+ try {
+ /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
+ * always at record boundaries, so tasks may need to read into other splits to complete
+ * a record. */
+ inputMetrics.bytesRead = split.inputSplit.value.getLength()
+ } catch {
+ case e: java.io.IOException =>
+ logWarning("Unable to get input size to set InputMetrics for task", e)
+ }
+ context.taskMetrics.inputMetrics = Some(inputMetrics)
+
override def getNext() = {
try {
finished = !reader.next(key, value)
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index ac1ccc06f238..f2b3a64bf134 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -31,6 +31,7 @@ import org.apache.spark.Logging
import org.apache.spark.Partition
import org.apache.spark.SerializableWritable
import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.executor.{DataReadMethod, InputMetrics}
private[spark] class NewHadoopPartition(
rddId: Int,
@@ -112,6 +113,18 @@ class NewHadoopRDD[K, V](
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
+ val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+ try {
+ /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
+ * always at record boundaries, so tasks may need to read into other splits to complete
+ * a record. */
+ inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength()
+ } catch {
+ case e: Exception =>
+ logWarning("Unable to get input split size in order to set task input bytes", e)
+ }
+ context.taskMetrics.inputMetrics = Some(inputMetrics)
+
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback(() => close())
var havePair = false
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index a1e21cad48b9..47dd112f6832 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -26,7 +26,9 @@ import scala.collection.mutable.HashMap
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
/**
* :: DeveloperApi ::
@@ -160,7 +162,13 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
" START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime +
" EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskMetrics.hostname
val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
- val readMetrics = taskMetrics.shuffleReadMetrics match {
+ val inputMetrics = taskMetrics.inputMetrics match {
+ case Some(metrics) =>
+ " READ_METHOD=" + metrics.readMethod.toString +
+ " INPUT_BYTES=" + metrics.bytesRead
+ case None => ""
+ }
+ val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match {
case Some(metrics) =>
" SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime +
" BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
@@ -174,7 +182,8 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
case None => ""
}
- stageLogInfo(stageId, status + info + executorRunTime + readMetrics + writeMetrics)
+ stageLogInfo(stageId, status + info + executorRunTime + inputMetrics + shuffleReadMetrics +
+ writeMetrics)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index d2f7baf928b6..0db0a5bc7341 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -29,6 +29,7 @@ import akka.actor.{ActorSystem, Cancellable, Props}
import sun.nio.ch.DirectBuffer
import org.apache.spark._
+import org.apache.spark.executor.{DataReadMethod, InputMetrics}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
import org.apache.spark.serializer.Serializer
@@ -39,6 +40,15 @@ private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValu
private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues
+/* Class for returning a fetched block and associated metrics. */
+private[spark] class BlockResult(
+ val data: Iterator[Any],
+ readMethod: DataReadMethod.Value,
+ bytes: Long) {
+ val inputMetrics = new InputMetrics(readMethod)
+ inputMetrics.bytesRead = bytes
+}
+
private[spark] class BlockManager(
executorId: String,
actorSystem: ActorSystem,
@@ -334,9 +344,9 @@ private[spark] class BlockManager(
/**
* Get block from local block manager.
*/
- def getLocal(blockId: BlockId): Option[Iterator[Any]] = {
+ def getLocal(blockId: BlockId): Option[BlockResult] = {
logDebug(s"Getting local block $blockId")
- doGetLocal(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
+ doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
}
/**
@@ -355,11 +365,11 @@ private[spark] class BlockManager(
blockId, s"Block $blockId not found on disk, though it should be")
}
} else {
- doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
+ doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}
}
- private def doGetLocal(blockId: BlockId, asValues: Boolean): Option[Any] = {
+ private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
@@ -386,14 +396,14 @@ private[spark] class BlockManager(
// Look for the block in memory
if (level.useMemory) {
logDebug(s"Getting block $blockId from memory")
- val result = if (asValues) {
- memoryStore.getValues(blockId)
+ val result = if (asBlockResult) {
+ memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
} else {
memoryStore.getBytes(blockId)
}
result match {
case Some(values) =>
- return Some(values)
+ return result
case None =>
logDebug(s"Block $blockId not found in memory")
}
@@ -405,10 +415,11 @@ private[spark] class BlockManager(
if (tachyonStore.contains(blockId)) {
tachyonStore.getBytes(blockId) match {
case Some(bytes) =>
- if (!asValues) {
+ if (!asBlockResult) {
return Some(bytes)
} else {
- return Some(dataDeserialize(blockId, bytes))
+ return Some(new BlockResult(
+ dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
}
case None =>
logDebug(s"Block $blockId not found in tachyon")
@@ -429,14 +440,15 @@ private[spark] class BlockManager(
if (!level.useMemory) {
// If the block shouldn't be stored in memory, we can just return it
- if (asValues) {
- return Some(dataDeserialize(blockId, bytes))
+ if (asBlockResult) {
+ return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,
+ info.size))
} else {
return Some(bytes)
}
} else {
// Otherwise, we also have to store something in the memory store
- if (!level.deserialized || !asValues) {
+ if (!level.deserialized || !asBlockResult) {
/* We'll store the bytes in memory if the block's storage level includes
* "memory serialized", or if it should be cached as objects in memory
* but we only requested its serialized bytes. */
@@ -445,7 +457,7 @@ private[spark] class BlockManager(
memoryStore.putBytes(blockId, copyForMemory, level)
bytes.rewind()
}
- if (!asValues) {
+ if (!asBlockResult) {
return Some(bytes)
} else {
val values = dataDeserialize(blockId, bytes)
@@ -457,12 +469,12 @@ private[spark] class BlockManager(
memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data
match {
case Left(values2) =>
- return Some(values2)
+ return Some(new BlockResult(values2, DataReadMethod.Disk, info.size))
case _ =>
- throw new SparkException("Memory store did not return an iterator")
+ throw new SparkException("Memory store did not return back an iterator")
}
} else {
- return Some(values)
+ return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
}
}
}
@@ -477,9 +489,9 @@ private[spark] class BlockManager(
/**
* Get block from remote block managers.
*/
- def getRemote(blockId: BlockId): Option[Iterator[Any]] = {
+ def getRemote(blockId: BlockId): Option[BlockResult] = {
logDebug(s"Getting remote block $blockId")
- doGetRemote(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
+ doGetRemote(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
}
/**
@@ -487,10 +499,10 @@ private[spark] class BlockManager(
*/
def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
logDebug(s"Getting remote block $blockId as bytes")
- doGetRemote(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
+ doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}
- private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
+ private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
require(blockId != null, "BlockId is null")
val locations = Random.shuffle(master.getLocations(blockId))
for (loc <- locations) {
@@ -498,8 +510,11 @@ private[spark] class BlockManager(
val data = BlockManagerWorker.syncGetBlock(
GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
if (data != null) {
- if (asValues) {
- return Some(dataDeserialize(blockId, data))
+ if (asBlockResult) {
+ return Some(new BlockResult(
+ dataDeserialize(blockId, data),
+ DataReadMethod.Network,
+ data.limit()))
} else {
return Some(data)
}
@@ -513,7 +528,7 @@ private[spark] class BlockManager(
/**
* Get a block from the block manager (either local or remote).
*/
- def get(blockId: BlockId): Option[Iterator[Any]] = {
+ def get(blockId: BlockId): Option[BlockResult] = {
val local = getLocal(blockId)
if (local.isDefined) {
logInfo(s"Found block $blockId locally")
@@ -792,7 +807,7 @@ private[spark] class BlockManager(
* Read a block consisting of a single object.
*/
def getSingle(blockId: BlockId): Option[Any] = {
- get(blockId).map(_.next())
+ get(blockId).map(_.data.next())
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index a107c5182b3b..328be158db68 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -78,7 +78,7 @@ private[spark] object ThreadingTest {
val startTime = System.currentTimeMillis()
manager.get(blockId) match {
case Some(retrievedBlock) =>
- assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList,
+ assert(retrievedBlock.data.toList.asInstanceOf[List[Int]] == block.toList,
"Block " + blockId + " did not match")
println("Got block " + blockId + " in " +
(System.currentTimeMillis - startTime) + " ms")
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 2d8c3b949c1a..de90d0b606bb 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -71,6 +71,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
"Complete Tasks",
"Total Tasks",
"Task Time",
+ "Input Bytes",
"Shuffle Read",
"Shuffle Write")
@@ -96,6 +97,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
{values("Complete Tasks")} |
{values("Total Tasks")} |
{Utils.msDurationToString(values("Task Time").toLong)} |
+ {Utils.bytesToString(values("Input Bytes").toLong)} |
{Utils.bytesToString(values("Shuffle Read").toLong)} |
{Utils.bytesToString(values("Shuffle Write").toLong)} |
@@ -116,6 +118,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
val totalTasks = activeTasks + failedTasks + completedTasks
val totalDuration = listener.executorToDuration.getOrElse(execId, 0)
+ val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0)
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0)
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0)
@@ -133,6 +136,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
completedTasks,
totalTasks,
totalDuration,
+ totalInputBytes,
totalShuffleRead,
totalShuffleWrite,
maxMem
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 91d37b835b19..58eeb86bf9a3 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -46,6 +46,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener)
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
val executorToDuration = HashMap[String, Long]()
+ val executorToInputBytes = HashMap[String, Long]()
val executorToShuffleRead = HashMap[String, Long]()
val executorToShuffleWrite = HashMap[String, Long]()
@@ -72,6 +73,10 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener)
// Update shuffle read/write
val metrics = taskEnd.taskMetrics
if (metrics != null) {
+ metrics.inputMetrics.foreach { inputMetrics =>
+ executorToInputBytes(eid) =
+ executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead
+ }
metrics.shuffleReadMetrics.foreach { shuffleRead =>
executorToShuffleRead(eid) =
executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
index 2aaf6329b792..c4a8996c0b9a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
@@ -28,6 +28,7 @@ class ExecutorSummary {
var taskTime : Long = 0
var failedTasks : Int = 0
var succeededTasks : Int = 0
+ var inputBytes: Long = 0
var shuffleRead : Long = 0
var shuffleWrite : Long = 0
var memoryBytesSpilled : Long = 0
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index add0e9878a54..2a34a9af925d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -43,6 +43,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
Total Tasks |
Failed Tasks |
Succeeded Tasks |
+ Input Bytes |
Shuffle Read |
Shuffle Write |
Shuffle Spill (Memory) |
@@ -75,6 +76,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
{v.failedTasks + v.succeededTasks} |
{v.failedTasks} |
{v.succeededTasks} |
+ {Utils.bytesToString(v.inputBytes)} |
{Utils.bytesToString(v.shuffleRead)} |
{Utils.bytesToString(v.shuffleWrite)} |
{Utils.bytesToString(v.memoryBytesSpilled)} |
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 381a5443df8b..2286a7f952f2 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -46,13 +46,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
val completedStages = ListBuffer[StageInfo]()
val failedStages = ListBuffer[StageInfo]()
- // Total metrics reflect metrics only for completed tasks
- var totalTime = 0L
- var totalShuffleRead = 0L
- var totalShuffleWrite = 0L
-
// TODO: Should probably consolidate all following into a single hash map.
val stageIdToTime = HashMap[Int, Long]()
+ val stageIdToInputBytes = HashMap[Int, Long]()
val stageIdToShuffleRead = HashMap[Int, Long]()
val stageIdToShuffleWrite = HashMap[Int, Long]()
val stageIdToMemoryBytesSpilled = HashMap[Int, Long]()
@@ -93,6 +89,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
val toRemove = math.max(retainedStages / 10, 1)
stages.take(toRemove).foreach { s =>
stageIdToTime.remove(s.stageId)
+ stageIdToInputBytes.remove(s.stageId)
stageIdToShuffleRead.remove(s.stageId)
stageIdToShuffleWrite.remove(s.stageId)
stageIdToMemoryBytesSpilled.remove(s.stageId)
@@ -171,6 +168,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
val metrics = taskEnd.taskMetrics
if (metrics != null) {
+ metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead }
metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
y.memoryBytesSpilled += metrics.memoryBytesSpilled
@@ -200,18 +198,19 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
stageIdToTime.getOrElseUpdate(sid, 0L)
val time = metrics.map(_.executorRunTime).getOrElse(0L)
stageIdToTime(sid) += time
- totalTime += time
+
+ stageIdToInputBytes.getOrElseUpdate(sid, 0L)
+ val inputBytes = metrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L)
+ stageIdToInputBytes(sid) += inputBytes
stageIdToShuffleRead.getOrElseUpdate(sid, 0L)
val shuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L)
stageIdToShuffleRead(sid) += shuffleRead
- totalShuffleRead += shuffleRead
stageIdToShuffleWrite.getOrElseUpdate(sid, 0L)
val shuffleWrite =
metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L)
stageIdToShuffleWrite(sid) += shuffleWrite
- totalShuffleWrite += shuffleWrite
stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L)
val memoryBytesSpilled = metrics.map(_.memoryBytesSpilled).getOrElse(0L)
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 8e3d5d1cd4c6..afb8ed754ff8 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -48,6 +48,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime)
val numCompleted = tasks.count(_.taskInfo.finished)
+ val inputBytes = listener.stageIdToInputBytes.getOrElse(stageId, 0L)
+ val hasInput = inputBytes > 0
val shuffleReadBytes = listener.stageIdToShuffleRead.getOrElse(stageId, 0L)
val hasShuffleRead = shuffleReadBytes > 0
val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L)
@@ -69,6 +71,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
Total task time across all tasks:
{UIUtils.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
+ {if (hasInput)
+
+ Input:
+ {Utils.bytesToString(inputBytes)}
+
+ }
{if (hasShuffleRead)
Shuffle read:
@@ -98,13 +106,14 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
Seq(
"Index", "ID", "Attempt", "Status", "Locality Level", "Executor",
"Launch Time", "Duration", "GC Time") ++
+ {if (hasInput) Seq("Input") else Nil} ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
{if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++
Seq("Errors")
val taskTable = UIUtils.listingTable(
- taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
+ taskHeaders, taskRow(hasInput, hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
// Excludes tasks which failed and have incomplete metrics
val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.taskMetrics.isDefined)
@@ -159,6 +168,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
def getQuantileCols(data: Seq[Double]) =
Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong))
+ val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble
+ }
+ val inputQuantiles = "Input" +: getQuantileCols(inputSizes)
+
val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
}
@@ -186,6 +200,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
serviceQuantiles,
gettingResultQuantiles,
schedulerDelayQuantiles,
+ if (hasInput) inputQuantiles else Nil,
if (hasShuffleRead) shuffleReadQuantiles else Nil,
if (hasShuffleWrite) shuffleWriteQuantiles else Nil,
if (hasBytesSpilled) memoryBytesSpilledQuantiles else Nil,
@@ -209,8 +224,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
}
}
- def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean)
- (taskData: TaskUIData): Seq[Node] = {
+ def taskRow(
+ hasInput: Boolean,
+ hasShuffleRead: Boolean,
+ hasShuffleWrite: Boolean,
+ hasBytesSpilled: Boolean)(taskData: TaskUIData): Seq[Node] = {
taskData match { case TaskUIData(info, metrics, errorMessage) =>
val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
else metrics.map(_.executorRunTime).getOrElse(1L)
@@ -219,6 +237,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
+ val maybeInput = metrics.flatMap(_.inputMetrics)
+ val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("")
+ val inputReadable = maybeInput
+ .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})")
+ .getOrElse("")
+
val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead)
val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("")
@@ -265,12 +289,17 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
{if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""}
-->
- {if (shuffleRead) {
+ {if (hasInput) {
+
+ {inputReadable}
+ |
+ }}
+ {if (hasShuffleRead) {
{shuffleReadReadable}
|
}}
- {if (shuffleWrite) {
+ {if (hasShuffleWrite) {
{writeTimeReadable}
|
@@ -278,7 +307,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
{shuffleWriteReadable}
}}
- {if (bytesSpilled) {
+ {if (hasBytesSpilled) {
{memoryBytesSpilledReadable}
|
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 30971f769682..e7ba81927ebe 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -43,6 +43,7 @@ private[ui] class StageTableBase(
Submitted |
Duration |
Tasks: Succeeded/Total |
+ Input |
Shuffle Read |
Shuffle Write |
}
@@ -123,6 +124,11 @@ private[ui] class StageTableBase(
case _ => ""
}
val totalTasks = s.numTasks
+ val inputSortable = listener.stageIdToInputBytes.getOrElse(s.stageId, 0L)
+ val inputRead = inputSortable match {
+ case 0 => ""
+ case b => Utils.bytesToString(b)
+ }
val shuffleReadSortable = listener.stageIdToShuffleRead.getOrElse(s.stageId, 0L)
val shuffleRead = shuffleReadSortable match {
case 0 => ""
@@ -150,6 +156,7 @@ private[ui] class StageTableBase(
{makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)}
|
+ {inputRead} |
{shuffleRead} |
{shuffleWrite} |
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 6245b4b8023c..26c9c9d6034e 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -26,7 +26,8 @@ import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.JsonAST._
-import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.executor.{DataReadMethod, InputMetrics, ShuffleReadMetrics,
+ ShuffleWriteMetrics, TaskMetrics}
import org.apache.spark.scheduler._
import org.apache.spark.storage._
import org.apache.spark._
@@ -213,6 +214,8 @@ private[spark] object JsonProtocol {
taskMetrics.shuffleReadMetrics.map(shuffleReadMetricsToJson).getOrElse(JNothing)
val shuffleWriteMetrics =
taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing)
+ val inputMetrics =
+ taskMetrics.inputMetrics.map(inputMetricsToJson).getOrElse(JNothing)
val updatedBlocks =
taskMetrics.updatedBlocks.map { blocks =>
JArray(blocks.toList.map { case (id, status) =>
@@ -230,6 +233,7 @@ private[spark] object JsonProtocol {
("Disk Bytes Spilled" -> taskMetrics.diskBytesSpilled) ~
("Shuffle Read Metrics" -> shuffleReadMetrics) ~
("Shuffle Write Metrics" -> shuffleWriteMetrics) ~
+ ("Input Metrics" -> inputMetrics) ~
("Updated Blocks" -> updatedBlocks)
}
@@ -247,6 +251,11 @@ private[spark] object JsonProtocol {
("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime)
}
+ def inputMetricsToJson(inputMetrics: InputMetrics): JValue = {
+ ("Data Read Method" -> inputMetrics.readMethod.toString) ~
+ ("Bytes Read" -> inputMetrics.bytesRead)
+ }
+
def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
val reason = Utils.getFormattedClassName(taskEndReason)
val json = taskEndReason match {
@@ -528,6 +537,8 @@ private[spark] object JsonProtocol {
Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)
metrics.shuffleWriteMetrics =
Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
+ metrics.inputMetrics =
+ Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson)
metrics.updatedBlocks =
Utils.jsonOption(json \ "Updated Blocks").map { value =>
value.extract[List[JValue]].map { block =>
@@ -557,6 +568,13 @@ private[spark] object JsonProtocol {
metrics
}
+ def inputMetricsFromJson(json: JValue): InputMetrics = {
+ val metrics = new InputMetrics(
+ DataReadMethod.withName((json \ "Data Read Method").extract[String]))
+ metrics.bytesRead = (json \ "Bytes Read").extract[Long]
+ metrics
+ }
+
def taskEndReasonFromJson(json: JValue): TaskEndReason = {
val success = Utils.getFormattedClassName(Success)
val resubmitted = Utils.getFormattedClassName(Resubmitted)
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 4f178db40f63..7f5d0b061e8b 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.mock.EasyMockSugar
+import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
@@ -66,7 +67,8 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
test("get cached rdd") {
expecting {
- blockManager.get(RDDBlockId(0, 0)).andReturn(Some(ArrayBuffer(5, 6, 7).iterator))
+ val result = new BlockResult(ArrayBuffer(5, 6, 7).iterator, DataReadMethod.Memory, 12)
+ blockManager.get(RDDBlockId(0, 0)).andReturn(Some(result))
}
whenExecuting(blockManager) {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 6df0a080961b..71f48e295ecc 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -251,6 +251,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
taskInfoMetrics.foreach { case (taskInfo, taskMetrics) =>
taskMetrics.resultSize should be > (0l)
if (stageInfo.rddInfos.exists(info => info.name == d2.name || info.name == d3.name)) {
+ taskMetrics.inputMetrics should not be ('defined)
taskMetrics.shuffleWriteMetrics should be ('defined)
taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l)
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index d7dbe5164b7f..23cb6905bfde 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -31,11 +31,13 @@ import org.scalatest.concurrent.Timeouts._
import org.scalatest.Matchers
import org.scalatest.time.SpanSugar._
-import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
+import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.executor.DataReadMethod
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}
+import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions
import scala.language.postfixOps
@@ -415,6 +417,39 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
}
+ test("correct BlockResult returned from get() calls") {
+ store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr,
+ mapOutputTracker)
+ val list1 = List(new Array[Byte](200), new Array[Byte](200))
+ val list1ForSizeEstimate = new ArrayBuffer[Any]
+ list1ForSizeEstimate ++= list1.iterator
+ val list1SizeEstimate = SizeEstimator.estimate(list1ForSizeEstimate)
+ val list2 = List(new Array[Byte](50), new Array[Byte](100), new Array[Byte](150))
+ val list2ForSizeEstimate = new ArrayBuffer[Any]
+ list2ForSizeEstimate ++= list2.iterator
+ val list2SizeEstimate = SizeEstimator.estimate(list2ForSizeEstimate)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.put("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.put("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+ val list1Get = store.get("list1")
+ assert(list1Get.isDefined, "list1 expected to be in store")
+ assert(list1Get.get.data.size === 2)
+ assert(list1Get.get.inputMetrics.bytesRead === list1SizeEstimate)
+ assert(list1Get.get.inputMetrics.readMethod === DataReadMethod.Memory)
+ val list2MemoryGet = store.get("list2memory")
+ assert(list2MemoryGet.isDefined, "list2memory expected to be in store")
+ assert(list2MemoryGet.get.data.size === 3)
+ assert(list2MemoryGet.get.inputMetrics.bytesRead === list2SizeEstimate)
+ assert(list2MemoryGet.get.inputMetrics.readMethod === DataReadMethod.Memory)
+ val list2DiskGet = store.get("list2disk")
+ assert(list2DiskGet.isDefined, "list2memory expected to be in store")
+ assert(list2DiskGet.get.data.size === 3)
+ System.out.println(list2DiskGet)
+ // We don't know the exact size of the data on disk, but it should certainly be > 0.
+ assert(list2DiskGet.get.inputMetrics.bytesRead > 0)
+ assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk)
+ }
+
test("in-memory LRU storage") {
store = new BlockManager("", actorSystem, master, serializer, 1200, conf,
securityMgr, mapOutputTracker)
@@ -630,18 +665,18 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.get("list2").isDefined, "list2 was not in store")
- assert(store.get("list2").get.size == 2)
+ assert(store.get("list2").get.data.size === 2)
assert(store.get("list3").isDefined, "list3 was not in store")
- assert(store.get("list3").get.size == 2)
+ assert(store.get("list3").get.data.size === 2)
assert(store.get("list1") === None, "list1 was in store")
assert(store.get("list2").isDefined, "list2 was not in store")
- assert(store.get("list2").get.size == 2)
+ assert(store.get("list2").get.data.size === 2)
// At this point list2 was gotten last, so LRU will getSingle rid of list3
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.get("list1").isDefined, "list1 was not in store")
- assert(store.get("list1").get.size == 2)
+ assert(store.get("list1").get.data.size === 2)
assert(store.get("list2").isDefined, "list2 was not in store")
- assert(store.get("list2").get.size == 2)
+ assert(store.get("list2").get.data.size === 2)
assert(store.get("list3") === None, "list1 was in store")
}
@@ -656,28 +691,31 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+ val listForSizeEstimate = new ArrayBuffer[Any]
+ listForSizeEstimate ++= list1.iterator
+ val listSize = SizeEstimator.estimate(listForSizeEstimate)
// At this point LRU should not kick in because list3 is only on disk
- assert(store.get("list1").isDefined, "list2 was not in store")
- assert(store.get("list1").get.size === 2)
- assert(store.get("list2").isDefined, "list3 was not in store")
- assert(store.get("list2").get.size === 2)
- assert(store.get("list3").isDefined, "list1 was not in store")
- assert(store.get("list3").get.size === 2)
- assert(store.get("list1").isDefined, "list2 was not in store")
- assert(store.get("list1").get.size === 2)
- assert(store.get("list2").isDefined, "list3 was not in store")
- assert(store.get("list2").get.size === 2)
- assert(store.get("list3").isDefined, "list1 was not in store")
- assert(store.get("list3").get.size === 2)
+ assert(store.get("list1").isDefined, "list1 was not in store")
+ assert(store.get("list1").get.data.size === 2)
+ assert(store.get("list2").isDefined, "list2 was not in store")
+ assert(store.get("list2").get.data.size === 2)
+ assert(store.get("list3").isDefined, "list3 was not in store")
+ assert(store.get("list3").get.data.size === 2)
+ assert(store.get("list1").isDefined, "list1 was not in store")
+ assert(store.get("list1").get.data.size === 2)
+ assert(store.get("list2").isDefined, "list2 was not in store")
+ assert(store.get("list2").get.data.size === 2)
+ assert(store.get("list3").isDefined, "list3 was not in store")
+ assert(store.get("list3").get.data.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out
store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
assert(store.get("list1") === None, "list1 was in store")
- assert(store.get("list2").isDefined, "list3 was not in store")
- assert(store.get("list2").get.size === 2)
- assert(store.get("list3").isDefined, "list1 was not in store")
- assert(store.get("list3").get.size === 2)
+ assert(store.get("list2").isDefined, "list2 was not in store")
+ assert(store.get("list2").get.data.size === 2)
+ assert(store.get("list3").isDefined, "list3 was not in store")
+ assert(store.get("list3").get.data.size === 2)
assert(store.get("list4").isDefined, "list4 was not in store")
- assert(store.get("list4").get.size === 2)
+ assert(store.get("list4").get.data.size === 2)
}
test("negative byte values in ByteBufferInputStream") {
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 6c4987045587..316e14100e40 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -39,7 +39,11 @@ class JsonProtocolSuite extends FunSuite {
val taskGettingResult =
SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true))
val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
- makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800))
+ makeTaskInfo(123L, 234, 67, 345L, false),
+ makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = false))
+ val taskEndWithHadoopInput = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
+ makeTaskInfo(123L, 234, 67, 345L, false),
+ makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true))
val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties)
val jobEnd = SparkListenerJobEnd(20, JobSucceeded)
val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
@@ -61,6 +65,7 @@ class JsonProtocolSuite extends FunSuite {
testEvent(taskStart, taskStartJsonString)
testEvent(taskGettingResult, taskGettingResultJsonString)
testEvent(taskEnd, taskEndJsonString)
+ testEvent(taskEndWithHadoopInput, taskEndWithHadoopInputJsonString)
testEvent(jobStart, jobStartJsonString)
testEvent(jobEnd, jobEndJsonString)
testEvent(environmentUpdate, environmentUpdateJsonString)
@@ -75,7 +80,7 @@ class JsonProtocolSuite extends FunSuite {
testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
- testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8))
+ testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false))
testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000))
// StorageLevel
@@ -118,7 +123,7 @@ class JsonProtocolSuite extends FunSuite {
testBlockId(StreamBlockId(1, 2L))
}
- test("Backward compatibility") {
+ test("StageInfo.details backward compatibility") {
// StageInfo.details was added after 1.0.0.
val info = makeStageInfo(1, 2, 3, 4L, 5L)
assert(info.details.nonEmpty)
@@ -129,6 +134,16 @@ class JsonProtocolSuite extends FunSuite {
assert("" === newInfo.details)
}
+ test("InputMetrics backward compatibility") {
+ // InputMetrics were added after 1.0.1.
+ val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true)
+ assert(metrics.inputMetrics.nonEmpty)
+ val newJson = JsonProtocol.taskMetricsToJson(metrics)
+ val oldJson = newJson.removeField { case (field, _) => field == "Input Metrics" }
+ val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
+ assert(newMetrics.inputMetrics.isEmpty)
+ }
+
/** -------------------------- *
| Helper test running methods |
@@ -294,6 +309,8 @@ class JsonProtocolSuite extends FunSuite {
metrics1.shuffleReadMetrics, metrics2.shuffleReadMetrics, assertShuffleReadEquals)
assertOptionEquals(
metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics, assertShuffleWriteEquals)
+ assertOptionEquals(
+ metrics1.inputMetrics, metrics2.inputMetrics, assertInputMetricsEquals)
assertOptionEquals(metrics1.updatedBlocks, metrics2.updatedBlocks, assertBlocksEquals)
}
@@ -311,6 +328,11 @@ class JsonProtocolSuite extends FunSuite {
assert(metrics1.shuffleWriteTime === metrics2.shuffleWriteTime)
}
+ private def assertEquals(metrics1: InputMetrics, metrics2: InputMetrics) {
+ assert(metrics1.readMethod === metrics2.readMethod)
+ assert(metrics1.bytesRead === metrics2.bytesRead)
+ }
+
private def assertEquals(bm1: BlockManagerId, bm2: BlockManagerId) {
assert(bm1.executorId === bm2.executorId)
assert(bm1.host === bm2.host)
@@ -403,6 +425,10 @@ class JsonProtocolSuite extends FunSuite {
assertEquals(w1, w2)
}
+ private def assertInputMetricsEquals(i1: InputMetrics, i2: InputMetrics) {
+ assertEquals(i1, i2)
+ }
+
private def assertTaskMetricsEquals(t1: TaskMetrics, t2: TaskMetrics) {
assertEquals(t1, t2)
}
@@ -460,9 +486,19 @@ class JsonProtocolSuite extends FunSuite {
new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative)
}
- private def makeTaskMetrics(a: Long, b: Long, c: Long, d: Long, e: Int, f: Int) = {
+ /**
+ * Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is
+ * set to true) or read data from a shuffle otherwise.
+ */
+ private def makeTaskMetrics(
+ a: Long,
+ b: Long,
+ c: Long,
+ d: Long,
+ e: Int,
+ f: Int,
+ hasHadoopInput: Boolean) = {
val t = new TaskMetrics
- val sr = new ShuffleReadMetrics
val sw = new ShuffleWriteMetrics
t.hostname = "localhost"
t.executorDeserializeTime = a
@@ -471,15 +507,23 @@ class JsonProtocolSuite extends FunSuite {
t.jvmGCTime = d
t.resultSerializationTime = a + b
t.memoryBytesSpilled = a + c
- sr.shuffleFinishTime = b + c
- sr.totalBlocksFetched = e + f
- sr.remoteBytesRead = b + d
- sr.localBlocksFetched = e
- sr.fetchWaitTime = a + d
- sr.remoteBlocksFetched = f
+
+ if (hasHadoopInput) {
+ val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+ inputMetrics.bytesRead = d + e + f
+ t.inputMetrics = Some(inputMetrics)
+ } else {
+ val sr = new ShuffleReadMetrics
+ sr.shuffleFinishTime = b + c
+ sr.totalBlocksFetched = e + f
+ sr.remoteBytesRead = b + d
+ sr.localBlocksFetched = e
+ sr.fetchWaitTime = a + d
+ sr.remoteBlocksFetched = f
+ t.shuffleReadMetrics = Some(sr)
+ }
sw.shuffleBytesWritten = a + b + c
sw.shuffleWriteTime = b + c + d
- t.shuffleReadMetrics = Some(sr)
t.shuffleWriteMetrics = Some(sw)
// Make at most 6 blocks
t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i =>
@@ -552,8 +596,9 @@ class JsonProtocolSuite extends FunSuite {
| },
| "Shuffle Write Metrics":{
| "Shuffle Bytes Written":1200,
- | "Shuffle Write Time":1500},
- | "Updated Blocks":[
+ | "Shuffle Write Time":1500
+ | },
+ | "Updated Blocks":[
| {"Block ID":"rdd_0_0",
| "Status":{
| "Storage Level":{
@@ -568,6 +613,35 @@ class JsonProtocolSuite extends FunSuite {
|}
""".stripMargin
+ private val taskEndWithHadoopInputJsonString =
+ """
+ |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask",
+ |"Task End Reason":{"Reason":"Success"},
+ |"Task Info":{
+ | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
+ | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
+ | "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0
+ |},
+ |"Task Metrics":{
+ | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
+ | "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700,
+ | "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
+ | "Shuffle Write Metrics":{"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},
+ | "Input Metrics":{"Data Read Method":"Hadoop","Bytes Read":2100},
+ | "Updated Blocks":[
+ | {"Block ID":"rdd_0_0",
+ | "Status":{
+ | "Storage Level":{
+ | "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false,
+ | "Replication":2
+ | },
+ | "Memory Size":0,"Tachyon Size":0,"Disk Size":0
+ | }
+ | }
+ | ]}
+ |}
+ """
+
private val jobStartJsonString =
"""
{"Event":"SparkListenerJobStart","Job ID":10,"Stage IDs":[1,2,3,4],"Properties":