Skip to content

Commit 9b2aeaf

Browse files
committed
Add unit test for locality wait
1 parent 4a5ea82 commit 9b2aeaf

File tree

1 file changed

+38
-0
lines changed

1 file changed

+38
-0
lines changed

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -608,6 +608,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
608608
// Fail the running task
609609
val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get
610610
taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, ByteBuffer.allocate(0))
611+
tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason)
611612
when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
612613
"executor0", failedTask.index)).thenReturn(true)
613614

@@ -622,6 +623,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
622623
val tsm2 = stageToMockTaskSetManager(1)
623624
val failedTask2 = secondTaskAttempts.find(_.executorId == "executor0").get
624625
taskScheduler.statusUpdate(failedTask2.taskId, TaskState.FAILED, ByteBuffer.allocate(0))
626+
tsm2.handleFailedTask(failedTask2.taskId, TaskState.FAILED, UnknownReason)
625627
when(tsm2.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
626628
"executor0", failedTask2.index)).thenReturn(true)
627629

@@ -644,6 +646,42 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
644646
assert(!tsm.isZombie)
645647
}
646648

649+
// this test is to check that we don't abort a taskSet which is not being scheduled on other
650+
// executors as it is waiting on locality timeout and not being aborted because it is still not
651+
// completely blacklisted.
652+
test("SPARK-22148 Ensure we don't abort the taskSet if we haven't been completely blacklisted") {
653+
taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
654+
config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0")
655+
656+
val preferredLocation = Seq(ExecutorCacheTaskLocation("host0", "executor0"))
657+
val taskSet1 = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0,
658+
preferredLocation)
659+
taskScheduler.submitTasks(taskSet1)
660+
661+
val tsm = stageToMockTaskSetManager(0)
662+
663+
// submit an offer with one executor
664+
var taskAttempts = taskScheduler.resourceOffers(IndexedSeq(
665+
WorkerOffer("executor0", "host0", 1)
666+
)).flatten
667+
668+
// Fail the running task
669+
val failedTask = taskAttempts.find(_.executorId == "executor0").get
670+
taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, ByteBuffer.allocate(0))
671+
tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason)
672+
when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
673+
"executor0", failedTask.index)).thenReturn(true)
674+
675+
// make an offer but we won't schedule anything yet as scheduler locality is still PROCESS_LOCAL
676+
assert(taskScheduler.resourceOffers(IndexedSeq(
677+
WorkerOffer("executor1", "host0", 1)
678+
)).flatten.isEmpty)
679+
680+
assert(taskScheduler.unschedulableTaskSetToExpiryTime.size == 0)
681+
682+
assert(!tsm.isZombie)
683+
}
684+
647685
/**
648686
* Helper for performance tests. Takes the explicitly blacklisted nodes and executors; verifies
649687
* that the blacklists are used efficiently to ensure scheduling is not O(numPendingTasks).

0 commit comments

Comments
 (0)