Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit ecad77a

Browse files
author
Michael Gummelt
committed
Support multiple executors per node on Mesos.
Support spark.executor.cores on Mesos.
1 parent 33212cb commit ecad77a

File tree

9 files changed

+521
-275
lines changed

9 files changed

+521
-275
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
240240
else {
241241
val executorData = executorDataMap(task.executorId)
242242
executorData.freeCores -= scheduler.CPUS_PER_TASK
243+
244+
logInfo(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
245+
s"${executorData.executorHost}.")
246+
243247
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
244248
}
245249
}
@@ -309,7 +313,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
309313
}
310314

311315
// TODO (prashant) send conf instead of properties
312-
driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
316+
driverEndpoint = createDriverEndpointRef(properties)
317+
}
318+
319+
protected def createDriverEndpointRef(
320+
properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
321+
rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
313322
}
314323

315324
protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala

Lines changed: 230 additions & 145 deletions
Large diffs are not rendered by default.

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ private[spark] class MesosSchedulerBackend(
138138
val (resourcesAfterCpu, usedCpuResources) =
139139
partitionResources(availableResources, "cpus", mesosExecutorCores)
140140
val (resourcesAfterMem, usedMemResources) =
141-
partitionResources(resourcesAfterCpu.asJava, "mem", calculateTotalMemory(sc))
141+
partitionResources(resourcesAfterCpu.asJava, "mem", executorMemory(sc))
142142

143143
builder.addAllResources(usedCpuResources.asJava)
144144
builder.addAllResources(usedMemResources.asJava)
@@ -250,7 +250,7 @@ private[spark] class MesosSchedulerBackend(
250250
// check offers for
251251
// 1. Memory requirements
252252
// 2. CPU requirements - need at least 1 for executor, 1 for task
253-
val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
253+
val meetsMemoryRequirements = mem >= executorMemory(sc)
254254
val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
255255
val meetsRequirements =
256256
(meetsMemoryRequirements && meetsCPURequirements) ||

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,15 +140,15 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
140140
}
141141
}
142142

143-
/**
144-
* Signal that the scheduler has registered with Mesos.
145-
*/
146-
protected def getResource(res: JList[Resource], name: String): Double = {
143+
def getResource(res: JList[Resource], name: String): Double = {
147144
// A resource can have multiple values in the offer since it can either be from
148145
// a specific role or wildcard.
149146
res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum
150147
}
151148

149+
/**
150+
* Signal that the scheduler has registered with Mesos.
151+
*/
152152
protected def markRegistered(): Unit = {
153153
registerLatch.countDown()
154154
}
@@ -337,7 +337,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
337337
* @return memory requirement as (0.1 * <memoryOverhead>) or MEMORY_OVERHEAD_MINIMUM
338338
* (whichever is larger)
339339
*/
340-
def calculateTotalMemory(sc: SparkContext): Int = {
340+
def executorMemory(sc: SparkContext): Int = {
341341
sc.conf.getInt("spark.mesos.executor.memoryOverhead",
342342
math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
343343
sc.executorMemory

0 commit comments

Comments
 (0)