diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 281965a5981b..052d8b4dc1c8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -54,6 +54,12 @@ private[spark] class MesosSchedulerBackend( var classLoader: ClassLoader = null + /** The number of cores we currently use in the cluster. */ + private[spark] var totalCoresAcquired: Double = 0 + + /** The maximum number of cores we can use at any one time. */ + private val maxCores: Double = sc.conf.getDouble("spark.cores.max", Double.MaxValue) + // The listener bus to publish executor added/removed events. val listenerBus = sc.listenerBus @@ -93,8 +99,8 @@ private[spark] class MesosSchedulerBackend( val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home") .orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility .getOrElse { - throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") - } + throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") + } val environment = Environment.newBuilder() sc.conf.getOption("spark.executor.extraClassPath").foreach { cp => environment.addVariables( @@ -210,12 +216,18 @@ private[spark] class MesosSchedulerBackend( } /** - * Method called by Mesos to offer resources on slaves. We respond by asking our active task sets - * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that - * tasks are balanced across the cluster. + * Return the usable Mesos offers and corresponding WorkerOffers. + * + * This method declines Mesos offers that don't meet minimum cpu, memory or attribute + * requirements. + * + * @param d Mesos SchedulerDriver to decline offers + * @param offers Mesos offers to be considered + * @return a pair of Mesos offers and corresponding WorkerOffer that can be used by the + * fine-grained scheduler. */ - override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { - inClassLoader() { + private[spark] def usableWorkerOffers(d: SchedulerDriver, + offers: JList[Offer]): (Seq[Protos.Offer], Seq[WorkerOffer]) = { // Fail first on offers with unmet constraints val (offersMatchingConstraints, offersNotMatchingConstraints) = offers.asScala.partition { o => @@ -225,8 +237,7 @@ private[spark] class MesosSchedulerBackend( // add some debug messaging if (!meetsConstraints) { - val id = o.getId.getValue - logDebug(s"Declining offer: $id with attributes: $offerAttributes") + logDebug(s"Declining offer: ${o.getId.getValue} with attributes: $offerAttributes") } meetsConstraints @@ -239,44 +250,69 @@ private[spark] class MesosSchedulerBackend( .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()) } - // Of the matching constraints, see which ones give us enough memory and cores - val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o => - val mem = getResource(o.getResourcesList, "mem") - val cpus = getResource(o.getResourcesList, "cpus") - val slaveId = o.getSlaveId.getValue - val offerAttributes = toAttributeMap(o.getAttributesList) - - // check offers for - // 1. Memory requirements - // 2. CPU requirements - need at least 1 for executor, 1 for task - val meetsMemoryRequirements = mem >= calculateTotalMemory(sc) - val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK) - val meetsRequirements = - (meetsMemoryRequirements && meetsCPURequirements) || + // Of the matching constraints, see which ones give us enough memory and cores + val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o => + val mem = getResource(o.getResourcesList, "mem") + val cpus = getResource(o.getResourcesList, "cpus") + val slaveId = o.getSlaveId.getValue + val offerAttributes = toAttributeMap(o.getAttributesList) + + // check offers for + // 1. Memory requirements + // 2. CPU requirements - need at least 1 for executor, 1 for task + val meetsMemoryRequirements = mem >= calculateTotalMemory(sc) + val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK) + val meetsRequirements = + (meetsMemoryRequirements && meetsCPURequirements) || (slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) - val debugstr = if (meetsRequirements) "Accepting" else "Declining" - logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: " - + s"$offerAttributes mem: $mem cpu: $cpus") + val debugstr = if (meetsRequirements) "Accepting" else "Declining" + logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: " + + s"$offerAttributes mem: $mem cpu: $cpus") + + meetsRequirements + } + + // Decline offers we ruled out immediately + unUsableOffers.foreach(o => d.declineOffer(o.getId)) + + var availableCores = Math.max(0, maxCores - totalCoresAcquired) - meetsRequirements + val workerOffers = (for (o <- usableOffers) yield { + val coresInOffer = getResource(o.getResourcesList, "cpus").toInt + val extraCores = if (slaveIdToExecutorInfo.contains(o.getSlaveId.getValue)) { + 0D + } else { + // If the Mesos executor has not been started on this slave yet, set aside a few + // cores for the Mesos executor by offering fewer cores to the Spark executor + mesosExecutorCores } - // Decline offers we ruled out immediately - unUsableOffers.foreach(o => d.declineOffer(o.getId)) + // the cores we can offer for tasks on workers should not exceed neither availableCores + // nor cores in the current offer, after accounting for non-task cores + val taskCores = Math.min(availableCores - extraCores, coresInOffer - extraCores) - val workerOffers = usableOffers.map { o => - val cpus = if (slaveIdToExecutorInfo.contains(o.getSlaveId.getValue)) { - getResource(o.getResourcesList, "cpus").toInt - } else { - // If the Mesos executor has not been started on this slave yet, set aside a few - // cores for the Mesos executor by offering fewer cores to the Spark executor - (getResource(o.getResourcesList, "cpus") - mesosExecutorCores).toInt - } - new WorkerOffer( + if (taskCores > 0) { + availableCores -= taskCores + extraCores + Option(new WorkerOffer( o.getSlaveId.getValue, o.getHostname, - cpus) + taskCores.toInt)) + } else { + None } + }).flatten + + (usableOffers, workerOffers) + } + + /** + * Method called by Mesos to offer resources on slaves. We respond by asking our active task sets + * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that + * tasks are balanced across the cluster. + */ + override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { + inClassLoader() { + val (usableOffers, workerOffers) = usableWorkerOffers(d, offers) val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap @@ -304,20 +340,25 @@ private[spark] class MesosSchedulerBackend( mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) .add(mesosTask) slaveIdToResources(slaveId) = remainingResources + + totalCoresAcquired += getResource(mesosTask.getResourcesList, "cpus") } } // Reply to the offers val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? - mesosTasks.foreach { case (slaveId, tasks) => - slaveIdToWorkerOffer.get(slaveId).foreach(o => - listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId, - // TODO: Add support for log urls for Mesos - new ExecutorInfo(o.host, o.cores, Map.empty))) - ) - logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}") - d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) + mesosTasks.foreach { + case (slaveId, tasks) => + // add the cores reserved for each Mesos executor (one per slave) + totalCoresAcquired += getResource(tasks.get(0).getExecutor.getResourcesList, "cpus") + + slaveIdToWorkerOffer.get(slaveId).foreach(o => + listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId, + // TODO: Add support for log urls for Mesos + new ExecutorInfo(o.host, o.cores, Map.empty)))) + logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}") + d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) } // Decline offers that weren't used @@ -365,6 +406,8 @@ private[spark] class MesosSchedulerBackend( } if (TaskState.isFinished(state)) { taskIdToSlaveId.remove(tid) + // here we assume that any Mesos task was allocated CPUS_PER_TASK + totalCoresAcquired -= scheduler.CPUS_PER_TASK } } scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer) @@ -396,7 +439,12 @@ private[spark] class MesosSchedulerBackend( private def removeExecutor(slaveId: String, reason: String) = { synchronized { listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), slaveId, reason)) - slaveIdToExecutorInfo -= slaveId + val executorInfo = slaveIdToExecutorInfo.remove(slaveId) + // in case we had an executor there, we need to update the total cores + executorInfo.map { info => + // we could use mesosExecutorCores but this way we are sure we're counting correctly + totalCoresAcquired -= getResource(info.getResourcesList, "cpus") + } } } @@ -412,8 +460,10 @@ private[spark] class MesosSchedulerBackend( recordSlaveLost(d, slaveId, SlaveLost()) } - override def executorLost(d: SchedulerDriver, executorId: ExecutorID, - slaveId: SlaveID, status: Int) { + override def executorLost( + d: SchedulerDriver, + executorId: ExecutorID, + slaveId: SlaveID, status: Int) { logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue, slaveId.getValue)) recordSlaveLost(d, slaveId, ExecutorExited(status, exitCausedByApp = true)) @@ -422,8 +472,7 @@ private[spark] class MesosSchedulerBackend( override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = { mesosDriver.killTask( TaskID.newBuilder() - .setValue(taskId.toString).build() - ) + .setValue(taskId.toString).build()) } // TODO: query Mesos for number of cores diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 721861fbbc51..17247b37e07b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -31,7 +31,6 @@ import org.apache.mesos.protobuf.{ByteString, GeneratedMessage} import org.apache.spark.{SparkException, SparkConf, Logging, SparkContext} import org.apache.spark.util.Utils - /** * Shared trait for implementing a Mesos Scheduler. This holds common state and helper * methods and Mesos scheduler will use. @@ -158,6 +157,12 @@ private[mesos] trait MesosSchedulerUtils extends Logging { /** * Partition the existing set of resources into two groups, those remaining to be * scheduled and those requested to be used for a new task. + * + * @note This method assumes there are enough resources to fulfill the request. In case + * there aren't it will return partial results. For instance, if amountToUse is + * 2 cpus, but only 1 is available, it will return a used `Resource` for + * 1 cpu. + * * @param resources The full list of available resources * @param resourceName The name of the resource to take from the available resources * @param amountToUse The amount of resources to take from the available resources @@ -197,7 +202,6 @@ private[mesos] trait MesosSchedulerUtils extends Logging { (attr.getName, attr.getText.getValue.split(',').toSet) } - /** Build a Mesos resource protobuf object */ protected def createResource(resourceName: String, quantity: Double): Protos.Resource = { Resource.newBuilder() @@ -225,7 +229,6 @@ private[mesos] trait MesosSchedulerUtils extends Logging { }).toMap } - /** * Match the requirements (if any) to the offer attributes. * if attribute requirements are not specified - return true diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala index c4dc56003120..e77b57604154 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala @@ -41,12 +41,25 @@ import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded, import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { + private def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer = { + val builder = Offer.newBuilder() + builder.addResourcesBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(mem)) + builder.addResourcesBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(cpu)) + builder.setId(OfferID.newBuilder() + .setValue(offerId).build()) + .setFrameworkId(FrameworkID.newBuilder().setValue("f1")) + .setSlaveId(SlaveID.newBuilder().setValue(slaveId)) + .setHostname(s"host${slaveId}") + .build() + } - test("Use configured mesosExecutor.cores for ExecutorInfo") { - val mesosExecutorCores = 3 - val conf = new SparkConf - conf.set("spark.mesos.mesosExecutor.cores", mesosExecutorCores.toString) - + private def setupSparkContext(): SparkContext = { val listenerBus = mock[LiveListenerBus] listenerBus.post( SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) @@ -54,10 +67,19 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi val sc = mock[SparkContext] when(sc.getSparkHome()).thenReturn(Option("/spark-home")) - when(sc.conf).thenReturn(conf) when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) when(sc.executorMemory).thenReturn(100) when(sc.listenerBus).thenReturn(listenerBus) + sc + } + + test("Use configured mesosExecutor.cores for ExecutorInfo") { + val mesosExecutorCores = 3 + val conf = new SparkConf + conf.set("spark.mesos.mesosExecutor.cores", mesosExecutorCores.toString) + + val sc = setupSparkContext() + when(sc.conf).thenReturn(conf) val taskScheduler = mock[TaskSchedulerImpl] when(taskScheduler.CPUS_PER_TASK).thenReturn(2) @@ -76,19 +98,10 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi test("check spark-class location correctly") { val conf = new SparkConf - conf.set("spark.mesos.executor.home" , "/mesos-home") - - val listenerBus = mock[LiveListenerBus] - listenerBus.post( - SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) - - val sc = mock[SparkContext] - when(sc.getSparkHome()).thenReturn(Option("/spark-home")) - + conf.set("spark.mesos.executor.home", "/mesos-home") + val sc = setupSparkContext() when(sc.conf).thenReturn(conf) - when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) - when(sc.executorMemory).thenReturn(100) - when(sc.listenerBus).thenReturn(listenerBus) + val taskScheduler = mock[TaskSchedulerImpl] when(taskScheduler.CPUS_PER_TASK).thenReturn(2) @@ -117,16 +130,8 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi .set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro") .set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp") - val listenerBus = mock[LiveListenerBus] - listenerBus.post( - SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) - - val sc = mock[SparkContext] - when(sc.executorMemory).thenReturn(100) - when(sc.getSparkHome()).thenReturn(Option("/spark-home")) - when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) + val sc = setupSparkContext() when(sc.conf).thenReturn(conf) - when(sc.listenerBus).thenReturn(listenerBus) val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") @@ -157,35 +162,11 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi } test("mesos resource offers result in launching tasks") { - def createOffer(id: Int, mem: Int, cpu: Int): Offer = { - val builder = Offer.newBuilder() - builder.addResourcesBuilder() - .setName("mem") - .setType(Value.Type.SCALAR) - .setScalar(Scalar.newBuilder().setValue(mem)) - builder.addResourcesBuilder() - .setName("cpus") - .setType(Value.Type.SCALAR) - .setScalar(Scalar.newBuilder().setValue(cpu)) - builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()) - .setFrameworkId(FrameworkID.newBuilder().setValue("f1")) - .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")) - .setHostname(s"host${id.toString}").build() - } - val driver = mock[SchedulerDriver] val taskScheduler = mock[TaskSchedulerImpl] - val listenerBus = mock[LiveListenerBus] - listenerBus.post( - SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) - - val sc = mock[SparkContext] - when(sc.executorMemory).thenReturn(100) - when(sc.getSparkHome()).thenReturn(Option("/path")) - when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) + val sc = setupSparkContext() when(sc.conf).thenReturn(new SparkConf) - when(sc.listenerBus).thenReturn(listenerBus) val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") @@ -193,21 +174,19 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi val minCpu = 4 val mesosOffers = new java.util.ArrayList[Offer] - mesosOffers.add(createOffer(1, minMem, minCpu)) - mesosOffers.add(createOffer(2, minMem - 1, minCpu)) - mesosOffers.add(createOffer(3, minMem, minCpu)) + mesosOffers.add(createOffer("o1", "s1", minMem, minCpu)) + mesosOffers.add(createOffer("o2", "s2", minMem - 1, minCpu)) + mesosOffers.add(createOffer("o3", "s3", minMem, minCpu)) val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2) expectedWorkerOffers.append(new WorkerOffer( mesosOffers.get(0).getSlaveId.getValue, mesosOffers.get(0).getHostname, - (minCpu - backend.mesosExecutorCores).toInt - )) + (minCpu - backend.mesosExecutorCores).toInt)) expectedWorkerOffers.append(new WorkerOffer( mesosOffers.get(2).getSlaveId.getValue, mesosOffers.get(2).getHostname, - (minCpu - backend.mesosExecutorCores).toInt - )) + (minCpu - backend.mesosExecutorCores).toInt)) val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc))) when(taskScheduler.CPUS_PER_TASK).thenReturn(2) @@ -217,9 +196,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi driver.launchTasks( Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), capture.capture(), - any(classOf[Filters]) - ) - ).thenReturn(Status.valueOf(1)) + any(classOf[Filters]))).thenReturn(Status.valueOf(1)) when(driver.declineOffer(mesosOffers.get(1).getId)).thenReturn(Status.valueOf(1)) when(driver.declineOffer(mesosOffers.get(2).getId)).thenReturn(Status.valueOf(1)) @@ -228,8 +205,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi verify(driver, times(1)).launchTasks( Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), capture.capture(), - any(classOf[Filters]) - ) + any(classOf[Filters])) verify(driver, times(1)).declineOffer(mesosOffers.get(1).getId) verify(driver, times(1)).declineOffer(mesosOffers.get(2).getId) assert(capture.getValue.size() === 1) @@ -242,7 +218,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi // Unwanted resources offered on an existing node. Make sure they are declined val mesosOffers2 = new java.util.ArrayList[Offer] - mesosOffers2.add(createOffer(1, minMem, minCpu)) + mesosOffers2.add(createOffer("o1", "s1", minMem, minCpu)) reset(taskScheduler) reset(driver) when(taskScheduler.resourceOffers(any(classOf[Seq[WorkerOffer]]))).thenReturn(Seq(Seq())) @@ -257,16 +233,8 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi val driver = mock[SchedulerDriver] val taskScheduler = mock[TaskSchedulerImpl] - val listenerBus = mock[LiveListenerBus] - listenerBus.post( - SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) - - val sc = mock[SparkContext] - when(sc.executorMemory).thenReturn(100) - when(sc.getSparkHome()).thenReturn(Option("/path")) - when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) + val sc = setupSparkContext() when(sc.conf).thenReturn(new SparkConf) - when(sc.listenerBus).thenReturn(listenerBus) val id = 1 val builder = Offer.newBuilder() @@ -305,7 +273,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi mesosOffers.get(0).getSlaveId.getValue, mesosOffers.get(0).getHostname, 2 // Deducting 1 for executor - )) + )) val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc))) @@ -316,17 +284,14 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi driver.launchTasks( Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), capture.capture(), - any(classOf[Filters]) - ) - ).thenReturn(Status.valueOf(1)) + any(classOf[Filters]))).thenReturn(Status.valueOf(1)) backend.resourceOffers(driver, mesosOffers) verify(driver, times(1)).launchTasks( Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), capture.capture(), - any(classOf[Filters]) - ) + any(classOf[Filters])) assert(capture.getValue.size() === 1) val taskInfo = capture.getValue.iterator().next() @@ -344,4 +309,157 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("prod") }) } + + test("does not pass offers to TaskScheduler above spark.cores.max") { + val driver = mock[SchedulerDriver] + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.CPUS_PER_TASK).thenReturn(2) + + val conf = new SparkConf + conf.set("spark.cores.max", "5") + + val sc = setupSparkContext() + when(sc.conf).thenReturn(conf) + + val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") + + val minMem = backend.calculateTotalMemory(sc) + val offers = List(createOffer("o1", "s1", minMem, 5), createOffer("o2", "s2", minMem, 10)) + + val (usableOffers, workerOffers) = backend.usableWorkerOffers(driver, offers.asJava) + + assert(usableOffers === offers, "All offers are usable") + // 1 core is set aside for the executor, so only 4 cores are available in the worker offer + // the second Mesos offer is not translated to a worker offer since all available cores are + // already consumed + assert(workerOffers === List(new WorkerOffer("s1", "hosts1", 4))) + } + + test("correctly accounts for mesossExecutor.cores when calculating total acquired cores") { + val driver = mock[SchedulerDriver] + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.CPUS_PER_TASK).thenReturn(2) + + val conf = new SparkConf + conf.set("spark.cores.max", "6") + conf.set("spark.mesos.mesosExecutor.cores", "3") + + val sc = setupSparkContext() + when(sc.conf).thenReturn(conf) + + val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") + + val minMem = backend.calculateTotalMemory(sc) + val offers = List(createOffer("o1", "s1", minMem, 8), createOffer("o2", "s2", minMem, 10)) + + val (usableOffers, workerOffers) = backend.usableWorkerOffers(driver, offers.asJava) + + // 3 cores are set aside for the executor, so only 3 cores are available without going + // above `spark.cores.max`. + // the second Mesos offer is not translated to a worker offer since all available cores are + // already consumed + assert(workerOffers === List(new WorkerOffer("s1", "hosts1", 3))) + } + + test("does not launch tasks above spark.cores.max") { + val driver = mock[SchedulerDriver] + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.CPUS_PER_TASK).thenReturn(2) + + val conf = new SparkConf + conf.set("spark.cores.max", "5") + + val sc = setupSparkContext() + when(sc.conf).thenReturn(conf) + + val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") + + val minMem = backend.calculateTotalMemory(sc) + val offers = List(createOffer("o1", "s1", minMem, 5), createOffer("o2", "s2", minMem, 10)) + + val (usableOffers, workerOffers) = backend.usableWorkerOffers(driver, offers.asJava) + + // there are two tasks that fit in the first offer + val expectedTaskDescriptions = Seq( + new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))), + new TaskDescription(2L, 0, "s1", "n2", 1, ByteBuffer.wrap(new Array[Byte](0)))) + + when(taskScheduler.resourceOffers(workerOffers)).thenReturn(Seq(expectedTaskDescriptions)) + + val captor = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]]) + + backend.resourceOffers(driver, offers.asJava) + + verify(driver, times(1)).launchTasks( + Matchers.eq(Collections.singleton(offers.head.getId)), + captor.capture(), + any(classOf[Filters])) + + val launchedTaskInfos = captor.getValue.asScala + assert(launchedTaskInfos.size == 2) + assert(launchedTaskInfos.head.getSlaveId.getValue == "s1") + + val launchedCores = + for (taskInfo <- launchedTaskInfos) yield + getResource(taskInfo.getResourcesList.asScala, "cpus") + + assert(launchedCores === Seq(2.0, 2.0)) + + assert(backend.totalCoresAcquired == 5) + } + + test("cores are released when tasks are done") { + val driver = mock[SchedulerDriver] + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.CPUS_PER_TASK).thenReturn(2) + + val conf = new SparkConf + conf.set("spark.cores.max", "5") + conf.set("spark.mesos.mesosExecutor.cores", "1") + + val sc = setupSparkContext() + when(sc.conf).thenReturn(conf) + + val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") + + val minMem = backend.calculateTotalMemory(sc) + val offers = List(createOffer("o1", "s1", minMem, 5), createOffer("o2", "s2", minMem, 10)) + + val (usableOffers, workerOffers) = backend.usableWorkerOffers(driver, offers.asJava) + + // there are two tasks that fit in the first offer + val expectedTaskDescriptions = Seq( + new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))), + new TaskDescription(2L, 0, "s1", "n2", 1, ByteBuffer.wrap(new Array[Byte](0)))) + + when(taskScheduler.resourceOffers(workerOffers)).thenReturn(Seq(expectedTaskDescriptions)) + + val captor = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]]) + + backend.resourceOffers(driver, offers.asJava) + + // we should have used all cores we are allowed to use + assert(backend.totalCoresAcquired == 5) + + val executorId = ExecutorID.newBuilder().setValue("hosts1").build() + val statusBuilder = TaskStatus.newBuilder() + statusBuilder.setTaskId(TaskID.newBuilder().setValue("1").build()) + statusBuilder.setState(TaskState.TASK_FINISHED) + statusBuilder.setExecutorId(executorId) + backend.statusUpdate(driver, statusBuilder.build()) + + statusBuilder.setTaskId(TaskID.newBuilder().setValue("2").build()) + statusBuilder.setState(TaskState.TASK_FINISHED) + statusBuilder.setExecutorId(executorId) + backend.statusUpdate(driver, statusBuilder.build()) + + // this is called by Mesos when an executor exits + backend.executorLost(driver, executorId, SlaveID.newBuilder().setValue("s1").build(), 0) + + assert(backend.totalCoresAcquired == 0) + } + + private def getResource(resources: Iterable[Resource], name: String) = { + resources.filter(_.getName == name).map(_.getScalar.getValue).sum + } } diff --git a/docs/configuration.md b/docs/configuration.md index c276e8e90dec..ae6d61fca5fc 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1095,11 +1095,10 @@ Apart from these, the following properties are also available, and may be useful (not set) When running on a standalone deploy cluster or a - Mesos cluster in "coarse-grained" - sharing mode, the maximum amount of CPU cores to request for the application from - across the cluster (not from each machine). If not set, the default will be - spark.deploy.defaultCores on Spark's standalone cluster manager, or - infinite (all available cores) on Mesos. + Mesos cluster, the maximum + amount of CPU cores to request for the application from across the cluster (not from each + machine). If not set, the default will be spark.deploy.defaultCores on + Spark's standalone cluster manager, or infinite (all available cores) on Mesos. diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md index a3c34cb6796f..c78923c29f15 100644 --- a/docs/job-scheduling.md +++ b/docs/job-scheduling.md @@ -35,9 +35,8 @@ Resource allocation can be configured as follows, based on the cluster type: or change the default for applications that don't set this setting through `spark.deploy.defaultCores`. Finally, in addition to controlling cores, each application's `spark.executor.memory` setting controls its memory use. -* **Mesos:** To use static partitioning on Mesos, set the `spark.mesos.coarse` configuration property to `true`, - and optionally set `spark.cores.max` to limit each application's resource share as in the standalone mode. - You should also set `spark.executor.memory` to control the executor memory. +* **Mesos:** To use static partitioning on Mesos, set `spark.cores.max` to limit each application's + resource share as in the standalone mode. You should also set `spark.executor.memory` to control the executor memory. * **YARN:** The `--num-executors` option to the Spark YARN client controls how many executors it will allocate on the cluster, while `--executor-memory` and `--executor-cores` control the resources per executor. diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index ec5a44d79212..4b937836bf6d 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -181,7 +181,7 @@ To run in coarse-grained mode, set the `spark.mesos.coarse` property in your conf.set("spark.mesos.coarse", "true") {% endhighlight %} -In addition, for coarse-grained mode, you can control the maximum number of resources Spark will +In addition, for both fine-grained and coarse-grained mode, you can control the maximum number of resources Spark will acquire. By default, it will acquire *all* cores in the cluster (that get offered by Mesos), which only makes sense if you run just one application at a time. You can cap the maximum number of cores using `conf.set("spark.cores.max", "10")` (for example).