Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
@DeveloperApi
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String)
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent

/** An event used in the listener to shutdown the listener daemon thread. */
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ private[spark] class Stage(
id
}

def attemptId: Int = nextAttemptId

val name = callSite.short
val details = callSite.long

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import org.apache.spark.annotation.DeveloperApi
class TaskInfo(
val taskId: Long,
val index: Int,
val attempt: Int,
val launchTime: Long,
val executorId: String,
val host: String,
val taskLocality: TaskLocality.TaskLocality) {
val taskLocality: TaskLocality.TaskLocality,
val speculative: Boolean) {

/**
* The time when the task started remotely getting the result. Will not be set if the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,17 +335,19 @@ private[spark] class TaskSetManager(
/**
* Dequeue a pending task for a given node and return its index and locality level.
* Only search for tasks matching the given locality constraint.
*
* @return An option containing (task index within the task set, locality, is speculative?)
*/
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value)] =
: Option[(Int, TaskLocality.Value, Boolean)] =
{
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL))
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}

if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL))
return Some((index, TaskLocality.NODE_LOCAL, false))
}
}

Expand All @@ -354,23 +356,25 @@ private[spark] class TaskSetManager(
rack <- sched.getRackForHost(host)
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
} {
return Some((index, TaskLocality.RACK_LOCAL))
return Some((index, TaskLocality.RACK_LOCAL, false))
}
}

// Look for no-pref tasks after rack-local tasks since they can run anywhere.
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL))
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}

if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
for (index <- findTaskFromList(execId, allPendingTasks)) {
return Some((index, TaskLocality.ANY))
return Some((index, TaskLocality.ANY, false))
}
}

// Finally, if all else has failed, find a speculative task
findSpeculativeTask(execId, host, locality)
findSpeculativeTask(execId, host, locality).map { case (taskIndex, allowedLocality) =>
(taskIndex, allowedLocality, true)
}
}

