diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index e0944bc9f5f86..6db699fea7f10 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -196,17 +196,18 @@ configuration variables:
* Executor memory: `spark.executor.memory`
* Executor cores: `spark.executor.cores`
-* Number of executors: `spark.cores.max`/`spark.executor.cores`
+* Number of executors: min(`spark.cores.max`/`spark.executor.cores`,
+`spark.mem.max`/(`spark.executor.memory`+`spark.mesos.executor.memoryOverhead`))
Please see the [Spark Configuration](configuration.html) page for
details and default values.
Executors are brought up eagerly when the application starts, until
-`spark.cores.max` is reached. If you don't set `spark.cores.max`, the
-Spark application will reserve all resources offered to it by Mesos,
-so we of course urge you to set this variable in any sort of
-multi-tenant cluster, including one which runs multiple concurrent
-Spark applications.
+`spark.cores.max` or `spark.mem.max` is reached. If you set neither
+`spark.cores.max` nor `spark.mem.max`, the Spark application will
+consume all resources offered to it by Mesos, so we of course urge
+you to set this variable in any sort of multi-tenant cluster,
+including one which runs multiple concurrent Spark applications.
The scheduler will start executors round-robin on the offers Mesos
gives it, but there are no spread guarantees, as Mesos does not
@@ -343,6 +344,13 @@ See the [configuration page](configuration.html) for information on Spark config
The value can be a floating point number.
+
+ spark.mem.max |
+ (none) |
+
+ Maximum amount of memory Spark accepts from Mesos launching executor.
+ |
+
spark.mesos.executor.docker.image |
(none) |
@@ -613,6 +621,39 @@ See the [configuration page](configuration.html) for information on Spark config
driver disconnects, the master immediately tears down the framework.
+
+ spark.mesos.rejectOfferDuration |
+ 120s |
+
+ Time to consider unused resources refused, serves as a fallback of
+ `spark.mesos.rejectOfferDurationForUnmetConstraints`,
+ `spark.mesos.rejectOfferDurationForReachedMaxCores`,
+ `spark.mesos.rejectOfferDurationForReachedMaxMem`
+ |
+
+
+ spark.mesos.rejectOfferDurationForUnmetConstraints |
+ spark.mesos.rejectOfferDuration |
+
+ Time to consider unused resources refused with unmet constraints
+ |
+
+
+ spark.mesos.rejectOfferDurationForReachedMaxCores |
+ spark.mesos.rejectOfferDuration |
+
+ Time to consider unused resources refused when maximum number of cores
+ spark.cores.max is reached
+ |
+
+
+ spark.mesos.rejectOfferDurationForReachedMaxMem |
+ spark.mesos.rejectOfferDuration |
+
+ Time to consider unused resources refused when maximum amount of memory
+ spark.mem.max is reached
+ |
+
# Troubleshooting and Debugging
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 80c0a041b7322..c7e35e883be2f 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -64,6 +64,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private val MAX_SLAVE_FAILURES = 2
private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt)
+ private val maxMemOption = conf.getOption("spark.mem.max").map(Utils.memoryStringToMb)
private val executorCoresOption = conf.getOption("spark.executor.cores").map(_.toInt)
@@ -76,6 +77,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
cores - (cores % minCoresPerExecutor)
}
+ private val maxMem = maxMemOption.getOrElse(Int.MaxValue)
+
private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)
private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
@@ -95,8 +98,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// Cores we have acquired with each Mesos task ID
private val coresByTaskId = new mutable.HashMap[String, Int]
+ private val memByTaskId = new mutable.HashMap[String, Int]
private val gpusByTaskId = new mutable.HashMap[String, Int]
private var totalCoresAcquired = 0
+ private var totalMemAcquired = 0
private var totalGpusAcquired = 0
// The amount of time to wait for locality scheduling
@@ -149,6 +154,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private val rejectOfferDurationForReachedMaxCores =
getRejectOfferDurationForReachedMaxCores(sc.conf)
+ // Reject offers when we reached the maximum amount of memory for this framework
+ private val rejectOfferDurationForReachedMaxMem =
+ getRejectOfferDurationForReachedMaxMem(sc.conf)
+
// A client for talking to the external shuffle service
private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
if (shuffleServiceEnabled) {
@@ -398,6 +407,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
offer,
Some("reached spark.cores.max"),
Some(rejectOfferDurationForReachedMaxCores))
+ } else if (totalMemAcquired >= maxMem) {
+ // Reject an offer for a configurable amount of time to avoid starving other frameworks
+ declineOffer(driver,
+ offer,
+ Some("reached spark.mem.max"),
+ Some(rejectOfferDurationForReachedMaxMem))
} else {
declineOffer(
driver,
@@ -462,7 +477,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
tasks(offer.getId) ::= taskBuilder.build()
remainingResources(offerId) = resourcesLeft.asJava
totalCoresAcquired += taskCPUs
+ totalMemAcquired += taskMemory
coresByTaskId(taskId) = taskCPUs
+ memByTaskId(taskId) = taskMemory
if (taskGPUs > 0) {
totalGpusAcquired += taskGPUs
gpusByTaskId(taskId) = taskGPUs
@@ -511,6 +528,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
cpus <= offerCPUs &&
cpus + totalCoresAcquired <= maxCores &&
mem <= offerMem &&
+ mem + totalMemAcquired <= maxMem &&
numExecutors < executorLimit &&
slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES &&
meetsPortRequirements &&
@@ -584,6 +602,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
totalCoresAcquired -= cores
coresByTaskId -= taskId
}
+ // Also remove the memory we have remembered for this task, if it's in the hashmap
+ for (mem <- memByTaskId.get(taskId)) {
+ totalMemAcquired -= mem
+ memByTaskId -= taskId
+ }
// Also remove the gpus we have remembered for this task, if it's in the hashmap
for (gpus <- gpusByTaskId.get(taskId)) {
totalGpusAcquired -= gpus
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 6fcb30af8a733..c55e28263e873 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -391,6 +391,12 @@ trait MesosSchedulerUtils extends Logging {
getRejectOfferDurationStr(conf))
}
+ protected def getRejectOfferDurationForReachedMaxMem(conf: SparkConf): Long = {
+ conf.getTimeAsSeconds(
+ "spark.mesos.rejectOfferDurationForReachedMaxMem",
+ getRejectOfferDurationStr(conf))
+ }
+
/**
* Checks executor ports if they are within some range of the offered list of ports ranges,
*
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 6c40792112f49..6afb68dd68dfd 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -152,6 +152,22 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(cpus == maxCores)
}
+ test("mesos does not acquire more than spark.mem.max") {
+ setBackend(Map("spark.mem.max" -> "2g",
+ "spark.executor.memory" -> "1g",
+ "spark.executor.cores" -> "1"))
+
+ val executorMemory = backend.executorMemory(sc)
+ val maxCores = 10
+ offerResources(List(Resources(executorMemory * 3, maxCores)))
+
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.length == 1)
+
+ val mem = backend.getResource(taskInfos.head.getResourcesList, "mem")
+ assert(mem == executorMemory)
+ }
+
test("mesos does not acquire gpus if not specified") {
setBackend()