Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit aaf85dc

Browse files
author
Michael Gummelt
committed
[SPARK-5095][MESOS] Support launching multiple mesos executors in coarse grained mesos mode.
This is the next iteration of tnachen's previous PR: apache#4027 In that PR, we resolved with andrewor14 and pwendell to implement the Mesos scheduler's support of `spark.executor.cores` to be consistent with YARN and Standalone. This PR implements that resolution. This PR implements two high-level features. These two features are co-dependent, so they're implemented both here: - Mesos support for spark.executor.cores - Multiple executors per slave We at Mesosphere have been working with Typesafe on a Spark/Mesos integration test suite: https://github.com/typesafehub/mesos-spark-integration-tests, which passes for this PR. The contribution is my original work and I license the work to the project under the project's open source license. Author: Michael Gummelt <[email protected]> Closes apache#10993 from mgummelt/executor_sizing.
1 parent 7bf61ed commit aaf85dc

File tree

9 files changed

+524
-276
lines changed

9 files changed

+524
-276
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
241241
else {
242242
val executorData = executorDataMap(task.executorId)
243243
executorData.freeCores -= scheduler.CPUS_PER_TASK
244+
245+
logInfo(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
246+
s"${executorData.executorHost}.")
247+
244248
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
245249
}
246250
}
@@ -310,7 +314,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
310314
}
311315

312316
// TODO (prashant) send conf instead of properties
313-
driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
317+
driverEndpoint = createDriverEndpointRef(properties)
318+
}
319+
320+
protected def createDriverEndpointRef(
321+
properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
322+
rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
314323
}
315324

316325
protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala

Lines changed: 230 additions & 143 deletions
Large diffs are not rendered by default.

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ private[spark] class MesosSchedulerBackend(
137137
val (resourcesAfterCpu, usedCpuResources) =
138138
partitionResources(availableResources, "cpus", mesosExecutorCores)
139139
val (resourcesAfterMem, usedMemResources) =
140-
partitionResources(resourcesAfterCpu.asJava, "mem", calculateTotalMemory(sc))
140+
partitionResources(resourcesAfterCpu.asJava, "mem", executorMemory(sc))
141141

142142
builder.addAllResources(usedCpuResources.asJava)
143143
builder.addAllResources(usedMemResources.asJava)
@@ -249,7 +249,7 @@ private[spark] class MesosSchedulerBackend(
249249
// check offers for
250250
// 1. Memory requirements
251251
// 2. CPU requirements - need at least 1 for executor, 1 for task
252-
val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
252+
val meetsMemoryRequirements = mem >= executorMemory(sc)
253253
val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
254254
val meetsRequirements =
255255
(meetsMemoryRequirements && meetsCPURequirements) ||

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,15 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
131131
}
132132
}
133133

134-
/**
135-
* Signal that the scheduler has registered with Mesos.
136-
*/
137-
protected def getResource(res: JList[Resource], name: String): Double = {
134+
def getResource(res: JList[Resource], name: String): Double = {
138135
// A resource can have multiple values in the offer since it can either be from
139136
// a specific role or wildcard.
140137
res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum
141138
}
142139

140+
/**
141+
* Signal that the scheduler has registered with Mesos.
142+
*/
143143
protected def markRegistered(): Unit = {
144144
registerLatch.countDown()
145145
}
@@ -324,7 +324,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
324324
* @return memory requirement as (0.1 * <memoryOverhead>) or MEMORY_OVERHEAD_MINIMUM
325325
* (whichever is larger)
326326
*/
327-
def calculateTotalMemory(sc: SparkContext): Int = {
327+
def executorMemory(sc: SparkContext): Int = {
328328
sc.conf.getInt("spark.mesos.executor.memoryOverhead",
329329
math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
330330
sc.executorMemory

0 commit comments

Comments
 (0)