@@ -910,4 +910,108 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
910910 taskScheduler.initialize(new FakeSchedulerBackend )
911911 }
912912 }
913+
914+ test(" Completions in zombie tasksets update status of non-zombie taskset" ) {
915+ val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
916+ val valueSer = SparkEnv .get.serializer.newInstance()
917+
918+ def completeTaskSuccessfully (tsm : TaskSetManager , partition : Int ): Unit = {
919+ val indexInTsm = tsm.partitionToIndex(partition)
920+ val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
921+ val result = new DirectTaskResult [Int ](valueSer.serialize(1 ), Seq ())
922+ tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
923+ }
924+
925+ // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
926+ // two times, so we have three active task sets for one stage. (For this to really happen,
927+ // you'd need the previous stage to also get restarted, and then succeed, in between each
928+ // attempt, but that happens outside what we're mocking here.)
929+ val zombieAttempts = (0 until 2 ).map { stageAttempt =>
930+ val attempt = FakeTask .createTaskSet(10 , stageAttemptId = stageAttempt)
931+ taskScheduler.submitTasks(attempt)
932+ val tsm = taskScheduler.taskSetManagerForAttempt(0 , stageAttempt).get
933+ val offers = (0 until 10 ).map{ idx => WorkerOffer (s " exec- $idx" , s " host- $idx" , 1 ) }
934+ taskScheduler.resourceOffers(offers)
935+ assert(tsm.runningTasks === 10 )
936+ // fail attempt
937+ tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState .FAILED ,
938+ FetchFailed (null , 0 , 0 , 0 , " fetch failed" ))
939+ // the attempt is a zombie, but the tasks are still running (this could be true even if
940+ // we actively killed those tasks, as killing is best-effort)
941+ assert(tsm.isZombie)
942+ assert(tsm.runningTasks === 9 )
943+ tsm
944+ }
945+
946+ // we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for
947+ // the stage, but this time with insufficient resources so not all tasks are active.
948+
949+ val finalAttempt = FakeTask .createTaskSet(10 , stageAttemptId = 2 )
950+ taskScheduler.submitTasks(finalAttempt)
951+ val finalTsm = taskScheduler.taskSetManagerForAttempt(0 , 2 ).get
952+ val offers = (0 until 5 ).map{ idx => WorkerOffer (s " exec- $idx" , s " host- $idx" , 1 ) }
953+ val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
954+ finalAttempt.tasks(task.index).partitionId
955+ }.toSet
956+ assert(finalTsm.runningTasks === 5 )
957+ assert(! finalTsm.isZombie)
958+
959+ // We simulate late completions from our zombie tasksets, corresponding to all the pending
960+ // partitions in our final attempt. This means we're only waiting on the tasks we've already
961+ // launched.
962+ val finalAttemptPendingPartitions = (0 until 10 ).toSet.diff(finalAttemptLaunchedPartitions)
963+ finalAttemptPendingPartitions.foreach { partition =>
964+ completeTaskSuccessfully(zombieAttempts(0 ), partition)
965+ }
966+
967+ // If there is another resource offer, we shouldn't run anything. Though our final attempt
968+ // used to have pending tasks, now those tasks have been completed by zombie attempts. The
969+ // remaining tasks to compute are already active in the non-zombie attempt.
970+ assert(
971+ taskScheduler.resourceOffers(IndexedSeq (WorkerOffer (" exec-1" , " host-1" , 1 ))).flatten.isEmpty)
972+
973+ val remainingTasks = finalAttemptLaunchedPartitions.toIndexedSeq.sorted
974+
975+ // finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be
976+ // marked as zombie.
977+ // for each of the remaining tasks, find the tasksets with an active copy of the task, and
978+ // finish the task.
979+ remainingTasks.foreach { partition =>
980+ val tsm = if (partition == 0 ) {
981+ // we failed this task on both zombie attempts, this one is only present in the latest
982+ // taskset
983+ finalTsm
984+ } else {
985+ // should be active in every taskset. We choose a zombie taskset just to make sure that
986+ // we transition the active taskset correctly even if the final completion comes
987+ // from a zombie.
988+ zombieAttempts(partition % 2 )
989+ }
990+ completeTaskSuccessfully(tsm, partition)
991+ }
992+
993+ assert(finalTsm.isZombie)
994+
995+ // no taskset has completed all of its tasks, so no updates to the blacklist tracker yet
996+ verify(blacklist, never).updateBlacklistForSuccessfulTaskSet(anyInt(), anyInt(), anyObject())
997+
998+ // finally, lets complete all the tasks. We simulate failures in attempt 1, but everything
999+ // else succeeds, to make sure we get the right updates to the blacklist in all cases.
1000+ (zombieAttempts ++ Seq (finalTsm)).foreach { tsm =>
1001+ val stageAttempt = tsm.taskSet.stageAttemptId
1002+ tsm.runningTasksSet.foreach { index =>
1003+ if (stageAttempt == 1 ) {
1004+ tsm.handleFailedTask(tsm.taskInfos(index).taskId, TaskState .FAILED , TaskResultLost )
1005+ } else {
1006+ val result = new DirectTaskResult [Int ](valueSer.serialize(1 ), Seq ())
1007+ tsm.handleSuccessfulTask(tsm.taskInfos(index).taskId, result)
1008+ }
1009+ }
1010+
1011+ // we update the blacklist for the stage attempts with all successful tasks. Even though
1012+ // some tasksets had failures, we still consider them all successful from a blacklisting
1013+ // perspective, as the failures weren't from a problem w/ the tasks themselves.
1014+ verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0 ), meq(stageAttempt), anyObject())
1015+ }
1016+ }
9131017}
0 commit comments