Skip to content

Commit d1636dd

Browse files
committed
[SPARK-2297][UI] Make task attempt and speculation more explicit in UI.
New UI: ![screen shot 2014-06-26 at 1 43 52 pm](https://cloud.githubusercontent.com/assets/323388/3404643/82b9ddc6-fd73-11e3-96f9-f7592a7aee79.png) Author: Reynold Xin <[email protected]> Closes #1236 from rxin/ui-task-attempt and squashes the following commits: 3b645dd [Reynold Xin] Expose attemptId in Stage. c0474b1 [Reynold Xin] Beefed up unit test. c404bdd [Reynold Xin] Fix ReplayListenerSuite. f56be4b [Reynold Xin] Fixed JsonProtocolSuite. e29e0f7 [Reynold Xin] Minor update. 5e4354a [Reynold Xin] [SPARK-2297][UI] Make task attempt and speculation more explicit in UI.
1 parent bf578de commit d1636dd

File tree

8 files changed

+102
-49
lines changed

8 files changed

+102
-49
lines changed

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,11 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
7575
@DeveloperApi
7676
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
7777

78+
@DeveloperApi
7879
case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String)
7980
extends SparkListenerEvent
8081

82+
@DeveloperApi
8183
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
8284

8385
/** An event used in the listener to shutdown the listener daemon thread. */

core/src/main/scala/org/apache/spark/scheduler/Stage.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ private[spark] class Stage(
106106
id
107107
}
108108

109+
def attemptId: Int = nextAttemptId
110+
109111
val name = callSite.short
110112
val details = callSite.long
111113

core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ import org.apache.spark.annotation.DeveloperApi
2727
class TaskInfo(
2828
val taskId: Long,
2929
val index: Int,
30+
val attempt: Int,
3031
val launchTime: Long,
3132
val executorId: String,
3233
val host: String,
33-
val taskLocality: TaskLocality.TaskLocality) {
34+
val taskLocality: TaskLocality.TaskLocality,
35+
val speculative: Boolean) {
3436

3537
/**
3638
* The time when the task started remotely getting the result. Will not be set if the

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -335,17 +335,19 @@ private[spark] class TaskSetManager(
335335
/**
336336
* Dequeue a pending task for a given node and return its index and locality level.
337337
* Only search for tasks matching the given locality constraint.
338+
*
339+
* @return An option containing (task index within the task set, locality, is speculative?)
338340
*/
339341
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
340-
: Option[(Int, TaskLocality.Value)] =
342+
: Option[(Int, TaskLocality.Value, Boolean)] =
341343
{
342344
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
343-
return Some((index, TaskLocality.PROCESS_LOCAL))
345+
return Some((index, TaskLocality.PROCESS_LOCAL, false))
344346
}
345347

346348
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
347349
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
348-
return Some((index, TaskLocality.NODE_LOCAL))
350+
return Some((index, TaskLocality.NODE_LOCAL, false))
349351
}
350352
}
351353

@@ -354,23 +356,25 @@ private[spark] class TaskSetManager(
354356
rack <- sched.getRackForHost(host)
355357
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
356358
} {
357-
return Some((index, TaskLocality.RACK_LOCAL))
359+
return Some((index, TaskLocality.RACK_LOCAL, false))
358360
}
359361
}
360362

361363
// Look for no-pref tasks after rack-local tasks since they can run anywhere.
362364
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
363-
return Some((index, TaskLocality.PROCESS_LOCAL))
365+
return Some((index, TaskLocality.PROCESS_LOCAL, false))
364366
}
365367

366368
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
367369
for (index <- findTaskFromList(execId, allPendingTasks)) {
368-
return Some((index, TaskLocality.ANY))
370+
return Some((index, TaskLocality.ANY, false))
369371
}
370372
}
371373

372374
// Finally, if all else has failed, find a speculative task
373-
findSpeculativeTask(execId, host, locality)
375+
findSpeculativeTask(execId, host, locality).map { case (taskIndex, allowedLocality) =>
376+
(taskIndex, allowedLocality, true)
377+
}
374378
}
375379

