From 743a1e62b4a91bddeec2d3b3a48a3c0843c58d73 Mon Sep 17 00:00:00 2001 From: hushan Date: Tue, 10 May 2016 21:05:54 +0800 Subject: [PATCH 1/2] refine case --- .../apache/spark/scheduler/DAGScheduler.scala | 44 ++++++++- .../spark/scheduler/TaskScheduler.scala | 2 + .../spark/scheduler/TaskSchedulerImpl.scala | 11 +++ .../spark/scheduler/TaskSetManager.scala | 8 +- .../spark/scheduler/DAGSchedulerSuite.scala | 95 ++++++++++++++++--- .../ExternalClusterManagerSuite.scala | 1 + 6 files changed, 143 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b7fb608ea5064..51b6d680472ef 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1157,9 +1157,9 @@ class DAGScheduler( val stage = stageIdToStage(task.stageId) event.reason match { case Success => - stage.pendingPartitions -= task.partitionId task match { case rt: ResultTask[_, _] => + stage.pendingPartitions -= task.partitionId // Cast to ResultStage here because it's part of the ResultTask // TODO Refactor this out to a function that accepts a ResultStage val resultStage = stage.asInstanceOf[ResultStage] @@ -1200,6 +1200,7 @@ class DAGScheduler( if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { + stage.pendingPartitions -= task.partitionId shuffleStage.addOutputLoc(smt.partitionId, status) } @@ -1339,19 +1340,51 @@ class DAGScheduler( logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch)) blockManagerMaster.removeExecutor(execId) + val resubmitStages: HashSet[Int] = HashSet.empty if (!env.blockManager.externalShuffleServiceEnabled || fetchFailed) { // TODO: This will be really slow if we keep accumulating shuffle map stages for ((shuffleId, stage) <- shuffleToMapStage) { stage.removeOutputsOnExecutor(execId) - mapOutputTracker.registerMapOutputs( - shuffleId, - stage.outputLocInMapOutputTrackerFormat(), - changeEpoch = true) + val locs = stage.outputLocInMapOutputTrackerFormat() + if (runningStages.contains(stage)) { + // Assumption: 1) not a FetchFailed ExecutorLost, 2) a running shuffleMapStage has + // multiple taskSets: 1 active, some Zombie, some removed as finished. Executor lost + // may lost the output only finish by the removedTasksets or zombieTasksets, So need + // to check if runningStage.pendingPartitions == Missing shuffleMapStage.outputLocs + // if is false, says lost locs in removedTaskSets or zombieTaskSets, + // So need mark active as zombie and resubmit that stage + if (!fetchFailed && stage.findMissingPartitions() + .exists(!stage.pendingPartitions.contains(_))) { + resubmitStages += stage.id + } + mapOutputTracker.incrementEpoch() + } else { + mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true) + } } + if (shuffleToMapStage.isEmpty) { mapOutputTracker.incrementEpoch() } + clearCacheLocs() + + if (!fetchFailed) { + // if FailedStages is not empty, + // it implies that had already scheduled a ResubmitFailedStages. + if (failedStages.isEmpty) { + messageScheduler.schedule(new Runnable { + override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) + }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + } + resubmitStages.foreach { + case stageId => + val stage = stageIdToStage(stageId) + logWarning(s"Executor $execId cause $stageId partition lost, So resubmit") + markStageAsFinished(stage, Some(s"Executor $execId lost")) + failedStages += stage + } + } } } else { logDebug("Additional executor lost message for " + execId + @@ -1416,6 +1449,7 @@ class DAGScheduler( outputCommitCoordinator.stageEnd(stage.id) listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) + taskScheduler.zombieTasks(stage.id) runningStages -= stage } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 647d44a0f0680..f8ee0e464d066 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -53,6 +53,8 @@ private[spark] trait TaskScheduler { // Cancel a stage. def cancelTasks(stageId: Int, interruptThread: Boolean): Unit + def zombieTasks(stageId: Int): Unit + // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called. def setDAGScheduler(dagScheduler: DAGScheduler): Unit diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index f31ec2af4ebd6..5c4228cb76f72 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -224,6 +224,17 @@ private[spark] class TaskSchedulerImpl( } } + override def zombieTasks(stageId: Int): Unit = synchronized { + taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts => + attempts.foreach { case (stageAttemptId, tsm) => + if (!tsm.isZombie) { + logInfo(s"Mark stage($stageId) taskset ${tsm.taskSet.id} as Zombie") + tsm.isZombie = true + } + } + } + } + /** * Called to indicate that all task attempts (including speculated tasks) associated with the * given TaskSetManager have completed, so state associated with the TaskSetManager should be diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 6e08cdd87a8d1..62fa5ef895ba3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -787,8 +787,12 @@ private[spark] class TaskSetManager( addPendingTask(index) // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our // stage finishes when a total of tasks.size tasks finish. - sched.dagScheduler.taskEnded( - tasks(index), Resubmitted, null, Seq.empty[AccumulableInfo], info) + // The reason for not resubmitting ZombieTasks is make DAGScheduler to + // know whether the lost partition can re-run on current activeTaskSet or not. + if (!isZombie) { + sched.dagScheduler.taskEnded( + tasks(index), Resubmitted, null, Seq.empty[AccumulableInfo], info) + } } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index b76c0a4bd1dde..b8c2ee410759c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -122,6 +122,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def cancelTasks(stageId: Int, interruptThread: Boolean) { cancelledStages += stageId } + override def zombieTasks(stageId: Int): Unit = {} override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} @@ -480,6 +481,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def cancelTasks(stageId: Int, interruptThread: Boolean) { throw new UnsupportedOperationException } + override def zombieTasks(stageId: Int): Unit = {} override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} override def defaultParallelism(): Int = 2 override def executorHeartbeatReceived( @@ -1272,13 +1274,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou Success, makeMapStatus("hostA", reduceRdd.partitions.length))) - // now that host goes down - runEvent(ExecutorLost("exec-hostA")) - // so we resubmit those tasks + // note these resubmit events arrived earlier than ExecutorLost runEvent(makeCompletionEvent(taskSets(0).tasks(0), Resubmitted, null)) runEvent(makeCompletionEvent(taskSets(0).tasks(1), Resubmitted, null)) + // now that host goes down + runEvent(ExecutorLost("exec-hostA")) + // now complete everything on a different host complete(taskSets(0), Seq( (Success, makeMapStatus("hostB", reduceRdd.partitions.length)), @@ -1304,6 +1307,72 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assert(stage1TaskSet.stageAttemptId == 0) } + test("Resubmit stage while lost partition in ZombieTasksets or RemovedTaskSets") { + val firstRDD = new MyRDD(sc, 3, Nil) + val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(3)) + val firstShuffleId = firstShuffleDep.shuffleId + val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep)) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3)) + val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) + submit(reduceRdd, Array(0)) + + // things start out smoothly, stage 0 completes with no issues + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)), + (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)), + (Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length)) + )) + + // then start running stage 1 + runEvent(makeCompletionEvent( + taskSets(1).tasks(0), + Success, + makeMapStatus("hostD", shuffleMapRdd.partitions.length))) + + // simulate make stage 1 resubmit, notice for stage1.0 + // partitionId=1 already finished in hostD, so if we resubmit stage1, + // stage 1.1 only resubmit tasks for partitionId = 0,2 + runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(null, firstShuffleId, 2, 1, "Fetch failed"), null)) + scheduler.resubmitFailedStages() + + val stage1Resubmit1 = taskSets(2) + assert(stage1Resubmit1.stageId == 1) + assert(stage1Resubmit1.tasks.size == 2) + + // now exec-hostD lost, so the output loc of stage1 partitionId=1 will lost. + // runEvent(makeCompletionEvent(taskSets(1).tasks(0), Resubmitted, null)) + runEvent(ExecutorLost("exec-hostD")) + scheduler.resubmitFailedStages() + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + + assert(taskSets(3).tasks.size == 3) // both stage 1 partition 0/1/2 + + // let stage1Resubmit1 complete + complete(taskSets(2), Seq( + (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)), + (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)) + )) + + // and let we complete stage1Resubmit0's active running Tasks + runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + Success, + makeMapStatus("hostC", shuffleMapRdd.partitions.length))) + runEvent(makeCompletionEvent( + taskSets(1).tasks(2), + Success, + makeMapStatus("hostC", shuffleMapRdd.partitions.length))) + + runEvent(makeCompletionEvent( + taskSets(3).tasks(0), + Success, + makeMapStatus("hostC", shuffleMapRdd.partitions.length))) + + assert(scheduler.runningStages.head.isInstanceOf[ResultStage]) + } + /** * Makes sure that failures of stage used by multiple jobs are correctly handled. * @@ -1467,16 +1536,20 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // blockManagerMaster.removeExecutor("exec-hostA") // pretend we were told hostA went away runEvent(ExecutorLost("exec-hostA")) + // DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks // rather than marking it is as failed and waiting. complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)))) + + // In previous due to pendingPartitions -= expiredTask.partitonID, + // so will cause Stage resubmit, now we ignored expiredTask partition. // have hostC complete the resubmitted task - complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1)))) + complete(taskSets(0), Seq((Success, makeMapStatus("hostC", 1)))) assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) - complete(taskSets(2), Seq((Success, 42))) + complete(taskSets(1), Seq((Success, 42))) assert(results === Map(0 -> 42)) assertDataStructuresEmpty() } @@ -1927,8 +2000,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou runEvent(makeCompletionEvent(oldTaskSet.tasks(0), Success, makeMapStatus("hostA", 2))) assert(results.size === 0) // Map stage job should not be complete yet + // Pretend host A was lost val oldEpoch = mapOutputTracker.getEpoch + runEvent(makeCompletionEvent(taskSets(0).tasks(0), Resubmitted, null)) runEvent(ExecutorLost("exec-hostA")) val newEpoch = mapOutputTracker.getEpoch assert(newEpoch > oldEpoch) @@ -1941,12 +2016,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou runEvent(makeCompletionEvent(oldTaskSet.tasks(2), Success, makeMapStatus("hostB", 2))) assert(results.size === 0) // Map stage job should not be complete yet - // Now complete tasks in the second task set - val newTaskSet = taskSets(1) - assert(newTaskSet.tasks.size === 2) // Both tasks 0 and 1 were on on hostA - runEvent(makeCompletionEvent(newTaskSet.tasks(0), Success, makeMapStatus("hostB", 2))) + assert(scheduler.runningStages.head.pendingPartitions.size === 2) // Both tasks 0 and 1 + runEvent(makeCompletionEvent(oldTaskSet.tasks(0), Success, makeMapStatus("hostB", 2))) assert(results.size === 0) // Map stage job should not be complete yet - runEvent(makeCompletionEvent(newTaskSet.tasks(1), Success, makeMapStatus("hostB", 2))) + runEvent(makeCompletionEvent(oldTaskSet.tasks(1), Success, makeMapStatus("hostB", 2))) assert(results.size === 1) // Map stage job should now finally be complete assertDataStructuresEmpty() @@ -1954,7 +2027,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou val reduceRDD = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) results.clear() submit(reduceRDD, Array(0, 1)) - complete(taskSets(2), Seq((Success, 42), (Success, 43))) + complete(taskSets(1), Seq((Success, 42), (Success, 43))) assert(results === Map(0 -> 42, 1 -> 43)) results.clear() assertDataStructuresEmpty() diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 9971d48a52ce7..f3878b5023721 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -61,6 +61,7 @@ private class DummyTaskScheduler extends TaskScheduler { override def start(): Unit = {} override def stop(): Unit = {} override def submitTasks(taskSet: TaskSet): Unit = {} + override def zombieTasks(stage: Int): Unit = {} override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = {} override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} override def defaultParallelism(): Int = 2 From 3bf1eaaddc3661241b7558abd4f74cc3173aba34 Mon Sep 17 00:00:00 2001 From: hushan Date: Tue, 10 May 2016 21:12:14 +0800 Subject: [PATCH 2/2] add another related fix patch --- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 5c4228cb76f72..6aa9c22e0dbd2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -364,12 +364,14 @@ private[spark] class TaskSchedulerImpl( } } } - if (state == TaskState.FINISHED) { - taskSet.removeRunningTask(tid) - taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) - } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { - taskSet.removeRunningTask(tid) - taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) + if (taskSet.runningTasksSet.contains(tid)) { + if (state == TaskState.FINISHED) { + taskSet.removeRunningTask(tid) + taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) + } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { + taskSet.removeRunningTask(tid) + taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) + } } case None => logError(