Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 47 additions & 6 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,17 +196,18 @@ configuration variables:

* Executor memory: `spark.executor.memory`
* Executor cores: `spark.executor.cores`
* Number of executors: `spark.cores.max`/`spark.executor.cores`
* Number of executors: min(`spark.cores.max`/`spark.executor.cores`,
`spark.mem.max`/(`spark.executor.memory`+`spark.mesos.executor.memoryOverhead`))

Please see the [Spark Configuration](configuration.html) page for
details and default values.

Executors are brought up eagerly when the application starts, until
`spark.cores.max` is reached. If you don't set `spark.cores.max`, the
Spark application will reserve all resources offered to it by Mesos,
so we of course urge you to set this variable in any sort of
multi-tenant cluster, including one which runs multiple concurrent
Spark applications.
`spark.cores.max` or `spark.mem.max` is reached. If you set neither
`spark.cores.max` nor `spark.mem.max`, the Spark application will
consume all resources offered to it by Mesos, so we of course urge
you to set this variable in any sort of multi-tenant cluster,
including one which runs multiple concurrent Spark applications.

The scheduler will start executors round-robin on the offers Mesos
gives it, but there are no spread guarantees, as Mesos does not
Expand Down Expand Up @@ -343,6 +344,13 @@ See the [configuration page](configuration.html) for information on Spark config
The value can be a floating point number.
</td>
</tr>
<tr>
<td><code>spark.mem.max</code></td>
<td><code>(none)</code></td>
<td>
Maximum amount of memory Spark accepts from Mesos launching executor.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add "across the cluster (not from each machine)". And, something about there is no maximum if this property is not set.

</td>
</tr>
<tr>
<td><code>spark.mesos.executor.docker.image</code></td>
<td>(none)</td>
Expand Down Expand Up @@ -613,6 +621,39 @@ See the [configuration page](configuration.html) for information on Spark config
driver disconnects, the master immediately tears down the framework.
</td>
</tr>
<tr>
<td><code>spark.mesos.rejectOfferDuration</code></td>
<td><code>120s</code></td>
<td>
Time to consider unused resources refused, serves as a fallback of
`spark.mesos.rejectOfferDurationForUnmetConstraints`,
`spark.mesos.rejectOfferDurationForReachedMaxCores`,
`spark.mesos.rejectOfferDurationForReachedMaxMem`
</td>
</tr>
<tr>
<td><code>spark.mesos.rejectOfferDurationForUnmetConstraints</code></td>
<td><code>spark.mesos.rejectOfferDuration</code></td>
<td>
Time to consider unused resources refused with unmet constraints
</td>
</tr>
<tr>
<td><code>spark.mesos.rejectOfferDurationForReachedMaxCores</code></td>
<td><code>spark.mesos.rejectOfferDuration</code></td>
<td>
Time to consider unused resources refused when maximum number of cores
<code>spark.cores.max</code> is reached
</td>
</tr>
<tr>
<td><code>spark.mesos.rejectOfferDurationForReachedMaxMem</code></td>
<td><code>spark.mesos.rejectOfferDuration</code></td>
<td>
Time to consider unused resources refused when maximum amount of memory
<code>spark.mem.max</code> is reached
</td>
</tr>
</table>

# Troubleshooting and Debugging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private val MAX_SLAVE_FAILURES = 2

private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt)
private val maxMemOption = conf.getOption("spark.mem.max").map(Utils.memoryStringToMb)
Copy link
Contributor

@skonto skonto Oct 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we defend against minimum values and fail fast? For example default executor memory is 1.4MB. We could calculate the value returned by MesosSchedulerUtils.executorMemory. I don't think these values calculated in canLaunchTask ever change. Same applies for cpus.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@skonto
For cpus, I think we can compare with minCoresPerExecutor
For mem, calling the MesosSchedulerUtils.executorMemory to get the minimum requirement.

Then at here, we parse the option, check the minimum and if it is too small, throw exception?

Copy link
Contributor

@skonto skonto Oct 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception I think would be ok, the idea if something is never going to work let the user know, especially for the novice user. In general, the minimum would be a warning if we dont want an exception thrown.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I add the check with this PR or a separate one?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to have a check similar to

