@@ -39,7 +39,11 @@ class JsonProtocolSuite extends FunSuite {
3939 val taskGettingResult =
4040 SparkListenerTaskGettingResult (makeTaskInfo(1000L , 2000 , 5 , 3000L , true ))
4141 val taskEnd = SparkListenerTaskEnd (1 , " ShuffleMapTask" , Success ,
42- makeTaskInfo(123L , 234 , 67 , 345L , false ), makeTaskMetrics(300L , 400L , 500L , 600L , 700 , 800 ))
42+ makeTaskInfo(123L , 234 , 67 , 345L , false ),
43+ makeTaskMetrics(300L , 400L , 500L , 600L , 700 , 800 , hasHdfsInput = false ))
44+ val taskEndWithHdfsInput = SparkListenerTaskEnd (1 , " ShuffleMapTask" , Success ,
45+ makeTaskInfo(123L , 234 , 345L ),
46+ makeTaskMetrics(300L , 400L , 500L , 600L , 700 , 800 , hasHdfsInput = true ))
4347 val jobStart = SparkListenerJobStart (10 , Seq [Int ](1 , 2 , 3 , 4 ), properties)
4448 val jobEnd = SparkListenerJobEnd (20 , JobSucceeded )
4549 val environmentUpdate = SparkListenerEnvironmentUpdate (Map [String , Seq [(String , String )]](
@@ -61,6 +65,7 @@ class JsonProtocolSuite extends FunSuite {
6165 testEvent(taskStart, taskStartJsonString)
6266 testEvent(taskGettingResult, taskGettingResultJsonString)
6367 testEvent(taskEnd, taskEndJsonString)
68+ testEvent(taskEndWithHdfsInput, taskEndWithHdfsInputJsonString)
6469 testEvent(jobStart, jobStartJsonString)
6570 testEvent(jobEnd, jobEndJsonString)
6671 testEvent(environmentUpdate, environmentUpdateJsonString)
@@ -74,8 +79,8 @@ class JsonProtocolSuite extends FunSuite {
7479 test(" Dependent Classes" ) {
7580 testRDDInfo(makeRddInfo(2 , 3 , 4 , 5L , 6L ))
7681 testStageInfo(makeStageInfo(10 , 20 , 30 , 40L , 50L ))
77- testTaskInfo(makeTaskInfo(999L , 888 , 55 , 777L , false ))
78- testTaskMetrics(makeTaskMetrics(33333L , 44444L , 55555L , 66666L , 7 , 8 ))
82+ testTaskInfo(makeTaskInfo(999L , 888 , 777L , false ))
83+ testTaskMetrics(makeTaskMetrics(33333L , 44444L , 55555L , 66666L , 7 , 8 , hasHdfsInput = false ))
7984 testBlockManagerId(BlockManagerId (" Hong" , " Kong" , 500 , 1000 ))
8085
8186 // StorageLevel
@@ -118,7 +123,7 @@ class JsonProtocolSuite extends FunSuite {
118123 testBlockId(StreamBlockId (1 , 2L ))
119124 }
120125
121- test(" Backward compatibility" ) {
126+ test(" backward compatibility" ) {
122127 // StageInfo.details was added after 1.0.0.
123128 val info = makeStageInfo(1 , 2 , 3 , 4L , 5L )
124129 assert(info.details.nonEmpty)
@@ -294,6 +299,8 @@ class JsonProtocolSuite extends FunSuite {
294299 metrics1.shuffleReadMetrics, metrics2.shuffleReadMetrics, assertShuffleReadEquals)
295300 assertOptionEquals(
296301 metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics, assertShuffleWriteEquals)
302+ assertOptionEquals(
303+ metrics1.inputMetrics, metrics2.inputMetrics, assertInputMetricsEquals)
297304 assertOptionEquals(metrics1.updatedBlocks, metrics2.updatedBlocks, assertBlocksEquals)
298305 }
299306
@@ -311,6 +318,11 @@ class JsonProtocolSuite extends FunSuite {
311318 assert(metrics1.shuffleWriteTime === metrics2.shuffleWriteTime)
312319 }
313320
321+ private def assertEquals (metrics1 : InputMetrics , metrics2 : InputMetrics ) {
322+ assert(metrics1.readMethod === metrics2.readMethod)
323+ assert(metrics1.bytesRead === metrics2.bytesRead)
324+ }
325+
314326 private def assertEquals (bm1 : BlockManagerId , bm2 : BlockManagerId ) {
315327 assert(bm1.executorId === bm2.executorId)
316328 assert(bm1.host === bm2.host)
@@ -403,6 +415,10 @@ class JsonProtocolSuite extends FunSuite {
403415 assertEquals(w1, w2)
404416 }
405417
418+ private def assertInputMetricsEquals (i1 : InputMetrics , i2 : InputMetrics ) {
419+ assertEquals(i1, i2)
420+ }
421+
406422 private def assertTaskMetricsEquals (t1 : TaskMetrics , t2 : TaskMetrics ) {
407423 assertEquals(t1, t2)
408424 }
@@ -460,9 +476,13 @@ class JsonProtocolSuite extends FunSuite {
460476 new TaskInfo (a, b, c, d, " executor" , " your kind sir" , TaskLocality .NODE_LOCAL , speculative)
461477 }
462478
463- private def makeTaskMetrics (a : Long , b : Long , c : Long , d : Long , e : Int , f : Int ) = {
479+ /**
480+ * Creates a TaskMetrics object describing a task that read data from HDFS (if hasHdfsInput is
481+ * set to true) or read data from a shuffle otherwise.
482+ */
483+ private def makeTaskMetrics (
484+ a : Long , b : Long , c : Long , d : Long , e : Int , f : Int , hasHdfsInput : Boolean ) = {
464485 val t = new TaskMetrics
465- val sr = new ShuffleReadMetrics
466486 val sw = new ShuffleWriteMetrics
467487 t.hostname = " localhost"
468488 t.executorDeserializeTime = a
@@ -471,15 +491,23 @@ class JsonProtocolSuite extends FunSuite {
471491 t.jvmGCTime = d
472492 t.resultSerializationTime = a + b
473493 t.memoryBytesSpilled = a + c
474- sr.shuffleFinishTime = b + c
475- sr.totalBlocksFetched = e + f
476- sr.remoteBytesRead = b + d
477- sr.localBlocksFetched = e
478- sr.fetchWaitTime = a + d
479- sr.remoteBlocksFetched = f
494+
495+ if (hasHdfsInput) {
496+ val inputMetrics = new InputMetrics (DataReadMethod .Hdfs )
497+ inputMetrics.bytesRead = d + e + f
498+ t.inputMetrics = Some (inputMetrics)
499+ } else {
500+ val sr = new ShuffleReadMetrics
501+ sr.shuffleFinishTime = b + c
502+ sr.totalBlocksFetched = e + f
503+ sr.remoteBytesRead = b + d
504+ sr.localBlocksFetched = e
505+ sr.fetchWaitTime = a + d
506+ sr.remoteBlocksFetched = f
507+ t.shuffleReadMetrics = Some (sr)
508+ }
480509 sw.shuffleBytesWritten = a + b + c
481510 sw.shuffleWriteTime = b + c + d
482- t.shuffleReadMetrics = Some (sr)
483511 t.shuffleWriteMetrics = Some (sw)
484512 // Make at most 6 blocks
485513 t.updatedBlocks = Some ((1 to (e % 5 + 1 )).map { i =>
@@ -568,6 +596,23 @@ class JsonProtocolSuite extends FunSuite {
568596 |}
569597 """ .stripMargin
570598
599+ private val taskEndWithHdfsInputJsonString =
600+ """
601+ {"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask",
602+ "Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":123,"Index":
603+ 234,"Launch Time":345,"Executor ID":"executor","Host":"your kind sir",
604+ "Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed":
605+ false,"Serialized Size":0},"Task Metrics":{"Host Name":"localhost",
606+ "Executor Deserialize Time":300,"Executor Run Time":400,"Result Size":500,
607+ "JVM GC Time":600,"Result Serialization Time":700,"Memory Bytes Spilled":
608+ 800,"Disk Bytes Spilled":0,"Shuffle Read Metrics":[],"Shuffle Write Metrics":
609+ {"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},"Input Metrics":
610+ {"Data Read Method":Hdfs,"Bytes Read":1500},"Updated Blocks":
611+ [{"Block ID":{"Type":"RDDBlockId","RDD ID":0,"Split Index":0},"Status":
612+ {"Storage Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,
613+ "Deserialized":false,"Replication":2},"Memory Size":0,"Disk Size":0,"Tachyon Size":0}}]}}
614+ """
615+
571616 private val jobStartJsonString =
572617 """
573618 {"Event":"SparkListenerJobStart","Job ID":10,"Stage IDs":[1,2,3,4],"Properties":
0 commit comments