diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 7d08eae0b4871..3a52f5d7512d1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -18,23 +18,24 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File +import java.util import java.util.concurrent.locks.ReentrantLock import java.util.{Collections, List => JList} import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, HashSet} -import com.google.common.collect.HashBiMap -import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} -import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver} - -import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException, TaskState} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.RpcAddress -import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl} import org.apache.spark.util.Utils +import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException, TaskState} + + +import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} +import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver} /** * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds @@ -64,16 +65,31 @@ private[spark] class CoarseMesosSchedulerBackend( // This is for cleaning up shuffle files reliably. private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) + private val maxExecutorsPerSlave = conf.getInt("spark.mesos.coarse.executors.max", 1) + + private val maxCpusPerExecutor = + conf.getOption("spark.mesos.coarse.executor.cores.max").map { m => m.toInt } + + if (conf.getOption("spark.mesos.coarse.executors.max").isDefined && maxCpusPerExecutor.isEmpty) { + throw new IllegalArgumentException( + "Must configure spark.mesos.coarse.coresPerExecutor.max when " + + "spark.mesos.coarse.executors.max is set") + } + // Cores we have acquired with each Mesos task ID - val coresByTaskId = new HashMap[Int, Int] + val coresByTaskId = new HashMap[String, Int] var totalCoresAcquired = 0 - val slaveIdsWithExecutors = new HashSet[String] - // Maping from slave Id to hostname private val slaveIdToHost = new HashMap[String, String] - val taskIdToSlaveId: HashBiMap[Int, String] = HashBiMap.create[Int, String] + // Contains the list of slave ids that we have connect shuffle service to + private val existingSlaveShuffleConnections = new HashSet[String] + + // Contains a mapping of slave ids to the number of executors launched. + val slaveIdsWithExecutors = new HashMap[String, Int] + + val taskIdToSlaveId: HashMap[String, String] = new HashMap[String, String] // How many times tasks on each slave failed val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int] @@ -89,8 +105,6 @@ private[spark] class CoarseMesosSchedulerBackend( */ private[mesos] def executorLimit: Int = executorLimitOption.getOrElse(Int.MaxValue) - private val pendingRemovedSlaveIds = new HashSet[String] - // private lock object protecting mutable state above. Using the intrinsic lock // may lead to deadlocks since the superclass might also try to lock private val stateLock = new ReentrantLock @@ -122,10 +136,10 @@ private[spark] class CoarseMesosSchedulerBackend( @volatile var appId: String = _ - def newMesosTaskId(): Int = { + def newMesosTaskId(slaveId: String): String = { val id = nextMesosTaskId nextMesosTaskId += 1 - id + slaveId + "/" + id } override def start() { @@ -140,7 +154,7 @@ private[spark] class CoarseMesosSchedulerBackend( startScheduler(driver) } - def createCommand(offer: Offer, numCores: Int, taskId: Int): CommandInfo = { + def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = { val executorSparkHome = conf.getOption("spark.mesos.executor.home") .orElse(sc.getSparkHome()) .getOrElse { @@ -184,7 +198,7 @@ private[spark] class CoarseMesosSchedulerBackend( "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend" .format(prefixEnv, runScript) + s" --driver-url $driverURL" + - s" --executor-id ${offer.getSlaveId.getValue}" + + s" --executor-id $taskId" + s" --hostname ${offer.getHostname}" + s" --cores $numCores" + s" --app-id $appId") @@ -192,12 +206,12 @@ private[spark] class CoarseMesosSchedulerBackend( // Grab everything to the first '.'. We'll use that and '*' to // glob the directory "correctly". val basename = uri.get.split('/').last.split('.').head - val executorId = sparkExecutorId(offer.getSlaveId.getValue, taskId.toString) + command.setValue( s"cd $basename*; $prefixEnv " + "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" + s" --driver-url $driverURL" + - s" --executor-id $executorId" + + s" --executor-id $taskId" + s" --hostname ${offer.getHostname}" + s" --cores $numCores" + s" --app-id $appId") @@ -244,58 +258,63 @@ private[spark] class CoarseMesosSchedulerBackend( * unless we've already launched more than we wanted to. */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { + val memoryPerExecutor = calculateTotalMemory(sc) stateLock.synchronized { val filters = Filters.newBuilder().setRefuseSeconds(5).build() for (offer <- offers.asScala) { + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus").toInt + var remainingMem = mem + var remainingCores = cpus + val tasks = new util.ArrayList[MesosTaskInfo]() val offerAttributes = toAttributeMap(offer.getAttributesList) val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) val slaveId = offer.getSlaveId.getValue - val mem = getResource(offer.getResourcesList, "mem") - val cpus = getResource(offer.getResourcesList, "cpus").toInt val id = offer.getId.getValue - if (meetsConstraints) { - if (taskIdToSlaveId.size < executorLimit && - totalCoresAcquired < maxCores && - mem >= calculateTotalMemory(sc) && - cpus >= 1 && - failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && - !slaveIdsWithExecutors.contains(slaveId)) { - // Launch an executor on the slave - val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) - totalCoresAcquired += cpusToUse - val taskId = newMesosTaskId() - taskIdToSlaveId.put(taskId, slaveId) - slaveIdsWithExecutors += slaveId - coresByTaskId(taskId) = cpusToUse - // Gather cpu resources from the available resources and use them in the task. - val (remainingResources, cpuResourcesToUse) = - partitionResources(offer.getResourcesList, "cpus", cpusToUse) - val (_, memResourcesToUse) = - partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc)) - val taskBuilder = MesosTaskInfo.newBuilder() - .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) - .setSlaveId(offer.getSlaveId) - .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId)) - .setName("Task " + taskId) - .addAllResources(cpuResourcesToUse.asJava) - .addAllResources(memResourcesToUse.asJava) - - sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil - .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder()) - } - - // Accept the offer and launch the task - logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus") - slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname - d.launchTasks( - Collections.singleton(offer.getId), - Collections.singleton(taskBuilder.build()), filters) - } else { - // Decline the offer - logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus") - d.declineOffer(offer.getId) + var executorCount = slaveIdsWithExecutors.getOrElse(slaveId, 0) + while (taskIdToSlaveId.size < executorLimit && + totalCoresAcquired < maxCores && + meetsConstraints && + remainingMem >= calculateTotalMemory(sc) && + remainingCores >= 1 && + failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && + executorCount < maxExecutorsPerSlave) { + val coresToUse = + math.min(maxCpusPerExecutor.getOrElse(Int.MaxValue), + math.min(remainingCores, maxCores - totalCoresAcquired)) + totalCoresAcquired += coresToUse + remainingCores -= coresToUse + remainingMem -= memoryPerExecutor + val taskId = newMesosTaskId(slaveId) + taskIdToSlaveId(taskId) = slaveId + executorCount += 1 + slaveIdsWithExecutors(slaveId) = executorCount + coresByTaskId(taskId) = coresToUse + // Gather cpu resources from the available resources and use them in the task. + val (remainingResources, cpuResourcesToUse) = + partitionResources(offer.getResourcesList, "cpus", coresToUse) + val (_, memResourcesToUse) = + partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc)) + val taskBuilder = MesosTaskInfo.newBuilder() + .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) + .setSlaveId(offer.getSlaveId) + .setCommand(createCommand(offer, coresToUse + extraCoresPerSlave, taskId)) + .setName("Task " + taskId) + .addAllResources(cpuResourcesToUse.asJava) + .addAllResources(memResourcesToUse.asJava) + + sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => + MesosSchedulerBackendUtil + .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder()) } + tasks.add(taskBuilder.build()) + } + + if (!tasks.isEmpty) { + // accept the offer and launch the task + logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus") + slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname + d.launchTasks(Collections.singleton(offer.getId()), tasks, filters) } else { // This offer does not meet constraints. We don't need to see it again. // Decline the offer for a long period of time. @@ -310,7 +329,7 @@ private[spark] class CoarseMesosSchedulerBackend( override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { - val taskId = status.getTaskId.getValue.toInt + val taskId = status.getTaskId.getValue val state = status.getState logInfo(s"Mesos task $taskId is now $state") val slaveId: String = status.getSlaveId.getValue @@ -321,7 +340,8 @@ private[spark] class CoarseMesosSchedulerBackend( // this through Mesos, since the shuffle services are set up independently. if (TaskState.fromMesos(state).equals(TaskState.RUNNING) && slaveIdToHost.contains(slaveId) && - shuffleServiceEnabled) { + shuffleServiceEnabled && + !existingSlaveShuffleConnections.contains(slaveId)) { assume(mesosExternalShuffleClient.isDefined, "External shuffle client was not instantiated even though shuffle service is enabled.") // TODO: Remove this and allow the MesosExternalShuffleService to detect @@ -332,12 +352,10 @@ private[spark] class CoarseMesosSchedulerBackend( s"host $hostname, port $externalShufflePort for app ${conf.getAppId}") mesosExternalShuffleClient.get .registerDriverWithShuffleService(hostname, externalShufflePort) - } + existingSlaveShuffleConnections += slaveId + } else if (TaskState.isFinished(TaskState.fromMesos(state))) { + val slaveId = taskIdToSlaveId(taskId) - if (TaskState.isFinished(TaskState.fromMesos(state))) { - val slaveId = taskIdToSlaveId.get(taskId) - slaveIdsWithExecutors -= slaveId - taskIdToSlaveId.remove(taskId) // Remove the cores we have remembered for this task, if it's in the hashmap for (cores <- coresByTaskId.get(taskId)) { totalCoresAcquired -= cores @@ -351,7 +369,7 @@ private[spark] class CoarseMesosSchedulerBackend( "is Spark installed on it?") } } - executorTerminated(d, slaveId, s"Executor finished with state $state") + executorTerminated(d, taskId, slaveId, s"Executor finished with state $state") // In case we'd rejected everything before but have now lost a node d.reviveOffers() } @@ -378,35 +396,39 @@ private[spark] class CoarseMesosSchedulerBackend( * slave IDs that we might have asked to be killed. It also notifies the driver * that an executor was removed. */ - private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = { + private def executorTerminated( + d: SchedulerDriver, + executorId: String, + slaveId: String, + reason: String): Unit = { stateLock.synchronized { - if (slaveIdsWithExecutors.contains(slaveId)) { - val slaveIdToTaskId = taskIdToSlaveId.inverse() - if (slaveIdToTaskId.containsKey(slaveId)) { - val taskId: Int = slaveIdToTaskId.get(slaveId) - taskIdToSlaveId.remove(taskId) - removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason)) + if (slaveIdsWithExecutors.contains(slaveId) && taskIdToSlaveId.contains(executorId)) { + taskIdToSlaveId.remove(executorId) + removeExecutor(executorId, new ExecutorLossReason(reason)) + val newCount = slaveIdsWithExecutors(slaveId) - 1 + if (newCount == 0) { + slaveIdsWithExecutors.remove(slaveId) + } else { + slaveIdsWithExecutors(slaveId) = newCount } - // TODO: This assumes one Spark executor per Mesos slave, - // which may no longer be true after SPARK-5095 - pendingRemovedSlaveIds -= slaveId - slaveIdsWithExecutors -= slaveId } } } - private def sparkExecutorId(slaveId: String, taskId: String): String = { - s"$slaveId/$taskId" - } - override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = { logInfo(s"Mesos slave lost: ${slaveId.getValue}") - executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue) + // Terminate all executors in the slave + stateLock.synchronized { + val lostExecutors = taskIdToSlaveId.filter(_._2.equals(slaveId.getValue)).map(_._1) + lostExecutors.foreach { taskId => + executorTerminated(d, taskId, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue) + } + } } override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int): Unit = { - logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue)) - slaveLost(d, s) + logInfo("Executor lost: %s".format(e.getValue)) + executorTerminated(d, e.getValue, s.getValue, "Mesos Executor lost: " + e.getValue) } override def applicationId(): String = @@ -429,13 +451,9 @@ private[spark] class CoarseMesosSchedulerBackend( return false } - val slaveIdToTaskId = taskIdToSlaveId.inverse() for (executorId <- executorIds) { - val slaveId = executorId.split("/")(0) - if (slaveIdToTaskId.containsKey(slaveId)) { - mesosDriver.killTask( - TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build()) - pendingRemovedSlaveIds += slaveId + if (taskIdToSlaveId.contains(executorId)) { + mesosDriver.killTask(TaskID.newBuilder().setValue(executorId).build()) } else { logWarning("Unable to find executor Id '" + executorId + "' in Mesos scheduler") } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala index 525ee0d3bdc5a..992305f593f14 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -17,23 +17,29 @@ package org.apache.spark.scheduler.cluster.mesos +import java.nio.ByteBuffer import java.util import java.util.Collections +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + import org.apache.mesos.Protos.Value.Scalar -import org.apache.mesos.Protos._ -import org.apache.mesos.{Protos, Scheduler, SchedulerDriver} +import org.apache.mesos.Protos.{TaskState, _} +import org.apache.mesos.{Protos, SchedulerDriver} import org.mockito.Matchers._ import org.mockito.Mockito._ -import org.mockito.Matchers +import org.mockito.{ArgumentCaptor, Matchers => MMatchers} +import org.scalatest.{BeforeAndAfter, Matchers} import org.scalatest.mock.MockitoSugar -import org.scalatest.BeforeAndAfter -import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SecurityManager, SparkFunSuite} +import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription, TaskSchedulerImpl, WorkerOffer} +import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkEnv, SparkFunSuite} + class CoarseMesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext + with Matchers with MockitoSugar with BeforeAndAfter { @@ -61,16 +67,10 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite driver: SchedulerDriver): CoarseMesosSchedulerBackend = { val securityManager = mock[SecurityManager] val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master", securityManager) { - override protected def createSchedulerDriver( - masterUrl: String, - scheduler: Scheduler, - sparkUser: String, - appName: String, - conf: SparkConf, - webuiUrl: Option[String] = None, - checkpoint: Option[Boolean] = None, - failoverTimeout: Option[Double] = None, - frameworkId: Option[String] = None): SchedulerDriver = driver + override def start(): Unit = { + mesosDriver = driver + } + override def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = None markRegistered() } backend.start() @@ -78,21 +78,30 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite } var sparkConf: SparkConf = _ + var sparkEnv: SparkEnv = _ + var taskScheduler: TaskSchedulerImpl = _ before { sparkConf = (new SparkConf) .setMaster("local[*]") .setAppName("test-mesos-dynamic-alloc") .setSparkHome("/path") - - sc = new SparkContext(sparkConf) + .set("spark.testing", "1") + sc = mock[SparkContext] + sparkEnv = mock[SparkEnv] + taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.sc).thenReturn(sc) + when(taskScheduler.sc).thenReturn(sc) + when(sc.executorMemory).thenReturn(100) + when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) + when(sc.getSparkHome()).thenReturn(Option("/path")) + when(sc.env).thenReturn(sparkEnv) + when(sc.conf).thenReturn(sparkConf) } test("mesos supports killing and limiting executors") { val driver = mock[SchedulerDriver] when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) - val taskScheduler = mock[TaskSchedulerImpl] - when(taskScheduler.sc).thenReturn(sc) sparkConf.set("spark.driver.host", "driverHost") sparkConf.set("spark.driver.port", "1234") @@ -104,11 +113,11 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite val mesosOffers = new java.util.ArrayList[Offer] mesosOffers.add(createOffer("o1", "s1", minMem, minCpu)) - val taskID0 = TaskID.newBuilder().setValue("0").build() + val taskID0 = TaskID.newBuilder().setValue("s1/0").build() backend.resourceOffers(driver, mesosOffers) verify(driver, times(1)).launchTasks( - Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), + MMatchers.eq(Collections.singleton(mesosOffers.get(0).getId)), any[util.Collection[TaskInfo]], any[Filters]) @@ -122,7 +131,9 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite backend.resourceOffers(driver, mesosOffers2) verify(driver, times(1)) - .declineOffer(OfferID.newBuilder().setValue("o2").build()) + .declineOffer( + MMatchers.eq(OfferID.newBuilder().setValue("o2").build()), + MMatchers.any(classOf[Filters])) // Verify we didn't launch any new executor assert(backend.slaveIdsWithExecutors.size === 1) @@ -130,7 +141,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite backend.doRequestTotalExecutors(2) backend.resourceOffers(driver, mesosOffers2) verify(driver, times(1)).launchTasks( - Matchers.eq(Collections.singleton(mesosOffers2.get(0).getId)), + MMatchers.eq(Collections.singleton(mesosOffers2.get(0).getId)), any[util.Collection[TaskInfo]], any[Filters]) @@ -142,8 +153,6 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite test("mesos supports killing and relaunching tasks with executors") { val driver = mock[SchedulerDriver] when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) - val taskScheduler = mock[TaskSchedulerImpl] - when(taskScheduler.sc).thenReturn(sc) val backend = createSchedulerBackend(taskScheduler, driver) val minMem = backend.calculateTotalMemory(sc) + 1024 @@ -158,13 +167,13 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite backend.resourceOffers(driver, mesosOffers) verify(driver, times(1)).launchTasks( - Matchers.eq(Collections.singleton(offer1.getId)), + MMatchers.eq(Collections.singleton(offer1.getId)), anyObject(), anyObject[Filters]) // Simulate task killed, executor no longer running val status = TaskStatus.newBuilder() - .setTaskId(TaskID.newBuilder().setValue("0").build()) + .setTaskId(TaskID.newBuilder().setValue("s1/0").build()) .setSlaveId(SlaveID.newBuilder().setValue("s1").build()) .setState(TaskState.TASK_KILLED) .build @@ -178,10 +187,92 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite assert(backend.slaveIdsWithExecutors.contains("s1")) verify(driver, times(1)).launchTasks( - Matchers.eq(Collections.singleton(offer2.getId)), + MMatchers.eq(Collections.singleton(offer2.getId)), anyObject(), anyObject[Filters]) verify(driver, times(1)).reviveOffers() } + + test("launch multiple executors") { + val driver = mock[SchedulerDriver] + val taskScheduler = mock[TaskSchedulerImpl] + val env = mock[SparkEnv] + val sc = mock[SparkContext] + when(taskScheduler.sc).thenReturn(sc) + when(sc.executorMemory).thenReturn(100) + when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) + when(sc.getSparkHome()).thenReturn(Option("/path")) + when(sc.env).thenReturn(env) + val conf = new SparkConf + conf.set("spark.driver.host", "localhost") + conf.set("spark.driver.port", "1234") + conf.set("spark.mesos.coarse.executors.max", "2") + conf.set("spark.mesos.coarse.executor.cores.max", "2") + conf.set("spark.mesos.coarse.cores.max", "2") + conf.set("spark.testing", "1") + when(sc.conf).thenReturn(conf) + + val securityManager = mock[SecurityManager] + + val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master", securityManager) + + val minMem = backend.calculateTotalMemory(sc) + val minCpu = 2 + + val mesosOffers = new java.util.ArrayList[Offer] + mesosOffers.add(createOffer("1", "s1", minMem * 2, minCpu * 2)) + + val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1) + expectedWorkerOffers.append(new WorkerOffer( + mesosOffers.get(0).getSlaveId.getValue, + mesosOffers.get(0).getHostname, + 2 + )) + + val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) + val taskDesc2 = new TaskDescription(2L, 0, "s2", "n2", 0, ByteBuffer.wrap(new Array[Byte](0))) + when(taskScheduler.resourceOffers(MMatchers.eq(expectedWorkerOffers))) + .thenReturn(Seq(Seq(taskDesc, taskDesc2))) + when(taskScheduler.CPUS_PER_TASK).thenReturn(2) + when(taskScheduler.sc).thenReturn(sc) + + val capture = ArgumentCaptor.forClass(classOf[util.Collection[TaskInfo]]) + when( + driver.launchTasks( + MMatchers.eq(Collections.singleton(mesosOffers.get(0).getId)), + capture.capture(), + MMatchers.any()) + ).thenReturn(Status.valueOf(1)) + + backend.resourceOffers(driver, mesosOffers) + + assert(capture.getValue.size() == 2) + val iter = capture.getValue.iterator() + val taskInfo = iter.next() + taskInfo.getName should be("Task s1/0") + val cpus = taskInfo.getResourcesList.get(0) + cpus.getName should be("cpus") + cpus.getScalar.getValue should be(2.0) + taskInfo.getSlaveId.getValue should be("s1") + + val taskInfo2 = iter.next() + taskInfo2.getName should be("Task s1/1") + val cpus2 = taskInfo2.getResourcesList.get(0) + cpus2.getName should be("cpus") + cpus2.getScalar.getValue should be(2.0) + taskInfo2.getSlaveId.getValue should be("s1") + + // Already capped the max executors, shouldn't launch a new one. + val mesosOffers2 = new java.util.ArrayList[Offer] + mesosOffers2.add(createOffer("1", "s1", minMem, minCpu)) + when(taskScheduler.resourceOffers(MMatchers.any(classOf[Seq[WorkerOffer]]))) + .thenReturn(Seq(Seq())) + when(taskScheduler.CPUS_PER_TASK).thenReturn(2) + when(driver.declineOffer( + MMatchers.eq(mesosOffers2.get(0).getId), + MMatchers.any(classOf[Filters]))) + .thenReturn(Status.valueOf(1)) + backend.resourceOffers(driver, mesosOffers2) + } } diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index a197d0e373027..60d5bcc5017c2 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -373,6 +373,20 @@ See the [configuration page](configuration.html) for information on Spark config +
spark.mesos.coarse.coresPerExecutor.maxspark.mesos.coarse.executors.max