Skip to content

Commit e5c6143

Browse files
Ngone51squito
authored andcommitted
[SPARK-23433][SPARK-25250][CORE] Later created TaskSet should learn about the finished partitions
## What changes were proposed in this pull request? This is an optional solution for #22806 . #21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why #22806 hit the issue. This pr extends #21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks. ## How was this patch tested? Add. Closes #23871 from Ngone51/dev-23433-25250. Lead-authored-by: wuyi <[email protected]> Co-authored-by: Ngone51 <[email protected]> Signed-off-by: Imran Rashid <[email protected]>
1 parent 9bddf71 commit e5c6143

File tree

3 files changed

+79
-20
lines changed

3 files changed

+79
-20
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
2323
import java.util.concurrent.atomic.AtomicLong
2424

2525
import scala.collection.Set
26-
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
26+
import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet}
2727
import scala.util.Random
2828

2929
import org.apache.spark._
@@ -102,6 +102,9 @@ private[spark] class TaskSchedulerImpl(
102102
// Protected by `this`
103103
val taskIdToExecutorId = new HashMap[Long, String]
104104

105+
// Protected by `this`
106+
private[scheduler] val stageIdToFinishedPartitions = new HashMap[Int, BitSet]
107+
105108
@volatile private var hasReceivedTask = false
106109
@volatile private var hasLaunchedTask = false
107110
private val starvationTimer = new Timer(true)
@@ -244,7 +247,20 @@ private[spark] class TaskSchedulerImpl(
244247
private[scheduler] def createTaskSetManager(
245248
taskSet: TaskSet,
246249
maxTaskFailures: Int): TaskSetManager = {
247-
new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
250+
// only create a BitSet once for a certain stage since we only remove
251+
// that stage when an active TaskSetManager succeed.
252+
stageIdToFinishedPartitions.getOrElseUpdate(taskSet.stageId, new BitSet)
253+
val tsm = new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
254+
// TaskSet got submitted by DAGScheduler may have some already completed
255+
// tasks since DAGScheduler does not always know all the tasks that have
256+
// been completed by other tasksets when completing a stage, so we mark
257+
// those tasks as finished here to avoid launching duplicate tasks, while
258+
// holding the TaskSchedulerImpl lock.
259+
// See SPARK-25250 and `markPartitionCompletedInAllTaskSets()`
260+
stageIdToFinishedPartitions.get(taskSet.stageId).foreach {
261+
finishedPartitions => finishedPartitions.foreach(tsm.markPartitionCompleted(_, None))
262+
}
263+
tsm
248264
}
249265

250266
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
@@ -847,19 +863,31 @@ private[spark] class TaskSchedulerImpl(
847863
}
848864

849865
/**
850-
* Marks the task has completed in all TaskSetManagers for the given stage.
866+
* Marks the task has completed in all TaskSetManagers(active / zombie) for the given stage.
851867
*
852868
* After stage failure and retry, there may be multiple TaskSetManagers for the stage.
853869
* If an earlier attempt of a stage completes a task, we should ensure that the later attempts
854870
* do not also submit those same tasks. That also means that a task completion from an earlier
855871
* attempt can lead to the entire stage getting marked as successful.
872+
* And there is also the possibility that the DAGScheduler submits another taskset at the same
873+
* time as we're marking a task completed here -- that taskset would have a task for a partition
874+
* that was already completed. We maintain the set of finished partitions in
875+
* stageIdToFinishedPartitions, protected by this, so we can detect those tasks when the taskset
876+
* is submitted. See SPARK-25250 for more details.
877+
*
878+
* note: this method must be called with a lock on this.
856879
*/
857880
private[scheduler] def markPartitionCompletedInAllTaskSets(
858881
stageId: Int,
859882
partitionId: Int,
860883
taskInfo: TaskInfo) = {
884+
// if we do not find a BitSet for this stage, which means an active TaskSetManager
885+
// has already succeeded and removed the stage.
886+
stageIdToFinishedPartitions.get(stageId).foreach{
887+
finishedPartitions => finishedPartitions += partitionId
888+
}
861889
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
862-
tsm.markPartitionCompleted(partitionId, taskInfo)
890+
tsm.markPartitionCompleted(partitionId, Some(taskInfo))
863891
}
864892
}
865893

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.NotSerializableException
2121
import java.nio.ByteBuffer
2222
import java.util.concurrent.ConcurrentLinkedQueue
2323

24-
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
24+
import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet}
2525
import scala.math.max
2626
import scala.util.control.NonFatal
2727

