Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ private[spark] class TaskSchedulerImpl(
// Threshold above which we warn user initial TaskSet may be starved
val STARVATION_TIMEOUT = conf.getLong("spark.starvation.timeout", 15000)

// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intentionally left as an undocumented parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm - we could document this in docs/configuration.md -- but I don't think this is a commonly used flag, and it has been around for a while I guess.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess documenting it is better -- added a commit for that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see that it wasn't part of your change. Anyways it might make sense to document it.


// TaskSetManagers are not thread safe, so any access to one should be synchronized
// on this class.
val activeTaskSets = new HashMap[String, TaskSetManager]
Expand Down Expand Up @@ -228,16 +231,18 @@ private[spark] class TaskSchedulerImpl(
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = taskSet.taskSet.id
taskIdToExecutorId(tid) = execId
activeExecutorIds += execId
executorsByHost(host) += execId
availableCpus(i) -= taskSet.CPUS_PER_TASK
assert (availableCpus(i) >= 0)
launchedTask = true
if (availableCpus(i) >= CPUS_PER_TASK) {
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = taskSet.taskSet.id
taskIdToExecutorId(tid) = execId
activeExecutorIds += execId
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert (availableCpus(i) >= 0)
launchedTask = true
}
}
}
} while (launchedTask)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ private[spark] class TaskSetManager(
{
val conf = sched.sc.conf

// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)

/*
* Sometimes if an executor is dead or in an otherwise invalid state, the driver
* does not realize right away leading to repeated task failures. If enabled,
Expand Down Expand Up @@ -384,11 +381,10 @@ private[spark] class TaskSetManager(
def resourceOffer(
execId: String,
host: String,
availableCpus: Int,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
if (!isZombie && availableCpus >= CPUS_PER_TASK) {
if (!isZombie) {
val curTime = clock.getTime()

var allowedLocality = getAllowedLocalityLevel(curTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
if (executorActor.contains(executorId)) {
freeCores(executorId) += 1
freeCores(executorId) += scheduler.CPUS_PER_TASK
makeOffers(executorId)
} else {
// Ignoring the update since we don't know about the executor.
Expand Down Expand Up @@ -140,7 +140,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
freeCores(task.executorId) -= 1
freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
executorActor(task.executorId) ! LaunchTask(task)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ private[spark] class MesosSchedulerBackend(
val cpuResource = Resource.newBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(1).build())
.setScalar(Value.Scalar.newBuilder().setValue(scheduler.CPUS_PER_TASK).build())
.build()
MesosTaskInfo.newBuilder()
.setTaskId(taskId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ class FakeTaskSetManager(
override def resourceOffer(
execId: String,
host: String,
availableCpus: Int,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
Expand Down Expand Up @@ -125,7 +124,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
}
for (taskSet <- taskSetQueue) {
taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match {
taskSet.resourceOffer("execId_1", "hostname_1", TaskLocality.ANY) match {
case Some(task) =>
return taskSet.stageId
case None => {}
Expand Down Expand Up @@ -293,4 +292,43 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
assert(count > 0)
assert(count < numTrials)
}

test("Scheduler correctly accounts for multiple CPUs per task") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskCpus = 2

sc.conf.set("spark.task.cpus", taskCpus.toString)
val taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
val dagScheduler = new DAGScheduler(sc, taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
override def executorAdded(execId: String, host: String) {}
}

// Give zero core offers. Should not generate any tasks
val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0),
new WorkerOffer("executor1", "host1", 0))
val taskSet = FakeTask.createTaskSet(1)
taskScheduler.submitTasks(taskSet)
var taskDescriptions = taskScheduler.resourceOffers(zeroCoreWorkerOffers).flatten
assert(0 === taskDescriptions.length)

// No tasks should run as we only have 1 core free.
val numFreeCores = 1
val singleCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores),
new WorkerOffer("executor1", "host1", numFreeCores))
taskScheduler.submitTasks(taskSet)
taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten
assert(0 === taskDescriptions.length)

// Now change the offers to have 2 cores in one executor and verify if it
// is chosen.
val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus),
new WorkerOffer("executor1", "host1", numFreeCores))
taskScheduler.submitTasks(taskSet)
taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
assert(1 === taskDescriptions.length)
assert("executor0" === taskDescriptions(0).executorId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,16 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val taskSet = FakeTask.createTaskSet(1)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)

// Offer a host with no CPUs
assert(manager.resourceOffer("exec1", "host1", 0, ANY) === None)

// Offer a host with process-local as the constraint; this should work because the TaskSet
// above won't have any locality preferences
val taskOption = manager.resourceOffer("exec1", "host1", 2, TaskLocality.PROCESS_LOCAL)
val taskOption = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL)
assert(taskOption.isDefined)
val task = taskOption.get
assert(task.executorId === "exec1")
assert(sched.startedTasks.contains(0))

// Re-offer the host -- now we should get no more tasks
assert(manager.resourceOffer("exec1", "host1", 2, PROCESS_LOCAL) === None)
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)

// Tell it the task has finished
manager.handleSuccessfulTask(0, createTaskResult(0))
Expand All @@ -121,15 +118,15 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {

// First three offers should all find tasks
for (i <- 0 until 3) {
val taskOption = manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL)
val taskOption = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)
assert(taskOption.isDefined)
val task = taskOption.get
assert(task.executorId === "exec1")
}
assert(sched.startedTasks.toSet === Set(0, 1, 2))

// Re-offer the host -- now we should get no more tasks
assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)

// Finish the first two tasks
manager.handleSuccessfulTask(0, createTaskResult(0))
Expand Down Expand Up @@ -157,35 +154,35 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)

// First offer host1, exec1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)

// Offer host1, exec1 again: the last task, which has no prefs, should be chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 3)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 3)

// Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)

clock.advance(LOCALITY_WAIT)

// Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)

// Offer host1, exec1 again, at NODE_LOCAL level: we should choose task 2
assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL).get.index == 2)
assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 2)

// Offer host1, exec1 again, at NODE_LOCAL level: nothing should get chosen
assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL) === None)
assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL) === None)

// Offer host1, exec1 again, at ANY level: nothing should get chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
assert(manager.resourceOffer("exec1", "host1", ANY) === None)

clock.advance(LOCALITY_WAIT)

// Offer host1, exec1 again, at ANY level: task 1 should get chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)

// Offer host1, exec1 again, at ANY level: nothing should be chosen as we've launched all tasks
assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
}

test("delay scheduling with fallback") {
Expand All @@ -203,29 +200,29 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)

// First offer host1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)

// Offer host1 again: nothing should get chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
assert(manager.resourceOffer("exec1", "host1", ANY) === None)

clock.advance(LOCALITY_WAIT)

// Offer host1 again: second task (on host2) should get chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)

// Offer host1 again: third task (on host2) should get chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2)

// Offer host2: fifth task (also on host2) should get chosen
assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 4)
assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 4)

// Now that we've launched a local task, we should no longer launch the task for host3
assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
assert(manager.resourceOffer("exec2", "host2", ANY) === None)

clock.advance(LOCALITY_WAIT)

// After another delay, we can go ahead and launch that task non-locally
assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 3)
assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 3)
}

test("delay scheduling with failed hosts") {
Expand All @@ -240,24 +237,24 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)

// First offer host1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)

// Offer host1 again: third task should be chosen immediately because host3 is not up
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2)

// After this, nothing should get chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
assert(manager.resourceOffer("exec1", "host1", ANY) === None)

// Now mark host2 as dead
sched.removeExecutor("exec2")
manager.executorLost("exec2", "host2")

// Task 1 should immediately be launched on host1 because its original host is gone
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)

// Now that all tasks have launched, nothing new should be launched anywhere else
assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
assert(manager.resourceOffer("exec2", "host2", ANY) === None)
}

test("task result lost") {
Expand All @@ -267,14 +264,14 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)

assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)

// Tell it the task has finished but the result was lost.
manager.handleFailedTask(0, TaskState.FINISHED, TaskResultLost)
assert(sched.endedTasks(0) === TaskResultLost)

// Re-offer the host -- now we should get task 0 again.
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
}

test("repeated failures lead to task set abortion") {
Expand All @@ -287,7 +284,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
// after the last failure.
(1 to manager.maxTaskFailures).foreach { index =>
val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY)
val offerResult = manager.resourceOffer("exec1", "host1", ANY)
assert(offerResult.isDefined,
"Expect resource offer on iteration %s to return a task".format(index))
assert(offerResult.get.index === 0)
Expand Down Expand Up @@ -317,7 +314,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val manager = new TaskSetManager(sched, taskSet, 4, clock)

{
val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL)
val offerResult = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL)
assert(offerResult.isDefined, "Expect resource offer to return a task")

assert(offerResult.get.index === 0)
Expand All @@ -328,15 +325,15 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(!sched.taskSetsFailed.contains(taskSet.id))

// Ensure scheduling on exec1 fails after failure 1 due to blacklist
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.RACK_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.ANY).isEmpty)
assert(manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", TaskLocality.NODE_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", TaskLocality.RACK_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", TaskLocality.ANY).isEmpty)
}

// Run the task on exec1.1 - should work, and then fail it on exec1.1
{
val offerResult = manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL)
val offerResult = manager.resourceOffer("exec1.1", "host1", TaskLocality.NODE_LOCAL)
assert(offerResult.isDefined,
"Expect resource offer to return a task for exec1.1, offerResult = " + offerResult)

Expand All @@ -348,12 +345,12 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(!sched.taskSetsFailed.contains(taskSet.id))

// Ensure scheduling on exec1.1 fails after failure 2 due to blacklist
assert(manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1.1", "host1", TaskLocality.NODE_LOCAL).isEmpty)
}

// Run the task on exec2 - should work, and then fail it on exec2
{
val offerResult = manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY)
val offerResult = manager.resourceOffer("exec2", "host2", TaskLocality.ANY)
assert(offerResult.isDefined, "Expect resource offer to return a task")

assert(offerResult.get.index === 0)
Expand All @@ -364,20 +361,20 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(!sched.taskSetsFailed.contains(taskSet.id))

// Ensure scheduling on exec2 fails after failure 3 due to blacklist
assert(manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY).isEmpty)
assert(manager.resourceOffer("exec2", "host2", TaskLocality.ANY).isEmpty)
}

// After reschedule delay, scheduling on exec1 should be possible.
clock.advance(rescheduleDelay)

{
val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL)
val offerResult = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL)
assert(offerResult.isDefined, "Expect resource offer to return a task")

assert(offerResult.get.index === 0)
assert(offerResult.get.executorId === "exec1")

assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL).isEmpty)

// Cause exec1 to fail : failure 4
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
Expand Down
Loading