Skip to content

Commit 144426c

Browse files
cxzl25tgravescs
authored andcommitted
[SPARK-24677][CORE] Avoid NoSuchElementException from MedianHeap
## What changes were proposed in this pull request? When speculation is enabled, TaskSetManager#markPartitionCompleted should write successful task duration to MedianHeap, not just increase tasksSuccessful. Otherwise when TaskSetManager#checkSpeculatableTasks,tasksSuccessful non-zero, but MedianHeap is empty. Then throw an exception successfulTaskDurations.median java.util.NoSuchElementException: MedianHeap is empty. Finally led to stopping SparkContext. ## How was this patch tested? TaskSetManagerSuite.scala unit test:[SPARK-24677] MedianHeap should not be empty when speculation is enabled Author: sychen <[email protected]> Closes #21656 from cxzl25/fix_MedianHeap_empty. (cherry picked from commit c8bee93) Signed-off-by: Thomas Graves <[email protected]>
1 parent 17db572 commit 144426c

File tree

3 files changed

+59
-4
lines changed

3 files changed

+59
-4
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -701,9 +701,12 @@ private[spark] class TaskSchedulerImpl private[scheduler](
701701
* do not also submit those same tasks. That also means that a task completion from an earlier
702702
* attempt can lead to the entire stage getting marked as successful.
703703
*/
704-
private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = {
704+
private[scheduler] def markPartitionCompletedInAllTaskSets(
705+
stageId: Int,
706+
partitionId: Int,
707+
taskInfo: TaskInfo) = {
705708
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
706-
tsm.markPartitionCompleted(partitionId)
709+
tsm.markPartitionCompleted(partitionId, taskInfo)
707710
}
708711
}
709712

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,7 @@ private[spark] class TaskSetManager(
748748
}
749749
// There may be multiple tasksets for this stage -- we let all of them know that the partition
750750
// was completed. This may result in some of the tasksets getting completed.
751-
sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId)
751+
sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, info)
752752
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
753753
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
754754
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
@@ -759,9 +759,12 @@ private[spark] class TaskSetManager(
759759
maybeFinishTaskSet()
760760
}
761761

762-
private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
762+
private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = {
763763
partitionToIndex.get(partitionId).foreach { index =>
764764
if (!successful(index)) {
765+
if (speculationEnabled && !isZombie) {
766+
successfulTaskDurations.insert(taskInfo.duration)
767+
}
765768
tasksSuccessful += 1
766769
successful(index) = true
767770
if (tasksSuccessful == numTasks) {

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,6 +1214,55 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
12141214
assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
12151215
}
12161216

1217+
test("[SPARK-24677] Avoid NoSuchElementException from MedianHeap") {
1218+
val conf = new SparkConf().set("spark.speculation", "true")
1219+
sc = new SparkContext("local", "test", conf)
1220+
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
1221+
sc.conf.set("spark.speculation.multiplier", "0.0")
1222+
sc.conf.set("spark.speculation.quantile", "0.1")
1223+
sc.conf.set("spark.speculation", "true")
1224+
1225+
sched = new FakeTaskScheduler(sc)
1226+
sched.initialize(new FakeSchedulerBackend())
1227+
1228+
val dagScheduler = new FakeDAGScheduler(sc, sched)
1229+
sched.setDAGScheduler(dagScheduler)
1230+
1231+
val taskSet1 = FakeTask.createTaskSet(10)
1232+
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet1.tasks.map { task =>
1233+
task.metrics.internalAccums
1234+
}
1235+
1236+
sched.submitTasks(taskSet1)
1237+
sched.resourceOffers(
1238+
(0 until 10).map { idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) })
1239+
1240+
val taskSetManager1 = sched.taskSetManagerForAttempt(0, 0).get
1241+
1242+
// fail fetch
1243+
taskSetManager1.handleFailedTask(
1244+
taskSetManager1.taskAttempts.head.head.taskId, TaskState.FAILED,
1245+
FetchFailed(null, 0, 0, 0, "fetch failed"))
1246+
1247+
assert(taskSetManager1.isZombie)
1248+
assert(taskSetManager1.runningTasks === 9)
1249+
1250+
val taskSet2 = FakeTask.createTaskSet(10, stageAttemptId = 1)
1251+
sched.submitTasks(taskSet2)
1252+
sched.resourceOffers(
1253+
(11 until 20).map { idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) })
1254+
1255+
// Complete the 2 tasks and leave 8 task in running
1256+
for (id <- Set(0, 1)) {
1257+
taskSetManager1.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
1258+
assert(sched.endedTasks(id) === Success)
1259+
}
1260+
1261+
val taskSetManager2 = sched.taskSetManagerForAttempt(0, 1).get
1262+
assert(!taskSetManager2.successfulTaskDurations.isEmpty())
1263+
taskSetManager2.checkSpeculatableTasks(0)
1264+
}
1265+
12171266
private def createTaskResult(
12181267
id: Int,
12191268
accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {

0 commit comments

Comments
 (0)