@@ -533,11 +533,14 @@ private[master] class Master(
533533 }
534534
535535 /**
536- * The resource allocator spread out each app among all the workers until it has all its cores in
537- * spreadOut mode otherwise packs each app into as few workers as possible until it has assigned
538- * all its cores. User can define spark.deploy.maxCoresPerExecutor per application to
539- * limit the maximum number of cores to allocate to each executor on each worker; if the parameter
540- * is not defined, then only one executor will be launched on a worker.
536+ * Schedule executors to be launched on the workers.There are two modes of launching executors.
537+ * The first attempts to spread out an application's executors on as many workers as possible,
538+ * while the second does the opposite (i.e. launch them on as few workers as possible). The former
539+ * is usually better for data locality purposes and is the default. The number of cores assigned
540+ * to each executor is configurable. When this is explicitly set, multiple executors from the same
541+ * application may be launched on the same worker if the worker has enough cores and memory.
542+ * Otherwise, each executor grabs all the cores available on the worker by default, in which case
543+ * only one executor may be launched on each worker.
541544 */
542545 private def startExecutorsOnWorkers (): Unit = {
543546 // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
@@ -546,7 +549,9 @@ private[master] class Master(
546549 // Try to spread out each app among all the workers, until it has all its cores
547550 for (app <- waitingApps if app.coresLeft > 0 ) {
548551 val usableWorkers = workers.toArray.filter(_.state == WorkerState .ALIVE )
549- .filter(canUse(app, _)).sortBy(_.coresFree).reverse
552+ .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
553+ worker.coresFree > 0 )
554+ .sortBy(_.coresFree).reverse
550555 val numUsable = usableWorkers.length
551556 val assigned = new Array [Int ](numUsable) // Number of cores to give on each node
552557 var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
@@ -566,15 +571,16 @@ private[master] class Master(
566571 } else {
567572 // Pack each app into as few workers as possible until we've assigned all its cores
568573 for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState .ALIVE ) {
569- for (app <- waitingApps if app.coresLeft > 0 ) {
570- allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
574+ for (app <- waitingApps if app.coresLeft > 0 &&
575+ worker.memoryFree >= app.desc.memoryPerExecutorMB) {
576+ allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
571577 }
572578 }
573579 }
574580 }
575581
576582 /**
577- * allocate resources in a certain worker to one or more executors
583+ * Allocate a worker's resources to one or more executors.
578584 * @param app the info of the application which the executors belong to
579585 * @param coresToAllocate cores on this worker to be allocated to this application
580586 * @param worker the worker info
@@ -583,20 +589,24 @@ private[master] class Master(
583589 app : ApplicationInfo ,
584590 coresToAllocate : Int ,
585591 worker : WorkerInfo ): Unit = {
586- if (canUse(app, worker)) {
587- val memoryPerExecutor = app.desc.memoryPerExecutorMB
588- val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate)
589- var coresLeft = coresToAllocate
590- while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) {
591- val exec = app.addExecutor(worker, coresPerExecutor)
592- coresLeft -= coresPerExecutor
593- launchExecutor(worker, exec)
594- app.state = ApplicationState .RUNNING
595- }
592+ val memoryPerExecutor = app.desc.memoryPerExecutorMB
593+ val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate)
594+ var coresLeft = coresToAllocate
595+ while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) {
596+ val exec = app.addExecutor(worker, coresPerExecutor)
597+ coresLeft -= coresPerExecutor
598+ launchExecutor(worker, exec)
599+ app.state = ApplicationState .RUNNING
596600 }
597601 }
598602
599- private def startDriversOnWorkers (): Unit = {
603+ /**
604+ * Schedule the currently available resources among waiting apps. This method will be called
605+ * every time a new app joins or resource availability changes.
606+ */
607+ private def schedule (): Unit = {
608+ if (state != RecoveryState .ALIVE ) { return }
609+ // start in-cluster drivers, they take strict precedence over applications
600610 val shuffledWorkers = Random .shuffle(workers) // Randomization helps balance drivers
601611 for (worker <- shuffledWorkers if worker.state == WorkerState .ALIVE ) {
602612 for (driver <- waitingDrivers) {
@@ -606,21 +616,11 @@ private[master] class Master(
606616 }
607617 }
608618 }
609- }
610-
611- /**
612- * Schedule the currently available resources among waiting apps. This method will be called
613- * every time a new app joins or resource availability changes.
614- */
615- private def schedule (): Unit = {
616- if (state != RecoveryState .ALIVE ) { return }
617- // start in-cluster drivers, they take strict precedence over applications
618- startDriversOnWorkers()
619619 // start executors
620620 startExecutorsOnWorkers()
621621 }
622622
623- def launchExecutor (worker : WorkerInfo , exec : ExecutorDesc ): Unit = {
623+ def launchExecutor (worker : WorkerInfo , exec : ExecutorDesc ): Unit = {
624624 logInfo(" Launching executor " + exec.fullId + " on worker " + worker.id)
625625 worker.addExecutor(exec)
626626 worker.actor ! LaunchExecutor (masterUrl,
0 commit comments