From 424a3c8e341747bbe1107866b92f23af4bd3efd2 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 1 Mar 2019 19:45:26 +0800 Subject: [PATCH 1/6] avoid more than one active task set managers for a stage --- .../spark/scheduler/TaskSchedulerImpl.scala | 20 ++++++++++++------- .../spark/scheduler/TaskSetManager.scala | 2 +- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 3f23bfe59523..442154fb2fc3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -212,14 +212,20 @@ private[spark] class TaskSchedulerImpl( val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) - stageTaskSets(taskSet.stageAttemptId) = manager - val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => - ts.taskSet != taskSet && !ts.isZombie - } - if (conflictingTaskSet) { - throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + - s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") + + // Mark all the existing TaskSetManagers of this stage as zombie, as we are adding a new one. + // This is necessary to handle a corner case. Let's say a stage has 10 partitions and has 2 + // TaskSetManagers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 + // and it completes. TSM2 finishes tasks for partition 1-19, and thinks he is still active + // because partition 10 is not completed yet. However, DAGScheduler gets task completion + // events for all the 10 partitions and thinks the stage is finished. If it's a shuffle stage + // and somehow it has missing map outputs, then DAGScheduler will resubmit it and create a + // TSM3 for it. As a stage can't have more than one active task set managers, we must mark + // TSM2 as zombie (it actually is). + stageTaskSets.foreach { case (_, ts) => + ts.isZombie = true } + stageTaskSets(taskSet.stageAttemptId) = manager schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 453939aaf190..e787e1fee10e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -123,7 +123,7 @@ private[spark] class TaskSetManager( // state until all tasks have finished running; we keep TaskSetManagers that are in the zombie // state in order to continue to track and account for the running tasks. // TODO: We should kill any running task attempts when the task set manager becomes a zombie. - private[scheduler] var isZombie = false + @transient private[scheduler] var isZombie = false // Whether the taskSet run tasks from a barrier stage. Spark must launch all the tasks at the // same time for a barrier stage. From 53c6ed8d25d94d917711772f747a40162f1615f0 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sat, 2 Mar 2019 10:33:44 +0800 Subject: [PATCH 2/6] fix a mistake --- .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index e787e1fee10e..59b931f33c90 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -123,7 +123,7 @@ private[spark] class TaskSetManager( // state until all tasks have finished running; we keep TaskSetManagers that are in the zombie // state in order to continue to track and account for the running tasks. // TODO: We should kill any running task attempts when the task set manager becomes a zombie. - @transient private[scheduler] var isZombie = false + @volatile private[scheduler] var isZombie = false // Whether the taskSet run tasks from a barrier stage. Spark must launch all the tasks at the // same time for a barrier stage. From f94809df47668600c2044bce3ecc2e1650b6e192 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 4 Mar 2019 11:57:36 +0800 Subject: [PATCH 3/6] fix tests --- .../scheduler/TaskSchedulerImplSuite.scala | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 016adb8b70e7..4f52a1961bd9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -201,30 +201,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // Even if one of the task sets has not-serializable tasks, the other task set should // still be processed without error taskScheduler.submitTasks(FakeTask.createTaskSet(1)) - taskScheduler.submitTasks(taskSet) taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten assert(taskDescriptions.map(_.executorId) === Seq("executor0")) } - test("refuse to schedule concurrent attempts for the same stage (SPARK-8103)") { - val taskScheduler = setupScheduler() - val attempt1 = FakeTask.createTaskSet(1, 0) - val attempt2 = FakeTask.createTaskSet(1, 1) - taskScheduler.submitTasks(attempt1) - intercept[IllegalStateException] { taskScheduler.submitTasks(attempt2) } - - // OK to submit multiple if previous attempts are all zombie - taskScheduler.taskSetManagerForAttempt(attempt1.stageId, attempt1.stageAttemptId) - .get.isZombie = true - taskScheduler.submitTasks(attempt2) - val attempt3 = FakeTask.createTaskSet(1, 2) - intercept[IllegalStateException] { taskScheduler.submitTasks(attempt3) } - taskScheduler.taskSetManagerForAttempt(attempt2.stageId, attempt2.stageAttemptId) - .get.isZombie = true - taskScheduler.submitTasks(attempt3) - assert(!failedTaskSet) - } - test("don't schedule more tasks after a taskset is zombie") { val taskScheduler = setupScheduler() From 0ca733daadf859736ec3f4a06765c0327786d923 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 5 Mar 2019 11:56:06 +0800 Subject: [PATCH 4/6] add test --- .../scheduler/TaskSchedulerImplSuite.scala | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 4f52a1961bd9..43b3a42564d4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -201,10 +201,41 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // Even if one of the task sets has not-serializable tasks, the other task set should // still be processed without error taskScheduler.submitTasks(FakeTask.createTaskSet(1)) + val taskSet2 = new TaskSet( + Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 1, 0, 0, null) + taskScheduler.submitTasks(taskSet2) taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten assert(taskDescriptions.map(_.executorId) === Seq("executor0")) } + test("concurrent attempts for the same stage only have one active taskset") { + val taskScheduler = setupScheduler() + def isTasksetZombie(taskset: TaskSet): Boolean = { + taskScheduler.taskSetManagerForAttempt(taskset.stageId, taskset.stageAttemptId).get.isZombie + } + + val attempt1 = FakeTask.createTaskSet(1, 0) + taskScheduler.submitTasks(attempt1) + // The first submitted taskset is active + assert(!isTasksetZombie(attempt1)) + + val attempt2 = FakeTask.createTaskSet(1, 1) + taskScheduler.submitTasks(attempt2) + // The first submitted taskset is zombie now + assert(isTasksetZombie(attempt1)) + // The newly submitted taskset is active + assert(!isTasksetZombie(attempt2)) + + val attempt3 = FakeTask.createTaskSet(1, 2) + taskScheduler.submitTasks(attempt3) + // The first submitted taskset remains zombie + assert(isTasksetZombie(attempt1)) + // The second submitted taskset is zombie now + assert(isTasksetZombie(attempt2)) + // The newly submitted taskset is active + assert(!isTasksetZombie(attempt3)) + } + test("don't schedule more tasks after a taskset is zombie") { val taskScheduler = setupScheduler() From 07d7de9bedbcbd4c5241fe96bd2b386e71b05e8f Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 5 Mar 2019 19:18:28 +0800 Subject: [PATCH 5/6] fix typo --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 442154fb2fc3..76c509e5e9e0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -216,7 +216,7 @@ private[spark] class TaskSchedulerImpl( // Mark all the existing TaskSetManagers of this stage as zombie, as we are adding a new one. // This is necessary to handle a corner case. Let's say a stage has 10 partitions and has 2 // TaskSetManagers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 - // and it completes. TSM2 finishes tasks for partition 1-19, and thinks he is still active + // and it completes. TSM2 finishes tasks for partition 1-9, and thinks he is still active // because partition 10 is not completed yet. However, DAGScheduler gets task completion // events for all the 10 partitions and thinks the stage is finished. If it's a shuffle stage // and somehow it has missing map outputs, then DAGScheduler will resubmit it and create a From 58f646eee88bde40b0f4e6440c2af61a2b90364b Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 6 Mar 2019 00:59:17 +0800 Subject: [PATCH 6/6] revert adding volatile --- .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 59b931f33c90..453939aaf190 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -123,7 +123,7 @@ private[spark] class TaskSetManager( // state until all tasks have finished running; we keep TaskSetManagers that are in the zombie // state in order to continue to track and account for the running tasks. // TODO: We should kill any running task attempts when the task set manager becomes a zombie. - @volatile private[scheduler] var isZombie = false + private[scheduler] var isZombie = false // Whether the taskSet run tasks from a barrier stage. Spark must launch all the tasks at the // same time for a barrier stage.