From 5f3b35b844a06f08b5e63a64e488a1208acd1243 Mon Sep 17 00:00:00 2001 From: "Li, YanKit | Wilson | RIT" Date: Mon, 23 Oct 2017 06:55:24 +0000 Subject: [PATCH 1/4] [SPARK-22133][DOCS] Documentation for Mesos Reject Offer Configurations --- docs/running-on-mesos.md | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index e0944bc9f5f8..0c8eac4d9a58 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -203,7 +203,7 @@ details and default values. Executors are brought up eagerly when the application starts, until `spark.cores.max` is reached. If you don't set `spark.cores.max`, the -Spark application will reserve all resources offered to it by Mesos, +Spark application will consume all resources offered to it by Mesos, so we of course urge you to set this variable in any sort of multi-tenant cluster, including one which runs multiple concurrent Spark applications. @@ -613,6 +613,30 @@ See the [configuration page](configuration.html) for information on Spark config driver disconnects, the master immediately tears down the framework. + + spark.mesos.rejectOfferDuration + 120s + + Time to consider unused resources refused, serves as a fallback of + `spark.mesos.rejectOfferDurationForUnmetConstraints`, + `spark.mesos.rejectOfferDurationForReachedMaxCores` + + + + spark.mesos.rejectOfferDurationForUnmetConstraints + spark.mesos.rejectOfferDuration + + Time to consider unused resources refused with unmet constraints + + + + spark.mesos.rejectOfferDurationForReachedMaxCores + spark.mesos.rejectOfferDuration + + Time to consider unused resources refused when maximum number of cores + spark.cores.max is reached + + # Troubleshooting and Debugging From 97146946f35806c6694c4f78a815a303d33483f4 Mon Sep 17 00:00:00 2001 From: "Li, YanKit | Wilson | RIT" Date: Tue, 17 Oct 2017 05:54:06 +0000 Subject: [PATCH 2/4] [SPARK-22292][Mesos] Added spark.mem.max support for Mesos --- .../MesosCoarseGrainedSchedulerBackend.scala | 23 +++++++++++++++++++ .../cluster/mesos/MesosSchedulerUtils.scala | 6 +++++ ...osCoarseGrainedSchedulerBackendSuite.scala | 17 ++++++++++++++ 3 files changed, 46 insertions(+) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 80c0a041b732..c7e35e883be2 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -64,6 +64,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private val MAX_SLAVE_FAILURES = 2 private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt) + private val maxMemOption = conf.getOption("spark.mem.max").map(Utils.memoryStringToMb) private val executorCoresOption = conf.getOption("spark.executor.cores").map(_.toInt) @@ -76,6 +77,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( cores - (cores % minCoresPerExecutor) } + private val maxMem = maxMemOption.getOrElse(Int.MaxValue) + private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false) private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0) @@ -95,8 +98,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( // Cores we have acquired with each Mesos task ID private val coresByTaskId = new mutable.HashMap[String, Int] + private val memByTaskId = new mutable.HashMap[String, Int] private val gpusByTaskId = new mutable.HashMap[String, Int] private var totalCoresAcquired = 0 + private var totalMemAcquired = 0 private var totalGpusAcquired = 0 // The amount of time to wait for locality scheduling @@ -149,6 +154,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private val rejectOfferDurationForReachedMaxCores = getRejectOfferDurationForReachedMaxCores(sc.conf) + // Reject offers when we reached the maximum amount of memory for this framework + private val rejectOfferDurationForReachedMaxMem = + getRejectOfferDurationForReachedMaxMem(sc.conf) + // A client for talking to the external shuffle service private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = { if (shuffleServiceEnabled) { @@ -398,6 +407,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( offer, Some("reached spark.cores.max"), Some(rejectOfferDurationForReachedMaxCores)) + } else if (totalMemAcquired >= maxMem) { + // Reject an offer for a configurable amount of time to avoid starving other frameworks + declineOffer(driver, + offer, + Some("reached spark.mem.max"), + Some(rejectOfferDurationForReachedMaxMem)) } else { declineOffer( driver, @@ -462,7 +477,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( tasks(offer.getId) ::= taskBuilder.build() remainingResources(offerId) = resourcesLeft.asJava totalCoresAcquired += taskCPUs + totalMemAcquired += taskMemory coresByTaskId(taskId) = taskCPUs + memByTaskId(taskId) = taskMemory if (taskGPUs > 0) { totalGpusAcquired += taskGPUs gpusByTaskId(taskId) = taskGPUs @@ -511,6 +528,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( cpus <= offerCPUs && cpus + totalCoresAcquired <= maxCores && mem <= offerMem && + mem + totalMemAcquired <= maxMem && numExecutors < executorLimit && slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES && meetsPortRequirements && @@ -584,6 +602,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( totalCoresAcquired -= cores coresByTaskId -= taskId } + // Also remove the memory we have remembered for this task, if it's in the hashmap + for (mem <- memByTaskId.get(taskId)) { + totalMemAcquired -= mem + memByTaskId -= taskId + } // Also remove the gpus we have remembered for this task, if it's in the hashmap for (gpus <- gpusByTaskId.get(taskId)) { totalGpusAcquired -= gpus diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 6fcb30af8a73..c55e28263e87 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -391,6 +391,12 @@ trait MesosSchedulerUtils extends Logging { getRejectOfferDurationStr(conf)) } + protected def getRejectOfferDurationForReachedMaxMem(conf: SparkConf): Long = { + conf.getTimeAsSeconds( + "spark.mesos.rejectOfferDurationForReachedMaxMem", + getRejectOfferDurationStr(conf)) + } + /** * Checks executor ports if they are within some range of the offered list of ports ranges, * diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 6c40792112f4..32f3da28973c 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -152,6 +152,23 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(cpus == maxCores) } + test("mesos does not acquire more than spark.mem.max") { + setBackend(Map("spark.mem.max" -> "2g", + "spark.executor.memory" -> "1g", + "spark.executor.cores" -> "1")) + + val executorMemory = backend.executorMemory(sc) + + val maxCores = 10 + offerResources(List(Resources(executorMemory * 3, maxCores))) + + val taskInfos = verifyTaskLaunched(driver, "o1") + assert(taskInfos.length == 1) + + val mem = backend.getResource(taskInfos.head.getResourcesList, "mem") + assert(mem == executorMemory) + } + test("mesos does not acquire gpus if not specified") { setBackend() From dc4f44107a9f768b932f784b2645839f4fba3ed8 Mon Sep 17 00:00:00 2001 From: "Li, YanKit | Wilson | RIT" Date: Fri, 20 Oct 2017 07:01:57 +0000 Subject: [PATCH 3/4] Remove Space, to be squashed --- .../cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 32f3da28973c..6afb68dd68df 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -158,7 +158,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite "spark.executor.cores" -> "1")) val executorMemory = backend.executorMemory(sc) - val maxCores = 10 offerResources(List(Resources(executorMemory * 3, maxCores))) From e82f828d9f108d148cc1cef82b7c8349fcceaf30 Mon Sep 17 00:00:00 2001 From: "Li, YanKit | Wilson | RIT" Date: Mon, 23 Oct 2017 06:59:15 +0000 Subject: [PATCH 4/4] Added documentation about `spark.mem.max` --- docs/running-on-mesos.md | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 0c8eac4d9a58..6db699fea7f1 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -196,17 +196,18 @@ configuration variables: * Executor memory: `spark.executor.memory` * Executor cores: `spark.executor.cores` -* Number of executors: `spark.cores.max`/`spark.executor.cores` +* Number of executors: min(`spark.cores.max`/`spark.executor.cores`, +`spark.mem.max`/(`spark.executor.memory`+`spark.mesos.executor.memoryOverhead`)) Please see the [Spark Configuration](configuration.html) page for details and default values. Executors are brought up eagerly when the application starts, until -`spark.cores.max` is reached. If you don't set `spark.cores.max`, the -Spark application will consume all resources offered to it by Mesos, -so we of course urge you to set this variable in any sort of -multi-tenant cluster, including one which runs multiple concurrent -Spark applications. +`spark.cores.max` or `spark.mem.max` is reached. If you set neither +`spark.cores.max` nor `spark.mem.max`, the Spark application will +consume all resources offered to it by Mesos, so we of course urge +you to set this variable in any sort of multi-tenant cluster, +including one which runs multiple concurrent Spark applications. The scheduler will start executors round-robin on the offers Mesos gives it, but there are no spread guarantees, as Mesos does not @@ -343,6 +344,13 @@ See the [configuration page](configuration.html) for information on Spark config The value can be a floating point number. + + spark.mem.max + (none) + + Maximum amount of memory Spark accepts from Mesos launching executor. + + spark.mesos.executor.docker.image (none) @@ -619,7 +627,8 @@ See the [configuration page](configuration.html) for information on Spark config Time to consider unused resources refused, serves as a fallback of `spark.mesos.rejectOfferDurationForUnmetConstraints`, - `spark.mesos.rejectOfferDurationForReachedMaxCores` + `spark.mesos.rejectOfferDurationForReachedMaxCores`, + `spark.mesos.rejectOfferDurationForReachedMaxMem` @@ -637,6 +646,14 @@ See the [configuration page](configuration.html) for information on Spark config spark.cores.max is reached + + spark.mesos.rejectOfferDurationForReachedMaxMem + spark.mesos.rejectOfferDuration + + Time to consider unused resources refused when maximum amount of memory + spark.mem.max is reached + + # Troubleshooting and Debugging