Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}

private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)

private val executorGpus = conf.getInt("spark.mesos.executor.gpus", 0)
private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)

private val taskLabels = conf.get("spark.mesos.task.labels", "")
Expand Down Expand Up @@ -328,24 +328,26 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
val offerAttributes = toAttributeMap(offer.getAttributesList)
val offerMem = getResource(offer.getResourcesList, "mem")
val offerCpus = getResource(offer.getResourcesList, "cpus")
val offerGpus = getResource(offer.getResourcesList, "gpus")
val offerPorts = getRangeResource(offer.getResourcesList, "ports")
val id = offer.getId.getValue

if (tasks.contains(offer.getId)) { // accept
val offerTasks = tasks(offer.getId)

logDebug(s"Accepting offer: $id with attributes: $offerAttributes " +
s"mem: $offerMem cpu: $offerCpus ports: $offerPorts." +
s"mem: $offerMem cpu: $offerCpus gpu: $offerGpus ports: $offerPorts." +
s" Launching ${offerTasks.size} Mesos tasks.")

for (task <- offerTasks) {
val taskId = task.getTaskId
val mem = getResource(task.getResourcesList, "mem")
val cpus = getResource(task.getResourcesList, "cpus")
val gpus = getResource(task.getResourcesList, "gpus")
val ports = getRangeResource(task.getResourcesList, "ports").mkString(",")

logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" +
s" ports: $ports")
s" gpu: $gpus ports: $ports")
}

driver.launchTasks(
Expand Down Expand Up @@ -398,9 +400,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
launchTasks = true
val taskId = newMesosTaskId()
val offerCPUs = getResource(resources, "cpus").toInt
val taskGPUs = Math.min(
Math.max(0, maxGpus - totalGpusAcquired), getResource(resources, "gpus").toInt)

val offerGPUs = getResource(resources, "gpus").toInt
var taskGPUs = executorGpus
val taskCPUs = executorCores(offerCPUs)
val taskMemory = executorMemory(sc)

Expand Down Expand Up @@ -482,6 +483,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = {
val offerMem = getResource(resources, "mem")
val offerCPUs = getResource(resources, "cpus").toInt
val offerGPUs = getResource(resources, "gpus").toInt
val cpus = executorCores(offerCPUs)
val mem = executorMemory(sc)
val ports = getRangeResource(resources, "ports")
Expand All @@ -491,6 +493,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
cpus <= offerCPUs &&
cpus + totalCoresAcquired <= maxCores &&
mem <= offerMem &&
executorGpus <= offerGPUs &&
executorGpus + totalGpusAcquired <= maxGpus &&
numExecutors() < executorLimit &&
slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES &&
meetsPortRequirements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,18 +165,47 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
}


test("mesos does not acquire more than spark.mesos.gpus.max") {
val maxGpus = 5
setBackend(Map("spark.mesos.gpus.max" -> maxGpus.toString))
test("mesos acquires spark.mesos.executor.gpus number of gpus per executor") {
setBackend(Map("spark.mesos.gpus.max" -> "5",
"spark.mesos.executor.gpus" -> "2"))

val executorMemory = backend.executorMemory(sc)
offerResources(List(Resources(executorMemory, 1, maxGpus + 1)))
offerResources(List(Resources(executorMemory, 1, 5)))

val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.length == 1)

val gpus = backend.getResource(taskInfos.head.getResourcesList, "gpus")
assert(gpus == maxGpus)
assert(gpus == 2)
}


test("mesos declines offers that cannot satisfy spark.mesos.executor.gpus") {
setBackend(Map("spark.mesos.gpus.max" -> "5",
"spark.mesos.executor.gpus" -> "2"))

val executorMemory = backend.executorMemory(sc)
offerResources(List(Resources(executorMemory, 1, 1)))
verifyDeclinedOffer(driver, createOfferId("o1"))
}


test("mesos declines offers that exceed spark.mesos.gpus.max") {
setBackend(Map("spark.mesos.gpus.max" -> "5",
"spark.mesos.executor.gpus" -> "2"))

val executorMemory = backend.executorMemory(sc)
offerResources(List(Resources(executorMemory, 1, 2),
Resources(executorMemory, 1, 2),
Resources(executorMemory, 1, 2)))

val taskInfos1 = verifyTaskLaunched(driver, "o1")
assert(backend.getResource(taskInfos1.head.getResourcesList, "gpus") == 2)

val taskInfos2 = verifyTaskLaunched(driver, "o2")
assert(backend.getResource(taskInfos2.head.getResourcesList, "gpus") == 2)

verifyDeclinedOffer(driver, createOfferId("o3"))
}


Expand Down