From 467f0bccb7d1940bed4f1b2e633c9374b0e654f2 Mon Sep 17 00:00:00 2001 From: sychen Date: Thu, 28 Jun 2018 15:34:38 +0800 Subject: [PATCH 1/5] MedianHeap is empty when speculation is enabled, causing the SparkContext to stop. --- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a18c66596852..40a757574808 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -772,6 +772,12 @@ private[spark] class TaskSetManager( private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = { partitionToIndex.get(partitionId).foreach { index => if (!successful(index)) { + if (speculationEnabled) { + taskAttempts(index).headOption.map { info => + info.markFinished(TaskState.FINISHED, clock.getTimeMillis()) + successfulTaskDurations.insert(info.duration) + } + } tasksSuccessful += 1 successful(index) = true if (tasksSuccessful == numTasks) { From 55ddbeb26085c9d8cd9c1768479d9b9acdacda2b Mon Sep 17 00:00:00 2001 From: sychen Date: Thu, 28 Jun 2018 19:15:12 +0800 Subject: [PATCH 2/5] Add a unit test:[SPARK-24677] MedianHeap should not be empty when speculation is enabled --- .../spark/scheduler/TaskSetManagerSuite.scala | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index ca6a7e5db3b1..cdb1401506bd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1365,6 +1365,55 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskOption4.get.addedJars === addedJarsMidTaskSet) } + test("[SPARK-24677] MedianHeap should not be empty when speculation is enabled") { + val conf = new SparkConf().set("spark.speculation", "true") + sc = new SparkContext("local", "test", conf) + // Set the speculation multiplier to be 0 so speculative tasks are launched immediately + sc.conf.set("spark.speculation.multiplier", "0.0") + sc.conf.set("spark.speculation.quantile", "0.1") + sc.conf.set("spark.speculation", "true") + + sched = new FakeTaskScheduler(sc) + sched.initialize(new FakeSchedulerBackend()) + + val dagScheduler = new FakeDAGScheduler(sc, sched) + sched.setDAGScheduler(dagScheduler) + + val taskSet1 = FakeTask.createTaskSet(10) + val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet1.tasks.map { task => + task.metrics.internalAccums + } + + sched.submitTasks(taskSet1) + sched.resourceOffers( + (0 until 10).map { idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }) + + val taskSetManager1 = sched.taskSetManagerForAttempt(0, 0).get + + // fail fetch + taskSetManager1.handleFailedTask( + taskSetManager1.taskAttempts.head.head.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) + + assert(taskSetManager1.isZombie) + assert(taskSetManager1.runningTasks === 9) + + val taskSet2 = FakeTask.createTaskSet(10, stageAttemptId = 1) + sched.submitTasks(taskSet2) + sched.resourceOffers( + (11 until 20).map { idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }) + + // Complete the 2 tasks and leave 8 task in running + for (id <- Set(0, 1)) { + taskSetManager1.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) + assert(sched.endedTasks(id) === Success) + } + + val taskSetManager2 = sched.taskSetManagerForAttempt(0, 1).get + assert(!taskSetManager2.successfulTaskDurations.isEmpty()) + taskSetManager2.checkSpeculatableTasks(0) + } + private def createTaskResult( id: Int, accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = { From 3d6682f1c9c1a71af6e232b02c323541ad5cf3bd Mon Sep 17 00:00:00 2001 From: sychen Date: Tue, 10 Jul 2018 20:25:51 +0800 Subject: [PATCH 3/5] speculationEnabled && !isZombie --- .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 40a757574808..105587de0963 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -772,7 +772,7 @@ private[spark] class TaskSetManager( private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = { partitionToIndex.get(partitionId).foreach { index => if (!successful(index)) { - if (speculationEnabled) { + if (speculationEnabled && !isZombie) { taskAttempts(index).headOption.map { info => info.markFinished(TaskState.FINISHED, clock.getTimeMillis()) successfulTaskDurations.insert(info.duration) From d8fdceb1ac86746c9c99b2dde833a62007a30f60 Mon Sep 17 00:00:00 2001 From: sychen Date: Tue, 10 Jul 2018 20:34:20 +0800 Subject: [PATCH 4/5] change test name --- .../scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index cdb1401506bd..ae571e5a3583 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1365,7 +1365,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskOption4.get.addedJars === addedJarsMidTaskSet) } - test("[SPARK-24677] MedianHeap should not be empty when speculation is enabled") { + test("[SPARK-24677] Avoid NoSuchElementException from MedianHeap") { val conf = new SparkConf().set("spark.speculation", "true") sc = new SparkContext("local", "test", conf) // Set the speculation multiplier to be 0 so speculative tasks are launched immediately From 1c1df5c4e421eed08fe31426400da6148f9c1842 Mon Sep 17 00:00:00 2001 From: sychen Date: Tue, 10 Jul 2018 21:47:17 +0800 Subject: [PATCH 5/5] use actual time of successful task --- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 7 +++++-- .../org/apache/spark/scheduler/TaskSetManager.scala | 9 +++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 598b62f85a1f..56c0bf6c0935 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -697,9 +697,12 @@ private[spark] class TaskSchedulerImpl( * do not also submit those same tasks. That also means that a task completion from an earlier * attempt can lead to the entire stage getting marked as successful. */ - private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = { + private[scheduler] def markPartitionCompletedInAllTaskSets( + stageId: Int, + partitionId: Int, + taskInfo: TaskInfo) = { taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => - tsm.markPartitionCompleted(partitionId) + tsm.markPartitionCompleted(partitionId, taskInfo) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 105587de0963..6071605ad7f9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -758,7 +758,7 @@ private[spark] class TaskSetManager( } // There may be multiple tasksets for this stage -- we let all of them know that the partition // was completed. This may result in some of the tasksets getting completed. - sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId) + sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, info) // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not // "deserialize" the value when holding a lock to avoid blocking other threads. So we call @@ -769,14 +769,11 @@ private[spark] class TaskSetManager( maybeFinishTaskSet() } - private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = { + private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = { partitionToIndex.get(partitionId).foreach { index => if (!successful(index)) { if (speculationEnabled && !isZombie) { - taskAttempts(index).headOption.map { info => - info.markFinished(TaskState.FINISHED, clock.getTimeMillis()) - successfulTaskDurations.insert(info.duration) - } + successfulTaskDurations.insert(taskInfo.duration) } tasksSuccessful += 1 successful(index) = true