Skip to content

Commit 92e385e

Browse files
liujianhuikayousterhout
authored andcommitted
[SPARK-19868] conflict TasksetManager lead to spark stopped
## What changes were proposed in this pull request? We must set the taskset to zombie before the DAGScheduler handles the taskEnded event. It's possible the taskEnded event will cause the DAGScheduler to launch a new stage attempt (this happens when map output data was lost), and if this happens before the taskSet has been set to zombie, it will appear that we have conflicting task sets. Author: liujianhui <liujianhui@didichuxing> Closes #17208 from liujianhuiouc/spark-19868.
1 parent d4fac41 commit 92e385e

File tree

2 files changed

+34
-8
lines changed

2 files changed

+34
-8
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -713,13 +713,7 @@ private[spark] class TaskSetManager(
713713
successfulTaskDurations.insert(info.duration)
714714
}
715715
removeRunningTask(tid)
716-
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
717-
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
718-
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
719-
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
720-
// Note: "result.value()" only deserializes the value when it's called at the first time, so
721-
// here "result.value()" just returns the value and won't block other threads.
722-
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
716+
723717
// Kill any other attempts for the same task (since those are unnecessary now that one
724718
// attempt completed successfully).
725719
for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
@@ -746,6 +740,13 @@ private[spark] class TaskSetManager(
746740
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
747741
" because task " + index + " has already completed successfully")
748742
}
743+
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
744+
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
745+
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
746+
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
747+
// Note: "result.value()" only deserializes the value when it's called at the first time, so
748+
// here "result.value()" just returns the value and won't block other threads.
749+
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
749750
maybeFinishTaskSet()
750751
}
751752

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ import java.util.{Properties, Random}
2222
import scala.collection.mutable
2323
import scala.collection.mutable.ArrayBuffer
2424

25-
import org.mockito.Matchers.{anyInt, anyString}
25+
import org.mockito.Matchers.{any, anyInt, anyString}
2626
import org.mockito.Mockito.{mock, never, spy, verify, when}
27+
import org.mockito.invocation.InvocationOnMock
28+
import org.mockito.stubbing.Answer
2729

2830
import org.apache.spark._
2931
import org.apache.spark.internal.config
@@ -1056,6 +1058,29 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
10561058
assert(manager.isZombie)
10571059
}
10581060

1061+
1062+
test("SPARK-19868: DagScheduler only notified of taskEnd when state is ready") {
1063+
// dagScheduler.taskEnded() is async, so it may *seem* ok to call it before we've set all
1064+
// appropriate state, eg. isZombie. However, this sets up a race that could go the wrong way.
1065+
// This is a super-focused regression test which checks the zombie state as soon as
1066+
// dagScheduler.taskEnded() is called, to ensure we haven't introduced a race.
1067+
sc = new SparkContext("local", "test")
1068+
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
1069+
val mockDAGScheduler = mock(classOf[DAGScheduler])
1070+
sched.dagScheduler = mockDAGScheduler
1071+
val taskSet = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0)
1072+
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = new ManualClock(1))
1073+
when(mockDAGScheduler.taskEnded(any(), any(), any(), any(), any())).then(new Answer[Unit] {
1074+
override def answer(invocationOnMock: InvocationOnMock): Unit = {
1075+
assert(manager.isZombie === true)
1076+
}
1077+
})
1078+
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
1079+
assert(taskOption.isDefined)
1080+
// this would fail, inside our mock dag scheduler, if it calls dagScheduler.taskEnded() too soon
1081+
manager.handleSuccessfulTask(0, createTaskResult(0))
1082+
}
1083+
10591084
test("SPARK-17894: Verify TaskSetManagers for different stage attempts have unique names") {
10601085
sc = new SparkContext("local", "test")
10611086
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))

0 commit comments

Comments
 (0)