@@ -541,6 +541,7 @@ private[master] class Master(
541541
542542 /**
543543 * Schedule executors to be launched on the workers.
544+ * Returns an array containing number of cores assigned to each worker.
544545 *
545546 * There are two modes of launching executors. The first attempts to spread out an application's
546547 * executors on as many workers as possible, while the second does the opposite (i.e. launch them
@@ -551,59 +552,100 @@ private[master] class Master(
551552 * multiple executors from the same application may be launched on the same worker if the worker
552553 * has enough cores and memory. Otherwise, each executor grabs all the cores available on the
553554 * worker by default, in which case only one executor may be launched on each worker.
555+ *
556+ * It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
557+ * at a time). Consider the following example: cluster has 4 workers with 16 cores each.
558+ * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
559+ * allocated at a time, 12 cores from each worker would be assigned to each executor.
560+ * Since 12 < 16, no executors would launch [SPARK-8881].
554561 */
555- private def startExecutorsOnWorkers (): Unit = {
556- // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
557- // in the queue, then the second app, etc.
558- if (spreadOutApps) {
559- // Try to spread out each app among all the workers, until it has all its cores
560- for (app <- waitingApps if app.coresLeft > 0 ) {
561- val usableWorkers = workers.toArray.filter(_.state == WorkerState .ALIVE )
562- .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
563- worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1 ))
564- .sortBy(_.coresFree).reverse
565- val numUsable = usableWorkers.length
566- val assigned = new Array [Int ](numUsable) // Number of cores to give on each node
567- var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
568- var pos = 0
569- while (toAssign > 0 ) {
570- if (usableWorkers(pos).coresFree - assigned(pos) > 0 ) {
571- toAssign -= 1
572- assigned(pos) += 1
562+ private def scheduleExecutorsOnWorkers (
563+ app : ApplicationInfo ,
564+ usableWorkers : Array [WorkerInfo ],
565+ spreadOutApps : Boolean ): Array [Int ] = {
566+ // If the number of cores per executor is not specified, then we can just schedule
567+ // 1 core at a time since we expect a single executor to be launched on each worker
568+ val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1 )
569+ val memoryPerExecutor = app.desc.memoryPerExecutorMB
570+ val numUsable = usableWorkers.length
571+ val assignedCores = new Array [Int ](numUsable) // Number of cores to give to each worker
572+ val assignedMemory = new Array [Int ](numUsable) // Amount of memory to give to each worker
573+ var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
574+ var freeWorkers = (0 until numUsable).toIndexedSeq
575+
576+ def canLaunchExecutor (pos : Int ): Boolean = {
577+ usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
578+ usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor
579+ }
580+
581+ while (coresToAssign >= coresPerExecutor && freeWorkers.nonEmpty) {
582+ freeWorkers = freeWorkers.filter(canLaunchExecutor)
583+ freeWorkers.foreach { pos =>
584+ var keepScheduling = true
585+ while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= coresPerExecutor) {
586+ coresToAssign -= coresPerExecutor
587+ assignedCores(pos) += coresPerExecutor
588+ // If cores per executor is not set, we are assigning 1 core at a time
589+ // without actually meaning to launch 1 executor for each core assigned
590+ if (app.desc.coresPerExecutor.isDefined) {
591+ assignedMemory(pos) += memoryPerExecutor
592+ }
593+
594+ // Spreading out an application means spreading out its executors across as
595+ // many workers as possible. If we are not spreading out, then we should keep
596+ // scheduling executors on this worker until we use all of its resources.
597+ // Otherwise, just move on to the next worker.
598+ if (spreadOutApps) {
599+ keepScheduling = false
573600 }
574- pos = (pos + 1 ) % numUsable
575- }
576- // Now that we've decided how many cores to give on each node, let's actually give them
577- for (pos <- 0 until numUsable if assigned(pos) > 0 ) {
578- allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos))
579601 }
580602 }
581- } else {
582- // Pack each app into as few workers as possible until we've assigned all its cores
583- for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState .ALIVE ) {
584- for (app <- waitingApps if app.coresLeft > 0 ) {
585- allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
586- }
603+ }
604+ assignedCores
605+ }
606+
607+ /**
608+ * Schedule and launch executors on workers
609+ */
610+ private def startExecutorsOnWorkers (): Unit = {
611+ // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
612+ // in the queue, then the second app, etc.
613+ for (app <- waitingApps if app.coresLeft > 0 ) {
614+ val coresPerExecutor : Option [Int ] = app.desc.coresPerExecutor
615+ // Filter out workers that don't have enough resources to launch an executor
616+ val usableWorkers = workers.toArray.filter(_.state == WorkerState .ALIVE )
617+ .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
618+ worker.coresFree >= coresPerExecutor.getOrElse(1 ))
619+ .sortBy(_.coresFree).reverse
620+ val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
621+
622+ // Now that we've decided how many cores to allocate on each worker, let's allocate them
623+ for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0 ) {
624+ allocateWorkerResourceToExecutors(
625+ app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
587626 }
588627 }
589628 }
590629
591630 /**
592631 * Allocate a worker's resources to one or more executors.
593632 * @param app the info of the application which the executors belong to
594- * @param coresToAllocate cores on this worker to be allocated to this application
633+ * @param assignedCores number of cores on this worker for this application
634+ * @param coresPerExecutor number of cores per executor
595635 * @param worker the worker info
596636 */
597637 private def allocateWorkerResourceToExecutors (
598638 app : ApplicationInfo ,
599- coresToAllocate : Int ,
639+ assignedCores : Int ,
640+ coresPerExecutor : Option [Int ],
600641 worker : WorkerInfo ): Unit = {
601- val memoryPerExecutor = app.desc.memoryPerExecutorMB
602- val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate)
603- var coresLeft = coresToAllocate
604- while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) {
605- val exec = app.addExecutor(worker, coresPerExecutor)
606- coresLeft -= coresPerExecutor
642+ // If the number of cores per executor is specified, we divide the cores assigned
643+ // to this worker evenly among the executors with no remainder.
644+ // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
645+ val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1 )
646+ val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
647+ for (i <- 1 to numExecutors) {
648+ val exec = app.addExecutor(worker, coresToAssign)
607649 launchExecutor(worker, exec)
608650 app.state = ApplicationState .RUNNING
609651 }
0 commit comments