Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -382,59 +382,97 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
for (offer <- offers) {
val slaveId = offer.getSlaveId.getValue
val offerId = offer.getId.getValue
val resources = remainingResources(offerId)

if (canLaunchTask(slaveId, resources)) {
// Create a task
launchTasks = true
val taskId = newMesosTaskId()
val offerCPUs = getResource(resources, "cpus").toInt

val taskCPUs = executorCores(offerCPUs)
val taskMemory = executorMemory(sc)

slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId)

val (afterCPUResources, cpuResourcesToUse) =
partitionResources(resources, "cpus", taskCPUs)
val (resourcesLeft, memResourcesToUse) =
partitionResources(afterCPUResources.asJava, "mem", taskMemory)

val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
.setName("Task " + taskId)
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)

sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder)
val availableResources = remainingResources(offerId)
val offerMem = getResource(availableResources, "mem")
val offerCpu = getResource(availableResources, "cpus")

// Catch offer limits
calculateUsableResources(
sc,
offerCpu.toInt,
offerMem.toInt
).flatMap(
{
// Catch "global" limits
case (taskCPUs: Int, taskMemory: Int) =>
if (numExecutors() >= executorLimit) {
logTrace(s"${numExecutors()} exceeds limit of $executorLimit")
None
} else if (
slaves.get(slaveId).map(_.taskFailures).getOrElse(0) >= MAX_SLAVE_FAILURES
) {
logTrace(s"Slave $slaveId exceeded limit of $MAX_SLAVE_FAILURES failures")
None
} else {
Some((taskCPUs, taskMemory))
}
}

tasks(offer.getId) ::= taskBuilder.build()
remainingResources(offerId) = resourcesLeft.asJava
totalCoresAcquired += taskCPUs
coresByTaskId(taskId) = taskCPUs
) match {
case Some((taskCPUs: Int, taskMemory: Int)) =>
// Create a task
launchTasks = true
val taskId = newMesosTaskId()

slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId)

val (afterCPUResources, cpuResourcesToUse) =
partitionResources(availableResources, "cpus", taskCPUs)
val (resourcesLeft, memResourcesToUse) =
partitionResources(afterCPUResources.asJava, "mem", taskMemory)

val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
.setName("Task " + taskId)
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)

sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder)
}

tasks(offer.getId) ::= taskBuilder.build()
remainingResources(offerId) = resourcesLeft.asJava
totalCoresAcquired += taskCPUs
coresByTaskId(taskId) = taskCPUs
case None => logDebug(s"Insufficient offer CPU: $offerCpu MEM: $offerMem")
}
}
}
tasks.toMap
}

private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = {
val offerMem = getResource(resources, "mem")
val offerCPUs = getResource(resources, "cpus").toInt
val cpus = executorCores(offerCPUs)
val mem = executorMemory(sc)

cpus > 0 &&
cpus <= offerCPUs &&
cpus + totalCoresAcquired <= maxCores &&
mem <= offerMem &&
numExecutors() < executorLimit &&
slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES
/**
* Try and fit the resources to the constraints. Return None if it does not match
*
* @param sc Spark context
* @param availableCpus The available CPUs
* @param availableMem The available Memory
* @return Tuple of CPU (integer cores) and Memory (integer MB) desired
*/
private[mesos]
def calculateUsableResources(sc: SparkContext, availableCpus: Int, availableMem: Int):
Option[(Int, Int)] = {
val desiredMemory = executorMemory(sc)
val desiredCpu = executorCores(availableCpus)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea, but it does make it harder to read. Can we refactor so we can collapse global and resource limits, and introduce like a case class for each condition, where it knows how to verify and logs error message when it doesn't match?
I think it makes adding and understanding how offers are matched better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that. Will be a few weeks before I can tackle this again though.

if (desiredCpu < 1) {
logTrace(s"Executor cores too low at $desiredCpu")
None
} else if (desiredMemory > availableMem) {
logTrace(s"Offer memory $availableMem is less than needed memory $desiredMemory")
None
} else if (desiredCpu > availableCpus) {
logTrace(s"Offer cpu $availableCpus is les than needed cpu $desiredCpu")
None
} else if (desiredCpu + totalCoresAcquired > maxCores) {
logTrace(s"Adding $desiredCpu more cpus to $totalCoresAcquired"
+ s" would go over $maxCores limit")
None
} else {
Some((desiredCpu, desiredMemory))
}
}

private def executorCores(offerCPUs: Int): Int = {
Expand Down