From 5f8ccd5789137363e035d1dfb9a05d3b9bf3ce6b Mon Sep 17 00:00:00 2001 From: Ji Yan Date: Thu, 9 Mar 2017 21:30:11 -0800 Subject: [PATCH 1/6] respect both gpu and maxgpu --- .../mesos/MesosCoarseGrainedSchedulerBackend.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index f555072c3842a..ecd499100311b 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -62,6 +62,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false) val maxGpus = conf.getInt("spark.mesos.gpus.max", 0) + val gpuCores = conf.getInt("spark.mesos.gpus", 0) private[this] val shutdownTimeoutMS = conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s") @@ -401,9 +402,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( launchTasks = true val taskId = newMesosTaskId() val offerCPUs = getResource(resources, "cpus").toInt - val taskGPUs = Math.min( - Math.max(0, maxGpus - totalGpusAcquired), getResource(resources, "gpus").toInt) - + var taskGPUs = Math.min(Math.max(0, maxGpus - totalGpusAcquired), + getResource(resources, "gpus").toInt) + if (gpuCores > 0) { + taskGPUs = gpuCores + } val taskCPUs = executorCores(offerCPUs) val taskMemory = executorMemory(sc) @@ -462,6 +465,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = { val offerMem = getResource(resources, "mem") val offerCPUs = getResource(resources, "cpus").toInt + val offerGPUs = getResource(resources, "gpus").toInt val cpus = executorCores(offerCPUs) val mem = executorMemory(sc) val ports = getRangeResource(resources, "ports") @@ -471,6 +475,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( cpus <= offerCPUs && cpus + totalCoresAcquired <= maxCores && mem <= offerMem && + gpuCores <= offerGPUs && numExecutors() < executorLimit && slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES && meetsPortRequirements From c2c1c5b66436a439e1d7342b7a2c58c502e26d6b Mon Sep 17 00:00:00 2001 From: Ji Yan Date: Thu, 9 Mar 2017 21:30:11 -0800 Subject: [PATCH 2/6] respect both gpu and maxgpu --- .../mesos/MesosCoarseGrainedSchedulerBackend.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 8f5b97ccb1f85..05f4d475a5fc5 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -75,6 +75,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0) + private val gpuCores = conf.getInt("spark.mesos.gpus", 0) + private val taskLabels = conf.get("spark.mesos.task.labels", "") private[this] val shutdownTimeoutMS = @@ -398,9 +400,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( launchTasks = true val taskId = newMesosTaskId() val offerCPUs = getResource(resources, "cpus").toInt - val taskGPUs = Math.min( - Math.max(0, maxGpus - totalGpusAcquired), getResource(resources, "gpus").toInt) - + var taskGPUs = Math.min(Math.max(0, maxGpus - totalGpusAcquired), + getResource(resources, "gpus").toInt) + if (gpuCores > 0) { + taskGPUs = gpuCores + } val taskCPUs = executorCores(offerCPUs) val taskMemory = executorMemory(sc) @@ -482,6 +486,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = { val offerMem = getResource(resources, "mem") val offerCPUs = getResource(resources, "cpus").toInt + val offerGPUs = getResource(resources, "gpus").toInt val cpus = executorCores(offerCPUs) val mem = executorMemory(sc) val ports = getRangeResource(resources, "ports") @@ -491,6 +496,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( cpus <= offerCPUs && cpus + totalCoresAcquired <= maxCores && mem <= offerMem && + gpuCores <= offerGPUs && numExecutors() < executorLimit && slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES && meetsPortRequirements From ba87b35817a7288b9b6aa41f4ac2244e235f2efd Mon Sep 17 00:00:00 2001 From: Ji Yan Date: Sat, 13 May 2017 09:53:59 -0700 Subject: [PATCH 3/6] fix syntax --- .../cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 5f01a71cd9e6d..ed850b6ba395d 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -75,8 +75,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private val gpuCores = conf.getInt("spark.mesos.gpus", 0) private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0) - private val gpuCores = conf.getInt("spark.mesos.gpus", 0) - private val taskLabels = conf.get("spark.mesos.task.labels", "") private[this] val shutdownTimeoutMS = From 5ef2881a2b1e1180b73d532988bab72c5fdab64c Mon Sep 17 00:00:00 2001 From: Ji Yan Date: Sun, 14 May 2017 13:02:16 -0700 Subject: [PATCH 4/6] fix gpu offer --- .../MesosCoarseGrainedSchedulerBackend.scala | 18 ++++++------ ...osCoarseGrainedSchedulerBackendSuite.scala | 29 +++++++++++++++---- 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index ed850b6ba395d..0282ff0c5df9e 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -72,7 +72,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false) - private val gpuCores = conf.getInt("spark.mesos.gpus", 0) + private val executorGpus = conf.getInt("spark.mesos.executor.gpus", 0) private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0) private val taskLabels = conf.get("spark.mesos.task.labels", "") @@ -328,6 +328,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val offerAttributes = toAttributeMap(offer.getAttributesList) val offerMem = getResource(offer.getResourcesList, "mem") val offerCpus = getResource(offer.getResourcesList, "cpus") + val offerGpus = getResource(offer.getResourcesList, "gpus") val offerPorts = getRangeResource(offer.getResourcesList, "ports") val id = offer.getId.getValue @@ -335,17 +336,18 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val offerTasks = tasks(offer.getId) logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + - s"mem: $offerMem cpu: $offerCpus ports: $offerPorts." + + s"mem: $offerMem cpu: $offerCpus gpu: $offerGpus ports: $offerPorts." + s" Launching ${offerTasks.size} Mesos tasks.") for (task <- offerTasks) { val taskId = task.getTaskId val mem = getResource(task.getResourcesList, "mem") val cpus = getResource(task.getResourcesList, "cpus") + val gpus = getResource(task.getResourcesList, "gpus") val ports = getRangeResource(task.getResourcesList, "ports").mkString(",") logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" + - s" ports: $ports") + s" gpu: $gpus ports: $ports") } driver.launchTasks( @@ -398,11 +400,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( launchTasks = true val taskId = newMesosTaskId() val offerCPUs = getResource(resources, "cpus").toInt - var taskGPUs = Math.min(Math.max(0, maxGpus - totalGpusAcquired), - getResource(resources, "gpus").toInt) - if (gpuCores > 0) { - taskGPUs = gpuCores - } + val offerGPUs = getResource(resources, "gpus").toInt + var taskGPUs = executorGpus val taskCPUs = executorCores(offerCPUs) val taskMemory = executorMemory(sc) @@ -494,7 +493,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( cpus <= offerCPUs && cpus + totalCoresAcquired <= maxCores && mem <= offerMem && - gpuCores <= offerGPUs && + executorGpus <= offerGPUs && + executorGpus + totalGpusAcquired <= maxGpus && numExecutors() < executorLimit && slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES && meetsPortRequirements diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 0418bfbaa5ed8..e3b86a19a1e5d 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -165,18 +165,37 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } - test("mesos does not acquire more than spark.mesos.gpus.max") { - val maxGpus = 5 - setBackend(Map("spark.mesos.gpus.max" -> maxGpus.toString)) + test("mesos acquires spark.mesos.executor.gpus number of gpus per executor") { + setBackend(Map("spark.mesos.gpus.max" -> "5")) + setBackend(Map("spark.mesos.executor.gpus" -> "2")) val executorMemory = backend.executorMemory(sc) - offerResources(List(Resources(executorMemory, 1, maxGpus + 1))) + offerResources(List(Resources(executorMemory, 1, 5))) val taskInfos = verifyTaskLaunched(driver, "o1") assert(taskInfos.length == 1) val gpus = backend.getResource(taskInfos.head.getResourcesList, "gpus") - assert(gpus == maxGpus) + assert(gpus == 2) + } + + + test("mesos declines offers that exceed spark.mesos.gpus.max") { + setBackend(Map("spark.mesos.gpus.max" -> "5")) + setBackend(Map("spark.mesos.executor.gpus" -> "2")) + + val executorMemory = backend.executorMemory(sc) + offerResources(List(Resources(executorMemory, 1, 2), + Resources(executorMemory, 1, 2), + Resources(executorMemory, 1, 2))) + + val taskInfos = verifyTaskLaunched(driver, "o1") + assert(backend.getResource(taskInfos.head.getResourcesList, "gpus") == executorGpus) + + val taskInfos = verifyTaskLaunched(driver, "o2") + assert(backend.getResource(taskInfos.head.getResourcesList, "gpus") == executorGpus) + + verifyDeclinedOffer(driver, createOfferId("o3"), true) } From c301f3d1e05cc7359142a6cfb8222ad65cbdd9eb Mon Sep 17 00:00:00 2001 From: Ji Yan Date: Sun, 14 May 2017 13:15:55 -0700 Subject: [PATCH 5/6] syntax fix --- .../mesos/MesosCoarseGrainedSchedulerBackendSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index e3b86a19a1e5d..1d291d1344f5f 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -190,10 +190,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite Resources(executorMemory, 1, 2))) val taskInfos = verifyTaskLaunched(driver, "o1") - assert(backend.getResource(taskInfos.head.getResourcesList, "gpus") == executorGpus) + assert(backend.getResource(taskInfos.head.getResourcesList, "gpus") == 2) - val taskInfos = verifyTaskLaunched(driver, "o2") - assert(backend.getResource(taskInfos.head.getResourcesList, "gpus") == executorGpus) + taskInfos = verifyTaskLaunched(driver, "o2") + assert(backend.getResource(taskInfos.head.getResourcesList, "gpus") == 2) verifyDeclinedOffer(driver, createOfferId("o3"), true) } From 7a07742f4e004e0e88aa2b3bc5143adab3689644 Mon Sep 17 00:00:00 2001 From: Ji Yan Date: Sun, 14 May 2017 17:30:50 -0700 Subject: [PATCH 6/6] pass all tests --- ...osCoarseGrainedSchedulerBackendSuite.scala | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 1d291d1344f5f..1a515d64aa4b4 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -166,8 +166,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos acquires spark.mesos.executor.gpus number of gpus per executor") { - setBackend(Map("spark.mesos.gpus.max" -> "5")) - setBackend(Map("spark.mesos.executor.gpus" -> "2")) + setBackend(Map("spark.mesos.gpus.max" -> "5", + "spark.mesos.executor.gpus" -> "2")) val executorMemory = backend.executorMemory(sc) offerResources(List(Resources(executorMemory, 1, 5))) @@ -180,22 +180,32 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } + test("mesos declines offers that cannot satisfy spark.mesos.executor.gpus") { + setBackend(Map("spark.mesos.gpus.max" -> "5", + "spark.mesos.executor.gpus" -> "2")) + + val executorMemory = backend.executorMemory(sc) + offerResources(List(Resources(executorMemory, 1, 1))) + verifyDeclinedOffer(driver, createOfferId("o1")) + } + + test("mesos declines offers that exceed spark.mesos.gpus.max") { - setBackend(Map("spark.mesos.gpus.max" -> "5")) - setBackend(Map("spark.mesos.executor.gpus" -> "2")) + setBackend(Map("spark.mesos.gpus.max" -> "5", + "spark.mesos.executor.gpus" -> "2")) val executorMemory = backend.executorMemory(sc) offerResources(List(Resources(executorMemory, 1, 2), Resources(executorMemory, 1, 2), Resources(executorMemory, 1, 2))) - val taskInfos = verifyTaskLaunched(driver, "o1") - assert(backend.getResource(taskInfos.head.getResourcesList, "gpus") == 2) + val taskInfos1 = verifyTaskLaunched(driver, "o1") + assert(backend.getResource(taskInfos1.head.getResourcesList, "gpus") == 2) - taskInfos = verifyTaskLaunched(driver, "o2") - assert(backend.getResource(taskInfos.head.getResourcesList, "gpus") == 2) + val taskInfos2 = verifyTaskLaunched(driver, "o2") + assert(backend.getResource(taskInfos2.head.getResourcesList, "gpus") == 2) - verifyDeclinedOffer(driver, createOfferId("o3"), true) + verifyDeclinedOffer(driver, createOfferId("o3")) }