Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1144,13 +1144,13 @@ class DAGScheduler(
null
}

// The success case is dealt with separately below.
// TODO: Why post it only for failed tasks in cancelled stages? Clarify semantics here.
if (event.reason != Success) {
val attemptId = task.stageAttemptId
listenerBus.post(SparkListenerTaskEnd(
stageId, attemptId, taskType, event.reason, event.taskInfo, taskMetrics))
}
// The stage may have already finished when we get this event -- eg. maybe it was a
// speculative task. It is important that we send the TaskEnd event in any case, so listeners
// are properly notified and can chose to handle it. For instance, some listeners are
// doing their own accounting and if they don't get the task end event they think
// tasks are still running when they really aren't.
listenerBus.post(SparkListenerTaskEnd(
stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))

if (!stageIdToStage.contains(task.stageId)) {
// Skip all the actions if the stage has been cancelled.
Expand All @@ -1160,8 +1160,6 @@ class DAGScheduler(
val stage = stageIdToStage(task.stageId)
event.reason match {
case Success =>
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
event.reason, event.taskInfo, taskMetrics))
stage.pendingPartitions -= task.partitionId
task match {
case rt: ResultTask[_, _] =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
val successfulStages = new HashSet[Int]
val failedStages = new ArrayBuffer[Int]
val stageByOrderOfExecution = new ArrayBuffer[Int]
val endedTasks = new HashSet[Long]

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
submittedStageInfos += stageSubmitted.stageInfo
Expand All @@ -148,6 +149,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
failedStages += stageInfo.stageId
}
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
endedTasks += taskEnd.taskInfo.taskId
}
}

var mapOutputTracker: MapOutputTrackerMaster = null
Expand Down Expand Up @@ -195,6 +200,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
sparkListener.submittedStageInfos.clear()
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
sparkListener.endedTasks.clear()
failure = null
sc.addSparkListener(sparkListener)
taskSets.clear()
Expand Down Expand Up @@ -982,6 +988,52 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(countSubmittedMapStageAttempts() === 2)
}

test("task events always posted in speculation / when stage is killed") {
val baseRdd = new MyRDD(sc, 4, Nil)
val finalRdd = new MyRDD(sc, 4, List(new OneToOneDependency(baseRdd)))
submit(finalRdd, Array(0, 1, 2, 3))

// complete two tasks
runEvent(makeCompletionEvent(
taskSets(0).tasks(0), Success, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(0)))
runEvent(makeCompletionEvent(
taskSets(0).tasks(1), Success, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(1)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
// verify stage exists
assert(scheduler.stageIdToStage.contains(0))
assert(sparkListener.endedTasks.size == 2)

// finish other 2 tasks
runEvent(makeCompletionEvent(
taskSets(0).tasks(2), Success, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(2)))
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), Success, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(3)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.endedTasks.size == 4)

// verify the stage is done
assert(!scheduler.stageIdToStage.contains(0))

// Stage should be complete. Finish one other Successful task to simulate what can happen
// with a speculative task and make sure the event is sent out
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), Success, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(5)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.endedTasks.size == 5)

// make sure non successful tasks also send out event
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), UnknownReason, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(6)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.endedTasks.size == 6)
}

test("ignore late map task completions") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
Expand Down Expand Up @@ -1947,6 +1999,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
info
}

private def createFakeTaskInfoWithId(taskId: Long): TaskInfo = {
val info = new TaskInfo(taskId, 0, 0, 0L, "", "", TaskLocality.ANY, false)
info.finishTime = 1 // to prevent spurious errors in JobProgressListener
info
}

private def makeCompletionEvent(
task: Task[_],
reason: TaskEndReason,
Expand Down