but for memory, so that we know we'll "land" on the maximum?


private val executorCoresOption = conf.getOption("spark.executor.cores").map(_.toInt)

Expand All @@ -76,6 +77,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
cores - (cores % minCoresPerExecutor)
}

private val maxMem = maxMemOption.getOrElse(Int.MaxValue)

private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)

private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
Expand All @@ -95,8 +98,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(

// Cores we have acquired with each Mesos task ID
private val coresByTaskId = new mutable.HashMap[String, Int]
private val memByTaskId = new mutable.HashMap[String, Int]
private val gpusByTaskId = new mutable.HashMap[String, Int]
private var totalCoresAcquired = 0
private var totalMemAcquired = 0
private var totalGpusAcquired = 0

// The amount of time to wait for locality scheduling
Expand Down Expand Up @@ -149,6 +154,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private val rejectOfferDurationForReachedMaxCores =
getRejectOfferDurationForReachedMaxCores(sc.conf)

// Reject offers when we reached the maximum amount of memory for this framework
private val rejectOfferDurationForReachedMaxMem =
getRejectOfferDurationForReachedMaxMem(sc.conf)

// A client for talking to the external shuffle service
private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
if (shuffleServiceEnabled) {
Expand Down Expand Up @@ -398,6 +407,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
offer,
Some("reached spark.cores.max"),
Some(rejectOfferDurationForReachedMaxCores))
} else if (totalMemAcquired >= maxMem) {
// Reject an offer for a configurable amount of time to avoid starving other frameworks
declineOffer(driver,
offer,
Some("reached spark.mem.max"),
Some(rejectOfferDurationForReachedMaxMem))
} else {
declineOffer(
driver,
Expand Down Expand Up @@ -462,7 +477,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
tasks(offer.getId) ::= taskBuilder.build()
remainingResources(offerId) = resourcesLeft.asJava
totalCoresAcquired += taskCPUs
totalMemAcquired += taskMemory
coresByTaskId(taskId) = taskCPUs
memByTaskId(taskId) = taskMemory
if (taskGPUs > 0) {
totalGpusAcquired += taskGPUs
gpusByTaskId(taskId) = taskGPUs
Expand Down Expand Up @@ -511,6 +528,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
cpus <= offerCPUs &&
cpus + totalCoresAcquired <= maxCores &&
mem <= offerMem &&
mem + totalMemAcquired <= maxMem &&
numExecutors < executorLimit &&
slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES &&
meetsPortRequirements &&
Expand Down Expand Up @@ -584,6 +602,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
totalCoresAcquired -= cores
coresByTaskId -= taskId
}
// Also remove the memory we have remembered for this task, if it's in the hashmap
for (mem <- memByTaskId.get(taskId)) {
totalMemAcquired -= mem
memByTaskId -= taskId
}
// Also remove the gpus we have remembered for this task, if it's in the hashmap
for (gpus <- gpusByTaskId.get(taskId)) {
totalGpusAcquired -= gpus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,12 @@ trait MesosSchedulerUtils extends Logging {
getRejectOfferDurationStr(conf))
}

protected def getRejectOfferDurationForReachedMaxMem(conf: SparkConf): Long = {
conf.getTimeAsSeconds(
"spark.mesos.rejectOfferDurationForReachedMaxMem",
getRejectOfferDurationStr(conf))
}

/**
* Checks executor ports if they are within some range of the offered list of ports ranges,
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,22 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(cpus == maxCores)
}

test("mesos does not acquire more than spark.mem.max") {
setBackend(Map("spark.mem.max" -> "2g",
"spark.executor.memory" -> "1g",
"spark.executor.cores" -> "1"))

val executorMemory = backend.executorMemory(sc)
val maxCores = 10
offerResources(List(Resources(executorMemory * 3, maxCores)))

val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.length == 1)

val mem = backend.getResource(taskInfos.head.getResourcesList, "mem")
assert(mem == executorMemory)
}

test("mesos does not acquire gpus if not specified") {
setBackend()

Expand Down