From 6b6fcebb0e057fdcbec82e21c514c3993d8c39ed Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Thu, 16 Jun 2016 11:12:46 -0700 Subject: [PATCH 1/2] Refactor MesosCoarseGrainedSchedulerBackend offer consideration --- .../MesosCoarseGrainedSchedulerBackend.scala | 130 +++++++++++------- 1 file changed, 84 insertions(+), 46 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index e88e4ad4750d..63f296f234f0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -382,59 +382,97 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( for (offer <- offers) { val slaveId = offer.getSlaveId.getValue val offerId = offer.getId.getValue - val resources = remainingResources(offerId) - - if (canLaunchTask(slaveId, resources)) { - // Create a task - launchTasks = true - val taskId = newMesosTaskId() - val offerCPUs = getResource(resources, "cpus").toInt - - val taskCPUs = executorCores(offerCPUs) - val taskMemory = executorMemory(sc) - - slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId) - - val (afterCPUResources, cpuResourcesToUse) = - partitionResources(resources, "cpus", taskCPUs) - val (resourcesLeft, memResourcesToUse) = - partitionResources(afterCPUResources.asJava, "mem", taskMemory) - - val taskBuilder = MesosTaskInfo.newBuilder() - .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) - .setSlaveId(offer.getSlaveId) - .setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId)) - .setName("Task " + taskId) - .addAllResources(cpuResourcesToUse.asJava) - .addAllResources(memResourcesToUse.asJava) - - sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil - .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder) + val availableResources = remainingResources(offerId) + val offerMem = getResource(availableResources, "mem") + val offerCpu = getResource(availableResources, "cpus") + + // Catch offer limits + calculateUsableResources( + sc, + offerMem.toInt, + offerCpu.toInt + ).flatMap( + { + // Catch "global" limits + case (taskCPUs: Int, taskMemory: Int) => + if (numExecutors() >= executorLimit) { + logTrace(s"${numExecutors()} exceeds limit of $executorLimit") + None + } else if ( + slaves.get(slaveId).map(_.taskFailures).getOrElse(0) >= MAX_SLAVE_FAILURES + ) { + logTrace(s"Slave $slaveId exceeded limit of $MAX_SLAVE_FAILURES failures") + None + } else { + Some((taskCPUs, taskMemory)) + } } - - tasks(offer.getId) ::= taskBuilder.build() - remainingResources(offerId) = resourcesLeft.asJava - totalCoresAcquired += taskCPUs - coresByTaskId(taskId) = taskCPUs + ) match { + case Some((taskCPUs: Int, taskMemory: Int)) => + // Create a task + launchTasks = true + val taskId = newMesosTaskId() + + slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId) + + val (afterCPUResources, cpuResourcesToUse) = + partitionResources(availableResources, "cpus", taskCPUs) + val (resourcesLeft, memResourcesToUse) = + partitionResources(afterCPUResources.asJava, "mem", taskMemory) + + val taskBuilder = MesosTaskInfo.newBuilder() + .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) + .setSlaveId(offer.getSlaveId) + .setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId)) + .setName("Task " + taskId) + .addAllResources(cpuResourcesToUse.asJava) + .addAllResources(memResourcesToUse.asJava) + + sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => + MesosSchedulerBackendUtil + .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder) + } + + tasks(offer.getId) ::= taskBuilder.build() + remainingResources(offerId) = resourcesLeft.asJava + totalCoresAcquired += taskCPUs + coresByTaskId(taskId) = taskCPUs + case None => logDebug(s"Insufficient offer CPU: $offerCpu MEM: $offerMem") } } } tasks.toMap } - private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = { - val offerMem = getResource(resources, "mem") - val offerCPUs = getResource(resources, "cpus").toInt - val cpus = executorCores(offerCPUs) - val mem = executorMemory(sc) - - cpus > 0 && - cpus <= offerCPUs && - cpus + totalCoresAcquired <= maxCores && - mem <= offerMem && - numExecutors() < executorLimit && - slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES + /** + * Try and fit the resources to the constraints. Return None if it does not match + * + * @param sc Spark context + * @param availableCpus The available CPUs + * @param availableMem The available Memory + * @return Tuple of CPU (integer cores) and Memory (integer MB) desired + */ + private[mesos] + def calculateUsableResources(sc: SparkContext, availableCpus: Int, availableMem: Int): + Option[(Int, Int)] = { + val desiredMemory = executorMemory(sc) + val desiredCpu = executorCores(availableCpus) + if (desiredCpu < 1) { + logTrace(s"Executor cores too low at $desiredCpu") + None + } else if (desiredMemory < availableMem) { + logTrace(s"Offer memory $availableMem is less than needed memory $desiredMemory") + None + } else if (desiredCpu < availableCpus) { + logTrace(s"Offer cpu $availableCpus is les than needed cpu $desiredCpu") + None + } else if (desiredCpu + totalCoresAcquired > maxCores) { + logTrace(s"Adding $desiredCpu more cpus to $totalCoresAcquired" + + s" would go over $maxCores limit") + None + } else { + Some((desiredCpu, desiredMemory)) + } } private def executorCores(offerCPUs: Int): Int = { From 9e0aedf12816c317db0a65e21adc921258608a4b Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Thu, 16 Jun 2016 17:11:01 -0700 Subject: [PATCH 2/2] Don't switch around variables incorrectly --- .../mesos/MesosCoarseGrainedSchedulerBackend.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 63f296f234f0..6ce89a465d89 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -389,8 +389,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( // Catch offer limits calculateUsableResources( sc, - offerMem.toInt, - offerCpu.toInt + offerCpu.toInt, + offerMem.toInt ).flatMap( { // Catch "global" limits @@ -460,10 +460,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( if (desiredCpu < 1) { logTrace(s"Executor cores too low at $desiredCpu") None - } else if (desiredMemory < availableMem) { + } else if (desiredMemory > availableMem) { logTrace(s"Offer memory $availableMem is less than needed memory $desiredMemory") None - } else if (desiredCpu < availableCpus) { + } else if (desiredCpu > availableCpus) { logTrace(s"Offer cpu $availableCpus is les than needed cpu $desiredCpu") None } else if (desiredCpu + totalCoresAcquired > maxCores) {