376380
/**
@@ -391,7 +395,7 @@ private[spark] class TaskSetManager(
391395
}
392396

393397
findTask(execId, host, allowedLocality) match {
394-
case Some((index, taskLocality)) => {
398+
case Some((index, taskLocality, speculative)) => {
395399
// Found a task; do some bookkeeping and return a task description
396400
val task = tasks(index)
397401
val taskId = sched.newTaskId()
@@ -400,7 +404,9 @@ private[spark] class TaskSetManager(
400404
taskSet.id, index, taskId, execId, host, taskLocality))
401405
// Do various bookkeeping
402406
copiesRunning(index) += 1
403-
val info = new TaskInfo(taskId, index, curTime, execId, host, taskLocality)
407+
val attemptNum = taskAttempts(index).size
408+
val info = new TaskInfo(
409+
taskId, index, attemptNum + 1, curTime, execId, host, taskLocality, speculative)
404410
taskInfos(taskId) = info
405411
taskAttempts(index) = info :: taskAttempts(index)
406412
// Update our locality level for delay scheduling

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
9595
</div>
9696
// scalastyle:on
9797
val taskHeaders: Seq[String] =
98-
Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++
99-
Seq("Duration", "GC Time", "Result Ser Time") ++
98+
Seq(
99+
"Index", "ID", "Attempt", "Status", "Locality Level", "Executor",
100+
"Launch Time", "Duration", "GC Time") ++
100101
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
101102
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
102103
{if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++
@@ -245,6 +246,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
245246
<tr>
246247
<td>{info.index}</td>
247248
<td>{info.taskId}</td>
249+
<td sorttable_customkey={info.attempt.toString}>{
250+
if (info.speculative) s"${info.attempt} (speculative)" else info.attempt.toString
251+
}</td>
248252
<td>{info.status}</td>
249253
<td>{info.taskLocality}</td>
250254
<td>{info.host}</td>
@@ -255,9 +259,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
255259
<td sorttable_customkey={gcTime.toString}>
256260
{if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
257261
</td>
262+
<!--
263+
TODO: Add this back after we add support to hide certain columns.
258264
<td sorttable_customkey={serializationTime.toString}>
259265
{if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""}
260266
</td>
267+
-->
261268
{if (shuffleRead) {
262269
<td sorttable_customkey={shuffleReadSortable}>
263270
{shuffleReadReadable}

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ import org.apache.spark.storage._
3232
import org.apache.spark._
3333

3434
private[spark] object JsonProtocol {
35+
// TODO: Remove this file and put JSON serialization into each individual class.
36+
3537
private implicit val format = DefaultFormats
3638

3739
/** ------------------------------------------------- *
@@ -194,10 +196,12 @@ private[spark] object JsonProtocol {
194196
def taskInfoToJson(taskInfo: TaskInfo): JValue = {
195197
("Task ID" -> taskInfo.taskId) ~
196198
("Index" -> taskInfo.index) ~
199+
("Attempt" -> taskInfo.attempt) ~
197200
("Launch Time" -> taskInfo.launchTime) ~
198201
("Executor ID" -> taskInfo.executorId) ~
199202
("Host" -> taskInfo.host) ~
200203
("Locality" -> taskInfo.taskLocality.toString) ~
204+
("Speculative" -> taskInfo.speculative) ~
201205
("Getting Result Time" -> taskInfo.gettingResultTime) ~
202206
("Finish Time" -> taskInfo.finishTime) ~
203207
("Failed" -> taskInfo.failed) ~
@@ -487,16 +491,19 @@ private[spark] object JsonProtocol {
487491
def taskInfoFromJson(json: JValue): TaskInfo = {
488492
val taskId = (json \ "Task ID").extract[Long]
489493
val index = (json \ "Index").extract[Int]
494+
val attempt = (json \ "Attempt").extractOpt[Int].getOrElse(1)
490495
val launchTime = (json \ "Launch Time").extract[Long]
491496
val executorId = (json \ "Executor ID").extract[String]
492497
val host = (json \ "Host").extract[String]
493498
val taskLocality = TaskLocality.withName((json \ "Locality").extract[String])
499+
val speculative = (json \ "Speculative").extractOpt[Boolean].getOrElse(false)
494500
val gettingResultTime = (json \ "Getting Result Time").extract[Long]
495501
val finishTime = (json \ "Finish Time").extract[Long]
496502
val failed = (json \ "Failed").extract[Boolean]
497503
val serializedSize = (json \ "Serialized Size").extract[Int]
498504

499-
val taskInfo = new TaskInfo(taskId, index, launchTime, executorId, host, taskLocality)
505+
val taskInfo =
506+
new TaskInfo(taskId, index, attempt, launchTime, executorId, host, taskLocality, speculative)
500507
taskInfo.gettingResultTime = gettingResultTime
501508
taskInfo.finishTime = finishTime
502509
taskInfo.failed = failed

core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
6666
// finish this task, should get updated shuffleRead
6767
shuffleReadMetrics.remoteBytesRead = 1000
6868
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
69-
var taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
69+
var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
7070
taskInfo.finishTime = 1
7171
var task = new ShuffleMapTask(0, null, null, 0, null)
7272
val taskType = Utils.getFormattedClassName(task)
@@ -75,7 +75,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
7575
.shuffleRead == 1000)
7676

7777
// finish a task with unknown executor-id, nothing should happen
78-
taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL)
78+
taskInfo =
79+
new TaskInfo(1234L, 0, 1, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL, true)
7980
taskInfo.finishTime = 1
8081
task = new ShuffleMapTask(0, null, null, 0, null)
8182
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
@@ -84,7 +85,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
8485
// finish this task, should get updated duration
8586
shuffleReadMetrics.remoteBytesRead = 1000
8687
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
87-
taskInfo = new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
88+
taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
8889
taskInfo.finishTime = 1
8990
task = new ShuffleMapTask(0, null, null, 0, null)
9091
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
@@ -94,7 +95,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
9495
// finish this task, should get updated duration
9596
shuffleReadMetrics.remoteBytesRead = 1000
9697
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
97-
taskInfo = new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL)
98+
taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL, false)
9899
taskInfo.finishTime = 1
99100
task = new ShuffleMapTask(0, null, null, 0, null)
100101
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
@@ -106,7 +107,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
106107
val conf = new SparkConf()
107108
val listener = new JobProgressListener(conf)
108109
val metrics = new TaskMetrics()
109-
val taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
110+
val taskInfo = new TaskInfo(1234L, 0, 3, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
110111
taskInfo.finishTime = 1
111112
val task = new ShuffleMapTask(0, null, null, 0, null)
112113
val taskType = Utils.getFormattedClassName(task)

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 57 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,11 @@ class JsonProtocolSuite extends FunSuite {
3535
val stageSubmitted =
3636
SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties)
3737
val stageCompleted = SparkListenerStageCompleted(makeStageInfo(101, 201, 301, 401L, 501L))
38-
val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 444L))
39-
val taskGettingResult = SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 3000L))
38+
val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 1, 444L, false))
39+
val taskGettingResult =
40+
SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true))
4041
val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
41-
makeTaskInfo(123L, 234, 345L), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800))
42+
makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800))
4243
val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties)
4344
val jobEnd = SparkListenerJobEnd(20, JobSucceeded)
4445
val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
@@ -73,7 +74,7 @@ class JsonProtocolSuite extends FunSuite {
7374
test("Dependent Classes") {
7475
testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
7576
testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
76-
testTaskInfo(makeTaskInfo(999L, 888, 777L))
77+
testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
7778
testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8))
7879
testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000))
7980

@@ -269,10 +270,12 @@ class JsonProtocolSuite extends FunSuite {
269270
private def assertEquals(info1: TaskInfo, info2: TaskInfo) {
270271
assert(info1.taskId === info2.taskId)
271272
assert(info1.index === info2.index)
273+
assert(info1.attempt === info2.attempt)
272274
assert(info1.launchTime === info2.launchTime)
273275
assert(info1.executorId === info2.executorId)
274276
assert(info1.host === info2.host)
275277
assert(info1.taskLocality === info2.taskLocality)
278+
assert(info1.speculative === info2.speculative)
276279
assert(info1.gettingResultTime === info2.gettingResultTime)
277280
assert(info1.finishTime === info2.finishTime)
278281
assert(info1.failed === info2.failed)
@@ -453,8 +456,8 @@ class JsonProtocolSuite extends FunSuite {
453456
new StageInfo(a, "greetings", b, rddInfos, "details")
454457
}
455458

456-
private def makeTaskInfo(a: Long, b: Int, c: Long) = {
457-
new TaskInfo(a, b, c, "executor", "your kind sir", TaskLocality.NODE_LOCAL)
459+
private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: Boolean) = {
460+
new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative)
458461
}
459462

460463
private def makeTaskMetrics(a: Long, b: Long, c: Long, d: Long, e: Int, f: Int) = {
@@ -510,37 +513,60 @@ class JsonProtocolSuite extends FunSuite {
510513

511514
private val taskStartJsonString =
512515
"""
513-
{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222,
514-
"Index":333,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir",
515-
"Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed":false,
516-
"Serialized Size":0}}
517-
"""
516+
|{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222,
517+
|"Index":333,"Attempt":1,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir",
518+
|"Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,
519+
|"Failed":false,"Serialized Size":0}}
520+
""".stripMargin
518521

519522
private val taskGettingResultJsonString =
520523
"""
521-
{"Event":"SparkListenerTaskGettingResult","Task Info":{"Task ID":1000,"Index":
522-
2000,"Launch Time":3000,"Executor ID":"executor","Host":"your kind sir",
523-
"Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed":false,
524-
"Serialized Size":0}}
525-
"""
524+
|{"Event":"SparkListenerTaskGettingResult","Task Info":
525+
| {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor ID":"executor",
526+
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0,
527+
| "Finish Time":0,"Failed":false,"Serialized Size":0
528+
| }
529+
|}
530+
""".stripMargin
526531

527532
private val taskEndJsonString =
528533
"""
529-
{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask",
530-
"Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":123,"Index":
531-
234,"Launch Time":345,"Executor ID":"executor","Host":"your kind sir",
532-
"Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed":
533-
false,"Serialized Size":0},"Task Metrics":{"Host Name":"localhost",
534-
"Executor Deserialize Time":300,"Executor Run Time":400,"Result Size":500,
535-
"JVM GC Time":600,"Result Serialization Time":700,"Memory Bytes Spilled":
536-
800,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Shuffle Finish Time":
537-
900,"Total Blocks Fetched":1500,"Remote Blocks Fetched":800,"Local Blocks Fetched":
538-
700,"Fetch Wait Time":900,"Remote Bytes Read":1000},"Shuffle Write Metrics":
539-
{"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},"Updated Blocks":
540-
[{"Block ID":"rdd_0_0","Status":{"Storage Level":{"Use Disk":true,"Use Memory":true,
541-
"Use Tachyon":false,"Deserialized":false,"Replication":2},"Memory Size":0,"Tachyon Size":0,
542-
"Disk Size":0}}]}}
543-
"""
534+
|{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask",
535+
|"Task End Reason":{"Reason":"Success"},
536+
|"Task Info":{
537+
| "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
538+
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
539+
| "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0
540+
|},
541+
|"Task Metrics":{
542+
| "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
543+
| "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700,
544+
| "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
545+
| "Shuffle Read Metrics":{
546+
| "Shuffle Finish Time":900,
547+
| "Total Blocks Fetched":1500,
548+
| "Remote Blocks Fetched":800,
549+
| "Local Blocks Fetched":700,
550+
| "Fetch Wait Time":900,
551+
| "Remote Bytes Read":1000
552+
| },
553+
| "Shuffle Write Metrics":{
554+
| "Shuffle Bytes Written":1200,
555+
| "Shuffle Write Time":1500},
556+
| "Updated Blocks":[
557+
| {"Block ID":"rdd_0_0",
558+
| "Status":{
559+
| "Storage Level":{
560+
| "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false,
561+
| "Replication":2
562+
| },
563+
| "Memory Size":0,"Tachyon Size":0,"Disk Size":0
564+
| }
565+
| }
566+
| ]
567+
| }
568+
|}
569+
""".stripMargin
544570

545571
private val jobStartJsonString =
546572
"""

0 commit comments

Comments
 (0)