Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ private[spark] object MemoryUtils {
val OVERHEAD_FRACTION = 1.07
val OVERHEAD_MINIMUM = 384

def calculateTotalMemory(sc: SparkContext) = {
def calculateTotalMemory(sc: SparkContext) = calculateExecutorMemory(sc) + calculateTaskMemory(sc)

def calculateTaskMemory(sc: SparkContext) = sc.executorMemory

def calculateExecutorMemory(sc: SparkContext) = {
math.max(
sc.conf.getOption("spark.mesos.executor.memoryOverhead")
.getOrElse(OVERHEAD_MINIMUM.toString)
.toInt + sc.executorMemory,
OVERHEAD_FRACTION * sc.executorMemory
)
.getOrElse(OVERHEAD_MINIMUM.toString).toInt,
((OVERHEAD_FRACTION - 1) * sc.executorMemory).toInt)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ private[spark] class MesosSchedulerBackend(
// The listener bus to publish executor added/removed events.
val listenerBus = sc.listenerBus

val executorCores = sc.conf.getInt("spark.mesos.executor.cores", 1)

@volatile var appId: String = _

override def start() {
Expand Down Expand Up @@ -139,14 +141,14 @@ private[spark] class MesosSchedulerBackend(
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder()
.setValue(scheduler.CPUS_PER_TASK).build())
.setValue(executorCores).build())
.build()
val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(
Value.Scalar.newBuilder()
.setValue(MemoryUtils.calculateTotalMemory(sc)).build())
.setValue(MemoryUtils.calculateExecutorMemory(sc)).build())
.build()
MesosExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
Expand Down Expand Up @@ -220,10 +222,9 @@ private[spark] class MesosSchedulerBackend(
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
// TODO(pwendell): Should below be 1 + scheduler.CPUS_PER_TASK?
(mem >= MemoryUtils.calculateTotalMemory(sc) &&
// need at least 1 for executor, 1 for task
cpus >= 2 * scheduler.CPUS_PER_TASK) ||
cpus >= (executorCores + scheduler.CPUS_PER_TASK)) ||
(slaveIdsWithExecutors.contains(slaveId) &&
cpus >= scheduler.CPUS_PER_TASK)
}
Expand All @@ -233,9 +234,7 @@ private[spark] class MesosSchedulerBackend(
getResource(o.getResourcesList, "cpus").toInt
} else {
// If the executor doesn't exist yet, subtract CPU for executor
// TODO(pwendell): Should below just subtract "1"?
getResource(o.getResourcesList, "cpus").toInt -
scheduler.CPUS_PER_TASK
getResource(o.getResourcesList, "cpus").toInt - executorCores
}
new WorkerOffer(
o.getSlaveId.getValue,
Expand Down Expand Up @@ -302,12 +301,18 @@ private[spark] class MesosSchedulerBackend(
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(scheduler.CPUS_PER_TASK).build())
.build()
val memResource = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(MemoryUtils.calculateTaskMemory(sc)).build())
.build()
MesosTaskInfo.newBuilder()
.setTaskId(taskId)
.setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
.setExecutor(createExecutorInfo(slaveId))
.setName(task.name)
.addResources(cpuResource)
.addResources(memResource)
.setData(MesosTaskLaunchData(task.serializedTask, task.attemptNumber).toByteString)
.build()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])

val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", 2)))
EasyMock.replay(listenerBus)

val sc = EasyMock.createMock(classOf[SparkContext])
EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes()
Expand All @@ -99,26 +97,30 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
EasyMock.expect(sc.listenerBus).andReturn(listenerBus)
EasyMock.replay(sc)

val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")

val minMem = MemoryUtils.calculateTotalMemory(sc)
val minCpu = 4

listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", minCpu - backend.executorCores)))
EasyMock.replay(listenerBus)

val mesosOffers = new java.util.ArrayList[Offer]
mesosOffers.add(createOffer(1, minMem, minCpu))
mesosOffers.add(createOffer(2, minMem - 1, minCpu))
mesosOffers.add(createOffer(3, minMem, minCpu))

val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")

val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2)
expectedWorkerOffers.append(new WorkerOffer(
mesosOffers.get(0).getSlaveId.getValue,
mesosOffers.get(0).getHostname,
2
minCpu - backend.executorCores
))
expectedWorkerOffers.append(new WorkerOffer(
mesosOffers.get(2).getSlaveId.getValue,
mesosOffers.get(2).getHostname,
2
minCpu - backend.executorCores
))
val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(expectedWorkerOffers))).andReturn(Seq(Seq(taskDesc)))
Expand Down
7 changes: 7 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,13 @@ Apart from these, the following properties are also available, and may be useful
`spark.mesos.executor.memoryOverhead` or 7% of `spark.executor.memory`.
</td>
</tr>
<tr>
<td><code>spark.mesos.executor.cores</code></td>
<td>1</td>
<td>
The amount of cores to request for running the mesos executor.
</td>
</tr>
</table>

#### Shuffle Behavior
Expand Down
7 changes: 7 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,13 @@ See the [configuration page](configuration.html) for information on Spark config
The final total amount of memory allocated is the maximum value between executor memory plus memoryOverhead, and overhead fraction (1.07) plus the executor memory.
</td>
</tr>
<tr>
<td><code>spark.mesos.executor.cores</code></td>
<td>1</td>
<td>
The amount of cores to request for running the mesos executor.
</td>
</tr>
</table>

# Troubleshooting and Debugging
Expand Down