diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index ae99432f5ce86..281725bb7327e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -28,7 +28,8 @@ private[spark] class ApplicationDescription( val eventLogDir: Option[URI] = None, // short name of compression codec used when writing event logs, if any (e.g. lzf) val eventLogCodec: Option[String] = None, - val coresPerExecutor: Option[Int] = None) + val coresPerExecutor: Option[Int] = None, + val coresPerTask: Int = 1) extends Serializable { val user = System.getProperty("user.name", "") diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index c5a6b1beac9be..dd637b2044317 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -540,20 +540,27 @@ private[master] class Master( // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. if (spreadOutApps) { - // Try to spread out each app among all the workers, until it has all its cores - for (app <- waitingApps if app.coresLeft > 0) { + // Try to spread out each app among all the nodes, until it has all its cores + for (app <- waitingApps if app.coresLeft >= app.desc.coresPerTask) { + val coreNumPerTask = app.desc.coresPerTask val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && - worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1)) + worker.coresFree >= math.max(app.desc.coresPerExecutor.getOrElse(1), + app.desc.coresPerTask)) .sortBy(_.coresFree).reverse val numUsable = usableWorkers.length val assigned = new Array[Int](numUsable) // Number of cores to give on each node var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) var pos = 0 - while (toAssign > 0) { - if (usableWorkers(pos).coresFree - assigned(pos) > 0) { - toAssign -= 1 - assigned(pos) += 1 + val availableWorkerSet = new HashSet[Int] + // Take into account the number of cores the application uses per task + // to avoid starving executors or wasting cluster resources (SPARK-5337) + while (toAssign >= coreNumPerTask && availableWorkerSet.size < usableWorkers.length) { + if (usableWorkers(pos).coresFree - assigned(pos) >= coreNumPerTask) { + toAssign -= coreNumPerTask + assigned(pos) += coreNumPerTask + } else { + availableWorkerSet += pos } pos = (pos + 1) % numUsable } @@ -565,7 +572,7 @@ private[master] class Master( } else { // Pack each app into as few workers as possible until we've assigned all its cores for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { - for (app <- waitingApps if app.coresLeft > 0) { + for (app <- waitingApps if app.coresLeft >= app.desc.coresPerTask) { allocateWorkerResourceToExecutors(app, app.coresLeft, worker) } } @@ -582,9 +589,13 @@ private[master] class Master( app: ApplicationInfo, coresToAllocate: Int, worker: WorkerInfo): Unit = { + val cpuPerTask = app.desc.coresPerTask val memoryPerExecutor = app.desc.memoryPerExecutorMB - val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate) - var coresLeft = coresToAllocate + var coresLeft = coresToAllocate - coresToAllocate % cpuPerTask + val maxCoresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresLeft) + val coresPerExecutor = math.max( + maxCoresPerExecutor - maxCoresPerExecutor % cpuPerTask, + cpuPerTask) while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) { val exec = app.addExecutor(worker, coresPerExecutor) coresLeft -= coresPerExecutor diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 13a52d836f32f..3980fed4a7600 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -70,6 +70,18 @@ private[spark] class TaskSchedulerImpl( // CPUs to request per task val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) + if (CPUS_PER_TASK < 1) { + throw new IllegalArgumentException( + s"spark.task.cpus must be greater than 0! (was $CPUS_PER_TASK)") + } + + val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt + + if (maxCores < CPUS_PER_TASK) { + throw new IllegalArgumentException( + s"spark.task.cpus ($CPUS_PER_TASK) should not be larger than spark.cores.max ($maxCores)") + } + // TaskSetManagers are not thread safe, so any access to one should be synchronized // on this class. val activeTaskSets = new HashMap[String, TaskSetManager] diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index ccf1dc5af6120..525b09d2cdb87 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -42,9 +42,10 @@ private[spark] class SparkDeploySchedulerBackend( private val registrationBarrier = new Semaphore(0) - private val maxCores = conf.getOption("spark.cores.max").map(_.toInt) - private val totalExpectedCores = maxCores.getOrElse(0) - + val maxCores = conf.getOption("spark.cores.max").map(_.toInt) + val totalExpectedCores = maxCores.getOrElse(0) + val coresPerTask = scheduler.CPUS_PER_TASK + override def start() { super.start() @@ -83,8 +84,8 @@ private[spark] class SparkDeploySchedulerBackend( args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) - val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, - command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor) + val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, + appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, coresPerTask) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() waitForRegistration() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index b037a4966ced0..825f7b88282a1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -62,6 +62,8 @@ private[spark] class CoarseMesosSchedulerBackend( // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt + + val coresPerTask = scheduler.CPUS_PER_TASK // Cores we have acquired with each Mesos task ID val coresByTaskId = new HashMap[Int, Int] @@ -213,13 +215,14 @@ private[spark] class CoarseMesosSchedulerBackend( val slaveId = offer.getSlaveId.toString val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus").toInt + val demandCores = math.min(cpus, maxCores - totalCoresAcquired) + val cpusToUse = demandCores - demandCores % coresPerTask if (totalCoresAcquired < maxCores && + cpusToUse >= coresPerTask && mem >= MemoryUtils.calculateTotalMemory(sc) && - cpus >= 1 && failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && !slaveIdsWithExecutors.contains(slaveId)) { // Launch an executor on the slave - val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) totalCoresAcquired += cpusToUse val taskId = newMesosTaskId() taskIdToSlaveId(taskId) = slaveId