Skip to content

Commit 7b71a0e

Browse files
committed
[SPARK-1683] Track task read metrics.
This commit adds a new metric in TaskMetrics to record the input data size and displays this information in the UI. An earlier version of this commit also added the read time, which can be useful for diagnosing straggler problems, but unfortunately that change introduced a significant performance regression for jobs that don't do much computation. In order to track read time, we'll need to do sampling. The screenshots below show the UI with the new "Input" field, which I added to the stage summary page, the executor summary page, and the per-stage page. ![image](https://cloud.githubusercontent.com/assets/1108612/3167930/2627f92a-eb77-11e3-861c-98ea5bb7a1a2.png) ![image](https://cloud.githubusercontent.com/assets/1108612/3167936/475a889c-eb77-11e3-9706-f11c48751f17.png) ![image](https://cloud.githubusercontent.com/assets/1108612/3167948/80ebcf12-eb77-11e3-87ed-349fce6a770c.png) Author: Kay Ousterhout <[email protected]> Closes #962 from kayousterhout/read_metrics and squashes the following commits: f13b67d [Kay Ousterhout] Correctly format input bytes on executor page 8b70cde [Kay Ousterhout] Added comment about potential inaccuracy of bytesRead d1016e8 [Kay Ousterhout] Udated SparkListenerSuite test 8461492 [Kay Ousterhout] Miniscule style fix ae04d99 [Kay Ousterhout] Remove input metrics for parallel collections 719f19d [Kay Ousterhout] Style fixes bb6ec62 [Kay Ousterhout] Small fixes 869ac7b [Kay Ousterhout] Updated Json tests 44a0301 [Kay Ousterhout] Fixed accidentally added line 4bd0568 [Kay Ousterhout] Added input source, renamed Hdfs to Hadoop. f27e535 [Kay Ousterhout] Updates based on review comments and to fix rebase bf41029 [Kay Ousterhout] Updated Json tests to pass 0fc33e0 [Kay Ousterhout] Added explicit backward compatibility test 4e52925 [Kay Ousterhout] Added Json output and associated tests. 365400b [Kay Ousterhout] [SPARK-1683] Track task read metrics.
1 parent cdf613f commit 7b71a0e

20 files changed

+349
-86
lines changed

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark
1919

2020
import scala.collection.mutable.{ArrayBuffer, HashSet}
2121

22+
import org.apache.spark.executor.InputMetrics
2223
import org.apache.spark.rdd.RDD
2324
import org.apache.spark.storage._
2425

@@ -41,9 +42,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
4142
val key = RDDBlockId(rdd.id, partition.index)
4243
logDebug(s"Looking for partition $key")
4344
blockManager.get(key) match {
44-
case Some(values) =>
45+
case Some(blockResult) =>
4546
// Partition is already materialized, so just return its values
46-
new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
47+
context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
48+
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
4749

4850
case None =>
4951
// Acquire a lock for loading this partition
@@ -110,7 +112,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
110112
logInfo(s"Whoever was loading $id failed; we'll try it ourselves")
111113
loading.add(id)
112114
}
113-
values.map(_.asInstanceOf[Iterator[T]])
115+
values.map(_.data.asInstanceOf[Iterator[T]])
114116
}
115117
}
116118
}
@@ -132,7 +134,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
132134
* exceptions that can be avoided. */
133135
updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
134136
blockManager.get(key) match {
135-
case Some(v) => v.asInstanceOf[Iterator[T]]
137+
case Some(v) => v.data.asInstanceOf[Iterator[T]]
136138
case None =>
137139
logInfo(s"Failure to store $key")
138140
throw new BlockException(key, s"Block manager failed to return cached value for $key!")

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ class TaskMetrics extends Serializable {
6666
*/
6767
var diskBytesSpilled: Long = _
6868

69+
/**
70+
* If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
71+
* are stored here.
72+
*/
73+
var inputMetrics: Option[InputMetrics] = None
74+
6975
/**
7076
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
7177
*/
@@ -87,6 +93,29 @@ private[spark] object TaskMetrics {
8793
def empty: TaskMetrics = new TaskMetrics
8894
}
8995

96+
/**
97+
* :: DeveloperApi ::
98+
* Method by which input data was read. Network means that the data was read over the network
99+
* from a remote block manager (which may have stored the data on-disk or in-memory).
100+
*/
101+
@DeveloperApi
102+
object DataReadMethod extends Enumeration with Serializable {
103+
type DataReadMethod = Value
104+
val Memory, Disk, Hadoop, Network = Value
105+
}
106+
107+
/**
108+
* :: DeveloperApi ::
109+
* Metrics about reading input data.
110+
*/
111+
@DeveloperApi
112+
case class InputMetrics(readMethod: DataReadMethod.Value) {
113+
/**
114+
* Total bytes read.
115+
*/
116+
var bytesRead: Long = 0L
117+
}
118+
90119

91120
/**
92121
* :: DeveloperApi ::

core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
4646
val blockManager = SparkEnv.get.blockManager
4747
val blockId = split.asInstanceOf[BlockRDDPartition].blockId
4848
blockManager.get(blockId) match {
49-
case Some(block) => block.asInstanceOf[Iterator[T]]
49+
case Some(block) => block.data.asInstanceOf[Iterator[T]]
5050
case None =>
5151
throw new Exception("Could not compute split, block " + blockId + " not found")
5252
}

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.apache.spark._
3838
import org.apache.spark.annotation.DeveloperApi
3939
import org.apache.spark.broadcast.Broadcast
4040
import org.apache.spark.deploy.SparkHadoopUtil
41+
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
4142
import org.apache.spark.util.NextIterator
4243

4344
/**
@@ -196,6 +197,20 @@ class HadoopRDD[K, V](
196197
context.addOnCompleteCallback{ () => closeIfNeeded() }
197198
val key: K = reader.createKey()
198199
val value: V = reader.createValue()
200+
201+
// Set the task input metrics.
202+
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
203+
try {
204+
/* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
205+
* always at record boundaries, so tasks may need to read into other splits to complete
206+
* a record. */
207+
inputMetrics.bytesRead = split.inputSplit.value.getLength()
208+
} catch {
209+
case e: java.io.IOException =>
210+
logWarning("Unable to get input size to set InputMetrics for task", e)
211+
}
212+
context.taskMetrics.inputMetrics = Some(inputMetrics)
213+
199214
override def getNext() = {
200215
try {
201216
finished = !reader.next(key, value)

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.Logging
3131
import org.apache.spark.Partition
3232
import org.apache.spark.SerializableWritable
3333
import org.apache.spark.{SparkContext, TaskContext}
34+
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
3435

3536
private[spark] class NewHadoopPartition(
3637
rddId: Int,
@@ -112,6 +113,18 @@ class NewHadoopRDD[K, V](
112113
split.serializableHadoopSplit.value, hadoopAttemptContext)
113114
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
114115

116+
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
117+
try {
118+
/* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
119+
* always at record boundaries, so tasks may need to read into other splits to complete
120+
* a record. */
121+
inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength()
122+
} catch {
123+
case e: Exception =>
124+
logWarning("Unable to get input split size in order to set task input bytes", e)
125+
}
126+
context.taskMetrics.inputMetrics = Some(inputMetrics)
127+
115128
// Register an on-task-completion callback to close the input stream.
116129
context.addOnCompleteCallback(() => close())
117130
var havePair = false

core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ import scala.collection.mutable.HashMap
2626

2727
import org.apache.spark._
2828
import org.apache.spark.annotation.DeveloperApi
29-
import org.apache.spark.executor.TaskMetrics
29+
import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
30+
import org.apache.spark.rdd.RDD
31+
import org.apache.spark.storage.StorageLevel
3032

3133
/**
3234
* :: DeveloperApi ::
@@ -160,7 +162,13 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
160162
" START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime +
161163
" EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskMetrics.hostname
162164
val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
163-
val readMetrics = taskMetrics.shuffleReadMetrics match {
165+
val inputMetrics = taskMetrics.inputMetrics match {
166+
case Some(metrics) =>
167+
" READ_METHOD=" + metrics.readMethod.toString +
168+
" INPUT_BYTES=" + metrics.bytesRead
169+
case None => ""
170+
}
171+
val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match {
164172
case Some(metrics) =>
165173
" SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime +
166174
" BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
@@ -174,7 +182,8 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
174182
case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
175183
case None => ""
176184
}
177-
stageLogInfo(stageId, status + info + executorRunTime + readMetrics + writeMetrics)
185+
stageLogInfo(stageId, status + info + executorRunTime + inputMetrics + shuffleReadMetrics +
186+
writeMetrics)
178187
}
179188

180189
/**

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import akka.actor.{ActorSystem, Cancellable, Props}
2929
import sun.nio.ch.DirectBuffer
3030

3131
import org.apache.spark._
32+
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
3233
import org.apache.spark.io.CompressionCodec
3334
import org.apache.spark.network._
3435
import org.apache.spark.serializer.Serializer
@@ -39,6 +40,15 @@ private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValu
3940
private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
4041
private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues
4142

43+
/* Class for returning a fetched block and associated metrics. */
44+
private[spark] class BlockResult(
45+
val data: Iterator[Any],
46+
readMethod: DataReadMethod.Value,
47+
bytes: Long) {
48+
val inputMetrics = new InputMetrics(readMethod)
49+
inputMetrics.bytesRead = bytes
50+
}
51+
4252
private[spark] class BlockManager(
4353
executorId: String,
4454
actorSystem: ActorSystem,
@@ -334,9 +344,9 @@ private[spark] class BlockManager(
334344
/**
335345
* Get block from local block manager.
336346
*/
337-
def getLocal(blockId: BlockId): Option[Iterator[Any]] = {
347+
def getLocal(blockId: BlockId): Option[BlockResult] = {
338348
logDebug(s"Getting local block $blockId")
339-
doGetLocal(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
349+
doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
340350
}
341351

342352
/**
@@ -355,11 +365,11 @@ private[spark] class BlockManager(
355365
blockId, s"Block $blockId not found on disk, though it should be")
356366
}
357367
} else {
358-
doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
368+
doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
359369
}
360370
}
361371

362-
private def doGetLocal(blockId: BlockId, asValues: Boolean): Option[Any] = {
372+
private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
363373
val info = blockInfo.get(blockId).orNull
364374
if (info != null) {
365375
info.synchronized {
@@ -386,14 +396,14 @@ private[spark] class BlockManager(
386396
// Look for the block in memory
387397
if (level.useMemory) {
388398
logDebug(s"Getting block $blockId from memory")
389-
val result = if (asValues) {
390-
memoryStore.getValues(blockId)
399+
val result = if (asBlockResult) {
400+
memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
391401
} else {
392402
memoryStore.getBytes(blockId)
393403
}
394404
result match {
395405
case Some(values) =>
396-
return Some(values)
406+
return result
397407
case None =>
398408
logDebug(s"Block $blockId not found in memory")
399409
}
@@ -405,10 +415,11 @@ private[spark] class BlockManager(
405415
if (tachyonStore.contains(blockId)) {
406416
tachyonStore.getBytes(blockId) match {
407417
case Some(bytes) =>
408-
if (!asValues) {
418+
if (!asBlockResult) {
409419
return Some(bytes)
410420
} else {
411-
return Some(dataDeserialize(blockId, bytes))
421+
return Some(new BlockResult(
422+
dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
412423
}
413424
case None =>
414425
logDebug(s"Block $blockId not found in tachyon")
@@ -429,14 +440,15 @@ private[spark] class BlockManager(
429440

430441
if (!level.useMemory) {
431442
// If the block shouldn't be stored in memory, we can just return it
432-
if (asValues) {
433-
return Some(dataDeserialize(blockId, bytes))
443+
if (asBlockResult) {
444+
return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,
445+
info.size))
434446
} else {
435447
return Some(bytes)
436448
}
437449
} else {
438450
// Otherwise, we also have to store something in the memory store
439-
if (!level.deserialized || !asValues) {
451+
if (!level.deserialized || !asBlockResult) {
440452
/* We'll store the bytes in memory if the block's storage level includes
441453
* "memory serialized", or if it should be cached as objects in memory
442454
* but we only requested its serialized bytes. */
@@ -445,7 +457,7 @@ private[spark] class BlockManager(
445457
memoryStore.putBytes(blockId, copyForMemory, level)
446458
bytes.rewind()
447459
}
448-
if (!asValues) {
460+
if (!asBlockResult) {
449461
return Some(bytes)
450462
} else {
451463
val values = dataDeserialize(blockId, bytes)
@@ -457,12 +469,12 @@ private[spark] class BlockManager(
457469
memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data
458470
match {
459471
case Left(values2) =>
460-
return Some(values2)
472+
return Some(new BlockResult(values2, DataReadMethod.Disk, info.size))
461473
case _ =>
462-
throw new SparkException("Memory store did not return an iterator")
474+
throw new SparkException("Memory store did not return back an iterator")
463475
}
464476
} else {
465-
return Some(values)
477+
return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
466478
}
467479
}
468480
}
@@ -477,29 +489,32 @@ private[spark] class BlockManager(
477489
/**
478490
* Get block from remote block managers.
479491
*/
480-
def getRemote(blockId: BlockId): Option[Iterator[Any]] = {
492+
def getRemote(blockId: BlockId): Option[BlockResult] = {
481493
logDebug(s"Getting remote block $blockId")
482-
doGetRemote(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
494+
doGetRemote(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
483495
}
484496

485497
/**
486498
* Get block from remote block managers as serialized bytes.
487499
*/
488500
def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
489501
logDebug(s"Getting remote block $blockId as bytes")
490-
doGetRemote(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
502+
doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
491503
}
492504

493-
private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
505+
private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
494506
require(blockId != null, "BlockId is null")
495507
val locations = Random.shuffle(master.getLocations(blockId))
496508
for (loc <- locations) {
497509
logDebug(s"Getting remote block $blockId from $loc")
498510
val data = BlockManagerWorker.syncGetBlock(
499511
GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
500512
if (data != null) {
501-
if (asValues) {
502-
return Some(dataDeserialize(blockId, data))
513+
if (asBlockResult) {
514+
return Some(new BlockResult(
515+
dataDeserialize(blockId, data),
516+
DataReadMethod.Network,
517+
data.limit()))
503518
} else {
504519
return Some(data)
505520
}
@@ -513,7 +528,7 @@ private[spark] class BlockManager(
513528
/**
514529
* Get a block from the block manager (either local or remote).
515530
*/
516-
def get(blockId: BlockId): Option[Iterator[Any]] = {
531+
def get(blockId: BlockId): Option[BlockResult] = {
517532
val local = getLocal(blockId)
518533
if (local.isDefined) {
519534
logInfo(s"Found block $blockId locally")
@@ -792,7 +807,7 @@ private[spark] class BlockManager(
792807
* Read a block consisting of a single object.
793808
*/
794809
def getSingle(blockId: BlockId): Option[Any] = {
795-
get(blockId).map(_.next())
810+
get(blockId).map(_.data.next())
796811
}
797812

798813
/**

core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ private[spark] object ThreadingTest {
7878
val startTime = System.currentTimeMillis()
7979
manager.get(blockId) match {
8080
case Some(retrievedBlock) =>
81-
assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList,
81+
assert(retrievedBlock.data.toList.asInstanceOf[List[Int]] == block.toList,
8282
"Block " + blockId + " did not match")
8383
println("Got block " + blockId + " in " +
8484
(System.currentTimeMillis - startTime) + " ms")

0 commit comments

Comments
 (0)