diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 7d08eae0b4871..f5d9e0884b055 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -254,12 +254,7 @@ private[spark] class CoarseMesosSchedulerBackend( val cpus = getResource(offer.getResourcesList, "cpus").toInt val id = offer.getId.getValue if (meetsConstraints) { - if (taskIdToSlaveId.size < executorLimit && - totalCoresAcquired < maxCores && - mem >= calculateTotalMemory(sc) && - cpus >= 1 && - failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && - !slaveIdsWithExecutors.contains(slaveId)) { + if (isOfferSatisfiesRequirements(slaveId, mem, cpus, sc)) { // Launch an executor on the slave val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) totalCoresAcquired += cpusToUse @@ -308,6 +303,35 @@ private[spark] class CoarseMesosSchedulerBackend( } } + // ToDo: Abstract out each condition and log them. + def isOfferSatisfiesRequirements(slaveId: String, mem: Double, cpusOffered: Int, + sc: SparkContext): Boolean = { + val meetsMemoryRequirements = mem >= calculateTotalMemory(sc) + val meetsCPURequirements = cpusOffered >= 1 + val needMoreCores = totalCoresAcquired < maxCores + val healthySlave = failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES + val taskOnEachSlaveLessThanExecutorLimit = taskIdToSlaveId.size < executorLimit + val executorNotRunningOnSlave = !slaveIdsWithExecutors.contains(slaveId) + + executorNotRunningOnSlave && + taskOnEachSlaveLessThanExecutorLimit && + needMoreCores && + meetsMemoryRequirements && + meetsCPURequirements && + healthySlave + } + + def isOfferValidForScheduling(meetsConstraints: Boolean, + slaveId: String, mem: Double, + cpus: Int, sc: SparkContext): Boolean = { + taskIdToSlaveId.size < executorLimit && + totalCoresAcquired < maxCores && + meetsConstraints && + mem >= calculateTotalMemory(sc) && + cpus >= 1 && + failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && + !slaveIdsWithExecutors.contains(slaveId) + } override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { val taskId = status.getTaskId.getValue.toInt diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 281965a5981bb..9b398749eb48e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -246,14 +246,13 @@ private[spark] class MesosSchedulerBackend( val slaveId = o.getSlaveId.getValue val offerAttributes = toAttributeMap(o.getAttributesList) - // check offers for - // 1. Memory requirements - // 2. CPU requirements - need at least 1 for executor, 1 for task - val meetsMemoryRequirements = mem >= calculateTotalMemory(sc) - val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK) + // check if Attribute constraints is satisfied + val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + val meetsRequirements = - (meetsMemoryRequirements && meetsCPURequirements) || - (slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) + isOfferSatisfiesRequirements(cpus, mem, slaveId, sc) + + // add some debug messaging val debugstr = if (meetsRequirements) "Accepting" else "Declining" logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: " + s"$offerAttributes mem: $mem cpu: $cpus") @@ -328,6 +327,18 @@ private[spark] class MesosSchedulerBackend( } } + // check if all constraints are satisfied + // 1. Memory requirements + // 2. CPU requirements - need at least 1 for executor, 1 for task + def isOfferSatisfiesRequirements(cpusOffered: Double, memory : Double, + slaveId: String, sc : SparkContext): Boolean = { + val meetsMemoryRequirements = memory >= calculateTotalMemory(sc) + val meetsCPURequirements = cpusOffered >= (mesosExecutorCores + scheduler.CPUS_PER_TASK) + + (meetsMemoryRequirements && meetsCPURequirements) || + (slaveIdToExecutorInfo.contains(slaveId) && cpusOffered >= scheduler.CPUS_PER_TASK) + } + /** Turn a Spark TaskDescription into a Mesos task and also resources unused by the task */ def createMesosTask( task: TaskDescription, diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala index 525ee0d3bdc5a..9a4f9b7352fd4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -58,7 +58,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite private def createSchedulerBackend( taskScheduler: TaskSchedulerImpl, - driver: SchedulerDriver): CoarseMesosSchedulerBackend = { + driver: SchedulerDriver, sc: SparkContext): CoarseMesosSchedulerBackend = { val securityManager = mock[SecurityManager] val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master", securityManager) { override protected def createSchedulerDriver( @@ -77,6 +77,14 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite backend } + private def createSchedulerBackendForGivenSparkConf(sc : SparkContext) = { + val driver = mock[SchedulerDriver] + when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.sc).thenReturn(sc) + createSchedulerBackend(taskScheduler, driver, sc) + } + var sparkConf: SparkConf = _ before { @@ -84,9 +92,10 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite .setMaster("local[*]") .setAppName("test-mesos-dynamic-alloc") .setSparkHome("/path") + .set("spark.cores.max", "10") sc = new SparkContext(sparkConf) - } + } test("mesos supports killing and limiting executors") { val driver = mock[SchedulerDriver] @@ -97,7 +106,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite sparkConf.set("spark.driver.host", "driverHost") sparkConf.set("spark.driver.port", "1234") - val backend = createSchedulerBackend(taskScheduler, driver) + val backend = createSchedulerBackend(taskScheduler, driver, sc) val minMem = backend.calculateTotalMemory(sc) val minCpu = 4 @@ -145,7 +154,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite val taskScheduler = mock[TaskSchedulerImpl] when(taskScheduler.sc).thenReturn(sc) - val backend = createSchedulerBackend(taskScheduler, driver) + val backend = createSchedulerBackend(taskScheduler, driver, sc) val minMem = backend.calculateTotalMemory(sc) + 1024 val minCpu = 4 @@ -153,7 +162,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite val offer1 = createOffer("o1", "s1", minMem, minCpu) mesosOffers.add(offer1) - val offer2 = createOffer("o2", "s1", minMem, 1); + val offer2 = createOffer("o2", "s1", minMem, 1) backend.resourceOffers(driver, mesosOffers) @@ -184,4 +193,47 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite verify(driver, times(1)).reviveOffers() } + + test("isOfferSatisfiesRequirements return true when there is a valid offer") { + val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) + + assert(schedulerBackend.isOfferSatisfiesRequirements("Slave1", 10000, 5, sc)) + } + + + test("isOfferSatisfiesRequirements return false when memory in offer is less" + + " than required memory") { + val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) + + assert(schedulerBackend.isOfferSatisfiesRequirements("Slave1", 1, 5, sc) === false) + } + + test("isOfferSatisfiesRequirements return false when cpu in offer is less than required cpu") { + val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) + + assert(schedulerBackend.isOfferSatisfiesRequirements("Slave1", 10000, 0, sc) === false) + } + + test("isOfferSatisfiesRequirements return false when offer is from slave already running" + + " an executor") { + val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) + schedulerBackend.slaveIdsWithExecutors += "Slave2" + + assert(schedulerBackend.isOfferSatisfiesRequirements("Slave2", 10000, 5, sc) === false) + } + + test("isOfferSatisfiesRequirements return false when task is failed more than " + + "MAX_SLAVE_FAILURES times on the given slave") { + val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) + schedulerBackend.failuresBySlaveId("Slave3") = 2 + + assert(schedulerBackend.isOfferSatisfiesRequirements("Slave3", 10000, 5, sc) === false) + } + + test("isOfferSatisfiesRequirements return false when max core is already acquired") { + val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) + schedulerBackend.totalCoresAcquired = 10 + + assert(schedulerBackend.isOfferSatisfiesRequirements("Slave1", 10000, 5, sc) === false) + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala index c4dc560031207..9e013a19c6b18 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala @@ -28,7 +28,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.mesos.Protos.Value.Scalar import org.apache.mesos.Protos._ -import org.apache.mesos.SchedulerDriver +import org.apache.mesos.{Protos, SchedulerDriver} import org.mockito.Matchers._ import org.mockito.Mockito._ import org.mockito.{ArgumentCaptor, Matchers} @@ -344,4 +344,66 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("prod") }) } + + private def createSchedulerBackendForGivenSparkConf(sc : SparkContext) : MesosSchedulerBackend = { + val conf = new SparkConf + + val listenerBus = mock[LiveListenerBus] + listenerBus.post( + SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + + when(sc.getSparkHome()).thenReturn(Option("/spark-home")) + + when(sc.conf).thenReturn(conf) + when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) + when(sc.executorMemory).thenReturn(100) + when(sc.listenerBus).thenReturn(listenerBus) + + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.CPUS_PER_TASK).thenReturn(2) + + new MesosSchedulerBackend(taskScheduler, sc, "master") + } + + test("isOfferSatisfiesRequirements return true when there offer meet cpu and" + + " memory requirement") { + val sc = mock[SparkContext] + val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) + + assert(schedulerBackend.isOfferSatisfiesRequirements( 5, 10000, "Slave1", sc)) + } + + test("isOfferSatisfiesRequirements return false when memory in offer is less " + + "than required memory") { + val sc = mock[SparkContext] + val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) + + assert(schedulerBackend.isOfferSatisfiesRequirements(5, 10, "Slave1", sc) === false) + } + + test("isOfferSatisfiesRequirements return false when cpu in offer is less than required cpu") { + val sc = mock[SparkContext] + val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) + + assert(schedulerBackend.isOfferSatisfiesRequirements(0, 10000, "Slave1", sc) === false) + } + + test("isOfferSatisfiesRequirements return true when offer is from slave already running and" + + " cpu is less than minimum cpu per task an executor") { + val sc = mock[SparkContext] + val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) + schedulerBackend.slaveIdToExecutorInfo("Slave2") = null + + assert(schedulerBackend.isOfferSatisfiesRequirements(2, 10000, "Slave2", sc) === true) + } + + test("isOfferSatisfiesRequirements return false when offer is from slave already running but" + + " cpu is less than minimum cpu per task an executor") { + val sc = mock[SparkContext] + val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) + schedulerBackend.slaveIdToExecutorInfo("Slave2") = null + + assert(schedulerBackend.isOfferSatisfiesRequirements(1, 10000, "Slave2", sc) === false) + } + }