diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala index 5101ec8352e79..487570d58531f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala @@ -24,12 +24,14 @@ private[spark] object MemoryUtils { val OVERHEAD_FRACTION = 1.07 val OVERHEAD_MINIMUM = 384 - def calculateTotalMemory(sc: SparkContext) = { + def calculateTotalMemory(sc: SparkContext) = calculateExecutorMemory(sc) + calculateTaskMemory(sc) + + def calculateTaskMemory(sc: SparkContext) = sc.executorMemory + + def calculateExecutorMemory(sc: SparkContext) = { math.max( sc.conf.getOption("spark.mesos.executor.memoryOverhead") - .getOrElse(OVERHEAD_MINIMUM.toString) - .toInt + sc.executorMemory, - OVERHEAD_FRACTION * sc.executorMemory - ) + .getOrElse(OVERHEAD_MINIMUM.toString).toInt, + ((OVERHEAD_FRACTION - 1) * sc.executorMemory).toInt) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 79c9051e88691..24b843da81e70 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -68,6 +68,8 @@ private[spark] class MesosSchedulerBackend( // The listener bus to publish executor added/removed events. val listenerBus = sc.listenerBus + val executorCores = sc.conf.getInt("spark.mesos.executor.cores", 1) + @volatile var appId: String = _ override def start() { @@ -139,14 +141,14 @@ private[spark] class MesosSchedulerBackend( .setName("cpus") .setType(Value.Type.SCALAR) .setScalar(Value.Scalar.newBuilder() - .setValue(scheduler.CPUS_PER_TASK).build()) + .setValue(executorCores).build()) .build() val memory = Resource.newBuilder() .setName("mem") .setType(Value.Type.SCALAR) .setScalar( Value.Scalar.newBuilder() - .setValue(MemoryUtils.calculateTotalMemory(sc)).build()) + .setValue(MemoryUtils.calculateExecutorMemory(sc)).build()) .build() MesosExecutorInfo.newBuilder() .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) @@ -220,10 +222,9 @@ private[spark] class MesosSchedulerBackend( val mem = getResource(o.getResourcesList, "mem") val cpus = getResource(o.getResourcesList, "cpus") val slaveId = o.getSlaveId.getValue - // TODO(pwendell): Should below be 1 + scheduler.CPUS_PER_TASK? (mem >= MemoryUtils.calculateTotalMemory(sc) && // need at least 1 for executor, 1 for task - cpus >= 2 * scheduler.CPUS_PER_TASK) || + cpus >= (executorCores + scheduler.CPUS_PER_TASK)) || (slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) } @@ -233,9 +234,7 @@ private[spark] class MesosSchedulerBackend( getResource(o.getResourcesList, "cpus").toInt } else { // If the executor doesn't exist yet, subtract CPU for executor - // TODO(pwendell): Should below just subtract "1"? - getResource(o.getResourcesList, "cpus").toInt - - scheduler.CPUS_PER_TASK + getResource(o.getResourcesList, "cpus").toInt - executorCores } new WorkerOffer( o.getSlaveId.getValue, @@ -302,12 +301,18 @@ private[spark] class MesosSchedulerBackend( .setType(Value.Type.SCALAR) .setScalar(Value.Scalar.newBuilder().setValue(scheduler.CPUS_PER_TASK).build()) .build() + val memResource = Resource.newBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder().setValue(MemoryUtils.calculateTaskMemory(sc)).build()) + .build() MesosTaskInfo.newBuilder() .setTaskId(taskId) .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) .setExecutor(createExecutorInfo(slaveId)) .setName(task.name) .addResources(cpuResource) + .addResources(memResource) .setData(MesosTaskLaunchData(task.serializedTask, task.attemptNumber).toByteString) .build() } diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala index 073814c127edc..89e33ad01bbfd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala @@ -88,8 +88,6 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) val listenerBus = EasyMock.createMock(classOf[LiveListenerBus]) - listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", 2))) - EasyMock.replay(listenerBus) val sc = EasyMock.createMock(classOf[SparkContext]) EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes() @@ -99,26 +97,30 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea EasyMock.expect(sc.listenerBus).andReturn(listenerBus) EasyMock.replay(sc) - val minMem = MemoryUtils.calculateTotalMemory(sc).toInt + val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") + + val minMem = MemoryUtils.calculateTotalMemory(sc) val minCpu = 4 + listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", minCpu - backend.executorCores))) + EasyMock.replay(listenerBus) + val mesosOffers = new java.util.ArrayList[Offer] mesosOffers.add(createOffer(1, minMem, minCpu)) mesosOffers.add(createOffer(2, minMem - 1, minCpu)) mesosOffers.add(createOffer(3, minMem, minCpu)) - val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2) expectedWorkerOffers.append(new WorkerOffer( mesosOffers.get(0).getSlaveId.getValue, mesosOffers.get(0).getHostname, - 2 + minCpu - backend.executorCores )) expectedWorkerOffers.append(new WorkerOffer( mesosOffers.get(2).getSlaveId.getValue, mesosOffers.get(2).getHostname, - 2 + minCpu - backend.executorCores )) val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(expectedWorkerOffers))).andReturn(Seq(Seq(taskDesc))) diff --git a/docs/configuration.md b/docs/configuration.md index efbab4085317a..bc729086e4d00 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -341,6 +341,13 @@ Apart from these, the following properties are also available, and may be useful `spark.mesos.executor.memoryOverhead` or 7% of `spark.executor.memory`. +
spark.mesos.executor.coresspark.mesos.executor.cores