diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 7d08eae0b4871..997aa56199f02 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -27,14 +27,13 @@ import scala.collection.mutable.{HashMap, HashSet} import com.google.common.collect.HashBiMap import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver} - -import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException, TaskState} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.RpcAddress import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils +import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException, TaskState} /** * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds @@ -60,6 +59,11 @@ private[spark] class CoarseMesosSchedulerBackend( // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt + // Minimum MB memory per core. Will reduce cores to meet this + val minMBPerCore = conf.getDouble("spark.cores.mb.min", 0.0) + // Maximum MB memory per core. Will truncate memory request to this limit + val maxMBPerCore = conf.getDouble("spark.cores.mb.max", Double.MaxValue) + // If shuffle service is enabled, the Spark driver will register with the shuffle service. // This is for cleaning up shuffle files reliably. private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) @@ -254,14 +258,18 @@ private[spark] class CoarseMesosSchedulerBackend( val cpus = getResource(offer.getResourcesList, "cpus").toInt val id = offer.getId.getValue if (meetsConstraints) { + // Returns NONE if offer isn't sufficient + val maybeUsableResources = calculateDesiredResources( + sc, math.min(cpus, maxCores - totalCoresAcquired), mem.toInt + ) if (taskIdToSlaveId.size < executorLimit && totalCoresAcquired < maxCores && - mem >= calculateTotalMemory(sc) && - cpus >= 1 && + maybeUsableResources.isDefined && failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && !slaveIdsWithExecutors.contains(slaveId)) { // Launch an executor on the slave - val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) + val cpusToUse = maybeUsableResources.get._1 + val memToUse = maybeUsableResources.get._2 totalCoresAcquired += cpusToUse val taskId = newMesosTaskId() taskIdToSlaveId.put(taskId, slaveId) @@ -271,7 +279,7 @@ private[spark] class CoarseMesosSchedulerBackend( val (remainingResources, cpuResourcesToUse) = partitionResources(offer.getResourcesList, "cpus", cpusToUse) val (_, memResourcesToUse) = - partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc)) + partitionResources(remainingResources.asJava, "mem", memToUse) val taskBuilder = MesosTaskInfo.newBuilder() .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) .setSlaveId(offer.getSlaveId) @@ -399,6 +407,55 @@ private[spark] class CoarseMesosSchedulerBackend( s"$slaveId/$taskId" } + /** + * Try and fit the resources to the constraints + * @param sc Spark context + * @param availableCpus The available CPUs + * @param availableMem The available Memory + * @return Tuple of CPU (integer cores) and Memory (integer MB) desired + */ + def calculateDesiredResources(sc: SparkContext, availableCpus: Int, availableMem: Int): + Option[(Int, Int)] = + { + if (availableCpus <= 0) { + // Don't want divide by 0 error + logTrace(s"No CPUs") + return None + } + val desiredMemory = super.calculateTotalMemory(sc) + val minMB = math.max(desiredMemory, minMBPerCore).toInt + val mbCoreRatio = availableMem.toDouble / availableCpus.toDouble + + if (availableMem < minMB) { + logTrace(s"Offer of $availableMem has insufficient memory. Needs at least $minMB") + None + } else if (mbCoreRatio > maxMBPerCore) { + // Too high? lower memory + val desiredCPUMem = (maxMBPerCore * availableCpus).toInt + if(desiredCPUMem >= minMB) { + logTrace(s"Too much memory available. Truncating to $desiredCPUMem") + Option((availableCpus, desiredCPUMem)) + } else { + logTrace(s"Desired memory from number of cores $availableCpus " + + s"insufficient to meet minimum MB required $minMB") + None + } + } else if (mbCoreRatio < minMBPerCore.toDouble) { + // Too low? lower CPU + val desiredCpus = (availableMem.toDouble / minMBPerCore.toDouble).toInt + if(desiredCpus <= 0) { + logTrace(s"Weird double rounding problem. memory/cpu ratio too close to threshold") + None + } else { + logTrace(s"Not enough memory per cpu, lowering cpu count to $desiredCpus") + Option((desiredCpus, availableMem)) + } + } else { + logTrace(s"Cpu and memory just right") + Option((availableCpus, availableMem)) + } + } + override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = { logInfo(s"Mesos slave lost: ${slaveId.getValue}") executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue) diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala index 525ee0d3bdc5a..5e0e42555ec65 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -23,14 +23,13 @@ import java.util.Collections import org.apache.mesos.Protos.Value.Scalar import org.apache.mesos.Protos._ import org.apache.mesos.{Protos, Scheduler, SchedulerDriver} +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite} +import org.mockito.Matchers import org.mockito.Matchers._ import org.mockito.Mockito._ -import org.mockito.Matchers -import org.scalatest.mock.MockitoSugar import org.scalatest.BeforeAndAfter - -import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SecurityManager, SparkFunSuite} +import org.scalatest.mock.MockitoSugar class CoarseMesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext @@ -77,6 +76,18 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite backend } + private def buildLimitedBackend() = { + sc.conf.set("spark.cores.mb.min", "10000") + sc.conf.set("spark.cores.mb.max", "100000") + val driver = mock[SchedulerDriver] + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.sc).thenReturn(sc) + val backend = createSchedulerBackend(taskScheduler, driver) + assert(backend.maxMBPerCore == 100000.0) + assert(backend.minMBPerCore == 10000.0) + backend + } + var sparkConf: SparkConf = _ before { @@ -88,6 +99,98 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite sc = new SparkContext(sparkConf) } + test("coarse mesos backend correctly keep sufficient offer") { + assert( + buildLimitedBackend().calculateDesiredResources(sc, 1, 80000) + .count(x => x._1 == 1 && x._2 == 80000) == 1 + ) + } + test("coarse mesos backend correctly ignores insufficient offer") { + val backend = buildLimitedBackend() + assert(backend.calculateDesiredResources(sc, 1, 800).isEmpty) + } + + test("coarse mesos backend correctly truncates CPU when too high") { + assert( + buildLimitedBackend().calculateDesiredResources(sc, 10, 80000) + .count(x => x._1 == 8 &&x._2 == 80000) == 1 + ) + } + + test("coarse mesos backend correctly truncates MEM when too high") { + assert( + buildLimitedBackend().calculateDesiredResources(sc, 1, 800000) + .count(x => x._1 == 1 && x._2 == 100000) == 1 + ) + } + + test("coarse mesos backend correctly handles zero cpu") { + assert(buildLimitedBackend().calculateDesiredResources(sc, 0, 800000).isEmpty) + } + + test("coarse mesos backend correctly handles zero mem") { + assert(buildLimitedBackend().calculateDesiredResources(sc, 1, 0).isEmpty) + } + + test("coarse mesos backend correctly handles zero everything") { + assert(buildLimitedBackend().calculateDesiredResources(sc, 0, 0).isEmpty) + } + + test("coarse mesos backend correctly handles default mem limit") { + sc.conf.set("spark.cores.mb.min", "1") + val driver = mock[SchedulerDriver] + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.sc).thenReturn(sc) + val backend = createSchedulerBackend(taskScheduler, driver) + assert(backend.maxMBPerCore == Double.MaxValue) + assert(backend.minMBPerCore == 1.0) + val minimumMem = backend.calculateTotalMemory(sc) + val minimumOffer = backend.calculateDesiredResources(sc, 1, minimumMem) + assert(minimumOffer.isDefined) + assert(minimumOffer.get._1 == 1) + assert(minimumOffer.get._2 == minimumMem) + assert( + backend.calculateDesiredResources(sc, 10, minimumMem) + .count(x => x._1 == 10 && x._2 == minimumMem) == 1 + ) + assert( + backend.calculateDesiredResources(sc, minimumMem, minimumMem) + .count(x => x._1 == minimumMem && x._2 == minimumMem) == 1 + ) + assert( + backend.calculateDesiredResources(sc, minimumMem + 1, minimumMem) + .count(x => x._1 == minimumMem && x._2 == minimumMem) == 1 + ) + assert(backend.calculateDesiredResources(sc, 1, minimumMem - 1).isEmpty) + } + + test("coarse mesos backend correctly handles unset mem limit") { + val driver = mock[SchedulerDriver] + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.sc).thenReturn(sc) + val backend = createSchedulerBackend(taskScheduler, driver) + assert(backend.maxMBPerCore == Double.MaxValue) + assert(backend.minMBPerCore == 0.0) + assert(backend.calculateDesiredResources(sc, 1, 1).isEmpty) + assert( + backend.calculateDesiredResources(sc, 1, 10000) + .count(x => x._1 == 1 && x._2 == 10000) == 1 + ) + assert( + backend.calculateDesiredResources(sc, 1, backend.calculateTotalMemory(sc)) + .count(x => x._1 == 1 && x._2 == backend.calculateTotalMemory(sc)) == 1 + ) + assert( + backend.calculateDesiredResources(sc, 1, Integer.MAX_VALUE) + .count(x => x._1 == 1 && x._2 == Integer.MAX_VALUE) == 1 + ) + assert( + backend.calculateDesiredResources(sc, Integer.MAX_VALUE, 10000) + .count(x => x._1 == Integer.MAX_VALUE && x._2 == 10000) == 1 + ) + } + + test("mesos supports killing and limiting executors") { val driver = mock[SchedulerDriver] when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)