/**
Expand All @@ -391,7 +395,7 @@ private[spark] class TaskSetManager(
}

findTask(execId, host, allowedLocality) match {
case Some((index, taskLocality)) => {
case Some((index, taskLocality, speculative)) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you change this to isSpeculative?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there were lots of debates about this so i'm going to keep it for now :)

// Found a task; do some bookkeeping and return a task description
val task = tasks(index)
val taskId = sched.newTaskId()
Expand All @@ -400,7 +404,9 @@ private[spark] class TaskSetManager(
taskSet.id, index, taskId, execId, host, taskLocality))
// Do various bookkeeping
copiesRunning(index) += 1
val info = new TaskInfo(taskId, index, curTime, execId, host, taskLocality)
val attemptNum = taskAttempts(index).size
val info = new TaskInfo(
taskId, index, attemptNum + 1, curTime, execId, host, taskLocality, speculative)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
</div>
// scalastyle:on
val taskHeaders: Seq[String] =
Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++
Seq("Duration", "GC Time", "Result Ser Time") ++
Seq(
"Index", "ID", "Attempt", "Status", "Locality Level", "Executor",
"Launch Time", "Duration", "GC Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
{if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++
Expand Down Expand Up @@ -248,6 +249,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
<tr>
<td>{info.index}</td>
<td>{info.taskId}</td>
<td sorttable_customkey={info.attempt.toString}>{
if (info.speculative) s"${info.attempt} (speculative)" else info.attempt.toString
}</td>
<td>{info.status}</td>
<td>{info.taskLocality}</td>
<td>{info.host}</td>
Expand All @@ -258,9 +262,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
<td sorttable_customkey={gcTime.toString}>
{if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
</td>
<!--
TODO: Add this back after we add support to hide certain columns.
<td sorttable_customkey={serializationTime.toString}>
{if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""}
</td>
-->
{if (shuffleRead) {
<td sorttable_customkey={shuffleReadSortable}>
{shuffleReadReadable}
Expand Down
9 changes: 8 additions & 1 deletion core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import org.apache.spark.storage._
import org.apache.spark._

private[spark] object JsonProtocol {
// TODO: Remove this file and put JSON serialization into each individual class.

private implicit val format = DefaultFormats

/** ------------------------------------------------- *
Expand Down Expand Up @@ -194,10 +196,12 @@ private[spark] object JsonProtocol {
def taskInfoToJson(taskInfo: TaskInfo): JValue = {
("Task ID" -> taskInfo.taskId) ~
("Index" -> taskInfo.index) ~
("Attempt" -> taskInfo.attempt) ~
("Launch Time" -> taskInfo.launchTime) ~
("Executor ID" -> taskInfo.executorId) ~
("Host" -> taskInfo.host) ~
("Locality" -> taskInfo.taskLocality.toString) ~
("Speculative" -> taskInfo.speculative) ~
("Getting Result Time" -> taskInfo.gettingResultTime) ~
("Finish Time" -> taskInfo.finishTime) ~
("Failed" -> taskInfo.failed) ~
Expand Down Expand Up @@ -487,16 +491,19 @@ private[spark] object JsonProtocol {
def taskInfoFromJson(json: JValue): TaskInfo = {
val taskId = (json \ "Task ID").extract[Long]
val index = (json \ "Index").extract[Int]
val attempt = (json \ "Attempt").extractOpt[Int].getOrElse(1)
val launchTime = (json \ "Launch Time").extract[Long]
val executorId = (json \ "Executor ID").extract[String]
val host = (json \ "Host").extract[String]
val taskLocality = TaskLocality.withName((json \ "Locality").extract[String])
val speculative = (json \ "Speculative").extractOpt[Boolean].getOrElse(false)
val gettingResultTime = (json \ "Getting Result Time").extract[Long]
val finishTime = (json \ "Finish Time").extract[Long]
val failed = (json \ "Failed").extract[Boolean]
val serializedSize = (json \ "Serialized Size").extract[Int]

val taskInfo = new TaskInfo(taskId, index, launchTime, executorId, host, taskLocality)
val taskInfo =
new TaskInfo(taskId, index, attempt, launchTime, executorId, host, taskLocality, speculative)
taskInfo.gettingResultTime = gettingResultTime
taskInfo.finishTime = finishTime
taskInfo.failed = failed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
// finish this task, should get updated shuffleRead
shuffleReadMetrics.remoteBytesRead = 1000
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
var taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
var task = new ShuffleMapTask(0, null, null, 0, null)
val taskType = Utils.getFormattedClassName(task)
Expand All @@ -75,7 +75,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
.shuffleRead == 1000)

// finish a task with unknown executor-id, nothing should happen
taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL)
taskInfo =
new TaskInfo(1234L, 0, 1, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL, true)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
Expand All @@ -84,7 +85,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
taskInfo = new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
Expand All @@ -94,7 +95,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
taskInfo = new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL)
taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
Expand All @@ -106,7 +107,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val conf = new SparkConf()
val listener = new JobProgressListener(conf)
val metrics = new TaskMetrics()
val taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
val taskInfo = new TaskInfo(1234L, 0, 3, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
val task = new ShuffleMapTask(0, null, null, 0, null)
val taskType = Utils.getFormattedClassName(task)
Expand Down
88 changes: 57 additions & 31 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ class JsonProtocolSuite extends FunSuite {
val stageSubmitted =
SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties)
val stageCompleted = SparkListenerStageCompleted(makeStageInfo(101, 201, 301, 401L, 501L))
val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 444L))
val taskGettingResult = SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 3000L))
val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 1, 444L, false))
val taskGettingResult =
SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true))
val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
makeTaskInfo(123L, 234, 345L), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800))
makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800))
val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties)
val jobEnd = SparkListenerJobEnd(20, JobSucceeded)
val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
Expand Down Expand Up @@ -73,7 +74,7 @@ class JsonProtocolSuite extends FunSuite {
test("Dependent Classes") {
testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
testTaskInfo(makeTaskInfo(999L, 888, 777L))
testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8))
testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000))

Expand Down Expand Up @@ -269,10 +270,12 @@ class JsonProtocolSuite extends FunSuite {
private def assertEquals(info1: TaskInfo, info2: TaskInfo) {
assert(info1.taskId === info2.taskId)
assert(info1.index === info2.index)
assert(info1.attempt === info2.attempt)
assert(info1.launchTime === info2.launchTime)
assert(info1.executorId === info2.executorId)
assert(info1.host === info2.host)
assert(info1.taskLocality === info2.taskLocality)
assert(info1.speculative === info2.speculative)
assert(info1.gettingResultTime === info2.gettingResultTime)
assert(info1.finishTime === info2.finishTime)
assert(info1.failed === info2.failed)
Expand Down Expand Up @@ -453,8 +456,8 @@ class JsonProtocolSuite extends FunSuite {
new StageInfo(a, "greetings", b, rddInfos, "details")
}

private def makeTaskInfo(a: Long, b: Int, c: Long) = {
new TaskInfo(a, b, c, "executor", "your kind sir", TaskLocality.NODE_LOCAL)
private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: Boolean) = {
new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative)
}

private def makeTaskMetrics(a: Long, b: Long, c: Long, d: Long, e: Int, f: Int) = {
Expand Down Expand Up @@ -510,37 +513,60 @@ class JsonProtocolSuite extends FunSuite {

private val taskStartJsonString =
"""
{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222,
"Index":333,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir",
"Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed":false,
"Serialized Size":0}}
"""
|{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222,
|"Index":333,"Attempt":1,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir",
|"Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,
|"Failed":false,"Serialized Size":0}}
""".stripMargin

private val taskGettingResultJsonString =
"""
{"Event":"SparkListenerTaskGettingResult","Task Info":{"Task ID":1000,"Index":
2000,"Launch Time":3000,"Executor ID":"executor","Host":"your kind sir",
"Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed":false,
"Serialized Size":0}}
"""
|{"Event":"SparkListenerTaskGettingResult","Task Info":
| {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor ID":"executor",
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0,
| "Finish Time":0,"Failed":false,"Serialized Size":0
| }
|}
""".stripMargin

private val taskEndJsonString =
"""
{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask",
"Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":123,"Index":
234,"Launch Time":345,"Executor ID":"executor","Host":"your kind sir",
"Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed":
false,"Serialized Size":0},"Task Metrics":{"Host Name":"localhost",
"Executor Deserialize Time":300,"Executor Run Time":400,"Result Size":500,
"JVM GC Time":600,"Result Serialization Time":700,"Memory Bytes Spilled":
800,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Shuffle Finish Time":
900,"Total Blocks Fetched":1500,"Remote Blocks Fetched":800,"Local Blocks Fetched":
700,"Fetch Wait Time":900,"Remote Bytes Read":1000},"Shuffle Write Metrics":
{"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},"Updated Blocks":
[{"Block ID":"rdd_0_0","Status":{"Storage Level":{"Use Disk":true,"Use Memory":true,
"Use Tachyon":false,"Deserialized":false,"Replication":2},"Memory Size":0,"Tachyon Size":0,
"Disk Size":0}}]}}
"""
|{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask",
|"Task End Reason":{"Reason":"Success"},
|"Task Info":{
| "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
| "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0
|},
|"Task Metrics":{
| "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
| "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700,
| "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
| "Shuffle Read Metrics":{
| "Shuffle Finish Time":900,
| "Total Blocks Fetched":1500,
| "Remote Blocks Fetched":800,
| "Local Blocks Fetched":700,
| "Fetch Wait Time":900,
| "Remote Bytes Read":1000
| },
| "Shuffle Write Metrics":{
| "Shuffle Bytes Written":1200,
| "Shuffle Write Time":1500},
| "Updated Blocks":[
| {"Block ID":"rdd_0_0",
| "Status":{
| "Storage Level":{
| "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false,
| "Replication":2
| },
| "Memory Size":0,"Tachyon Size":0,"Disk Size":0
| }
| }
| ]
| }
|}
""".stripMargin

private val jobStartJsonString =
"""
Expand Down