From b34ec0ccc750a0f39beee9d8f45c3acb858f79f9 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 4 May 2014 14:13:41 -0400 Subject: [PATCH 01/27] make master support multiple executors per worker --- .../spark/deploy/ApplicationDescription.scala | 7 +- .../apache/spark/deploy/JsonProtocol.scala | 4 +- .../spark/deploy/master/ApplicationInfo.scala | 5 +- .../apache/spark/deploy/master/Master.scala | 136 +++++++++++++----- .../deploy/master/ui/ApplicationPage.scala | 6 +- .../spark/deploy/master/ui/MasterPage.scala | 4 +- .../cluster/SparkDeploySchedulerBackend.scala | 3 + docs/configuration.md | 19 +++ 8 files changed, 136 insertions(+), 48 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index b7ae9c1fc0a23..fa5d443ba33cb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -22,7 +22,7 @@ import java.net.URI private[spark] class ApplicationDescription( val name: String, val maxCores: Option[Int], - val memoryPerSlave: Int, + val memoryPerExecutorMB: Int, val command: Command, var appUiUrl: String, val eventLogDir: Option[URI] = None, @@ -35,7 +35,7 @@ private[spark] class ApplicationDescription( def copy( name: String = name, maxCores: Option[Int] = maxCores, - memoryPerSlave: Int = memoryPerSlave, + memoryPerSlave: Int = memoryPerExecutorMB, command: Command = command, appUiUrl: String = appUiUrl, eventLogDir: Option[URI] = eventLogDir, @@ -43,5 +43,8 @@ private[spark] class ApplicationDescription( new ApplicationDescription( name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec) + // only valid when spark.executor.multiPerWorker is set to true + var maxCorePerExecutor = maxCores + override def toString: String = "ApplicationDescription(" + name + ")" } diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index dfc5b97e6a6c8..2954f932b4f41 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -46,7 +46,7 @@ private[deploy] object JsonProtocol { ("name" -> obj.desc.name) ~ ("cores" -> obj.desc.maxCores) ~ ("user" -> obj.desc.user) ~ - ("memoryperslave" -> obj.desc.memoryPerSlave) ~ + ("memoryperslave" -> obj.desc.memoryPerExecutorMB) ~ ("submitdate" -> obj.submitDate.toString) ~ ("state" -> obj.state.toString) ~ ("duration" -> obj.duration) @@ -55,7 +55,7 @@ private[deploy] object JsonProtocol { def writeApplicationDescription(obj: ApplicationDescription): JObject = { ("name" -> obj.name) ~ ("cores" -> obj.maxCores) ~ - ("memoryperslave" -> obj.memoryPerSlave) ~ + ("memoryperslave" -> obj.memoryPerExecutorMB) ~ ("user" -> obj.user) ~ ("command" -> obj.command.toString) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index bc5b293379f2b..3439eae086b34 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -75,9 +75,8 @@ private[deploy] class ApplicationInfo( } } - private[master] def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): - ExecutorDesc = { - val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave) + def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorDesc = { + val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerExecutorMB) executors(exec.id) = exec coresGranted += cores exec diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 9a5d5877da86d..5b35a184ca96b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -22,7 +22,7 @@ import java.net.URLEncoder import java.text.SimpleDateFormat import java.util.Date -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.mutable.{ListBuffer, ArrayBuffer, HashMap, HashSet} import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.postfixOps @@ -529,42 +529,13 @@ private[master] class Master( * two executors on the same worker). */ private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = { - worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app) + worker.memoryFree >= app.desc.memoryPerExecutorMB && !worker.hasExecutor(app) && + worker.coresFree > 0 } - /** - * Schedule the currently available resources among waiting apps. This method will be called - * every time a new app joins or resource availability changes. - */ - private def schedule() { - if (state != RecoveryState.ALIVE) { return } - - // First schedule drivers, they take strict precedence over applications - // Randomization helps balance drivers - val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) - val numWorkersAlive = shuffledAliveWorkers.size - var curPos = 0 - - for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers - // We assign workers to each waiting driver in a round-robin fashion. For each driver, we - // start from the last worker that was assigned a driver, and continue onwards until we have - // explored all alive workers. - var launched = false - var numWorkersVisited = 0 - while (numWorkersVisited < numWorkersAlive && !launched) { - val worker = shuffledAliveWorkers(curPos) - numWorkersVisited += 1 - if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { - launchDriver(worker, driver) - waitingDrivers -= driver - launched = true - } - curPos = (curPos + 1) % numWorkersAlive - } - } - - // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app - // in the queue, then the second app, etc. + // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app + // in the queue, then the second app, etc. + private def startSingleExecutorPerWorker() { if (spreadOutApps) { // Try to spread out each app among all the nodes, until it has all its cores for (app <- waitingApps if app.coresLeft > 0) { @@ -607,7 +578,100 @@ private[master] class Master( } } - private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) { + private def startMultiExecutorsPerWorker() { + // allow user to run multiple executors in the same worker + // (within the same worker JVM process) + if (spreadOutApps) { + for (app <- waitingApps if app.coresLeft > 0) { + val memoryPerExecutor = app.desc.memoryPerExecutorMB + val usableWorkers = workers.filter(_.state == WorkerState.ALIVE). + filter(worker => worker.coresFree > 0 && worker.memoryFree >= memoryPerExecutor).toArray. + sortBy(_.memoryFree / memoryPerExecutor).reverse + val maxCoreNumPerExecutor = app.desc.maxCorePerExecutor.get + // get the maximum total number of executors we can assign + var maxLeftExecutorsToAssign = usableWorkers.map(_.memoryFree / memoryPerExecutor).sum + var maxCoresLeft = app.coresLeft + val numUsable = usableWorkers.length + // Number of cores of each executor assigned to each worker + val assigned = Array.fill[ListBuffer[Int]](numUsable)(new ListBuffer[Int]) + val assignedSum = Array.fill[Int](numUsable)(0) + var pos = 0 + val noEnoughMemoryWorkers = new HashSet[Int] + while (maxLeftExecutorsToAssign > 0 && noEnoughMemoryWorkers.size < numUsable) { + if (usableWorkers(pos).coresFree - assignedSum(pos) > 0) { + val coreToAssign = math.min(math.min(usableWorkers(pos).coresFree - assignedSum(pos), + maxCoreNumPerExecutor), maxCoresLeft) + if (usableWorkers(pos).memoryFree >= + app.desc.memoryPerExecutorMB * (assigned(pos).length + 1)) { + maxLeftExecutorsToAssign -= coreToAssign + assigned(pos) += coreToAssign + assignedSum(pos) += coreToAssign + maxCoresLeft -= coreToAssign + } + } + if (usableWorkers(pos).memoryFree < app.desc.memoryPerExecutorMB * + (assigned(pos).length + 1)) { + noEnoughMemoryWorkers += pos + } + pos = (pos + 1) % numUsable + } + + // Now that we've decided how many executors and the core number for each to + // give on each node, let's actually give them + for (pos <- 0 until numUsable) { + for (execIdx <- 0 until assigned(pos).length) { + val exec = app.addExecutor(usableWorkers(pos), assigned(pos)(execIdx)) + launchExecutor(usableWorkers(pos), exec) + app.state = ApplicationState.RUNNING + } + } + } + } else { + // Pack each app into as few nodes as possible until we've assigned all its cores + for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { + for (app <- waitingApps if app.coresLeft > 0 && + app.desc.memoryPerExecutorMB <= worker.memoryFree) { + val coreNumPerExecutor = app.desc.maxCorePerExecutor.get + var coresLeft = math.min(worker.coresFree, app.coresLeft) + while (coresLeft > 0 && app.desc.memoryPerExecutorMB <= worker.memoryFree) { + val coreToAssign = math.min(coreNumPerExecutor, coresLeft) + val exec = app.addExecutor(worker, coreToAssign) + launchExecutor(worker, exec) + coresLeft -= coreToAssign + app.state = ApplicationState.RUNNING + } + } + } + } + } + + + /** + * Schedule the currently available resources among waiting apps. This method will be called + * every time a new app joins or resource availability changes. + */ + def schedule() { + if (state != RecoveryState.ALIVE) { return } + + // First schedule drivers, they take strict precedence over applications + val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers + for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { + for (driver <- waitingDrivers) { + if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { + launchDriver(worker, driver) + waitingDrivers -= driver + } + } + } + + if (!conf.getBoolean("spark.executor.multiPerWorker", false)) { + startSingleExecutorPerWorker() + } else { + startMultiExecutorsPerWorker() + } + } + + def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) worker.actor ! LaunchExecutor(masterUrl, diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 761aa8f7b1ef6..79dae4d26ac7f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -83,18 +83,18 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
  • Name: {app.desc.name}
  • User: {app.desc.user}
  • Cores: - { + { if (app.desc.maxCores.isEmpty) { "Unlimited (%s granted)".format(app.coresGranted) } else { "%s (%s granted, %s left)".format( app.desc.maxCores.get, app.coresGranted, app.coresLeft) } - } + }
  • Executor Memory: - {Utils.megabytesToString(app.desc.memoryPerSlave)} + {Utils.megabytesToString(app.desc.memoryPerExecutorMB)}
  • Submit Date: {app.submitDate}
  • State: {app.state}
  • diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index 45412a35e9a7d..399f07399a0aa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -208,8 +208,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { {app.coresGranted} - - {Utils.megabytesToString(app.desc.memoryPerSlave)} + + {Utils.megabytesToString(app.desc.memoryPerExecutorMB)} {UIUtils.formatDate(app.submitDate)} {app.desc.user} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 7eb3fdc19b5b8..2f66a57f1ad93 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -85,6 +85,9 @@ private[spark] class SparkDeploySchedulerBackend( val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec) + if (conf.getBoolean("spark.executor.multiPerWorker", true)) { + appDesc.maxCorePerExecutor = Some(conf.getInt("spark.executor.maxCoreNumPerExecutor", 1)) + } client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() diff --git a/docs/configuration.md b/docs/configuration.md index 7fe11475212b3..0206fae2edd27 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1255,6 +1255,25 @@ Apart from these, the following properties are also available, and may be useful spark.ui.view.acls Empty + + + spark.executor.multiPerWorker + false + + enable user to run multiple executors in the same worker. + + + + spark.executor.maxCoreNumPerExecutor + 1 + + set the max number of cores assigned to each executor; this property is only valid when + spark.executor.multiPerWorker is set to true. + + + + spark.executor.extraClassPath + (none) Comma separated list of users that have view access to the Spark web ui. By default only the user that started the Spark job has view access. From a5d629a8d9403a1859a6c608035d8d447d020b63 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 27 Jan 2015 08:17:47 -0500 Subject: [PATCH 02/27] java doc --- .../apache/spark/deploy/master/Master.scala | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 5b35a184ca96b..7d82cafe1204a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -578,9 +578,19 @@ private[master] class Master( } } + /** + * This functions starts multiple executors on each worker. + * It first calculates the maximum number of executors we can allocate to the application in this + * scheduling moment according to the free memory space on each worker, then tries to allocate executors + * on each worker according to the user-specified spark.executor.maxCoreNumPerExecutor. + * + * It traverses the available worker list. In spreadOutApps mode, it allocates at most + * spark.executor.maxCoreNumPerExecutor cores (can be less than it when the worker does not have enough + * cores or the demand is less than it) and app.desc.memoryPerExecutorMB megabytes memory and tracks the + * resource allocation in a 2d array for each visit; Otherwise, it uses up all available resources of a worker + * for each visit. + */ private def startMultiExecutorsPerWorker() { - // allow user to run multiple executors in the same worker - // (within the same worker JVM process) if (spreadOutApps) { for (app <- waitingApps if app.coresLeft > 0) { val memoryPerExecutor = app.desc.memoryPerExecutorMB @@ -588,31 +598,25 @@ private[master] class Master( filter(worker => worker.coresFree > 0 && worker.memoryFree >= memoryPerExecutor).toArray. sortBy(_.memoryFree / memoryPerExecutor).reverse val maxCoreNumPerExecutor = app.desc.maxCorePerExecutor.get - // get the maximum total number of executors we can assign - var maxLeftExecutorsToAssign = usableWorkers.map(_.memoryFree / memoryPerExecutor).sum + // get the maximum number of executors we can assign + var leftExecutorNumToAssign = usableWorkers.map(_.memoryFree / memoryPerExecutor).sum var maxCoresLeft = app.coresLeft val numUsable = usableWorkers.length - // Number of cores of each executor assigned to each worker + // 2D array to track the number of cores of each executor assigned to each worker val assigned = Array.fill[ListBuffer[Int]](numUsable)(new ListBuffer[Int]) val assignedSum = Array.fill[Int](numUsable)(0) var pos = 0 - val noEnoughMemoryWorkers = new HashSet[Int] - while (maxLeftExecutorsToAssign > 0 && noEnoughMemoryWorkers.size < numUsable) { + while (leftExecutorNumToAssign > 0) { if (usableWorkers(pos).coresFree - assignedSum(pos) > 0) { val coreToAssign = math.min(math.min(usableWorkers(pos).coresFree - assignedSum(pos), maxCoreNumPerExecutor), maxCoresLeft) - if (usableWorkers(pos).memoryFree >= - app.desc.memoryPerExecutorMB * (assigned(pos).length + 1)) { - maxLeftExecutorsToAssign -= coreToAssign + if (usableWorkers(pos).memoryFree >= memoryPerExecutor * (assigned(pos).length + 1)) { + leftExecutorNumToAssign -= 1 assigned(pos) += coreToAssign assignedSum(pos) += coreToAssign maxCoresLeft -= coreToAssign } } - if (usableWorkers(pos).memoryFree < app.desc.memoryPerExecutorMB * - (assigned(pos).length + 1)) { - noEnoughMemoryWorkers += pos - } pos = (pos + 1) % numUsable } From a26096d547cb0acc00269016ad31adcf71041d21 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 27 Jan 2015 08:25:01 -0500 Subject: [PATCH 03/27] stylistic fix --- .../org/apache/spark/deploy/master/Master.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7d82cafe1204a..14e73d7be9353 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -581,14 +581,14 @@ private[master] class Master( /** * This functions starts multiple executors on each worker. * It first calculates the maximum number of executors we can allocate to the application in this - * scheduling moment according to the free memory space on each worker, then tries to allocate executors - * on each worker according to the user-specified spark.executor.maxCoreNumPerExecutor. + * scheduling moment according to the free memory space on each worker, then tries to allocate + * executors on each worker according to the user-specified spark.executor.maxCoreNumPerExecutor. * * It traverses the available worker list. In spreadOutApps mode, it allocates at most - * spark.executor.maxCoreNumPerExecutor cores (can be less than it when the worker does not have enough - * cores or the demand is less than it) and app.desc.memoryPerExecutorMB megabytes memory and tracks the - * resource allocation in a 2d array for each visit; Otherwise, it uses up all available resources of a worker - * for each visit. + * spark.executor.maxCoreNumPerExecutor cores (can be less than it when the worker does not have + * enough cores or the demand is less than it) and app.desc.memoryPerExecutorMB megabytes memory + * and tracks the resource allocation in a 2d array for each visit; Otherwise, it uses up all + * available resources of a worker for each visit. */ private def startMultiExecutorsPerWorker() { if (spreadOutApps) { From e5efabb580780c435a262b53d66dab376140251e Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 27 Jan 2015 09:05:46 -0500 Subject: [PATCH 04/27] more java docs and consolidate canUse function --- .../apache/spark/deploy/master/Master.scala | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 14e73d7be9353..82d611f15ebf5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -529,12 +529,20 @@ private[master] class Master( * two executors on the same worker). */ private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = { - worker.memoryFree >= app.desc.memoryPerExecutorMB && !worker.hasExecutor(app) && - worker.coresFree > 0 + val enoughResources = worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree > 0 + val allowMultipleExecutors = app.desc.maxCorePerExecutor.isDefined || !worker.hasExecutor(app) + allowMultipleExecutors && enoughResources } - // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app - // in the queue, then the second app, etc. + /** + * This functions starts only one executor on each worker. + * + * It travers the available worker list. In spreadOutApps mode, it allocates at most + * 1 core and app.desc.memoryPerExecutorMB megabytes memory and tracks the resource allocation + * in a 1-d array for each visit; Otherwise, it allocates 1 core and app.desc.memoryPerExecutorMB + * megabytes to each executor but starts as many executors as possible (limited by the worker + * resources) for each visit. + */ private def startSingleExecutorPerWorker() { if (spreadOutApps) { // Try to spread out each app among all the nodes, until it has all its cores @@ -587,16 +595,17 @@ private[master] class Master( * It traverses the available worker list. In spreadOutApps mode, it allocates at most * spark.executor.maxCoreNumPerExecutor cores (can be less than it when the worker does not have * enough cores or the demand is less than it) and app.desc.memoryPerExecutorMB megabytes memory - * and tracks the resource allocation in a 2d array for each visit; Otherwise, it uses up all - * available resources of a worker for each visit. + * and tracks the resource allocation in a 2d array for each visit; Otherwise, it allocates at + * most spark.executor.maxCoreNumPerExecutor cores and app.desc.memoryPerExecutorMB megabytes + * to each executor but starts as many executors as possible (limited by the worker resources) for + * each visit. */ private def startMultiExecutorsPerWorker() { if (spreadOutApps) { for (app <- waitingApps if app.coresLeft > 0) { val memoryPerExecutor = app.desc.memoryPerExecutorMB val usableWorkers = workers.filter(_.state == WorkerState.ALIVE). - filter(worker => worker.coresFree > 0 && worker.memoryFree >= memoryPerExecutor).toArray. - sortBy(_.memoryFree / memoryPerExecutor).reverse + filter(canUse(app, _)).toArray.sortBy(_.memoryFree / memoryPerExecutor).reverse val maxCoreNumPerExecutor = app.desc.maxCorePerExecutor.get // get the maximum number of executors we can assign var leftExecutorNumToAssign = usableWorkers.map(_.memoryFree / memoryPerExecutor).sum From ec7d421ad7a6f1ab4beaa9594b7a9e051f6e2216 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 27 Jan 2015 09:08:20 -0500 Subject: [PATCH 05/27] test commit --- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 82d611f15ebf5..0630aeb191cef 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -677,7 +677,7 @@ private[master] class Master( } } - if (!conf.getBoolean("spark.executor.multiPerWorker", false)) { + if (!conf.getBoolean("spark.executor.multiPerWorker", true)) { startSingleExecutorPerWorker() } else { startMultiExecutorsPerWorker() From 5b814664fed62f32d920d030f0a78c14ac0fbb18 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 27 Jan 2015 10:32:52 -0500 Subject: [PATCH 06/27] remove outdated comments --- .../main/scala/org/apache/spark/deploy/master/Master.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 0630aeb191cef..1583544756363 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -524,9 +524,7 @@ private[master] class Master( } /** - * Can an app use the given worker? True if the worker has enough memory and we haven't already - * launched an executor for the app on it (right now the standalone backend doesn't like having - * two executors on the same worker). + * Can an app use the given worker? */ private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = { val enoughResources = worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree > 0 From 19d3da7163784722685c603cb35caeabc1a95c32 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 22 Feb 2015 11:30:31 -0500 Subject: [PATCH 07/27] address the comments --- .../spark/deploy/ApplicationDescription.scala | 2 +- .../apache/spark/deploy/master/Master.scala | 4 ++- .../deploy/master/ui/ApplicationPage.scala | 4 +-- docs/configuration.md | 28 ++++++------------- 4 files changed, 15 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index fa5d443ba33cb..bc7252d355a0e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -44,7 +44,7 @@ private[spark] class ApplicationDescription( name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec) // only valid when spark.executor.multiPerWorker is set to true - var maxCorePerExecutor = maxCores + var maxCorePerExecutor = None override def toString: String = "ApplicationDescription(" + name + ")" } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 1583544756363..a39e52266e954 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -609,7 +609,9 @@ private[master] class Master( var leftExecutorNumToAssign = usableWorkers.map(_.memoryFree / memoryPerExecutor).sum var maxCoresLeft = app.coresLeft val numUsable = usableWorkers.length - // 2D array to track the number of cores of each executor assigned to each worker + // A 2D array that tracks the number of cores used by each executor launched on + // each worker. The first index refers to the usable worker, and the second index + // refers to the executor launched on that worker. val assigned = Array.fill[ListBuffer[Int]](numUsable)(new ListBuffer[Int]) val assignedSum = Array.fill[Int](numUsable)(0) var pos = 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 79dae4d26ac7f..273f077bd8f57 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -83,14 +83,14 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
  • Name: {app.desc.name}
  • User: {app.desc.user}
  • Cores: - { + { if (app.desc.maxCores.isEmpty) { "Unlimited (%s granted)".format(app.coresGranted) } else { "%s (%s granted, %s left)".format( app.desc.maxCores.get, app.coresGranted, app.coresLeft) } - } + }
  • Executor Memory: diff --git a/docs/configuration.md b/docs/configuration.md index 0206fae2edd27..9c7837b9683b6 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -103,6 +103,7 @@ of the most common options to set are: Number of cores to use for the driver process, only in cluster mode. + spark.driver.maxResultSize 1g @@ -127,6 +128,14 @@ of the most common options to set are: or in your default properties file. + + spark.executor.maxCoreNumPerExecutor + 1 + + set the max number of cores assigned to each executor; this property is only valid when + spark.executor.multiPerWorker is set to true. + + spark.executor.memory 512m @@ -1255,25 +1264,6 @@ Apart from these, the following properties are also available, and may be useful spark.ui.view.acls Empty - - - spark.executor.multiPerWorker - false - - enable user to run multiple executors in the same worker. - - - - spark.executor.maxCoreNumPerExecutor - 1 - - set the max number of cores assigned to each executor; this property is only valid when - spark.executor.multiPerWorker is set to true. - - - - spark.executor.extraClassPath - (none) Comma separated list of users that have view access to the Spark web ui. By default only the user that started the Spark job has view access. From 0b64fea7017e9ab845912ed2b0f4b52f102c297e Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 22 Feb 2015 11:52:10 -0500 Subject: [PATCH 08/27] fix compilation issue --- .../scala/org/apache/spark/deploy/ApplicationDescription.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index bc7252d355a0e..db744aac77e20 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -44,7 +44,7 @@ private[spark] class ApplicationDescription( name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec) // only valid when spark.executor.multiPerWorker is set to true - var maxCorePerExecutor = None + var maxCorePerExecutor: Option[Int] = None override def toString: String = "ApplicationDescription(" + name + ")" } From 35c462c2226a07b9573f76afeb474ca88eaf143b Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 22 Feb 2015 17:34:41 -0500 Subject: [PATCH 09/27] address Andrew's comments --- .../org/apache/spark/deploy/SparkSubmit.scala | 2 + .../spark/deploy/SparkSubmitArguments.scala | 11 +- .../apache/spark/deploy/master/Master.scala | 121 ++++++------------ .../cluster/SparkDeploySchedulerBackend.scala | 6 +- docs/configuration.md | 10 +- 5 files changed, 53 insertions(+), 97 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 60bc243ebf40a..28941385e52b2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -383,6 +383,8 @@ object SparkSubmit { OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"), OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, sysProp = "spark.driver.memory"), OptionAssigner(args.driverCores, STANDALONE, CLUSTER, sysProp = "spark.driver.cores"), + OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES, + sysProp = "spark.executor.cores"), OptionAssigner(args.supervise.toString, STANDALONE, CLUSTER, sysProp = "spark.driver.supervise"), diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 03ecf3fd99ec5..06a286bc068d1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -481,11 +481,18 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | | Spark standalone and Mesos only: | --total-executor-cores NUM Total cores for all executors. - | + | + | Spark standalone and YARN only: + | --executor-cores NUM Number of cores per executor. Default value: 1 (YARN), 0 ( + | Standalone). In Standalone mode, Spark will try to run more + | than 1 executors on each worker in standalone mode; + | otherwise, only one executor on each executor is allowed + | (the executor will take all available cores of the worker at + | the moment. + | | YARN-only: | --driver-cores NUM Number of cores used by the driver, only in cluster mode | (Default: 1). - | --executor-cores NUM Number of cores per executor (Default: 1). | --queue QUEUE_NAME The YARN queue to submit to (Default: "default"). | --num-executors NUM Number of executors to launch (Default: 2). | --archives ARCHIVES Comma separated list of archives to be extracted into the diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index a39e52266e954..08c68aee9d858 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -528,102 +528,57 @@ private[master] class Master( */ private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = { val enoughResources = worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree > 0 - val allowMultipleExecutors = app.desc.maxCorePerExecutor.isDefined || !worker.hasExecutor(app) - allowMultipleExecutors && enoughResources + val allowToExecute = app.desc.maxCorePerExecutor.isDefined || !worker.hasExecutor(app) + allowToExecute && enoughResources } /** - * This functions starts only one executor on each worker. - * - * It travers the available worker list. In spreadOutApps mode, it allocates at most - * 1 core and app.desc.memoryPerExecutorMB megabytes memory and tracks the resource allocation - * in a 1-d array for each visit; Otherwise, it allocates 1 core and app.desc.memoryPerExecutorMB - * megabytes to each executor but starts as many executors as possible (limited by the worker - * resources) for each visit. - */ - private def startSingleExecutorPerWorker() { - if (spreadOutApps) { - // Try to spread out each app among all the nodes, until it has all its cores - for (app <- waitingApps if app.coresLeft > 0) { - val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(canUse(app, _)).sortBy(_.coresFree).reverse - val numUsable = usableWorkers.length - val assigned = new Array[Int](numUsable) // Number of cores to give on each node - var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) - var pos = 0 - while (toAssign > 0) { - if (usableWorkers(pos).coresFree - assigned(pos) > 0) { - toAssign -= 1 - assigned(pos) += 1 - } - pos = (pos + 1) % numUsable - } - // Now that we've decided how many cores to give on each node, let's actually give them - for (pos <- 0 until numUsable) { - if (assigned(pos) > 0) { - val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) - launchExecutor(usableWorkers(pos), exec) - app.state = ApplicationState.RUNNING - } - } - } - } else { - // Pack each app into as few nodes as possible until we've assigned all its cores - for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { - for (app <- waitingApps if app.coresLeft > 0) { - if (canUse(app, worker)) { - val coresToUse = math.min(worker.coresFree, app.coresLeft) - if (coresToUse > 0) { - val exec = app.addExecutor(worker, coresToUse) - launchExecutor(worker, exec) - app.state = ApplicationState.RUNNING - } - } - } - } - } - } - - /** - * This functions starts multiple executors on each worker. - * It first calculates the maximum number of executors we can allocate to the application in this - * scheduling moment according to the free memory space on each worker, then tries to allocate - * executors on each worker according to the user-specified spark.executor.maxCoreNumPerExecutor. + * This functions starts one or more executors on each worker. * * It traverses the available worker list. In spreadOutApps mode, it allocates at most - * spark.executor.maxCoreNumPerExecutor cores (can be less than it when the worker does not have - * enough cores or the demand is less than it) and app.desc.memoryPerExecutorMB megabytes memory - * and tracks the resource allocation in a 2d array for each visit; Otherwise, it allocates at - * most spark.executor.maxCoreNumPerExecutor cores and app.desc.memoryPerExecutorMB megabytes - * to each executor but starts as many executors as possible (limited by the worker resources) for - * each visit. + * spark.executor.cores (multiple executors per worker) or 1 core(s) (one executor per worker) + * for each visit of the worker (can be less than it when the worker does not have enough cores + * or the demand is less than it) and app.desc.memoryPerExecutorMB megabytes memory and tracks + * the resource allocation in a 2d array for each visit; Otherwise, it allocates at most + * spark.executor.cores (multiple executors per worker) or worker.freeCores (one executor per + * worker) cores and app.desc.memoryPerExecutorMB megabytes to each executor. */ - private def startMultiExecutorsPerWorker() { + private def startExecutorsOnWorker() { if (spreadOutApps) { for (app <- waitingApps if app.coresLeft > 0) { val memoryPerExecutor = app.desc.memoryPerExecutorMB val usableWorkers = workers.filter(_.state == WorkerState.ALIVE). filter(canUse(app, _)).toArray.sortBy(_.memoryFree / memoryPerExecutor).reverse - val maxCoreNumPerExecutor = app.desc.maxCorePerExecutor.get - // get the maximum number of executors we can assign - var leftExecutorNumToAssign = usableWorkers.map(_.memoryFree / memoryPerExecutor).sum - var maxCoresLeft = app.coresLeft + // the maximum number of cores allocated on each executor per visit on the worker list + val maxCoreAllocationPerRound = app.desc.maxCorePerExecutor.getOrElse(1) + var maxCoresLeft = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) val numUsable = usableWorkers.length + val maxExecutorPerWorker = { + if (app.desc.maxCorePerExecutor.isDefined) { + usableWorkers(0).memoryFree / memoryPerExecutor + } else { + 1 + } + } // A 2D array that tracks the number of cores used by each executor launched on // each worker. The first index refers to the usable worker, and the second index // refers to the executor launched on that worker. - val assigned = Array.fill[ListBuffer[Int]](numUsable)(new ListBuffer[Int]) + val assigned = Array.fill[Array[Int]](numUsable)(Array.fill[Int](maxExecutorPerWorker)(0)) + val workerPointer = Array.fill[Int](numUsable)(0) val assignedSum = Array.fill[Int](numUsable)(0) var pos = 0 - while (leftExecutorNumToAssign > 0) { - if (usableWorkers(pos).coresFree - assignedSum(pos) > 0) { + while (maxCoresLeft > 0) { + if ((usableWorkers(pos).coresFree - assignedSum(pos) > 0) && + (usableWorkers(pos).memoryFree >= memoryPerExecutor * (workerPointer(pos) + 1))) { val coreToAssign = math.min(math.min(usableWorkers(pos).coresFree - assignedSum(pos), - maxCoreNumPerExecutor), maxCoresLeft) - if (usableWorkers(pos).memoryFree >= memoryPerExecutor * (assigned(pos).length + 1)) { - leftExecutorNumToAssign -= 1 - assigned(pos) += coreToAssign - assignedSum(pos) += coreToAssign - maxCoresLeft -= coreToAssign + maxCoreAllocationPerRound), maxCoresLeft) + val workerAllocationArray = assigned(pos) + workerAllocationArray(workerPointer(pos)) += coreToAssign + assignedSum(pos) += coreToAssign + maxCoresLeft -= coreToAssign + if (app.desc.maxCorePerExecutor.isDefined) { + // if starting multiple executors on the worker, we move to the next executor + workerPointer(pos) += 1 } } pos = (pos + 1) % numUsable @@ -632,7 +587,7 @@ private[master] class Master( // Now that we've decided how many executors and the core number for each to // give on each node, let's actually give them for (pos <- 0 until numUsable) { - for (execIdx <- 0 until assigned(pos).length) { + for (execIdx <- 0 until workerPointer(pos)) { val exec = app.addExecutor(usableWorkers(pos), assigned(pos)(execIdx)) launchExecutor(usableWorkers(pos), exec) app.state = ApplicationState.RUNNING @@ -644,7 +599,7 @@ private[master] class Master( for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { for (app <- waitingApps if app.coresLeft > 0 && app.desc.memoryPerExecutorMB <= worker.memoryFree) { - val coreNumPerExecutor = app.desc.maxCorePerExecutor.get + val coreNumPerExecutor = app.desc.maxCorePerExecutor.getOrElse(worker.coresFree) var coresLeft = math.min(worker.coresFree, app.coresLeft) while (coresLeft > 0 && app.desc.memoryPerExecutorMB <= worker.memoryFree) { val coreToAssign = math.min(coreNumPerExecutor, coresLeft) @@ -677,11 +632,7 @@ private[master] class Master( } } - if (!conf.getBoolean("spark.executor.multiPerWorker", true)) { - startSingleExecutorPerWorker() - } else { - startMultiExecutorsPerWorker() - } + startExecutorsOnWorker() } def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 2f66a57f1ad93..3f5e93cf0efc9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -84,13 +84,9 @@ private[spark] class SparkDeploySchedulerBackend( val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec) - - if (conf.getBoolean("spark.executor.multiPerWorker", true)) { - appDesc.maxCorePerExecutor = Some(conf.getInt("spark.executor.maxCoreNumPerExecutor", 1)) - } + appDesc.maxCorePerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() - waitForRegistration() } diff --git a/docs/configuration.md b/docs/configuration.md index 9c7837b9683b6..88ffe8f7ab83f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -103,7 +103,6 @@ of the most common options to set are: Number of cores to use for the driver process, only in cluster mode. - spark.driver.maxResultSize 1g @@ -129,11 +128,12 @@ of the most common options to set are: - spark.executor.maxCoreNumPerExecutor - 1 + spark.executor.cores + None (Standalone), 1 (YARN) - set the max number of cores assigned to each executor; this property is only valid when - spark.executor.multiPerWorker is set to true. + Number of cores per executor. In Standalone mode, Spark will try to run more + than 1 executors on each worker in standalone mode; otherwise, only one executor on each executor + is allowed (the executor will take all available cores of the worker at the moment. From 387f4ecfcda4671b4ac2461fcf6d543c697074c9 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 22 Feb 2015 20:12:47 -0500 Subject: [PATCH 10/27] bug fix --- .../apache/spark/deploy/master/Master.scala | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 08c68aee9d858..70f87d0d463ff 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -564,21 +564,28 @@ private[master] class Master( // each worker. The first index refers to the usable worker, and the second index // refers to the executor launched on that worker. val assigned = Array.fill[Array[Int]](numUsable)(Array.fill[Int](maxExecutorPerWorker)(0)) - val workerPointer = Array.fill[Int](numUsable)(0) + val executorNumberOnWorker = Array.fill[Int](numUsable)(0) val assignedSum = Array.fill[Int](numUsable)(0) var pos = 0 while (maxCoresLeft > 0) { - if ((usableWorkers(pos).coresFree - assignedSum(pos) > 0) && - (usableWorkers(pos).memoryFree >= memoryPerExecutor * (workerPointer(pos) + 1))) { + val memoryDemand = { + if (app.desc.maxCorePerExecutor.isDefined) { + executorNumberOnWorker(pos) + 1 + } else { + memoryPerExecutor + } + } + if ((usableWorkers(pos).coresFree - assignedSum(pos) > 0) && + (usableWorkers(pos).memoryFree >= memoryDemand)) { val coreToAssign = math.min(math.min(usableWorkers(pos).coresFree - assignedSum(pos), maxCoreAllocationPerRound), maxCoresLeft) val workerAllocationArray = assigned(pos) - workerAllocationArray(workerPointer(pos)) += coreToAssign + workerAllocationArray(executorNumberOnWorker(pos)) += coreToAssign assignedSum(pos) += coreToAssign maxCoresLeft -= coreToAssign - if (app.desc.maxCorePerExecutor.isDefined) { + if (app.desc.maxCorePerExecutor.isDefined || executorNumberOnWorker(pos) == 0) { // if starting multiple executors on the worker, we move to the next executor - workerPointer(pos) += 1 + executorNumberOnWorker(pos) += 1 } } pos = (pos + 1) % numUsable @@ -587,7 +594,7 @@ private[master] class Master( // Now that we've decided how many executors and the core number for each to // give on each node, let's actually give them for (pos <- 0 until numUsable) { - for (execIdx <- 0 until workerPointer(pos)) { + for (execIdx <- 0 until executorNumberOnWorker(pos)) { val exec = app.addExecutor(usableWorkers(pos), assigned(pos)(execIdx)) launchExecutor(usableWorkers(pos), exec) app.state = ApplicationState.RUNNING From f64a28d34df2ae81436896ed2dd739de533a7347 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 22 Feb 2015 20:19:25 -0500 Subject: [PATCH 11/27] typo fix --- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 70f87d0d463ff..aadf6f5bd17e5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -570,7 +570,7 @@ private[master] class Master( while (maxCoresLeft > 0) { val memoryDemand = { if (app.desc.maxCorePerExecutor.isDefined) { - executorNumberOnWorker(pos) + 1 + (executorNumberOnWorker(pos) + 1) * memoryPerExecutor } else { memoryPerExecutor } From 878402cf7416d47575dd75b31b07b7614e58a0ab Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 22 Feb 2015 22:05:34 -0500 Subject: [PATCH 12/27] change the launching executor code --- .../org/apache/spark/deploy/master/Master.scala | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index aadf6f5bd17e5..64008ec6de873 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -579,11 +579,10 @@ private[master] class Master( (usableWorkers(pos).memoryFree >= memoryDemand)) { val coreToAssign = math.min(math.min(usableWorkers(pos).coresFree - assignedSum(pos), maxCoreAllocationPerRound), maxCoresLeft) - val workerAllocationArray = assigned(pos) - workerAllocationArray(executorNumberOnWorker(pos)) += coreToAssign + assigned(pos)(executorNumberOnWorker(pos)) += coreToAssign assignedSum(pos) += coreToAssign maxCoresLeft -= coreToAssign - if (app.desc.maxCorePerExecutor.isDefined || executorNumberOnWorker(pos) == 0) { + if (app.desc.maxCorePerExecutor.isDefined) { // if starting multiple executors on the worker, we move to the next executor executorNumberOnWorker(pos) += 1 } @@ -593,12 +592,11 @@ private[master] class Master( // Now that we've decided how many executors and the core number for each to // give on each node, let's actually give them - for (pos <- 0 until numUsable) { - for (execIdx <- 0 until executorNumberOnWorker(pos)) { - val exec = app.addExecutor(usableWorkers(pos), assigned(pos)(execIdx)) - launchExecutor(usableWorkers(pos), exec) - app.state = ApplicationState.RUNNING - } + for (pos <- 0 until numUsable; + execIdx <- 0 until math.max(executorNumberOnWorker(pos), 1)) { + val exec = app.addExecutor(usableWorkers(pos), assigned(pos)(execIdx)) + launchExecutor(usableWorkers(pos), exec) + app.state = ApplicationState.RUNNING } } } else { From 497ec2c525497f821167a9bd29dcb42e28d4b2eb Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 26 Mar 2015 21:34:04 -0400 Subject: [PATCH 13/27] address andrew's comments --- .../apache/spark/deploy/ApplicationDescription.scala | 6 ++---- .../scala/org/apache/spark/deploy/SparkSubmit.scala | 4 ++-- .../org/apache/spark/deploy/SparkSubmitArguments.scala | 10 ++++------ .../scala/org/apache/spark/deploy/master/Master.scala | 2 +- .../cluster/SparkDeploySchedulerBackend.scala | 4 ++-- 5 files changed, 11 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index db744aac77e20..df1dd60f6a3eb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -27,7 +27,8 @@ private[spark] class ApplicationDescription( var appUiUrl: String, val eventLogDir: Option[URI] = None, // short name of compression codec used when writing event logs, if any (e.g. lzf) - val eventLogCodec: Option[String] = None) + val eventLogCodec: Option[String] = None, + val maxCorePerExecutor: Option[Int] = None) extends Serializable { val user = System.getProperty("user.name", "") @@ -43,8 +44,5 @@ private[spark] class ApplicationDescription( new ApplicationDescription( name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec) - // only valid when spark.executor.multiPerWorker is set to true - var maxCorePerExecutor: Option[Int] = None - override def toString: String = "ApplicationDescription(" + name + ")" } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 28941385e52b2..d9822e093a3b0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -383,8 +383,6 @@ object SparkSubmit { OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"), OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, sysProp = "spark.driver.memory"), OptionAssigner(args.driverCores, STANDALONE, CLUSTER, sysProp = "spark.driver.cores"), - OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES, - sysProp = "spark.executor.cores"), OptionAssigner(args.supervise.toString, STANDALONE, CLUSTER, sysProp = "spark.driver.supervise"), @@ -408,6 +406,8 @@ object SparkSubmit { OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"), // Other options + OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES, + sysProp = "spark.deploy.maxCoresPerExecutor"), OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.memory"), OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES, diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 06a286bc068d1..2aa3febf56886 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -483,12 +483,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | --total-executor-cores NUM Total cores for all executors. | | Spark standalone and YARN only: - | --executor-cores NUM Number of cores per executor. Default value: 1 (YARN), 0 ( - | Standalone). In Standalone mode, Spark will try to run more - | than 1 executors on each worker in standalone mode; - | otherwise, only one executor on each executor is allowed - | (the executor will take all available cores of the worker at - | the moment. + | --executor-cores NUM Number of cores to use on each executor. In standalone mode, + | the executor will use all available cores on the worker if + | this is not specified. (Default: 1 in YARN mode, all + | available cores in standalone mode). | | YARN-only: | --driver-cores NUM Number of cores used by the driver, only in cluster mode diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 64008ec6de873..09c32f14ab581 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -623,7 +623,7 @@ private[master] class Master( * Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability changes. */ - def schedule() { + private def schedule(): Unit = { if (state != RecoveryState.ALIVE) { return } // First schedule drivers, they take strict precedence over applications diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 3f5e93cf0efc9..1999ef7bd42f3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -83,8 +83,8 @@ private[spark] class SparkDeploySchedulerBackend( args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - appUIAddress, sc.eventLogDir, sc.eventLogCodec) - appDesc.maxCorePerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) + appUIAddress, sc.eventLogDir, sc.eventLogCodec, + conf.getOption("spark.deploy.maxCoresPerExecutor").map(_.toInt)) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() waitForRegistration() From 2c2bcc5c7a98f366e0f85da0959ba7688ba96a7c Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 26 Mar 2015 22:01:05 -0400 Subject: [PATCH 14/27] fix wrong usage info --- .../apache/spark/deploy/SparkSubmitArguments.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 2aa3febf56886..32d5d883cf46a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -481,13 +481,14 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | | Spark standalone and Mesos only: | --total-executor-cores NUM Total cores for all executors. - | + | | Spark standalone and YARN only: - | --executor-cores NUM Number of cores to use on each executor. In standalone mode, - | the executor will use all available cores on the worker if - | this is not specified. (Default: 1 in YARN mode, all - | available cores in standalone mode). - | + | --executor-cores NUM Number of cores to use on each executor. Default: + | 1 in YARN mode; in standalone mode, all available cores + | (non-spreadApp mode) or ranging from 1 to + | total-executor-cores / availableNumOfWorkers at the moment + | of scheduling. + | | YARN-only: | --driver-cores NUM Number of cores used by the driver, only in cluster mode | (Default: 1). From ff011e28b6a7ac58b8c566ea70f02a3dad0950e6 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 5 Apr 2015 08:31:02 -0400 Subject: [PATCH 15/27] start multiple executors on the worker by rewriting startExeuctor logic --- .../apache/spark/deploy/master/Master.scala | 112 +++++++----------- 1 file changed, 45 insertions(+), 67 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 09c32f14ab581..e67619bce2ad2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -543,90 +543,58 @@ private[master] class Master( * spark.executor.cores (multiple executors per worker) or worker.freeCores (one executor per * worker) cores and app.desc.memoryPerExecutorMB megabytes to each executor. */ - private def startExecutorsOnWorker() { + private def startExecutorsOnWorkers() { + // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app + // in the queue, then the second app, etc. if (spreadOutApps) { + // Try to spread out each app among all the nodes, until it has all its cores for (app <- waitingApps if app.coresLeft > 0) { - val memoryPerExecutor = app.desc.memoryPerExecutorMB - val usableWorkers = workers.filter(_.state == WorkerState.ALIVE). - filter(canUse(app, _)).toArray.sortBy(_.memoryFree / memoryPerExecutor).reverse - // the maximum number of cores allocated on each executor per visit on the worker list - val maxCoreAllocationPerRound = app.desc.maxCorePerExecutor.getOrElse(1) - var maxCoresLeft = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) + val maxCoresPerExecutor = app.desc.maxCorePerExecutor.getOrElse(Int.MaxValue) + val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) + .filter(canUse(app, _)).sortBy(_.coresFree).reverse val numUsable = usableWorkers.length - val maxExecutorPerWorker = { - if (app.desc.maxCorePerExecutor.isDefined) { - usableWorkers(0).memoryFree / memoryPerExecutor - } else { - 1 - } - } - // A 2D array that tracks the number of cores used by each executor launched on - // each worker. The first index refers to the usable worker, and the second index - // refers to the executor launched on that worker. - val assigned = Array.fill[Array[Int]](numUsable)(Array.fill[Int](maxExecutorPerWorker)(0)) - val executorNumberOnWorker = Array.fill[Int](numUsable)(0) - val assignedSum = Array.fill[Int](numUsable)(0) + val assigned = new Array[Int](numUsable) // Number of cores to give on each node + var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) var pos = 0 - while (maxCoresLeft > 0) { - val memoryDemand = { - if (app.desc.maxCorePerExecutor.isDefined) { - (executorNumberOnWorker(pos) + 1) * memoryPerExecutor - } else { - memoryPerExecutor - } - } - if ((usableWorkers(pos).coresFree - assignedSum(pos) > 0) && - (usableWorkers(pos).memoryFree >= memoryDemand)) { - val coreToAssign = math.min(math.min(usableWorkers(pos).coresFree - assignedSum(pos), - maxCoreAllocationPerRound), maxCoresLeft) - assigned(pos)(executorNumberOnWorker(pos)) += coreToAssign - assignedSum(pos) += coreToAssign - maxCoresLeft -= coreToAssign - if (app.desc.maxCorePerExecutor.isDefined) { - // if starting multiple executors on the worker, we move to the next executor - executorNumberOnWorker(pos) += 1 - } + while (toAssign > 0) { + if (usableWorkers(pos).coresFree - assigned(pos) > 0) { + toAssign -= 1 + assigned(pos) += 1 } pos = (pos + 1) % numUsable } - - // Now that we've decided how many executors and the core number for each to - // give on each node, let's actually give them - for (pos <- 0 until numUsable; - execIdx <- 0 until math.max(executorNumberOnWorker(pos), 1)) { - val exec = app.addExecutor(usableWorkers(pos), assigned(pos)(execIdx)) - launchExecutor(usableWorkers(pos), exec) - app.state = ApplicationState.RUNNING + // Now that we've decided how many cores to give on each node, let's actually give them + for (pos <- 0 until numUsable) { + while (assigned(pos) > 0) { + val coresForThisExecutor = math.min(maxCoresPerExecutor, assigned(pos)) + val exec = app.addExecutor(usableWorkers(pos), coresForThisExecutor) + assigned(pos) -= coresForThisExecutor + launchExecutor(usableWorkers(pos), exec) + app.state = ApplicationState.RUNNING + } } } } else { // Pack each app into as few nodes as possible until we've assigned all its cores for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { - for (app <- waitingApps if app.coresLeft > 0 && - app.desc.memoryPerExecutorMB <= worker.memoryFree) { - val coreNumPerExecutor = app.desc.maxCorePerExecutor.getOrElse(worker.coresFree) - var coresLeft = math.min(worker.coresFree, app.coresLeft) - while (coresLeft > 0 && app.desc.memoryPerExecutorMB <= worker.memoryFree) { - val coreToAssign = math.min(coreNumPerExecutor, coresLeft) - val exec = app.addExecutor(worker, coreToAssign) - launchExecutor(worker, exec) - coresLeft -= coreToAssign - app.state = ApplicationState.RUNNING + for (app <- waitingApps if app.coresLeft > 0) { + val maxCoresPerExecutor = app.desc.maxCorePerExecutor.getOrElse(Int.MaxValue) + if (canUse(app, worker)) { + var coresToAssign = math.min(worker.coresFree, app.coresLeft) + while (coresToAssign > 0) { + val coresForThisExecutor = math.min(maxCoresPerExecutor, coresToAssign) + val exec = app.addExecutor(worker, coresForThisExecutor) + coresToAssign -= coresForThisExecutor + launchExecutor(worker, exec) + app.state = ApplicationState.RUNNING + } } } } } } - - /** - * Schedule the currently available resources among waiting apps. This method will be called - * every time a new app joins or resource availability changes. - */ - private def schedule(): Unit = { - if (state != RecoveryState.ALIVE) { return } - - // First schedule drivers, they take strict precedence over applications + private def startDriversOnWorkers(): Unit = { val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { for (driver <- waitingDrivers) { @@ -636,8 +604,18 @@ private[master] class Master( } } } + } - startExecutorsOnWorker() + /** + * Schedule the currently available resources among waiting apps. This method will be called + * every time a new app joins or resource availability changes. + */ + private def schedule(): Unit = { + if (state != RecoveryState.ALIVE) { return } + //start in-cluster drivers, they take strict precedence over applications + startDriversOnWorkers() + //start executors + startExecutorsOnWorkers() } def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) { From 4cf61f1a00ecd1dc5864f3108d5b3c9f39b05ae8 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 5 Apr 2015 09:43:35 -0400 Subject: [PATCH 16/27] improve the code and docs --- .../apache/spark/deploy/master/Master.scala | 62 ++++++++++--------- docs/configuration.md | 18 +++--- 2 files changed, 42 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index e67619bce2ad2..bdd379499ea4f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -533,21 +533,17 @@ private[master] class Master( } /** - * This functions starts one or more executors on each worker. - * - * It traverses the available worker list. In spreadOutApps mode, it allocates at most - * spark.executor.cores (multiple executors per worker) or 1 core(s) (one executor per worker) - * for each visit of the worker (can be less than it when the worker does not have enough cores - * or the demand is less than it) and app.desc.memoryPerExecutorMB megabytes memory and tracks - * the resource allocation in a 2d array for each visit; Otherwise, it allocates at most - * spark.executor.cores (multiple executors per worker) or worker.freeCores (one executor per - * worker) cores and app.desc.memoryPerExecutorMB megabytes to each executor. + * The resource allocator spread out each app among all the workers until it has all its cores in + * spreadOut mode otherwise packs each app into as few workers as possible until it has assigned + * all its cores. User can define spark.deploy.maxCoresPerExecutor per application to + * limit the maximum number of cores to allocate to each executor on each worker; if the parameter + * is not defined, then only one executor will be launched on a worker. */ private def startExecutorsOnWorkers() { // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. if (spreadOutApps) { - // Try to spread out each app among all the nodes, until it has all its cores + // Try to spread out each app among all the workers, until it has all its cores for (app <- waitingApps if app.coresLeft > 0) { val maxCoresPerExecutor = app.desc.maxCorePerExecutor.getOrElse(Int.MaxValue) val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) @@ -565,35 +561,43 @@ private[master] class Master( } // Now that we've decided how many cores to give on each node, let's actually give them for (pos <- 0 until numUsable) { - while (assigned(pos) > 0) { - val coresForThisExecutor = math.min(maxCoresPerExecutor, assigned(pos)) - val exec = app.addExecutor(usableWorkers(pos), coresForThisExecutor) - assigned(pos) -= coresForThisExecutor - launchExecutor(usableWorkers(pos), exec) - app.state = ApplicationState.RUNNING - } + allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos)) } } } else { - // Pack each app into as few nodes as possible until we've assigned all its cores + // Pack each app into as few workers as possible until we've assigned all its cores for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { for (app <- waitingApps if app.coresLeft > 0) { - val maxCoresPerExecutor = app.desc.maxCorePerExecutor.getOrElse(Int.MaxValue) - if (canUse(app, worker)) { - var coresToAssign = math.min(worker.coresFree, app.coresLeft) - while (coresToAssign > 0) { - val coresForThisExecutor = math.min(maxCoresPerExecutor, coresToAssign) - val exec = app.addExecutor(worker, coresForThisExecutor) - coresToAssign -= coresForThisExecutor - launchExecutor(worker, exec) - app.state = ApplicationState.RUNNING - } - } + allocateWorkerResourceToExecutors(app, app.coresLeft, worker) } } } } + /** + * allocate resources in a certain worker to one or more executors + * @param app the info of the application which the executors belong to + * @param coresDemand the total number of cores to be allocated to this application + * @param worker the worker info + */ + private def allocateWorkerResourceToExecutors( + app: ApplicationInfo, + coresDemand: Int, + worker: WorkerInfo): Unit = { + if (canUse(app, worker)) { + val memoryPerExecutor = app.desc.memoryPerExecutorMB + val maxCoresPerExecutor = app.desc.maxCorePerExecutor.getOrElse(Int.MaxValue) + var coresToAssign = coresDemand + while (coresToAssign > 0 && worker.memoryFree >= memoryPerExecutor) { + val coresForThisExecutor = math.min(maxCoresPerExecutor, coresToAssign) + val exec = app.addExecutor(worker, coresForThisExecutor) + coresToAssign -= coresForThisExecutor + launchExecutor(worker, exec) + app.state = ApplicationState.RUNNING + } + } + } + private def startDriversOnWorkers(): Unit = { val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { diff --git a/docs/configuration.md b/docs/configuration.md index 88ffe8f7ab83f..1158c80f90521 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -127,15 +127,6 @@ of the most common options to set are: or in your default properties file. - - spark.executor.cores - None (Standalone), 1 (YARN) - - Number of cores per executor. In Standalone mode, Spark will try to run more - than 1 executors on each worker in standalone mode; otherwise, only one executor on each executor - is allowed (the executor will take all available cores of the worker at the moment. - - spark.executor.memory 512m @@ -722,6 +713,15 @@ Apart from these, the following properties are also available, and may be useful this duration will be cleared as well. + + spark.deploy.maxCoresPerExecutor + (infinite) + + The maximum number of cores given to the executor. When this parameter is set, Spark will try to + run more than 1 executors on each worker in standalone mode; otherwise, only one executor is + launched on each worker. + + spark.default.parallelism From 63b3df997bdde0b991e39686ac9d17915af54eb0 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 5 Apr 2015 09:56:13 -0400 Subject: [PATCH 17/27] change the description of the parameter in the submit script --- .../org/apache/spark/deploy/SparkSubmitArguments.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 32d5d883cf46a..082bd9ca4db48 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -484,10 +484,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | | Spark standalone and YARN only: | --executor-cores NUM Number of cores to use on each executor. Default: - | 1 in YARN mode; in standalone mode, all available cores - | (non-spreadApp mode) or ranging from 1 to - | total-executor-cores / availableNumOfWorkers at the moment - | of scheduling. + | 1 in YARN mode; in standalone mode, all cores in a worker + | allocated to an application will be assigned to a single + | executor. | | YARN-only: | --driver-cores NUM Number of cores used by the driver, only in cluster mode From f595bd6094b37009be94802ad708cb793144de54 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 5 Apr 2015 09:58:33 -0400 Subject: [PATCH 18/27] recover some unintentional changes --- .../scala/org/apache/spark/deploy/master/ApplicationInfo.scala | 3 ++- .../src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 3439eae086b34..26e5534c9830e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -75,7 +75,8 @@ private[deploy] class ApplicationInfo( } } - def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorDesc = { + private[master] def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): + ExecutorDesc = { val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerExecutorMB) executors(exec.id) = exec coresGranted += cores diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index bdd379499ea4f..e30e4bb31f841 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -22,7 +22,7 @@ import java.net.URLEncoder import java.text.SimpleDateFormat import java.util.Date -import scala.collection.mutable.{ListBuffer, ArrayBuffer, HashMap, HashSet} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.postfixOps From d9c16855909c3b7ad828329d56ede661b90c9043 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 5 Apr 2015 10:00:16 -0400 Subject: [PATCH 19/27] remove unused var --- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index e30e4bb31f841..89e227898a3c8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -545,7 +545,6 @@ private[master] class Master( if (spreadOutApps) { // Try to spread out each app among all the workers, until it has all its cores for (app <- waitingApps if app.coresLeft > 0) { - val maxCoresPerExecutor = app.desc.maxCorePerExecutor.getOrElse(Int.MaxValue) val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(canUse(app, _)).sortBy(_.coresFree).reverse val numUsable = usableWorkers.length From f0354238cd656eb530f3b18cba3722d2f7e6b781 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 5 Apr 2015 10:01:37 -0400 Subject: [PATCH 20/27] stylistic fix --- .../main/scala/org/apache/spark/deploy/master/Master.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 89e227898a3c8..b7a28c9decf98 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -615,9 +615,9 @@ private[master] class Master( */ private def schedule(): Unit = { if (state != RecoveryState.ALIVE) { return } - //start in-cluster drivers, they take strict precedence over applications + // start in-cluster drivers, they take strict precedence over applications startDriversOnWorkers() - //start executors + // start executors startExecutorsOnWorkers() } From 12a1b320ea337cb5d93e54fc0368051b22be1333 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 9 Apr 2015 17:28:44 -0400 Subject: [PATCH 21/27] change the semantic of coresPerExecutor to exact core number --- .../spark/deploy/ApplicationDescription.scala | 6 ++-- .../org/apache/spark/deploy/SparkSubmit.scala | 2 +- .../spark/deploy/SparkSubmitArguments.scala | 6 ++-- .../apache/spark/deploy/master/Master.scala | 31 +++++++++---------- .../cluster/SparkDeploySchedulerBackend.scala | 6 ++-- docs/configuration.md | 12 ++++--- 6 files changed, 32 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index df1dd60f6a3eb..ae99432f5ce86 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -28,7 +28,7 @@ private[spark] class ApplicationDescription( val eventLogDir: Option[URI] = None, // short name of compression codec used when writing event logs, if any (e.g. lzf) val eventLogCodec: Option[String] = None, - val maxCorePerExecutor: Option[Int] = None) + val coresPerExecutor: Option[Int] = None) extends Serializable { val user = System.getProperty("user.name", "") @@ -36,13 +36,13 @@ private[spark] class ApplicationDescription( def copy( name: String = name, maxCores: Option[Int] = maxCores, - memoryPerSlave: Int = memoryPerExecutorMB, + memoryPerExecutorMB: Int = memoryPerExecutorMB, command: Command = command, appUiUrl: String = appUiUrl, eventLogDir: Option[URI] = eventLogDir, eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription = new ApplicationDescription( - name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec) + name, maxCores, memoryPerExecutorMB, command, appUiUrl, eventLogDir, eventLogCodec) override def toString: String = "ApplicationDescription(" + name + ")" } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index d9822e093a3b0..296a0764b8baf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -407,7 +407,7 @@ object SparkSubmit { // Other options OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES, - sysProp = "spark.deploy.maxCoresPerExecutor"), + sysProp = "spark.executor.cores"), OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.memory"), OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES, diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 082bd9ca4db48..faa8780288ea3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -483,10 +483,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | --total-executor-cores NUM Total cores for all executors. | | Spark standalone and YARN only: - | --executor-cores NUM Number of cores to use on each executor. Default: - | 1 in YARN mode; in standalone mode, all cores in a worker - | allocated to an application will be assigned to a single - | executor. + | --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode, + | or all available cores on the worker in standalone mode) | | YARN-only: | --driver-cores NUM Number of cores used by the driver, only in cluster mode diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index b7a28c9decf98..f60756d65820a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -528,7 +528,7 @@ private[master] class Master( */ private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = { val enoughResources = worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree > 0 - val allowToExecute = app.desc.maxCorePerExecutor.isDefined || !worker.hasExecutor(app) + val allowToExecute = app.desc.coresPerExecutor.isDefined || !worker.hasExecutor(app) allowToExecute && enoughResources } @@ -539,7 +539,7 @@ private[master] class Master( * limit the maximum number of cores to allocate to each executor on each worker; if the parameter * is not defined, then only one executor will be launched on a worker. */ - private def startExecutorsOnWorkers() { + private def startExecutorsOnWorkers(): Unit = { // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. if (spreadOutApps) { @@ -576,25 +576,24 @@ private[master] class Master( /** * allocate resources in a certain worker to one or more executors * @param app the info of the application which the executors belong to - * @param coresDemand the total number of cores to be allocated to this application + * @param coresToAllocate cores on this worker to be allocated to this application * @param worker the worker info */ private def allocateWorkerResourceToExecutors( app: ApplicationInfo, - coresDemand: Int, + coresToAllocate: Int, worker: WorkerInfo): Unit = { - if (canUse(app, worker)) { - val memoryPerExecutor = app.desc.memoryPerExecutorMB - val maxCoresPerExecutor = app.desc.maxCorePerExecutor.getOrElse(Int.MaxValue) - var coresToAssign = coresDemand - while (coresToAssign > 0 && worker.memoryFree >= memoryPerExecutor) { - val coresForThisExecutor = math.min(maxCoresPerExecutor, coresToAssign) - val exec = app.addExecutor(worker, coresForThisExecutor) - coresToAssign -= coresForThisExecutor - launchExecutor(worker, exec) - app.state = ApplicationState.RUNNING - } + if (canUse(app, worker)) { + val memoryPerExecutor = app.desc.memoryPerExecutorMB + val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate) + var coresLeft = coresToAllocate + while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) { + val exec = app.addExecutor(worker, coresPerExecutor) + coresLeft -= coresPerExecutor + launchExecutor(worker, exec) + app.state = ApplicationState.RUNNING } + } } private def startDriversOnWorkers(): Unit = { @@ -621,7 +620,7 @@ private[master] class Master( startExecutorsOnWorkers() } - def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) { + def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) worker.actor ! LaunchExecutor(masterUrl, diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 1999ef7bd42f3..adfed6a6f4f8d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -82,9 +82,9 @@ private[spark] class SparkDeploySchedulerBackend( val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") - val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - appUIAddress, sc.eventLogDir, sc.eventLogCodec, - conf.getOption("spark.deploy.maxCoresPerExecutor").map(_.toInt)) + val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, + command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, + conf.getOption("spark.executor.cores").map(_.toInt)) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() waitForRegistration() diff --git a/docs/configuration.md b/docs/configuration.md index 1158c80f90521..4b6f250b15e90 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -714,12 +714,16 @@ Apart from these, the following properties are also available, and may be useful - spark.deploy.maxCoresPerExecutor + spark.executor.cores (infinite) - The maximum number of cores given to the executor. When this parameter is set, Spark will try to - run more than 1 executors on each worker in standalone mode; otherwise, only one executor is - launched on each worker. + Default: 1 in YARN mode, all the available cores on the worker in standalone mode. + + The number of cores to use on each executor. For YARN and standalone mode only. + + In standalone mode, setting this parameter allows an application to run multiple executors on + the same worker, provided that there are enough cores on that worker. Otherwise, only one + executor per application will run on each worker. From 2eeff77c066bc14bd13dbf14f75c1b62fe55db07 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 9 Apr 2015 20:56:07 -0400 Subject: [PATCH 22/27] stylistic fixes --- .../spark/deploy/master/ApplicationInfo.scala | 6 +- .../apache/spark/deploy/master/Master.scala | 62 +++++++++---------- .../cluster/SparkDeploySchedulerBackend.scala | 4 +- docs/configuration.md | 4 +- 4 files changed, 38 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 26e5534c9830e..f59d550d4f3b3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -75,8 +75,10 @@ private[deploy] class ApplicationInfo( } } - private[master] def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): - ExecutorDesc = { + private[master] def addExecutor( + worker: WorkerInfo, + cores: Int, + useID: Option[Int] = None): ExecutorDesc = { val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerExecutorMB) executors(exec.id) = exec coresGranted += cores diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f60756d65820a..00597c1731562 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -533,11 +533,14 @@ private[master] class Master( } /** - * The resource allocator spread out each app among all the workers until it has all its cores in - * spreadOut mode otherwise packs each app into as few workers as possible until it has assigned - * all its cores. User can define spark.deploy.maxCoresPerExecutor per application to - * limit the maximum number of cores to allocate to each executor on each worker; if the parameter - * is not defined, then only one executor will be launched on a worker. + * Schedule executors to be launched on the workers.There are two modes of launching executors. + * The first attempts to spread out an application's executors on as many workers as possible, + * while the second does the opposite (i.e. launch them on as few workers as possible). The former + * is usually better for data locality purposes and is the default. The number of cores assigned + * to each executor is configurable. When this is explicitly set, multiple executors from the same + * application may be launched on the same worker if the worker has enough cores and memory. + * Otherwise, each executor grabs all the cores available on the worker by default, in which case + * only one executor may be launched on each worker. */ private def startExecutorsOnWorkers(): Unit = { // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app @@ -546,7 +549,9 @@ private[master] class Master( // Try to spread out each app among all the workers, until it has all its cores for (app <- waitingApps if app.coresLeft > 0) { val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(canUse(app, _)).sortBy(_.coresFree).reverse + .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && + worker.coresFree > 0) + .sortBy(_.coresFree).reverse val numUsable = usableWorkers.length val assigned = new Array[Int](numUsable) // Number of cores to give on each node var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) @@ -566,15 +571,16 @@ private[master] class Master( } else { // Pack each app into as few workers as possible until we've assigned all its cores for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { - for (app <- waitingApps if app.coresLeft > 0) { - allocateWorkerResourceToExecutors(app, app.coresLeft, worker) + for (app <- waitingApps if app.coresLeft > 0 && + worker.memoryFree >= app.desc.memoryPerExecutorMB) { + allocateWorkerResourceToExecutors(app, app.coresLeft, worker) } } } } /** - * allocate resources in a certain worker to one or more executors + * Allocate a worker's resources to one or more executors. * @param app the info of the application which the executors belong to * @param coresToAllocate cores on this worker to be allocated to this application * @param worker the worker info @@ -583,20 +589,24 @@ private[master] class Master( app: ApplicationInfo, coresToAllocate: Int, worker: WorkerInfo): Unit = { - if (canUse(app, worker)) { - val memoryPerExecutor = app.desc.memoryPerExecutorMB - val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate) - var coresLeft = coresToAllocate - while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) { - val exec = app.addExecutor(worker, coresPerExecutor) - coresLeft -= coresPerExecutor - launchExecutor(worker, exec) - app.state = ApplicationState.RUNNING - } + val memoryPerExecutor = app.desc.memoryPerExecutorMB + val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate) + var coresLeft = coresToAllocate + while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) { + val exec = app.addExecutor(worker, coresPerExecutor) + coresLeft -= coresPerExecutor + launchExecutor(worker, exec) + app.state = ApplicationState.RUNNING } } - private def startDriversOnWorkers(): Unit = { + /** + * Schedule the currently available resources among waiting apps. This method will be called + * every time a new app joins or resource availability changes. + */ + private def schedule(): Unit = { + if (state != RecoveryState.ALIVE) { return } + // start in-cluster drivers, they take strict precedence over applications val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { for (driver <- waitingDrivers) { @@ -606,21 +616,11 @@ private[master] class Master( } } } - } - - /** - * Schedule the currently available resources among waiting apps. This method will be called - * every time a new app joins or resource availability changes. - */ - private def schedule(): Unit = { - if (state != RecoveryState.ALIVE) { return } - // start in-cluster drivers, they take strict precedence over applications - startDriversOnWorkers() // start executors startExecutorsOnWorkers() } - def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { + def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) worker.actor ! LaunchExecutor(masterUrl, diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index adfed6a6f4f8d..ed5b7c1088196 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -82,9 +82,9 @@ private[spark] class SparkDeploySchedulerBackend( val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") + val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, - command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, - conf.getOption("spark.executor.cores").map(_.toInt)) + command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() waitForRegistration() diff --git a/docs/configuration.md b/docs/configuration.md index 4b6f250b15e90..0515199f60af6 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -715,10 +715,8 @@ Apart from these, the following properties are also available, and may be useful spark.executor.cores - (infinite) + 1 in YARN mode, all the available cores on the worker in standalone mode. - Default: 1 in YARN mode, all the available cores on the worker in standalone mode. - The number of cores to use on each executor. For YARN and standalone mode only. In standalone mode, setting this parameter allows an application to run multiple executors on From 45967b4500e0d7332ca82c3511505e1cf586d64c Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 9 Apr 2015 20:58:47 -0400 Subject: [PATCH 23/27] remove unused method --- .../scala/org/apache/spark/deploy/master/Master.scala | 9 --------- 1 file changed, 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 00597c1731562..b1b864127e48f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -523,15 +523,6 @@ private[master] class Master( logInfo("Recovery complete - resuming operations!") } - /** - * Can an app use the given worker? - */ - private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = { - val enoughResources = worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree > 0 - val allowToExecute = app.desc.coresPerExecutor.isDefined || !worker.hasExecutor(app) - allowToExecute && enoughResources - } - /** * Schedule executors to be launched on the workers.There are two modes of launching executors. * The first attempts to spread out an application's executors on as many workers as possible, From b8ca561d48107713f93993729ec4e9aa72375aad Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 9 Apr 2015 21:00:44 -0400 Subject: [PATCH 24/27] revert a change --- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index b1b864127e48f..ea301c70f7aa3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -611,7 +611,7 @@ private[master] class Master( startExecutorsOnWorkers() } - def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { + private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) worker.actor ! LaunchExecutor(masterUrl, From 940cb4276c7d06f92a7077f32cb2ea748c59a873 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 10 Apr 2015 02:29:58 -0400 Subject: [PATCH 25/27] avoid unnecessary allocation --- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index ea301c70f7aa3..952cae52477c4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -555,7 +555,7 @@ private[master] class Master( pos = (pos + 1) % numUsable } // Now that we've decided how many cores to give on each node, let's actually give them - for (pos <- 0 until numUsable) { + for (pos <- 0 until numUsable if assigned(pos) > 0) { allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos)) } } From fbeb7e5001d54cfeb4e5070ec4b6df64f72b7e38 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 14 Apr 2015 16:07:02 -0400 Subject: [PATCH 26/27] address the comments --- .../apache/spark/deploy/master/Master.scala | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 952cae52477c4..5abd3dc821710 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -524,14 +524,17 @@ private[master] class Master( } /** - * Schedule executors to be launched on the workers.There are two modes of launching executors. - * The first attempts to spread out an application's executors on as many workers as possible, - * while the second does the opposite (i.e. launch them on as few workers as possible). The former - * is usually better for data locality purposes and is the default. The number of cores assigned - * to each executor is configurable. When this is explicitly set, multiple executors from the same - * application may be launched on the same worker if the worker has enough cores and memory. - * Otherwise, each executor grabs all the cores available on the worker by default, in which case - * only one executor may be launched on each worker. + * Schedule executors to be launched on the workers. + * + * There are two modes of launching executors. The first attempts to spread out an application's + * executors on as many workers as possible, while the second does the opposite (i.e. launch them + * on as few workers as possible). The former is usually better for data locality purposes and is + * the default. + * + * The number of cores assigned to each executor is configurable. When this is explicitly set, + * multiple executors from the same application may be launched on the same worker if the worker + * has enough cores and memory. Otherwise, each executor grabs all the cores available on the + * worker by default, in which case only one executor may be launched on each worker. */ private def startExecutorsOnWorkers(): Unit = { // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app @@ -541,7 +544,7 @@ private[master] class Master( for (app <- waitingApps if app.coresLeft > 0) { val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && - worker.coresFree > 0) + worker.coresFree >= app.desc.coresPerExecutor.getOrElse(0)) .sortBy(_.coresFree).reverse val numUsable = usableWorkers.length val assigned = new Array[Int](numUsable) // Number of cores to give on each node @@ -562,9 +565,8 @@ private[master] class Master( } else { // Pack each app into as few workers as possible until we've assigned all its cores for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { - for (app <- waitingApps if app.coresLeft > 0 && - worker.memoryFree >= app.desc.memoryPerExecutorMB) { - allocateWorkerResourceToExecutors(app, app.coresLeft, worker) + for (app <- waitingApps if app.coresLeft > 0) { + allocateWorkerResourceToExecutors(app, app.coresLeft, worker) } } } @@ -597,7 +599,7 @@ private[master] class Master( */ private def schedule(): Unit = { if (state != RecoveryState.ALIVE) { return } - // start in-cluster drivers, they take strict precedence over applications + // Drivers take strict precedence over executors val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { for (driver <- waitingDrivers) { @@ -607,7 +609,6 @@ private[master] class Master( } } } - // start executors startExecutorsOnWorkers() } From 6dee808d4da7da43805e4ed16fc553f7bc18f494 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 14 Apr 2015 16:13:21 -0400 Subject: [PATCH 27/27] change filter predicate --- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 5abd3dc821710..c5a6b1beac9be 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -544,7 +544,7 @@ private[master] class Master( for (app <- waitingApps if app.coresLeft > 0) { val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && - worker.coresFree >= app.desc.coresPerExecutor.getOrElse(0)) + worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1)) .sortBy(_.coresFree).reverse val numUsable = usableWorkers.length val assigned = new Array[Int](numUsable) // Number of cores to give on each node