Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,22 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(sched.finishedManagers.contains(manager))
}

test("skip unsatisfiable locality levels") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2"))
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB")))
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)

// An executor that is not NODE_LOCAL should be rejected.
assert(manager.resourceOffer("execC", "host2", ANY) === None)

// Because there are no alive PROCESS_LOCAL executors, the base locality level should be
// NODE_LOCAL. So, we should schedule the task on this offered NODE_LOCAL executor before
// any of the locality wait timers expire.
assert(manager.resourceOffer("execA", "host1", ANY).get.index === 0)
}

test("basic delay scheduling") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
Expand Down