@@ -779,7 +779,11 @@ private[spark] class TaskSetManager(
779779
// Mark successful and stop if all the tasks have succeeded.
780780
successful(index) = true
781781
if (tasksSuccessful == numTasks) {
782-
isZombie = true
782+
// clean up finished partitions for the stage when the active TaskSetManager succeed
783+
if (!isZombie) {
784+
sched.stageIdToFinishedPartitions -= stageId
785+
isZombie = true
786+
}
783787
}
784788
} else {
785789
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
@@ -798,16 +802,21 @@ private[spark] class TaskSetManager(
798802
maybeFinishTaskSet()
799803
}
800804

801-
private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = {
805+
private[scheduler] def markPartitionCompleted(
806+
partitionId: Int,
807+
taskInfo: Option[TaskInfo]): Unit = {
802808
partitionToIndex.get(partitionId).foreach { index =>
803809
if (!successful(index)) {
804810
if (speculationEnabled && !isZombie) {
805-
successfulTaskDurations.insert(taskInfo.duration)
811+
taskInfo.foreach { info => successfulTaskDurations.insert(info.duration) }
806812
}
807813
tasksSuccessful += 1
808814
successful(index) = true
809815
if (tasksSuccessful == numTasks) {
810-
isZombie = true
816+
if (!isZombie) {
817+
sched.stageIdToFinishedPartitions -= stageId
818+
isZombie = true
819+
}
811820
}
812821
maybeFinishTaskSet()
813822
}

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,7 +1102,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11021102
}
11031103
}
11041104

1105-
test("Completions in zombie tasksets update status of non-zombie taskset") {
1105+
test("SPARK-23433/25250 Completions in zombie tasksets update status of non-zombie taskset") {
11061106
val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
11071107
val valueSer = SparkEnv.get.serializer.newInstance()
11081108

@@ -1114,9 +1114,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11141114
}
11151115

11161116
// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
1117-
// two times, so we have three active task sets for one stage. (For this to really happen,
1118-
// you'd need the previous stage to also get restarted, and then succeed, in between each
1119-
// attempt, but that happens outside what we're mocking here.)
1117+
// two times, so we have three TaskSetManagers(2 zombie, 1 active) for one stage. (For this
1118+
// to really happen, you'd need the previous stage to also get restarted, and then succeed,
1119+
// in between each attempt, but that happens outside what we're mocking here.)
11201120
val zombieAttempts = (0 until 2).map { stageAttempt =>
11211121
val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
11221122
taskScheduler.submitTasks(attempt)
@@ -1133,30 +1133,51 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11331133
assert(tsm.runningTasks === 9)
11341134
tsm
11351135
}
1136+
// we've now got 2 zombie attempts, each with 9 tasks still running. And there's no active
1137+
// attempt exists in taskScheduler by now.
1138+
1139+
// finish partition 1,2 by completing the tasks before a new attempt for the same stage submit.
1140+
// This is possible since the behaviour of submitting new attempt and handling successful task
1141+
// is from two different threads, which are "task-result-getter" and "dag-scheduler-event-loop"
1142+
// separately.
1143+
(0 until 2).foreach { i =>
1144+
completeTaskSuccessfully(zombieAttempts(i), i + 1)
1145+
assert(taskScheduler.stageIdToFinishedPartitions(0).contains(i + 1))
1146+
}
11361147

1137-
// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for
1138-
// the stage, but this time with insufficient resources so not all tasks are active.
1139-
1148+
// Submit the 3rd attempt still with 10 tasks, this happens due to the race between thread
1149+
// "task-result-getter" and "dag-scheduler-event-loop", where a TaskSet gets submitted with
1150+
// already completed tasks. And this time with insufficient resources so not all tasks are
1151+
// active.
11401152
val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
11411153
taskScheduler.submitTasks(finalAttempt)
11421154
val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
1155+
// Though finalTSM gets submitted with 10 tasks, the call to taskScheduler.submitTasks should
1156+
// realize that 2 tasks have already completed, and mark them appropriately, so it won't launch
1157+
// any duplicate tasks later (SPARK-25250).
1158+
(0 until 2).map(_ + 1).foreach { partitionId =>
1159+
val index = finalTsm.partitionToIndex(partitionId)
1160+
assert(finalTsm.successful(index))
1161+
}
1162+
11431163
val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
11441164
val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
11451165
finalAttempt.tasks(task.index).partitionId
11461166
}.toSet
11471167
assert(finalTsm.runningTasks === 5)
11481168
assert(!finalTsm.isZombie)
11491169

1150-
// We simulate late completions from our zombie tasksets, corresponding to all the pending
1151-
// partitions in our final attempt. This means we're only waiting on the tasks we've already
1152-
// launched.
1170+
// We continually simulate late completions from our zombie tasksets(but this time, there's one
1171+
// active attempt exists in taskScheduler), corresponding to all the pending partitions in our
1172+
// final attempt. This means we're only waiting on the tasks we've already launched.
11531173
val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions)
11541174
finalAttemptPendingPartitions.foreach { partition =>
11551175
completeTaskSuccessfully(zombieAttempts(0), partition)
1176+
assert(taskScheduler.stageIdToFinishedPartitions(0).contains(partition))
11561177
}
11571178

11581179
// If there is another resource offer, we shouldn't run anything. Though our final attempt
1159-
// used to have pending tasks, now those tasks have been completed by zombie attempts. The
1180+
// used to have pending tasks, now those tasks have been completed by zombie attempts. The
11601181
// remaining tasks to compute are already active in the non-zombie attempt.
11611182
assert(
11621183
taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)
@@ -1204,6 +1225,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
12041225
// perspective, as the failures weren't from a problem w/ the tasks themselves.
12051226
verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), any())
12061227
}
1228+
assert(taskScheduler.stageIdToFinishedPartitions.isEmpty)
12071229
}
12081230

12091231
test("don't schedule for a barrier taskSet if available slots are less than pending tasks") {

0 commit comments

Comments
 (0)