From 71703c808027b54970367af7d26f0d151b6a8003 Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Fri, 23 Jan 2015 14:46:25 +0900 Subject: [PATCH 1/5] [SPARK-5376][Mesos] MesosExecutor should have correct resources - Divided task and executor resources - Added `spark.mesos.executor.cpus` and fixed docs --- .../scheduler/cluster/mesos/MemoryUtils.scala | 12 +++++++----- .../cluster/mesos/MesosSchedulerBackend.scala | 16 +++++++++++----- .../mesos/MesosSchedulerBackendSuite.scala | 2 +- docs/configuration.md | 7 +++++++ docs/running-on-mesos.md | 7 +++++++ 5 files changed, 33 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala index 5101ec8352e7..487570d58531 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala @@ -24,12 +24,14 @@ private[spark] object MemoryUtils { val OVERHEAD_FRACTION = 1.07 val OVERHEAD_MINIMUM = 384 - def calculateTotalMemory(sc: SparkContext) = { + def calculateTotalMemory(sc: SparkContext) = calculateExecutorMemory(sc) + calculateTaskMemory(sc) + + def calculateTaskMemory(sc: SparkContext) = sc.executorMemory + + def calculateExecutorMemory(sc: SparkContext) = { math.max( sc.conf.getOption("spark.mesos.executor.memoryOverhead") - .getOrElse(OVERHEAD_MINIMUM.toString) - .toInt + sc.executorMemory, - OVERHEAD_FRACTION * sc.executorMemory - ) + .getOrElse(OVERHEAD_MINIMUM.toString).toInt, + ((OVERHEAD_FRACTION - 1) * sc.executorMemory).toInt) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 79c9051e8869..d5ead810370d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -68,6 +68,8 @@ private[spark] class MesosSchedulerBackend( // The listener bus to publish executor added/removed events. val listenerBus = sc.listenerBus + val executorCpus = sc.conf.getInt("spark.mesos.executor.cpus", 1) + @volatile var appId: String = _ override def start() { @@ -139,14 +141,14 @@ private[spark] class MesosSchedulerBackend( .setName("cpus") .setType(Value.Type.SCALAR) .setScalar(Value.Scalar.newBuilder() - .setValue(scheduler.CPUS_PER_TASK).build()) + .setValue(executorCpus).build()) .build() val memory = Resource.newBuilder() .setName("mem") .setType(Value.Type.SCALAR) .setScalar( Value.Scalar.newBuilder() - .setValue(MemoryUtils.calculateTotalMemory(sc)).build()) + .setValue(MemoryUtils.calculateExecutorMemory(sc)).build()) .build() MesosExecutorInfo.newBuilder() .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) @@ -223,7 +225,7 @@ private[spark] class MesosSchedulerBackend( // TODO(pwendell): Should below be 1 + scheduler.CPUS_PER_TASK? (mem >= MemoryUtils.calculateTotalMemory(sc) && // need at least 1 for executor, 1 for task - cpus >= 2 * scheduler.CPUS_PER_TASK) || + cpus >= (executorCpus + scheduler.CPUS_PER_TASK)) || (slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) } @@ -234,8 +236,7 @@ private[spark] class MesosSchedulerBackend( } else { // If the executor doesn't exist yet, subtract CPU for executor // TODO(pwendell): Should below just subtract "1"? - getResource(o.getResourcesList, "cpus").toInt - - scheduler.CPUS_PER_TASK + getResource(o.getResourcesList, "cpus").toInt - executorCpus } new WorkerOffer( o.getSlaveId.getValue, @@ -302,6 +303,11 @@ private[spark] class MesosSchedulerBackend( .setType(Value.Type.SCALAR) .setScalar(Value.Scalar.newBuilder().setValue(scheduler.CPUS_PER_TASK).build()) .build() + val memResource = Resource.newBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder().setValue(MemoryUtils.calculateTaskMemory(sc)).build()) + .build() MesosTaskInfo.newBuilder() .setTaskId(taskId) .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala index 073814c127ed..c160863be96c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala @@ -99,7 +99,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea EasyMock.expect(sc.listenerBus).andReturn(listenerBus) EasyMock.replay(sc) - val minMem = MemoryUtils.calculateTotalMemory(sc).toInt + val minMem = MemoryUtils.calculateTotalMemory(sc) val minCpu = 4 val mesosOffers = new java.util.ArrayList[Offer] diff --git a/docs/configuration.md b/docs/configuration.md index efbab4085317..6b3c00c4761b 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -341,6 +341,13 @@ Apart from these, the following properties are also available, and may be useful `spark.mesos.executor.memoryOverhead` or 7% of `spark.executor.memory`. + + spark.mesos.executor.cpus + 1 + + This value is the amount of cores so that executor is running itself. + + #### Shuffle Behavior diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 78358499fd01..99e1a0903cfc 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -226,6 +226,13 @@ See the [configuration page](configuration.html) for information on Spark config The final total amount of memory allocated is the maximum value between executor memory plus memoryOverhead, and overhead fraction (1.07) plus the executor memory. + + spark.mesos.executor.cpus + 1 + + The amount of cores that Mesos executor will request additionally for running executor itself. + + # Troubleshooting and Debugging From f655eee2bd3d871e6b50fbd9dd3a826662abca30 Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Fri, 23 Jan 2015 14:53:41 +0900 Subject: [PATCH 2/5] [SPARK-5376][Mesos] MesosExecutor should have correct resources - Removed `TODO` comments --- .../spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index d5ead810370d..7fe72d83d874 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -222,7 +222,6 @@ private[spark] class MesosSchedulerBackend( val mem = getResource(o.getResourcesList, "mem") val cpus = getResource(o.getResourcesList, "cpus") val slaveId = o.getSlaveId.getValue - // TODO(pwendell): Should below be 1 + scheduler.CPUS_PER_TASK? (mem >= MemoryUtils.calculateTotalMemory(sc) && // need at least 1 for executor, 1 for task cpus >= (executorCpus + scheduler.CPUS_PER_TASK)) || @@ -235,7 +234,6 @@ private[spark] class MesosSchedulerBackend( getResource(o.getResourcesList, "cpus").toInt } else { // If the executor doesn't exist yet, subtract CPU for executor - // TODO(pwendell): Should below just subtract "1"? getResource(o.getResourcesList, "cpus").toInt - executorCpus } new WorkerOffer( From 90545351ac53e054fc64c8229f97c19d82cb60aa Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Fri, 23 Jan 2015 15:33:06 +0900 Subject: [PATCH 3/5] [SPARK-5376][Mesos] MesosExecutor should have correct resources - changed term from `cpus` to `cores` - Reworded docs --- .../spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala | 2 +- docs/configuration.md | 4 ++-- docs/running-on-mesos.md | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 7fe72d83d874..62fbafeb36cd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -68,7 +68,7 @@ private[spark] class MesosSchedulerBackend( // The listener bus to publish executor added/removed events. val listenerBus = sc.listenerBus - val executorCpus = sc.conf.getInt("spark.mesos.executor.cpus", 1) + val executorCpus = sc.conf.getInt("spark.mesos.executor.cores", 1) @volatile var appId: String = _ diff --git a/docs/configuration.md b/docs/configuration.md index 6b3c00c4761b..bc729086e4d0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -342,10 +342,10 @@ Apart from these, the following properties are also available, and may be useful - spark.mesos.executor.cpus + spark.mesos.executor.cores 1 - This value is the amount of cores so that executor is running itself. + The amount of cores to request for running the mesos executor. diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 99e1a0903cfc..6f596d4ebb7b 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -227,10 +227,10 @@ See the [configuration page](configuration.html) for information on Spark config - spark.mesos.executor.cpus + spark.mesos.executor.cores 1 - The amount of cores that Mesos executor will request additionally for running executor itself. + The amount of cores to request for running the mesos executor. From a28b66684236bfb31fce3b9fcaaf8e33e166b5b5 Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Fri, 23 Jan 2015 16:00:10 +0900 Subject: [PATCH 4/5] [SPARK-5376][Mesos] MesosExecutor should have correct resources - changed variable name from `executorCpus` to `executorCores` - Fixed failed test case. --- .../cluster/mesos/MesosSchedulerBackend.scala | 8 ++++---- .../scheduler/mesos/MesosSchedulerBackendSuite.scala | 12 +++++++----- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 62fbafeb36cd..3463cffc1477 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -68,7 +68,7 @@ private[spark] class MesosSchedulerBackend( // The listener bus to publish executor added/removed events. val listenerBus = sc.listenerBus - val executorCpus = sc.conf.getInt("spark.mesos.executor.cores", 1) + val executorCores = sc.conf.getInt("spark.mesos.executor.cores", 1) @volatile var appId: String = _ @@ -141,7 +141,7 @@ private[spark] class MesosSchedulerBackend( .setName("cpus") .setType(Value.Type.SCALAR) .setScalar(Value.Scalar.newBuilder() - .setValue(executorCpus).build()) + .setValue(executorCores).build()) .build() val memory = Resource.newBuilder() .setName("mem") @@ -224,7 +224,7 @@ private[spark] class MesosSchedulerBackend( val slaveId = o.getSlaveId.getValue (mem >= MemoryUtils.calculateTotalMemory(sc) && // need at least 1 for executor, 1 for task - cpus >= (executorCpus + scheduler.CPUS_PER_TASK)) || + cpus >= (executorCores + scheduler.CPUS_PER_TASK)) || (slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) } @@ -234,7 +234,7 @@ private[spark] class MesosSchedulerBackend( getResource(o.getResourcesList, "cpus").toInt } else { // If the executor doesn't exist yet, subtract CPU for executor - getResource(o.getResourcesList, "cpus").toInt - executorCpus + getResource(o.getResourcesList, "cpus").toInt - executorCores } new WorkerOffer( o.getSlaveId.getValue, diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala index c160863be96c..89e33ad01bbf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala @@ -88,8 +88,6 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) val listenerBus = EasyMock.createMock(classOf[LiveListenerBus]) - listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", 2))) - EasyMock.replay(listenerBus) val sc = EasyMock.createMock(classOf[SparkContext]) EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes() @@ -99,26 +97,30 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea EasyMock.expect(sc.listenerBus).andReturn(listenerBus) EasyMock.replay(sc) + val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") + val minMem = MemoryUtils.calculateTotalMemory(sc) val minCpu = 4 + listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", minCpu - backend.executorCores))) + EasyMock.replay(listenerBus) + val mesosOffers = new java.util.ArrayList[Offer] mesosOffers.add(createOffer(1, minMem, minCpu)) mesosOffers.add(createOffer(2, minMem - 1, minCpu)) mesosOffers.add(createOffer(3, minMem, minCpu)) - val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2) expectedWorkerOffers.append(new WorkerOffer( mesosOffers.get(0).getSlaveId.getValue, mesosOffers.get(0).getHostname, - 2 + minCpu - backend.executorCores )) expectedWorkerOffers.append(new WorkerOffer( mesosOffers.get(2).getSlaveId.getValue, mesosOffers.get(2).getHostname, - 2 + minCpu - backend.executorCores )) val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(expectedWorkerOffers))).andReturn(Seq(Seq(taskDesc))) From d714e8bb6b699e5ec2a315df65cee0f4cf7765e5 Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Fri, 23 Jan 2015 18:12:31 +0900 Subject: [PATCH 5/5] [SPARK-5376][Mesos] MesosExecutor should have correct resources - Added memResource to MesosTaskInfo --- .../spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 3463cffc1477..24b843da81e7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -312,6 +312,7 @@ private[spark] class MesosSchedulerBackend( .setExecutor(createExecutorInfo(slaveId)) .setName(task.name) .addResources(cpuResource) + .addResources(memResource) .setData(MesosTaskLaunchData(task.serializedTask, task.attemptNumber).toByteString) .build() }