diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 86305d2ea8a09..1440a7fcfa2a7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -20,7 +20,7 @@ package org.apache.spark.deploy private[spark] class ApplicationDescription( val name: String, val maxCores: Option[Int], - val memoryPerSlave: Int, + val memoryPerExecutor: Int, // in Mb val command: Command, val sparkHome: Option[String], var appUiUrl: String, @@ -28,6 +28,7 @@ private[spark] class ApplicationDescription( extends Serializable { val user = System.getProperty("user.name", "") - + // only valid when spark.executor.multiPerWorker is set to true + var maxCorePerExecutor = maxCores override def toString: String = "ApplicationDescription(" + name + ")" } diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index c4f5e294a393e..bc7ce3184a807 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -45,7 +45,7 @@ private[spark] object JsonProtocol { ("name" -> obj.desc.name) ~ ("cores" -> obj.desc.maxCores) ~ ("user" -> obj.desc.user) ~ - ("memoryperslave" -> obj.desc.memoryPerSlave) ~ + ("memoryperslave" -> obj.desc.memoryPerExecutor) ~ ("submitdate" -> obj.submitDate.toString) ~ ("state" -> obj.state.toString) ~ ("duration" -> obj.duration) @@ -54,7 +54,7 @@ private[spark] object JsonProtocol { def writeApplicationDescription(obj: ApplicationDescription) = { ("name" -> obj.name) ~ ("cores" -> obj.maxCores) ~ - ("memoryperslave" -> obj.memoryPerSlave) ~ + ("memoryperslave" -> obj.memoryPerExecutor) ~ ("user" -> obj.user) ~ ("sparkhome" -> obj.sparkHome) ~ ("command" -> obj.command.toString) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 46b9f4dc7d3ba..71b5c26f8c838 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -66,7 +66,7 @@ private[spark] class ApplicationInfo( } def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorInfo = { - val exec = new ExecutorInfo(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave) + val exec = new ExecutorInfo(newExecutorId(useID), this, worker, cores, desc.memoryPerExecutor) executors(exec.id) = exec coresGranted += cores exec diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index fdb633bd33608..26ea2de1906da 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -20,7 +20,7 @@ package org.apache.spark.deploy.master import java.text.SimpleDateFormat import java.util.Date -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.mutable.{ListBuffer, ArrayBuffer, HashMap, HashSet} import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.postfixOps @@ -457,35 +457,17 @@ private[spark] class Master( * launched an executor for the app on it (right now the standalone backend doesn't like having * two executors on the same worker). */ - def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = { - worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app) + private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = { + worker.memoryFree >= app.desc.memoryPerExecutor && !worker.hasExecutor(app) && + worker.coresFree > 0 } - /** - * Schedule the currently available resources among waiting apps. This method will be called - * every time a new app joins or resource availability changes. - */ - def schedule() { - if (state != RecoveryState.ALIVE) { return } - - // First schedule drivers, they take strict precedence over applications - val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers - for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { - for (driver <- waitingDrivers) { - if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { - launchDriver(worker, driver) - waitingDrivers -= driver - } - } - } - - // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app - // in the queue, then the second app, etc. + private def startSingleExecutorPerWorker() { if (spreadOutApps) { // Try to spread out each app among all the nodes, until it has all its cores for (app <- waitingApps if app.coresLeft > 0) { val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(canUse(app, _)).sortBy(_.coresFree).reverse + .filter(canUse(app, _)).sortBy(_.coresFree).reverse val numUsable = usableWorkers.length val assigned = new Array[Int](numUsable) // Number of cores to give on each node var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) @@ -523,6 +505,109 @@ private[spark] class Master( } } + private def startMultiExecutorsPerWorker() { + // allow user to run multiple executors in the same worker + // (within the same worker JVM process) + if (spreadOutApps) { + for (app <- waitingApps if app.coresLeft > 0) { + var usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) + .filter(worker => worker.coresFree > 0 && worker.memoryFree >= + app.desc.memoryPerExecutor).sortBy(_.coresFree).reverse + val maxCoreNumPerExecutor = app.desc.maxCorePerExecutor.get + var mostFreeCoreWorkerPos = 0 + val maxAvailableSlots = usableWorkers.map(worker => math.min(worker.coresFree, + worker.memoryFree / app.desc.memoryPerExecutor)).sum + var leftCoreToAssign = math.min(app.coresLeft, maxAvailableSlots) + val numUsable = usableWorkers.length + // Number of cores of each executor assigned to each worker + val assigned = Array.fill[ListBuffer[Int]](numUsable)(new ListBuffer[Int]) + val assignedSum = Array.fill[Int](numUsable)(0) + var pos = 0 + val noEnoughMemoryWorkers = new HashSet[Int] + var maxPossibleCore = usableWorkers(mostFreeCoreWorkerPos).coresFree + while (leftCoreToAssign > 0 && noEnoughMemoryWorkers.size < numUsable) { + if ((usableWorkers(mostFreeCoreWorkerPos).coresFree - assignedSum(mostFreeCoreWorkerPos) < + usableWorkers(pos).coresFree - assignedSum(pos) || + noEnoughMemoryWorkers.contains(mostFreeCoreWorkerPos)) && + usableWorkers(pos).coresFree - assignedSum(pos) > 0 && + usableWorkers(pos).memoryFree >= + app.desc.memoryPerExecutor * (assigned(pos).length + 1)) { + mostFreeCoreWorkerPos = pos + } + maxPossibleCore = usableWorkers(mostFreeCoreWorkerPos).coresFree - + assignedSum(mostFreeCoreWorkerPos) + val coreToAssign = math.min(math.min(maxCoreNumPerExecutor, maxPossibleCore), + leftCoreToAssign) + if (usableWorkers(pos).coresFree - assignedSum(pos) >= coreToAssign) { + if (usableWorkers(pos).memoryFree >= + app.desc.memoryPerExecutor * (assigned(pos).length + 1)) { + leftCoreToAssign -= coreToAssign + assigned(pos) += coreToAssign + assignedSum(pos) += coreToAssign + } + } + if (usableWorkers(pos).memoryFree < app.desc.memoryPerExecutor * + (assigned(pos).length + 1)) { + noEnoughMemoryWorkers += pos + } + pos = (pos + 1) % numUsable + } + + // Now that we've decided how many executors and the core number for each to + // give on each node, let's actually give them + for (pos <- 0 until numUsable) { + for (execIdx <- 0 until assigned(pos).length) { + val exec = app.addExecutor(usableWorkers(pos), assigned(pos)(execIdx)) + launchExecutor(usableWorkers(pos), exec) + app.state = ApplicationState.RUNNING + } + } + } + } else { + // Pack each app into as few nodes as possible until we've assigned all its cores + for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { + for (app <- waitingApps if app.coresLeft > 0 && + app.desc.memoryPerExecutor <= worker.memoryFree) { + val coreNumPerExecutor = app.desc.maxCorePerExecutor.get + var coresLeft = math.min(worker.coresFree, app.coresLeft) + while (coresLeft > 0 && app.desc.memoryPerExecutor <= worker.memoryFree) { + val coreToAssign = math.min(coreNumPerExecutor, coresLeft) + val exec = app.addExecutor(worker, coreToAssign) + launchExecutor(worker, exec) + coresLeft -= coreToAssign + app.state = ApplicationState.RUNNING + } + } + } + } + } + + + /** + * Schedule the currently available resources among waiting apps. This method will be called + * every time a new app joins or resource availability changes. + */ + def schedule() { + if (state != RecoveryState.ALIVE) { return } + + // First schedule drivers, they take strict precedence over applications + val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers + for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { + for (driver <- waitingDrivers) { + if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { + launchDriver(worker, driver) + waitingDrivers -= driver + } + } + } + + if (!conf.getBoolean("spark.executor.multiPerWorker", true)) { + startSingleExecutorPerWorker() + } else { + startMultiExecutorsPerWorker() + } + } + def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index b5cd4d2ea963f..d435f0ff22b3b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -79,7 +79,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app
  • Executor Memory: - {Utils.megabytesToString(app.desc.memoryPerSlave)} + {Utils.megabytesToString(app.desc.memoryPerExecutor)}
  • Submit Date: {app.submitDate}
  • State: {app.state}
  • diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index 7ca3b08a28728..a63c416c0db10 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -165,8 +165,8 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { {app.coresGranted} - - {Utils.megabytesToString(app.desc.memoryPerSlave)} + + {Utils.megabytesToString(app.desc.memoryPerExecutor)} {UIUtils.formatDate(app.submitDate)} {app.desc.user} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index cefa41729964a..f1e16accfb52d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -59,7 +59,9 @@ private[spark] class SparkDeploySchedulerBackend( val sparkHome = sc.getSparkHome() val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) - + if (conf.getBoolean("spark.executor.multiPerWorker", true)) { + appDesc.maxCorePerExecutor = Some(conf.getInt("spark.executor.maxCoreNumPerExecutor", 1)) + } client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() } diff --git a/docs/configuration.md b/docs/configuration.md index 5b034e3cb3d47..91307c0055242 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -674,6 +674,21 @@ Apart from these, the following properties are also available, and may be useful with spark.executor.memory. + + spark.executor.multiPerWorker + false + + enable user to run multiple executors in the same worker. + + + + spark.executor.maxCoreNumPerExecutor + 1 + + set the max number of cores assigned to each executor; this property is only valid when + spark.executor.multiPerWorker is set to true. + + spark.executor.extraClassPath (none)