From 06cc6bca1410a67001c54e2ce457d1a94ea7d1c1 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Tue, 6 Jan 2015 14:25:40 -0800 Subject: [PATCH 1/4] [SPARK-5095] Support capping cores and launch mulitple executors in coarse grain mode. --- .../mesos/CoarseMesosSchedulerBackend.scala | 97 ++++++------ .../CoarseMesosSchedulerBackendSuite.scala | 143 ++++++++++++++++++ 2 files changed, 193 insertions(+), 47 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 7d08eae0b4871..55b2b6ea4fc33 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -64,15 +64,19 @@ private[spark] class CoarseMesosSchedulerBackend( // This is for cleaning up shuffle files reliably. private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) + val maxExecutorsPerSlave = conf.getInt("spark.mesos.coarse.executors.max", 1) + val maxCpusPerExecutor = conf.getInt("spark.mesos.coarse.cores.max", Int.MaxValue) + // Cores we have acquired with each Mesos task ID val coresByTaskId = new HashMap[Int, Int] var totalCoresAcquired = 0 - val slaveIdsWithExecutors = new HashSet[String] - // Maping from slave Id to hostname private val slaveIdToHost = new HashMap[String, String] + // Contains a mapping of slave ids to the number of executors launched. + val slaveIdsWithExecutors = new HashMap[String, Int] + val taskIdToSlaveId: HashBiMap[Int, String] = HashBiMap.create[Int, String] // How many times tasks on each slave failed val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int] @@ -250,52 +254,51 @@ private[spark] class CoarseMesosSchedulerBackend( val offerAttributes = toAttributeMap(offer.getAttributesList) val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) val slaveId = offer.getSlaveId.getValue - val mem = getResource(offer.getResourcesList, "mem") - val cpus = getResource(offer.getResourcesList, "cpus").toInt + val totalMem = getResource(offer.getResourcesList, "mem") + val totalCpus = getResource(offer.getResourcesList, "cpus").toInt val id = offer.getId.getValue - if (meetsConstraints) { - if (taskIdToSlaveId.size < executorLimit && - totalCoresAcquired < maxCores && - mem >= calculateTotalMemory(sc) && - cpus >= 1 && - failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && - !slaveIdsWithExecutors.contains(slaveId)) { - // Launch an executor on the slave - val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) - totalCoresAcquired += cpusToUse - val taskId = newMesosTaskId() - taskIdToSlaveId.put(taskId, slaveId) - slaveIdsWithExecutors += slaveId - coresByTaskId(taskId) = cpusToUse - // Gather cpu resources from the available resources and use them in the task. - val (remainingResources, cpuResourcesToUse) = - partitionResources(offer.getResourcesList, "cpus", cpusToUse) - val (_, memResourcesToUse) = - partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc)) - val taskBuilder = MesosTaskInfo.newBuilder() - .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) - .setSlaveId(offer.getSlaveId) - .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId)) - .setName("Task " + taskId) - .addAllResources(cpuResourcesToUse.asJava) - .addAllResources(memResourcesToUse.asJava) - - sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil - .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder()) - } - - // Accept the offer and launch the task - logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus") - slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname - d.launchTasks( - Collections.singleton(offer.getId), - Collections.singleton(taskBuilder.build()), filters) - } else { - // Decline the offer - logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus") - d.declineOffer(offer.getId) + var executorCount = slaveIdsWithExecutors.getOrElse(slaveId, 0) + while (taskIdToSlaveId.size < executorLimit && + totalCoresAcquired < maxCores && + meetsConstraints && + mem >= calculateTotalMemory(sc) && + cpus >= 1 && + failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && + executorCount < maxExecutorsPerSlave) { + // Launch an executor on the slave + val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) + totalCoresAcquired += cpusToUse + totalCpus -= cpusToUse + totalMem -= memRequired + val taskId = newMesosTaskId() + taskIdToSlaveId(taskId) = slaveId + executorCount += 1 + slaveIdsWithExecutors(slaveId) = executorCount + coresByTaskId(taskId) = cpusToUse + // Gather cpu resources from the available resources and use them in the task. + val (remainingResources, cpuResourcesToUse) = + partitionResources(offer.getResourcesList, "cpus", cpusToUse) + val (_, memResourcesToUse) = + partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc)) + val taskBuilder = MesosTaskInfo.newBuilder() + .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) + .setSlaveId(offer.getSlaveId) + .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId)) + .setName("Task " + taskId) + .addAllResources(cpuResourcesToUse.asJava) + .addAllResources(memResourcesToUse.asJava) + + sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => + MesosSchedulerBackendUtil + .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder()) } + + // accept the offer and launch the task + logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus") + slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname + d.launchTasks( + Collections.singleton(offer.getId), + Collections.singleton(taskBuilder.build()), filters) } else { // This offer does not meet constraints. We don't need to see it again. // Decline the offer for a long period of time. @@ -335,7 +338,7 @@ private[spark] class CoarseMesosSchedulerBackend( } if (TaskState.isFinished(TaskState.fromMesos(state))) { - val slaveId = taskIdToSlaveId.get(taskId) + val slaveId = taskIdToSlaveId(taskId) slaveIdsWithExecutors -= slaveId taskIdToSlaveId.remove(taskId) // Remove the cores we have remembered for this task, if it's in the hashmap diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala new file mode 100644 index 0000000000000..2c951f25af769 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.mesos + +import org.scalatest.{Matchers, FunSuite} +import org.apache.spark.{SparkEnv, SparkConf, SparkContext, LocalSparkContext} +import org.apache.spark.scheduler.{TaskDescription, WorkerOffer, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MemoryUtils} +import org.apache.mesos.SchedulerDriver +import org.apache.mesos.Protos._ +import org.scalatest.mock.EasyMockSugar +import org.apache.mesos.Protos.Value.Scalar +import org.easymock.{Capture, EasyMock} +import java.nio.ByteBuffer +import java.util.Collections +import java.util +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import akka.actor.ActorSystem + +class CoarseMesosSchedulerBackendSuite extends FunSuite + with Matchers + with LocalSparkContext + with EasyMockSugar { + + test("launch multiple executors") { + def createOffer(id: Int, mem: Int, cpu: Int) = { + val builder = Offer.newBuilder() + builder.addResourcesBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(mem)) + builder.addResourcesBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(cpu)) + builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1")) + .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build() + } + + val driver = EasyMock.createMock(classOf[SchedulerDriver]) + val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) + + val env = EasyMock.createMock(classOf[SparkEnv]) + val actorSystem = EasyMock.createMock(classOf[ActorSystem]) + EasyMock.expect(env.actorSystem).andReturn(actorSystem) + EasyMock.replay(env) + + val sc = EasyMock.createMock(classOf[SparkContext]) + EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes() + EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes() + EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes() + EasyMock.expect(sc.env).andReturn(env) + val conf = new SparkConf + conf.set("spark.driver.host", "localhost") + conf.set("spark.driver.port", "1234") + conf.set("spark.mesos.coarse.executors.max", "2") + conf.set("spark.mesos.coarse.cores.max", "2") + EasyMock.expect(sc.conf).andReturn(conf).anyTimes() + EasyMock.replay(sc) + + val minMem = MemoryUtils.calculateTotalMemory(sc).toInt + val minCpu = 2 + + val mesosOffers = new java.util.ArrayList[Offer] + mesosOffers.add(createOffer(1, minMem * 2, minCpu * 2)) + + val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1) + expectedWorkerOffers.append(new WorkerOffer( + mesosOffers.get(0).getSlaveId.getValue, + mesosOffers.get(0).getHostname, + 2 + )) + + val taskDesc = new TaskDescription(1L, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) + val taskDesc2 = new TaskDescription(2L, "s2", "n2", 0, ByteBuffer.wrap(new Array[Byte](0))) + EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(expectedWorkerOffers))) + .andReturn(Seq(Seq(taskDesc, taskDesc2))) + EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes() + EasyMock.expect(taskScheduler.sc).andReturn(sc) + EasyMock.replay(taskScheduler) + + val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master") + + val capture = new Capture[util.Collection[TaskInfo]] + EasyMock.expect( + driver.launchTasks( + EasyMock.eq(Collections.singleton(mesosOffers.get(0).getId)), + EasyMock.capture(capture), + EasyMock.anyObject(classOf[Filters]) + ) + ).andReturn(Status.valueOf(1)).once + EasyMock.replay(driver) + + backend.resourceOffers(driver, mesosOffers) + + EasyMock.verify(driver) + assert(capture.getValue.size() == 2) + val iter = capture.getValue.iterator() + val taskInfo = iter.next() + taskInfo.getName should be("Task 0") + val cpus = taskInfo.getResourcesList.get(0) + cpus.getName should be("cpus") + cpus.getScalar.getValue should be(2.0) + taskInfo.getSlaveId.getValue should be("s1") + + val taskInfo2 = iter.next() + taskInfo2.getName should be("Task 1") + val cpus2 = taskInfo2.getResourcesList.get(0) + cpus2.getName should be("cpus") + cpus2.getScalar.getValue should be(2.0) + taskInfo2.getSlaveId.getValue should be("s1") + + // Already capped the max executors, shouldn't launch a new one. + val mesosOffers2 = new java.util.ArrayList[Offer] + mesosOffers2.add(createOffer(1, minMem, minCpu)) + EasyMock.reset(taskScheduler) + EasyMock.reset(driver) + EasyMock.expect(taskScheduler.resourceOffers(EasyMock.anyObject(classOf[Seq[WorkerOffer]])).andReturn(Seq(Seq()))) + EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes() + EasyMock.replay(taskScheduler) + EasyMock.expect(driver.declineOffer(mesosOffers2.get(0).getId)).andReturn(Status.valueOf(1)).times(1) + EasyMock.replay(driver) + + backend.resourceOffers(driver, mesosOffers2) + EasyMock.verify(driver) + } +} From 855d75952bed193d227e41c7dc03c6904c92c280 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Wed, 14 Jan 2015 00:46:26 -0800 Subject: [PATCH 2/4] Add documentation --- docs/running-on-mesos.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index a197d0e373027..6382ef0707657 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -373,6 +373,20 @@ See the [configuration page](configuration.html) for information on Spark config + + spark.mesos.coarse.cores.max + Int.MaxValue + + The maximum amount of cores the coarse grained mesos scheduler will allocate per executor. This only applies for coarse grained mode. + + + + spark.mesos.coarse.executors.max + 1 + + The maximum amount of executors that the coarse grained mesos scheduler will run per slave. This only applies for coarse grained mode. + + # Troubleshooting and Debugging From 7d201480673534b832d39ce128fa99ced1f98da4 Mon Sep 17 00:00:00 2001 From: Gerard Maas Date: Mon, 16 Feb 2015 22:40:49 +0100 Subject: [PATCH 3/4] updates the test to comply to the updated TaskDescription. --- .../scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala index 2c951f25af769..abbae5c2ee38d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -87,8 +87,8 @@ class CoarseMesosSchedulerBackendSuite extends FunSuite 2 )) - val taskDesc = new TaskDescription(1L, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) - val taskDesc2 = new TaskDescription(2L, "s2", "n2", 0, ByteBuffer.wrap(new Array[Byte](0))) + val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) + val taskDesc2 = new TaskDescription(2L, 0, "s2", "n2", 0, ByteBuffer.wrap(new Array[Byte](0))) EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(expectedWorkerOffers))) .andReturn(Seq(Seq(taskDesc, taskDesc2))) EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes() From 8a7a7357b24ba90098daf279ac8862200d3ef90e Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Tue, 24 Mar 2015 13:58:32 -0700 Subject: [PATCH 4/4] Fix review comments. --- .../mesos/CoarseMesosSchedulerBackend.scala | 147 +++++++++-------- .../CoarseMesosSchedulerBackendSuite.scala | 149 ++++++++++++++---- .../CoarseMesosSchedulerBackendSuite.scala | 143 ----------------- docs/running-on-mesos.md | 6 +- 4 files changed, 204 insertions(+), 241 deletions(-) delete mode 100644 core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 55b2b6ea4fc33..3a52f5d7512d1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -18,23 +18,24 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File +import java.util import java.util.concurrent.locks.ReentrantLock import java.util.{Collections, List => JList} import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, HashSet} -import com.google.common.collect.HashBiMap -import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} -import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver} - -import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException, TaskState} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.RpcAddress -import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl} import org.apache.spark.util.Utils +import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException, TaskState} + + +import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} +import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver} /** * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds @@ -64,20 +65,31 @@ private[spark] class CoarseMesosSchedulerBackend( // This is for cleaning up shuffle files reliably. private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) - val maxExecutorsPerSlave = conf.getInt("spark.mesos.coarse.executors.max", 1) - val maxCpusPerExecutor = conf.getInt("spark.mesos.coarse.cores.max", Int.MaxValue) + private val maxExecutorsPerSlave = conf.getInt("spark.mesos.coarse.executors.max", 1) + + private val maxCpusPerExecutor = + conf.getOption("spark.mesos.coarse.executor.cores.max").map { m => m.toInt } + + if (conf.getOption("spark.mesos.coarse.executors.max").isDefined && maxCpusPerExecutor.isEmpty) { + throw new IllegalArgumentException( + "Must configure spark.mesos.coarse.coresPerExecutor.max when " + + "spark.mesos.coarse.executors.max is set") + } // Cores we have acquired with each Mesos task ID - val coresByTaskId = new HashMap[Int, Int] + val coresByTaskId = new HashMap[String, Int] var totalCoresAcquired = 0 // Maping from slave Id to hostname private val slaveIdToHost = new HashMap[String, String] + // Contains the list of slave ids that we have connect shuffle service to + private val existingSlaveShuffleConnections = new HashSet[String] + // Contains a mapping of slave ids to the number of executors launched. val slaveIdsWithExecutors = new HashMap[String, Int] - val taskIdToSlaveId: HashBiMap[Int, String] = HashBiMap.create[Int, String] + val taskIdToSlaveId: HashMap[String, String] = new HashMap[String, String] // How many times tasks on each slave failed val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int] @@ -93,8 +105,6 @@ private[spark] class CoarseMesosSchedulerBackend( */ private[mesos] def executorLimit: Int = executorLimitOption.getOrElse(Int.MaxValue) - private val pendingRemovedSlaveIds = new HashSet[String] - // private lock object protecting mutable state above. Using the intrinsic lock // may lead to deadlocks since the superclass might also try to lock private val stateLock = new ReentrantLock @@ -126,10 +136,10 @@ private[spark] class CoarseMesosSchedulerBackend( @volatile var appId: String = _ - def newMesosTaskId(): Int = { + def newMesosTaskId(slaveId: String): String = { val id = nextMesosTaskId nextMesosTaskId += 1 - id + slaveId + "/" + id } override def start() { @@ -144,7 +154,7 @@ private[spark] class CoarseMesosSchedulerBackend( startScheduler(driver) } - def createCommand(offer: Offer, numCores: Int, taskId: Int): CommandInfo = { + def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = { val executorSparkHome = conf.getOption("spark.mesos.executor.home") .orElse(sc.getSparkHome()) .getOrElse { @@ -188,7 +198,7 @@ private[spark] class CoarseMesosSchedulerBackend( "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend" .format(prefixEnv, runScript) + s" --driver-url $driverURL" + - s" --executor-id ${offer.getSlaveId.getValue}" + + s" --executor-id $taskId" + s" --hostname ${offer.getHostname}" + s" --cores $numCores" + s" --app-id $appId") @@ -196,12 +206,12 @@ private[spark] class CoarseMesosSchedulerBackend( // Grab everything to the first '.'. We'll use that and '*' to // glob the directory "correctly". val basename = uri.get.split('/').last.split('.').head - val executorId = sparkExecutorId(offer.getSlaveId.getValue, taskId.toString) + command.setValue( s"cd $basename*; $prefixEnv " + "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" + s" --driver-url $driverURL" + - s" --executor-id $executorId" + + s" --executor-id $taskId" + s" --hostname ${offer.getHostname}" + s" --cores $numCores" + s" --app-id $appId") @@ -248,42 +258,47 @@ private[spark] class CoarseMesosSchedulerBackend( * unless we've already launched more than we wanted to. */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { + val memoryPerExecutor = calculateTotalMemory(sc) stateLock.synchronized { val filters = Filters.newBuilder().setRefuseSeconds(5).build() for (offer <- offers.asScala) { + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus").toInt + var remainingMem = mem + var remainingCores = cpus + val tasks = new util.ArrayList[MesosTaskInfo]() val offerAttributes = toAttributeMap(offer.getAttributesList) val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) val slaveId = offer.getSlaveId.getValue - val totalMem = getResource(offer.getResourcesList, "mem") - val totalCpus = getResource(offer.getResourcesList, "cpus").toInt val id = offer.getId.getValue var executorCount = slaveIdsWithExecutors.getOrElse(slaveId, 0) while (taskIdToSlaveId.size < executorLimit && totalCoresAcquired < maxCores && meetsConstraints && - mem >= calculateTotalMemory(sc) && - cpus >= 1 && + remainingMem >= calculateTotalMemory(sc) && + remainingCores >= 1 && failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && executorCount < maxExecutorsPerSlave) { - // Launch an executor on the slave - val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) - totalCoresAcquired += cpusToUse - totalCpus -= cpusToUse - totalMem -= memRequired - val taskId = newMesosTaskId() + val coresToUse = + math.min(maxCpusPerExecutor.getOrElse(Int.MaxValue), + math.min(remainingCores, maxCores - totalCoresAcquired)) + totalCoresAcquired += coresToUse + remainingCores -= coresToUse + remainingMem -= memoryPerExecutor + val taskId = newMesosTaskId(slaveId) taskIdToSlaveId(taskId) = slaveId executorCount += 1 slaveIdsWithExecutors(slaveId) = executorCount - coresByTaskId(taskId) = cpusToUse + coresByTaskId(taskId) = coresToUse // Gather cpu resources from the available resources and use them in the task. val (remainingResources, cpuResourcesToUse) = - partitionResources(offer.getResourcesList, "cpus", cpusToUse) + partitionResources(offer.getResourcesList, "cpus", coresToUse) val (_, memResourcesToUse) = partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc)) val taskBuilder = MesosTaskInfo.newBuilder() .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) .setSlaveId(offer.getSlaveId) - .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId)) + .setCommand(createCommand(offer, coresToUse + extraCoresPerSlave, taskId)) .setName("Task " + taskId) .addAllResources(cpuResourcesToUse.asJava) .addAllResources(memResourcesToUse.asJava) @@ -292,13 +307,14 @@ private[spark] class CoarseMesosSchedulerBackend( MesosSchedulerBackendUtil .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder()) } + tasks.add(taskBuilder.build()) + } + if (!tasks.isEmpty) { // accept the offer and launch the task logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus") slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname - d.launchTasks( - Collections.singleton(offer.getId), - Collections.singleton(taskBuilder.build()), filters) + d.launchTasks(Collections.singleton(offer.getId()), tasks, filters) } else { // This offer does not meet constraints. We don't need to see it again. // Decline the offer for a long period of time. @@ -313,7 +329,7 @@ private[spark] class CoarseMesosSchedulerBackend( override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { - val taskId = status.getTaskId.getValue.toInt + val taskId = status.getTaskId.getValue val state = status.getState logInfo(s"Mesos task $taskId is now $state") val slaveId: String = status.getSlaveId.getValue @@ -324,7 +340,8 @@ private[spark] class CoarseMesosSchedulerBackend( // this through Mesos, since the shuffle services are set up independently. if (TaskState.fromMesos(state).equals(TaskState.RUNNING) && slaveIdToHost.contains(slaveId) && - shuffleServiceEnabled) { + shuffleServiceEnabled && + !existingSlaveShuffleConnections.contains(slaveId)) { assume(mesosExternalShuffleClient.isDefined, "External shuffle client was not instantiated even though shuffle service is enabled.") // TODO: Remove this and allow the MesosExternalShuffleService to detect @@ -335,12 +352,10 @@ private[spark] class CoarseMesosSchedulerBackend( s"host $hostname, port $externalShufflePort for app ${conf.getAppId}") mesosExternalShuffleClient.get .registerDriverWithShuffleService(hostname, externalShufflePort) - } - - if (TaskState.isFinished(TaskState.fromMesos(state))) { + existingSlaveShuffleConnections += slaveId + } else if (TaskState.isFinished(TaskState.fromMesos(state))) { val slaveId = taskIdToSlaveId(taskId) - slaveIdsWithExecutors -= slaveId - taskIdToSlaveId.remove(taskId) + // Remove the cores we have remembered for this task, if it's in the hashmap for (cores <- coresByTaskId.get(taskId)) { totalCoresAcquired -= cores @@ -354,7 +369,7 @@ private[spark] class CoarseMesosSchedulerBackend( "is Spark installed on it?") } } - executorTerminated(d, slaveId, s"Executor finished with state $state") + executorTerminated(d, taskId, slaveId, s"Executor finished with state $state") // In case we'd rejected everything before but have now lost a node d.reviveOffers() } @@ -381,35 +396,39 @@ private[spark] class CoarseMesosSchedulerBackend( * slave IDs that we might have asked to be killed. It also notifies the driver * that an executor was removed. */ - private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = { + private def executorTerminated( + d: SchedulerDriver, + executorId: String, + slaveId: String, + reason: String): Unit = { stateLock.synchronized { - if (slaveIdsWithExecutors.contains(slaveId)) { - val slaveIdToTaskId = taskIdToSlaveId.inverse() - if (slaveIdToTaskId.containsKey(slaveId)) { - val taskId: Int = slaveIdToTaskId.get(slaveId) - taskIdToSlaveId.remove(taskId) - removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason)) + if (slaveIdsWithExecutors.contains(slaveId) && taskIdToSlaveId.contains(executorId)) { + taskIdToSlaveId.remove(executorId) + removeExecutor(executorId, new ExecutorLossReason(reason)) + val newCount = slaveIdsWithExecutors(slaveId) - 1 + if (newCount == 0) { + slaveIdsWithExecutors.remove(slaveId) + } else { + slaveIdsWithExecutors(slaveId) = newCount } - // TODO: This assumes one Spark executor per Mesos slave, - // which may no longer be true after SPARK-5095 - pendingRemovedSlaveIds -= slaveId - slaveIdsWithExecutors -= slaveId } } } - private def sparkExecutorId(slaveId: String, taskId: String): String = { - s"$slaveId/$taskId" - } - override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = { logInfo(s"Mesos slave lost: ${slaveId.getValue}") - executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue) + // Terminate all executors in the slave + stateLock.synchronized { + val lostExecutors = taskIdToSlaveId.filter(_._2.equals(slaveId.getValue)).map(_._1) + lostExecutors.foreach { taskId => + executorTerminated(d, taskId, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue) + } + } } override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int): Unit = { - logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue)) - slaveLost(d, s) + logInfo("Executor lost: %s".format(e.getValue)) + executorTerminated(d, e.getValue, s.getValue, "Mesos Executor lost: " + e.getValue) } override def applicationId(): String = @@ -432,13 +451,9 @@ private[spark] class CoarseMesosSchedulerBackend( return false } - val slaveIdToTaskId = taskIdToSlaveId.inverse() for (executorId <- executorIds) { - val slaveId = executorId.split("/")(0) - if (slaveIdToTaskId.containsKey(slaveId)) { - mesosDriver.killTask( - TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build()) - pendingRemovedSlaveIds += slaveId + if (taskIdToSlaveId.contains(executorId)) { + mesosDriver.killTask(TaskID.newBuilder().setValue(executorId).build()) } else { logWarning("Unable to find executor Id '" + executorId + "' in Mesos scheduler") } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala index 525ee0d3bdc5a..992305f593f14 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -17,23 +17,29 @@ package org.apache.spark.scheduler.cluster.mesos +import java.nio.ByteBuffer import java.util import java.util.Collections +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + import org.apache.mesos.Protos.Value.Scalar -import org.apache.mesos.Protos._ -import org.apache.mesos.{Protos, Scheduler, SchedulerDriver} +import org.apache.mesos.Protos.{TaskState, _} +import org.apache.mesos.{Protos, SchedulerDriver} import org.mockito.Matchers._ import org.mockito.Mockito._ -import org.mockito.Matchers +import org.mockito.{ArgumentCaptor, Matchers => MMatchers} +import org.scalatest.{BeforeAndAfter, Matchers} import org.scalatest.mock.MockitoSugar -import org.scalatest.BeforeAndAfter -import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SecurityManager, SparkFunSuite} +import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription, TaskSchedulerImpl, WorkerOffer} +import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkEnv, SparkFunSuite} + class CoarseMesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext + with Matchers with MockitoSugar with BeforeAndAfter { @@ -61,16 +67,10 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite driver: SchedulerDriver): CoarseMesosSchedulerBackend = { val securityManager = mock[SecurityManager] val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master", securityManager) { - override protected def createSchedulerDriver( - masterUrl: String, - scheduler: Scheduler, - sparkUser: String, - appName: String, - conf: SparkConf, - webuiUrl: Option[String] = None, - checkpoint: Option[Boolean] = None, - failoverTimeout: Option[Double] = None, - frameworkId: Option[String] = None): SchedulerDriver = driver + override def start(): Unit = { + mesosDriver = driver + } + override def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = None markRegistered() } backend.start() @@ -78,21 +78,30 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite } var sparkConf: SparkConf = _ + var sparkEnv: SparkEnv = _ + var taskScheduler: TaskSchedulerImpl = _ before { sparkConf = (new SparkConf) .setMaster("local[*]") .setAppName("test-mesos-dynamic-alloc") .setSparkHome("/path") - - sc = new SparkContext(sparkConf) + .set("spark.testing", "1") + sc = mock[SparkContext] + sparkEnv = mock[SparkEnv] + taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.sc).thenReturn(sc) + when(taskScheduler.sc).thenReturn(sc) + when(sc.executorMemory).thenReturn(100) + when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) + when(sc.getSparkHome()).thenReturn(Option("/path")) + when(sc.env).thenReturn(sparkEnv) + when(sc.conf).thenReturn(sparkConf) } test("mesos supports killing and limiting executors") { val driver = mock[SchedulerDriver] when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) - val taskScheduler = mock[TaskSchedulerImpl] - when(taskScheduler.sc).thenReturn(sc) sparkConf.set("spark.driver.host", "driverHost") sparkConf.set("spark.driver.port", "1234") @@ -104,11 +113,11 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite val mesosOffers = new java.util.ArrayList[Offer] mesosOffers.add(createOffer("o1", "s1", minMem, minCpu)) - val taskID0 = TaskID.newBuilder().setValue("0").build() + val taskID0 = TaskID.newBuilder().setValue("s1/0").build() backend.resourceOffers(driver, mesosOffers) verify(driver, times(1)).launchTasks( - Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), + MMatchers.eq(Collections.singleton(mesosOffers.get(0).getId)), any[util.Collection[TaskInfo]], any[Filters]) @@ -122,7 +131,9 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite backend.resourceOffers(driver, mesosOffers2) verify(driver, times(1)) - .declineOffer(OfferID.newBuilder().setValue("o2").build()) + .declineOffer( + MMatchers.eq(OfferID.newBuilder().setValue("o2").build()), + MMatchers.any(classOf[Filters])) // Verify we didn't launch any new executor assert(backend.slaveIdsWithExecutors.size === 1) @@ -130,7 +141,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite backend.doRequestTotalExecutors(2) backend.resourceOffers(driver, mesosOffers2) verify(driver, times(1)).launchTasks( - Matchers.eq(Collections.singleton(mesosOffers2.get(0).getId)), + MMatchers.eq(Collections.singleton(mesosOffers2.get(0).getId)), any[util.Collection[TaskInfo]], any[Filters]) @@ -142,8 +153,6 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite test("mesos supports killing and relaunching tasks with executors") { val driver = mock[SchedulerDriver] when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) - val taskScheduler = mock[TaskSchedulerImpl] - when(taskScheduler.sc).thenReturn(sc) val backend = createSchedulerBackend(taskScheduler, driver) val minMem = backend.calculateTotalMemory(sc) + 1024 @@ -158,13 +167,13 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite backend.resourceOffers(driver, mesosOffers) verify(driver, times(1)).launchTasks( - Matchers.eq(Collections.singleton(offer1.getId)), + MMatchers.eq(Collections.singleton(offer1.getId)), anyObject(), anyObject[Filters]) // Simulate task killed, executor no longer running val status = TaskStatus.newBuilder() - .setTaskId(TaskID.newBuilder().setValue("0").build()) + .setTaskId(TaskID.newBuilder().setValue("s1/0").build()) .setSlaveId(SlaveID.newBuilder().setValue("s1").build()) .setState(TaskState.TASK_KILLED) .build @@ -178,10 +187,92 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite assert(backend.slaveIdsWithExecutors.contains("s1")) verify(driver, times(1)).launchTasks( - Matchers.eq(Collections.singleton(offer2.getId)), + MMatchers.eq(Collections.singleton(offer2.getId)), anyObject(), anyObject[Filters]) verify(driver, times(1)).reviveOffers() } + + test("launch multiple executors") { + val driver = mock[SchedulerDriver] + val taskScheduler = mock[TaskSchedulerImpl] + val env = mock[SparkEnv] + val sc = mock[SparkContext] + when(taskScheduler.sc).thenReturn(sc) + when(sc.executorMemory).thenReturn(100) + when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) + when(sc.getSparkHome()).thenReturn(Option("/path")) + when(sc.env).thenReturn(env) + val conf = new SparkConf + conf.set("spark.driver.host", "localhost") + conf.set("spark.driver.port", "1234") + conf.set("spark.mesos.coarse.executors.max", "2") + conf.set("spark.mesos.coarse.executor.cores.max", "2") + conf.set("spark.mesos.coarse.cores.max", "2") + conf.set("spark.testing", "1") + when(sc.conf).thenReturn(conf) + + val securityManager = mock[SecurityManager] + + val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master", securityManager) + + val minMem = backend.calculateTotalMemory(sc) + val minCpu = 2 + + val mesosOffers = new java.util.ArrayList[Offer] + mesosOffers.add(createOffer("1", "s1", minMem * 2, minCpu * 2)) + + val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1) + expectedWorkerOffers.append(new WorkerOffer( + mesosOffers.get(0).getSlaveId.getValue, + mesosOffers.get(0).getHostname, + 2 + )) + + val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) + val taskDesc2 = new TaskDescription(2L, 0, "s2", "n2", 0, ByteBuffer.wrap(new Array[Byte](0))) + when(taskScheduler.resourceOffers(MMatchers.eq(expectedWorkerOffers))) + .thenReturn(Seq(Seq(taskDesc, taskDesc2))) + when(taskScheduler.CPUS_PER_TASK).thenReturn(2) + when(taskScheduler.sc).thenReturn(sc) + + val capture = ArgumentCaptor.forClass(classOf[util.Collection[TaskInfo]]) + when( + driver.launchTasks( + MMatchers.eq(Collections.singleton(mesosOffers.get(0).getId)), + capture.capture(), + MMatchers.any()) + ).thenReturn(Status.valueOf(1)) + + backend.resourceOffers(driver, mesosOffers) + + assert(capture.getValue.size() == 2) + val iter = capture.getValue.iterator() + val taskInfo = iter.next() + taskInfo.getName should be("Task s1/0") + val cpus = taskInfo.getResourcesList.get(0) + cpus.getName should be("cpus") + cpus.getScalar.getValue should be(2.0) + taskInfo.getSlaveId.getValue should be("s1") + + val taskInfo2 = iter.next() + taskInfo2.getName should be("Task s1/1") + val cpus2 = taskInfo2.getResourcesList.get(0) + cpus2.getName should be("cpus") + cpus2.getScalar.getValue should be(2.0) + taskInfo2.getSlaveId.getValue should be("s1") + + // Already capped the max executors, shouldn't launch a new one. + val mesosOffers2 = new java.util.ArrayList[Offer] + mesosOffers2.add(createOffer("1", "s1", minMem, minCpu)) + when(taskScheduler.resourceOffers(MMatchers.any(classOf[Seq[WorkerOffer]]))) + .thenReturn(Seq(Seq())) + when(taskScheduler.CPUS_PER_TASK).thenReturn(2) + when(driver.declineOffer( + MMatchers.eq(mesosOffers2.get(0).getId), + MMatchers.any(classOf[Filters]))) + .thenReturn(Status.valueOf(1)) + backend.resourceOffers(driver, mesosOffers2) + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala deleted file mode 100644 index abbae5c2ee38d..0000000000000 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.mesos - -import org.scalatest.{Matchers, FunSuite} -import org.apache.spark.{SparkEnv, SparkConf, SparkContext, LocalSparkContext} -import org.apache.spark.scheduler.{TaskDescription, WorkerOffer, TaskSchedulerImpl} -import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MemoryUtils} -import org.apache.mesos.SchedulerDriver -import org.apache.mesos.Protos._ -import org.scalatest.mock.EasyMockSugar -import org.apache.mesos.Protos.Value.Scalar -import org.easymock.{Capture, EasyMock} -import java.nio.ByteBuffer -import java.util.Collections -import java.util -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer -import akka.actor.ActorSystem - -class CoarseMesosSchedulerBackendSuite extends FunSuite - with Matchers - with LocalSparkContext - with EasyMockSugar { - - test("launch multiple executors") { - def createOffer(id: Int, mem: Int, cpu: Int) = { - val builder = Offer.newBuilder() - builder.addResourcesBuilder() - .setName("mem") - .setType(Value.Type.SCALAR) - .setScalar(Scalar.newBuilder().setValue(mem)) - builder.addResourcesBuilder() - .setName("cpus") - .setType(Value.Type.SCALAR) - .setScalar(Scalar.newBuilder().setValue(cpu)) - builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1")) - .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build() - } - - val driver = EasyMock.createMock(classOf[SchedulerDriver]) - val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) - - val env = EasyMock.createMock(classOf[SparkEnv]) - val actorSystem = EasyMock.createMock(classOf[ActorSystem]) - EasyMock.expect(env.actorSystem).andReturn(actorSystem) - EasyMock.replay(env) - - val sc = EasyMock.createMock(classOf[SparkContext]) - EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes() - EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes() - EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes() - EasyMock.expect(sc.env).andReturn(env) - val conf = new SparkConf - conf.set("spark.driver.host", "localhost") - conf.set("spark.driver.port", "1234") - conf.set("spark.mesos.coarse.executors.max", "2") - conf.set("spark.mesos.coarse.cores.max", "2") - EasyMock.expect(sc.conf).andReturn(conf).anyTimes() - EasyMock.replay(sc) - - val minMem = MemoryUtils.calculateTotalMemory(sc).toInt - val minCpu = 2 - - val mesosOffers = new java.util.ArrayList[Offer] - mesosOffers.add(createOffer(1, minMem * 2, minCpu * 2)) - - val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1) - expectedWorkerOffers.append(new WorkerOffer( - mesosOffers.get(0).getSlaveId.getValue, - mesosOffers.get(0).getHostname, - 2 - )) - - val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) - val taskDesc2 = new TaskDescription(2L, 0, "s2", "n2", 0, ByteBuffer.wrap(new Array[Byte](0))) - EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(expectedWorkerOffers))) - .andReturn(Seq(Seq(taskDesc, taskDesc2))) - EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes() - EasyMock.expect(taskScheduler.sc).andReturn(sc) - EasyMock.replay(taskScheduler) - - val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master") - - val capture = new Capture[util.Collection[TaskInfo]] - EasyMock.expect( - driver.launchTasks( - EasyMock.eq(Collections.singleton(mesosOffers.get(0).getId)), - EasyMock.capture(capture), - EasyMock.anyObject(classOf[Filters]) - ) - ).andReturn(Status.valueOf(1)).once - EasyMock.replay(driver) - - backend.resourceOffers(driver, mesosOffers) - - EasyMock.verify(driver) - assert(capture.getValue.size() == 2) - val iter = capture.getValue.iterator() - val taskInfo = iter.next() - taskInfo.getName should be("Task 0") - val cpus = taskInfo.getResourcesList.get(0) - cpus.getName should be("cpus") - cpus.getScalar.getValue should be(2.0) - taskInfo.getSlaveId.getValue should be("s1") - - val taskInfo2 = iter.next() - taskInfo2.getName should be("Task 1") - val cpus2 = taskInfo2.getResourcesList.get(0) - cpus2.getName should be("cpus") - cpus2.getScalar.getValue should be(2.0) - taskInfo2.getSlaveId.getValue should be("s1") - - // Already capped the max executors, shouldn't launch a new one. - val mesosOffers2 = new java.util.ArrayList[Offer] - mesosOffers2.add(createOffer(1, minMem, minCpu)) - EasyMock.reset(taskScheduler) - EasyMock.reset(driver) - EasyMock.expect(taskScheduler.resourceOffers(EasyMock.anyObject(classOf[Seq[WorkerOffer]])).andReturn(Seq(Seq()))) - EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes() - EasyMock.replay(taskScheduler) - EasyMock.expect(driver.declineOffer(mesosOffers2.get(0).getId)).andReturn(Status.valueOf(1)).times(1) - EasyMock.replay(driver) - - backend.resourceOffers(driver, mesosOffers2) - EasyMock.verify(driver) - } -} diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 6382ef0707657..60d5bcc5017c2 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -374,17 +374,17 @@ See the [configuration page](configuration.html) for information on Spark config - spark.mesos.coarse.cores.max + spark.mesos.coarse.coresPerExecutor.max Int.MaxValue - The maximum amount of cores the coarse grained mesos scheduler will allocate per executor. This only applies for coarse grained mode. + The maximum number of cores to use on each Spark executor. Coarse-grained mode only. spark.mesos.coarse.executors.max 1 - The maximum amount of executors that the coarse grained mesos scheduler will run per slave. This only applies for coarse grained mode. + The maximum number of executors that Spark will run on each Mesos slave. Coarse-grained mode only.