Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1157,9 +1157,9 @@ class DAGScheduler(
val stage = stageIdToStage(task.stageId)
event.reason match {
case Success =>
stage.pendingPartitions -= task.partitionId
task match {
case rt: ResultTask[_, _] =>
stage.pendingPartitions -= task.partitionId
// Cast to ResultStage here because it's part of the ResultTask
// TODO Refactor this out to a function that accepts a ResultStage
val resultStage = stage.asInstanceOf[ResultStage]
Expand Down Expand Up @@ -1200,6 +1200,7 @@ class DAGScheduler(
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
} else {
stage.pendingPartitions -= task.partitionId
shuffleStage.addOutputLoc(smt.partitionId, status)
}

Expand Down Expand Up @@ -1339,19 +1340,51 @@ class DAGScheduler(
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
blockManagerMaster.removeExecutor(execId)

val resubmitStages: HashSet[Int] = HashSet.empty
if (!env.blockManager.externalShuffleServiceEnabled || fetchFailed) {
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleToMapStage) {
stage.removeOutputsOnExecutor(execId)
mapOutputTracker.registerMapOutputs(
shuffleId,
stage.outputLocInMapOutputTrackerFormat(),
changeEpoch = true)
val locs = stage.outputLocInMapOutputTrackerFormat()
if (runningStages.contains(stage)) {
// Assumption: 1) not a FetchFailed ExecutorLost, 2) a running shuffleMapStage has
// multiple taskSets: 1 active, some Zombie, some removed as finished. Executor lost
// may lost the output only finish by the removedTasksets or zombieTasksets, So need
// to check if runningStage.pendingPartitions == Missing shuffleMapStage.outputLocs
// if is false, says lost locs in removedTaskSets or zombieTaskSets,
// So need mark active as zombie and resubmit that stage
if (!fetchFailed && stage.findMissingPartitions()
.exists(!stage.pendingPartitions.contains(_))) {
resubmitStages += stage.id
}
mapOutputTracker.incrementEpoch()
} else {
mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true)
}
}

if (shuffleToMapStage.isEmpty) {
mapOutputTracker.incrementEpoch()
}

clearCacheLocs()

if (!fetchFailed) {
// if FailedStages is not empty,
// it implies that had already scheduled a ResubmitFailedStages.
if (failedStages.isEmpty) {
messageScheduler.schedule(new Runnable {
override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
}
resubmitStages.foreach {
case stageId =>
val stage = stageIdToStage(stageId)
logWarning(s"Executor $execId cause $stageId partition lost, So resubmit")
markStageAsFinished(stage, Some(s"Executor $execId lost"))
failedStages += stage
}
}
}
} else {
logDebug("Additional executor lost message for " + execId +
Expand Down Expand Up @@ -1416,6 +1449,7 @@ class DAGScheduler(

outputCommitCoordinator.stageEnd(stage.id)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
taskScheduler.zombieTasks(stage.id)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once stage was finished, it should make previous taskset Zombie

runningStages -= stage
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ private[spark] trait TaskScheduler {
// Cancel a stage.
def cancelTasks(stageId: Int, interruptThread: Boolean): Unit

def zombieTasks(stageId: Int): Unit

// Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
def setDAGScheduler(dagScheduler: DAGScheduler): Unit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,17 @@ private[spark] class TaskSchedulerImpl(
}
}

override def zombieTasks(stageId: Int): Unit = synchronized {
taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =>
attempts.foreach { case (stageAttemptId, tsm) =>
if (!tsm.isZombie) {
logInfo(s"Mark stage($stageId) taskset ${tsm.taskSet.id} as Zombie")
tsm.isZombie = true
}
}
}
}

/**
* Called to indicate that all task attempts (including speculated tasks) associated with the
* given TaskSetManager have completed, so state associated with the TaskSetManager should be
Expand Down Expand Up @@ -353,12 +364,14 @@ private[spark] class TaskSchedulerImpl(
}
}
}
if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
if (taskSet.runningTasksSet.contains(tid)) {
if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
}
case None =>
logError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,8 +787,12 @@ private[spark] class TaskSetManager(
addPendingTask(index)
// Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
// stage finishes when a total of tasks.size tasks finish.
sched.dagScheduler.taskEnded(
tasks(index), Resubmitted, null, Seq.empty[AccumulableInfo], info)
// The reason for not resubmitting ZombieTasks is make DAGScheduler to
// know whether the lost partition can re-run on current activeTaskSet or not.
if (!isZombie) {
sched.dagScheduler.taskEnded(
tasks(index), Resubmitted, null, Seq.empty[AccumulableInfo], info)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
override def cancelTasks(stageId: Int, interruptThread: Boolean) {
cancelledStages += stageId
}
override def zombieTasks(stageId: Int): Unit = {}
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
override def defaultParallelism() = 2
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
Expand Down Expand Up @@ -480,6 +481,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
override def cancelTasks(stageId: Int, interruptThread: Boolean) {
throw new UnsupportedOperationException
}
override def zombieTasks(stageId: Int): Unit = {}
override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
override def defaultParallelism(): Int = 2
override def executorHeartbeatReceived(
Expand Down Expand Up @@ -1272,13 +1274,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
Success,
makeMapStatus("hostA", reduceRdd.partitions.length)))

// now that host goes down
runEvent(ExecutorLost("exec-hostA"))

// so we resubmit those tasks
// note these resubmit events arrived earlier than ExecutorLost
runEvent(makeCompletionEvent(taskSets(0).tasks(0), Resubmitted, null))
runEvent(makeCompletionEvent(taskSets(0).tasks(1), Resubmitted, null))

// now that host goes down
runEvent(ExecutorLost("exec-hostA"))

// now complete everything on a different host
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostB", reduceRdd.partitions.length)),
Expand All @@ -1304,6 +1307,72 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(stage1TaskSet.stageAttemptId == 0)
}

test("Resubmit stage while lost partition in ZombieTasksets or RemovedTaskSets") {
val firstRDD = new MyRDD(sc, 3, Nil)
val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(3))
val firstShuffleId = firstShuffleDep.shuffleId
val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3))
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
submit(reduceRdd, Array(0))

// things start out smoothly, stage 0 completes with no issues
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
(Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length))
))

// then start running stage 1
runEvent(makeCompletionEvent(
taskSets(1).tasks(0),
Success,
makeMapStatus("hostD", shuffleMapRdd.partitions.length)))

// simulate make stage 1 resubmit, notice for stage1.0
// partitionId=1 already finished in hostD, so if we resubmit stage1,
// stage 1.1 only resubmit tasks for partitionId = 0,2
runEvent(makeCompletionEvent(
taskSets(1).tasks(1),
FetchFailed(null, firstShuffleId, 2, 1, "Fetch failed"), null))
scheduler.resubmitFailedStages()

val stage1Resubmit1 = taskSets(2)
assert(stage1Resubmit1.stageId == 1)
assert(stage1Resubmit1.tasks.size == 2)

// now exec-hostD lost, so the output loc of stage1 partitionId=1 will lost.
// runEvent(makeCompletionEvent(taskSets(1).tasks(0), Resubmitted, null))
runEvent(ExecutorLost("exec-hostD"))
scheduler.resubmitFailedStages()
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)

assert(taskSets(3).tasks.size == 3) // both stage 1 partition 0/1/2

// let stage1Resubmit1 complete
complete(taskSets(2), Seq(
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length))
))

// and let we complete stage1Resubmit0's active running Tasks
runEvent(makeCompletionEvent(
taskSets(1).tasks(1),
Success,
makeMapStatus("hostC", shuffleMapRdd.partitions.length)))
runEvent(makeCompletionEvent(
taskSets(1).tasks(2),
Success,
makeMapStatus("hostC", shuffleMapRdd.partitions.length)))

runEvent(makeCompletionEvent(
taskSets(3).tasks(0),
Success,
makeMapStatus("hostC", shuffleMapRdd.partitions.length)))

assert(scheduler.runningStages.head.isInstanceOf[ResultStage])
}

/**
* Makes sure that failures of stage used by multiple jobs are correctly handled.
*
Expand Down Expand Up @@ -1467,16 +1536,20 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// blockManagerMaster.removeExecutor("exec-hostA")
// pretend we were told hostA went away
runEvent(ExecutorLost("exec-hostA"))

// DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks
// rather than marking it is as failed and waiting.
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))

// In previous due to pendingPartitions -= expiredTask.partitonID,
// so will cause Stage resubmit, now we ignored expiredTask partition.
// have hostC complete the resubmitted task
complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1))))
complete(taskSets(0), Seq((Success, makeMapStatus("hostC", 1))))
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
complete(taskSets(2), Seq((Success, 42)))
complete(taskSets(1), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty()
}
Expand Down Expand Up @@ -1927,8 +2000,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
runEvent(makeCompletionEvent(oldTaskSet.tasks(0), Success, makeMapStatus("hostA", 2)))
assert(results.size === 0) // Map stage job should not be complete yet


// Pretend host A was lost
val oldEpoch = mapOutputTracker.getEpoch
runEvent(makeCompletionEvent(taskSets(0).tasks(0), Resubmitted, null))
runEvent(ExecutorLost("exec-hostA"))
val newEpoch = mapOutputTracker.getEpoch
assert(newEpoch > oldEpoch)
Expand All @@ -1941,20 +2016,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
runEvent(makeCompletionEvent(oldTaskSet.tasks(2), Success, makeMapStatus("hostB", 2)))
assert(results.size === 0) // Map stage job should not be complete yet

// Now complete tasks in the second task set
val newTaskSet = taskSets(1)
assert(newTaskSet.tasks.size === 2) // Both tasks 0 and 1 were on on hostA
runEvent(makeCompletionEvent(newTaskSet.tasks(0), Success, makeMapStatus("hostB", 2)))
assert(scheduler.runningStages.head.pendingPartitions.size === 2) // Both tasks 0 and 1
runEvent(makeCompletionEvent(oldTaskSet.tasks(0), Success, makeMapStatus("hostB", 2)))
assert(results.size === 0) // Map stage job should not be complete yet
runEvent(makeCompletionEvent(newTaskSet.tasks(1), Success, makeMapStatus("hostB", 2)))
runEvent(makeCompletionEvent(oldTaskSet.tasks(1), Success, makeMapStatus("hostB", 2)))
assert(results.size === 1) // Map stage job should now finally be complete
assertDataStructuresEmpty()

// Also test that a reduce stage using this shuffled data can immediately run
val reduceRDD = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
results.clear()
submit(reduceRDD, Array(0, 1))
complete(taskSets(2), Seq((Success, 42), (Success, 43)))
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
assert(results === Map(0 -> 42, 1 -> 43))
results.clear()
assertDataStructuresEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private class DummyTaskScheduler extends TaskScheduler {
override def start(): Unit = {}
override def stop(): Unit = {}
override def submitTasks(taskSet: TaskSet): Unit = {}
override def zombieTasks(stage: Int): Unit = {}
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = {}
override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
override def defaultParallelism(): Int = 2
Expand Down