@@ -917,4 +917,108 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
917917 taskScheduler.initialize(new FakeSchedulerBackend )
918918 }
919919 }
920+
921+ test(" Completions in zombie tasksets update status of non-zombie taskset" ) {
922+ val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
923+ val valueSer = SparkEnv .get.serializer.newInstance()
924+
925+ def completeTaskSuccessfully (tsm : TaskSetManager , partition : Int ): Unit = {
926+ val indexInTsm = tsm.partitionToIndex(partition)
927+ val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
928+ val result = new DirectTaskResult [Int ](valueSer.serialize(1 ), Seq ())
929+ tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
930+ }
931+
932+ // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
933+ // two times, so we have three active task sets for one stage. (For this to really happen,
934+ // you'd need the previous stage to also get restarted, and then succeed, in between each
935+ // attempt, but that happens outside what we're mocking here.)
936+ val zombieAttempts = (0 until 2 ).map { stageAttempt =>
937+ val attempt = FakeTask .createTaskSet(10 , stageAttemptId = stageAttempt)
938+ taskScheduler.submitTasks(attempt)
939+ val tsm = taskScheduler.taskSetManagerForAttempt(0 , stageAttempt).get
940+ val offers = (0 until 10 ).map{ idx => WorkerOffer (s " exec- $idx" , s " host- $idx" , 1 ) }
941+ taskScheduler.resourceOffers(offers)
942+ assert(tsm.runningTasks === 10 )
943+ // fail attempt
944+ tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState .FAILED ,
945+ FetchFailed (null , 0 , 0 , 0 , " fetch failed" ))
946+ // the attempt is a zombie, but the tasks are still running (this could be true even if
947+ // we actively killed those tasks, as killing is best-effort)
948+ assert(tsm.isZombie)
949+ assert(tsm.runningTasks === 9 )
950+ tsm
951+ }
952+
953+ // we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for
954+ // the stage, but this time with insufficient resources so not all tasks are active.
955+
956+ val finalAttempt = FakeTask .createTaskSet(10 , stageAttemptId = 2 )
957+ taskScheduler.submitTasks(finalAttempt)
958+ val finalTsm = taskScheduler.taskSetManagerForAttempt(0 , 2 ).get
959+ val offers = (0 until 5 ).map{ idx => WorkerOffer (s " exec- $idx" , s " host- $idx" , 1 ) }
960+ val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
961+ finalAttempt.tasks(task.index).partitionId
962+ }.toSet
963+ assert(finalTsm.runningTasks === 5 )
964+ assert(! finalTsm.isZombie)
965+
966+ // We simulate late completions from our zombie tasksets, corresponding to all the pending
967+ // partitions in our final attempt. This means we're only waiting on the tasks we've already
968+ // launched.
969+ val finalAttemptPendingPartitions = (0 until 10 ).toSet.diff(finalAttemptLaunchedPartitions)
970+ finalAttemptPendingPartitions.foreach { partition =>
971+ completeTaskSuccessfully(zombieAttempts(0 ), partition)
972+ }
973+
974+ // If there is another resource offer, we shouldn't run anything. Though our final attempt
975+ // used to have pending tasks, now those tasks have been completed by zombie attempts. The
976+ // remaining tasks to compute are already active in the non-zombie attempt.
977+ assert(
978+ taskScheduler.resourceOffers(IndexedSeq (WorkerOffer (" exec-1" , " host-1" , 1 ))).flatten.isEmpty)
979+
980+ val remainingTasks = finalAttemptLaunchedPartitions.toIndexedSeq.sorted
981+
982+ // finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be
983+ // marked as zombie.
984+ // for each of the remaining tasks, find the tasksets with an active copy of the task, and
985+ // finish the task.
986+ remainingTasks.foreach { partition =>
987+ val tsm = if (partition == 0 ) {
988+ // we failed this task on both zombie attempts, this one is only present in the latest
989+ // taskset
990+ finalTsm
991+ } else {
992+ // should be active in every taskset. We choose a zombie taskset just to make sure that
993+ // we transition the active taskset correctly even if the final completion comes
994+ // from a zombie.
995+ zombieAttempts(partition % 2 )
996+ }
997+ completeTaskSuccessfully(tsm, partition)
998+ }
999+
1000+ assert(finalTsm.isZombie)
1001+
1002+ // no taskset has completed all of its tasks, so no updates to the blacklist tracker yet
1003+ verify(blacklist, never).updateBlacklistForSuccessfulTaskSet(anyInt(), anyInt(), anyObject())
1004+
1005+ // finally, lets complete all the tasks. We simulate failures in attempt 1, but everything
1006+ // else succeeds, to make sure we get the right updates to the blacklist in all cases.
1007+ (zombieAttempts ++ Seq (finalTsm)).foreach { tsm =>
1008+ val stageAttempt = tsm.taskSet.stageAttemptId
1009+ tsm.runningTasksSet.foreach { index =>
1010+ if (stageAttempt == 1 ) {
1011+ tsm.handleFailedTask(tsm.taskInfos(index).taskId, TaskState .FAILED , TaskResultLost )
1012+ } else {
1013+ val result = new DirectTaskResult [Int ](valueSer.serialize(1 ), Seq ())
1014+ tsm.handleSuccessfulTask(tsm.taskInfos(index).taskId, result)
1015+ }
1016+ }
1017+
1018+ // we update the blacklist for the stage attempts with all successful tasks. Even though
1019+ // some tasksets had failures, we still consider them all successful from a blacklisting
1020+ // perspective, as the failures weren't from a problem w/ the tasks themselves.
1021+ verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0 ), meq(stageAttempt), anyObject())
1022+ }
1023+ }
9201024}
0 commit comments