From deeaa897421058dbb26d4b23c8dfefaa8652ac38 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 4 May 2014 14:13:41 -0400 Subject: [PATCH 01/10] make master support multiple executors per worker --- .../spark/deploy/ApplicationDescription.scala | 2 +- .../apache/spark/deploy/JsonProtocol.scala | 4 +- .../spark/deploy/master/ApplicationInfo.scala | 2 +- .../apache/spark/deploy/master/Master.scala | 104 ++++++++++++++---- .../deploy/master/ui/ApplicationPage.scala | 2 +- .../spark/deploy/master/ui/MasterPage.scala | 4 +- 6 files changed, 87 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 86305d2ea8a09..30e7c33d1d3c4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -20,7 +20,7 @@ package org.apache.spark.deploy private[spark] class ApplicationDescription( val name: String, val maxCores: Option[Int], - val memoryPerSlave: Int, + val memoryPerExecutor: Int, val command: Command, val sparkHome: Option[String], var appUiUrl: String, diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index c4f5e294a393e..bc7ce3184a807 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -45,7 +45,7 @@ private[spark] object JsonProtocol { ("name" -> obj.desc.name) ~ ("cores" -> obj.desc.maxCores) ~ ("user" -> obj.desc.user) ~ - ("memoryperslave" -> obj.desc.memoryPerSlave) ~ + ("memoryperslave" -> obj.desc.memoryPerExecutor) ~ ("submitdate" -> obj.submitDate.toString) ~ ("state" -> obj.state.toString) ~ ("duration" -> obj.duration) @@ -54,7 +54,7 @@ private[spark] object JsonProtocol { def writeApplicationDescription(obj: ApplicationDescription) = { ("name" -> obj.name) ~ ("cores" -> obj.maxCores) ~ - ("memoryperslave" -> obj.memoryPerSlave) ~ + ("memoryperslave" -> obj.memoryPerExecutor) ~ ("user" -> obj.user) ~ ("sparkhome" -> obj.sparkHome) ~ ("command" -> obj.command.toString) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 46b9f4dc7d3ba..71b5c26f8c838 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -66,7 +66,7 @@ private[spark] class ApplicationInfo( } def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorInfo = { - val exec = new ExecutorInfo(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave) + val exec = new ExecutorInfo(newExecutorId(useID), this, worker, cores, desc.memoryPerExecutor) executors(exec.id) = exec coresGranted += cores exec diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index fdb633bd33608..7d72891dfadb2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -20,7 +20,7 @@ package org.apache.spark.deploy.master import java.text.SimpleDateFormat import java.util.Date -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.mutable.{ListBuffer, ArrayBuffer, HashMap, HashSet} import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.postfixOps @@ -457,35 +457,16 @@ private[spark] class Master( * launched an executor for the app on it (right now the standalone backend doesn't like having * two executors on the same worker). */ - def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = { - worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app) + private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = { + worker.memoryFree >= app.desc.memoryPerExecutor && !worker.hasExecutor(app) } - /** - * Schedule the currently available resources among waiting apps. This method will be called - * every time a new app joins or resource availability changes. - */ - def schedule() { - if (state != RecoveryState.ALIVE) { return } - - // First schedule drivers, they take strict precedence over applications - val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers - for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { - for (driver <- waitingDrivers) { - if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { - launchDriver(worker, driver) - waitingDrivers -= driver - } - } - } - - // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app - // in the queue, then the second app, etc. + private def startSingleExecutorPerWorker() { if (spreadOutApps) { // Try to spread out each app among all the nodes, until it has all its cores for (app <- waitingApps if app.coresLeft > 0) { val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(canUse(app, _)).sortBy(_.coresFree).reverse + .filter(canUse(app, _)).sortBy(_.coresFree).reverse val numUsable = usableWorkers.length val assigned = new Array[Int](numUsable) // Number of cores to give on each node var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) @@ -523,6 +504,81 @@ private[spark] class Master( } } + private def startMultiExecutorsPerWorker() { + val coreNumPerExecutor = conf.getInt("spark.executor.coreNumPerExecutor", 1) + // allow user to run multiple executors in the same worker + // (within the same worker JVM process) + if (spreadOutApps) { + for (app <- waitingApps if app.coresLeft > 0) { + val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) + .filter(app.desc.memoryPerExecutor < _.memoryFree).sortBy(_.coresFree).reverse + var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) + val numUsable = usableWorkers.length + // Number of cores of each executor assigned to each worker + val assigned = new Array[ListBuffer[Int]](numUsable) + var pos = 0 + while (toAssign > 0) { + val assignedCore = math.min(coreNumPerExecutor, toAssign) + if (usableWorkers(pos).coresFree - assigned(pos).sum > 0) { + toAssign -= assignedCore + assigned(pos) += assignedCore + } + pos = (pos + 1) % numUsable + } + // Now that we've decided how many executors and the core number for each to give on each node, + // let's actually give them + for (pos <- 0 until numUsable) { + for (execIdx <- 0 until assigned(pos).length) { + val exec = app.addExecutor(usableWorkers(pos), assigned(pos)(execIdx)) + launchExecutor(usableWorkers(pos), exec) + app.state = ApplicationState.RUNNING + } + } + } + } else { + // Pack each app into as few nodes as possible until we've assigned all its cores + for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { + for (app <- waitingApps if app.coresLeft > 0 && + app.desc.memoryPerExecutor <= worker.memoryFree) { + var coresLeft = math.min(worker.coresFree, app.coresLeft) + while (coresLeft > 0) { + val assignedCore = math.min(coreNumPerExecutor, coresLeft) + val exec = app.addExecutor(worker, assignedCore) + launchExecutor(worker, exec) + coresLeft -= assignedCore + app.state = ApplicationState.RUNNING + } + } + } + } + } + + + /** + * Schedule the currently available resources among waiting apps. This method will be called + * every time a new app joins or resource availability changes. + */ + def schedule() { + if (state != RecoveryState.ALIVE) { return } + + // First schedule drivers, they take strict precedence over applications + val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers + for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { + for (driver <- waitingDrivers) { + if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { + launchDriver(worker, driver) + waitingDrivers -= driver + } + } + } + + if (conf.getBoolean("spark.multiExecutorsPerWorker.enable", false)) { + startSingleExecutorPerWorker() + } else { + startMultiExecutorsPerWorker() + } + } + def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index b5cd4d2ea963f..d435f0ff22b3b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -79,7 +79,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app
  • Executor Memory: - {Utils.megabytesToString(app.desc.memoryPerSlave)} + {Utils.megabytesToString(app.desc.memoryPerExecutor)}
  • Submit Date: {app.submitDate}
  • State: {app.state}
  • diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index 7ca3b08a28728..a63c416c0db10 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -165,8 +165,8 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { {app.coresGranted} - - {Utils.megabytesToString(app.desc.memoryPerSlave)} + + {Utils.megabytesToString(app.desc.memoryPerExecutor)} {UIUtils.formatDate(app.submitDate)} {app.desc.user} From 1d17949e946f8a4cd591e88f9792647fc9e7f5a6 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 4 May 2014 14:17:12 -0400 Subject: [PATCH 02/10] doc update --- .../apache/spark/deploy/master/Master.scala | 40 +++++++++++-------- docs/configuration.md | 15 +++++++ 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7d72891dfadb2..725dc32bf26b4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -510,23 +510,31 @@ private[spark] class Master( // (within the same worker JVM process) if (spreadOutApps) { for (app <- waitingApps if app.coresLeft > 0) { - val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(app.desc.memoryPerExecutor < _.memoryFree).sortBy(_.coresFree).reverse - var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) + var usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) + .filter(_.coresFree > 0).sortBy(_.coresFree).reverse + var leftCoreToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) val numUsable = usableWorkers.length // Number of cores of each executor assigned to each worker - val assigned = new Array[ListBuffer[Int]](numUsable) + val assigned = Array.fill[ListBuffer[Int]](numUsable)(new ListBuffer[Int]) var pos = 0 - while (toAssign > 0) { - val assignedCore = math.min(coreNumPerExecutor, toAssign) - if (usableWorkers(pos).coresFree - assigned(pos).sum > 0) { - toAssign -= assignedCore - assigned(pos) += assignedCore + val memoryNotEnoughFlags = new Array[Boolean](numUsable) + while (leftCoreToAssign > 0 && memoryNotEnoughFlags.contains(false)) { + val coreToAssign = math.min(coreNumPerExecutor, leftCoreToAssign) + if (usableWorkers(pos).coresFree - assigned(pos).sum > coreToAssign && + !memoryNotEnoughFlags(pos)) { + if (usableWorkers(pos).memoryFree > + app.desc.memoryPerExecutor * (assigned(pos).length + 1)) { + leftCoreToAssign -= coreToAssign + assigned(pos) += coreToAssign + } else { + memoryNotEnoughFlags(pos) = true + } } pos = (pos + 1) % numUsable } - // Now that we've decided how many executors and the core number for each to give on each node, - // let's actually give them + + // Now that we've decided how many executors and the core number for each to + // give on each node, let's actually give them for (pos <- 0 until numUsable) { for (execIdx <- 0 until assigned(pos).length) { val exec = app.addExecutor(usableWorkers(pos), assigned(pos)(execIdx)) @@ -541,11 +549,11 @@ private[spark] class Master( for (app <- waitingApps if app.coresLeft > 0 && app.desc.memoryPerExecutor <= worker.memoryFree) { var coresLeft = math.min(worker.coresFree, app.coresLeft) - while (coresLeft > 0) { - val assignedCore = math.min(coreNumPerExecutor, coresLeft) - val exec = app.addExecutor(worker, assignedCore) + while (coresLeft > 0 && app.desc.memoryPerExecutor <= worker.memoryFree) { + val coreToAssign = math.min(coreNumPerExecutor, coresLeft) + val exec = app.addExecutor(worker, coreToAssign) launchExecutor(worker, exec) - coresLeft -= assignedCore + coresLeft -= coreToAssign app.state = ApplicationState.RUNNING } } @@ -572,7 +580,7 @@ private[spark] class Master( } } - if (conf.getBoolean("spark.multiExecutorsPerWorker.enable", false)) { + if (!conf.getBoolean("spark.executor.multiPerWorker", false)) { startSingleExecutorPerWorker() } else { startMultiExecutorsPerWorker() diff --git a/docs/configuration.md b/docs/configuration.md index 5b034e3cb3d47..8077ca70a4aa4 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -674,6 +674,21 @@ Apart from these, the following properties are also available, and may be useful with spark.executor.memory. + + spark.executor.multiPerWorker + false + + enable user to run multiple executors in the same worker. + + + + spark.executor.coreNumPerExecutor + 1 + + set the number of cores assigned to each executor; this property is only valid when + spark.executor.multiPerWorker is set to true. + + spark.executor.extraClassPath (none) From 830ae112247e7b89a0c67ca20cf37a9f533ad36e Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Tue, 6 May 2014 09:53:26 -0400 Subject: [PATCH 03/10] bug fix --- .../main/scala/org/apache/spark/deploy/master/Master.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 725dc32bf26b4..4b24ca8862a53 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -520,9 +520,9 @@ private[spark] class Master( val memoryNotEnoughFlags = new Array[Boolean](numUsable) while (leftCoreToAssign > 0 && memoryNotEnoughFlags.contains(false)) { val coreToAssign = math.min(coreNumPerExecutor, leftCoreToAssign) - if (usableWorkers(pos).coresFree - assigned(pos).sum > coreToAssign && + if (usableWorkers(pos).coresFree - assigned(pos).sum >= coreToAssign && !memoryNotEnoughFlags(pos)) { - if (usableWorkers(pos).memoryFree > + if (usableWorkers(pos).memoryFree >= app.desc.memoryPerExecutor * (assigned(pos).length + 1)) { leftCoreToAssign -= coreToAssign assigned(pos) += coreToAssign From 874ec7a20502807d759fad4598068aa920887669 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 7 May 2014 09:00:58 -0400 Subject: [PATCH 04/10] application specific corePerExecutor --- .../spark/deploy/ApplicationDescription.scala | 5 ++-- .../apache/spark/deploy/master/Master.scala | 30 ++++++++++++------- .../cluster/SparkDeploySchedulerBackend.scala | 4 ++- docs/configuration.md | 4 +-- 4 files changed, 28 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 30e7c33d1d3c4..1440a7fcfa2a7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -20,7 +20,7 @@ package org.apache.spark.deploy private[spark] class ApplicationDescription( val name: String, val maxCores: Option[Int], - val memoryPerExecutor: Int, + val memoryPerExecutor: Int, // in Mb val command: Command, val sparkHome: Option[String], var appUiUrl: String, @@ -28,6 +28,7 @@ private[spark] class ApplicationDescription( extends Serializable { val user = System.getProperty("user.name", "") - + // only valid when spark.executor.multiPerWorker is set to true + var maxCorePerExecutor = maxCores override def toString: String = "ApplicationDescription(" + name + ")" } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 4b24ca8862a53..f6dab8a0a6b37 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -458,7 +458,8 @@ private[spark] class Master( * two executors on the same worker). */ private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = { - worker.memoryFree >= app.desc.memoryPerExecutor && !worker.hasExecutor(app) + worker.memoryFree >= app.desc.memoryPerExecutor && !worker.hasExecutor(app) && + worker.coresFree > 0 } private def startSingleExecutorPerWorker() { @@ -505,29 +506,37 @@ private[spark] class Master( } private def startMultiExecutorsPerWorker() { - val coreNumPerExecutor = conf.getInt("spark.executor.coreNumPerExecutor", 1) // allow user to run multiple executors in the same worker // (within the same worker JVM process) if (spreadOutApps) { + var usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) + .filter(_.coresFree > 0).sortBy(_.coresFree).reverse for (app <- waitingApps if app.coresLeft > 0) { - var usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(_.coresFree > 0).sortBy(_.coresFree).reverse + val maxCoreNumPerExecutor = app.desc.maxCorePerExecutor.get + var mostFreeCoreWorkerPos = 0 var leftCoreToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) val numUsable = usableWorkers.length // Number of cores of each executor assigned to each worker val assigned = Array.fill[ListBuffer[Int]](numUsable)(new ListBuffer[Int]) + val assignedSum = Array.fill[Int](numUsable)(0) var pos = 0 - val memoryNotEnoughFlags = new Array[Boolean](numUsable) - while (leftCoreToAssign > 0 && memoryNotEnoughFlags.contains(false)) { - val coreToAssign = math.min(coreNumPerExecutor, leftCoreToAssign) - if (usableWorkers(pos).coresFree - assigned(pos).sum >= coreToAssign && - !memoryNotEnoughFlags(pos)) { + var noEnoughMemoryWorkerNum = 0 + var maxPossibleCore = usableWorkers(mostFreeCoreWorkerPos).coresFree + while (leftCoreToAssign > 0 && noEnoughMemoryWorkerNum < numUsable) { + if (usableWorkers(mostFreeCoreWorkerPos).coresFree < usableWorkers(pos).coresFree) { + mostFreeCoreWorkerPos = pos + maxPossibleCore = usableWorkers(mostFreeCoreWorkerPos).coresFree + } + val coreToAssign = math.min(math.min(maxCoreNumPerExecutor, maxPossibleCore), + leftCoreToAssign) + if (usableWorkers(pos).coresFree - assignedSum(pos) >= coreToAssign) { if (usableWorkers(pos).memoryFree >= app.desc.memoryPerExecutor * (assigned(pos).length + 1)) { leftCoreToAssign -= coreToAssign assigned(pos) += coreToAssign + assignedSum(pos) += coreToAssign } else { - memoryNotEnoughFlags(pos) = true + noEnoughMemoryWorkerNum += 1 } } pos = (pos + 1) % numUsable @@ -548,6 +557,7 @@ private[spark] class Master( for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { for (app <- waitingApps if app.coresLeft > 0 && app.desc.memoryPerExecutor <= worker.memoryFree) { + val coreNumPerExecutor = app.desc.maxCorePerExecutor.get var coresLeft = math.min(worker.coresFree, app.coresLeft) while (coresLeft > 0 && app.desc.memoryPerExecutor <= worker.memoryFree) { val coreToAssign = math.min(coreNumPerExecutor, coresLeft) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index cefa41729964a..906ed4f2d200a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -59,7 +59,9 @@ private[spark] class SparkDeploySchedulerBackend( val sparkHome = sc.getSparkHome() val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) - + if (conf.getBoolean("spark.executor.multiPerWorker", false)) { + appDesc.maxCorePerExecutor = Some(conf.getInt("spark.executor.maxCoreNumPerExecutor", 1)) + } client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() } diff --git a/docs/configuration.md b/docs/configuration.md index 8077ca70a4aa4..91307c0055242 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -682,10 +682,10 @@ Apart from these, the following properties are also available, and may be useful - spark.executor.coreNumPerExecutor + spark.executor.maxCoreNumPerExecutor 1 - set the number of cores assigned to each executor; this property is only valid when + set the max number of cores assigned to each executor; this property is only valid when spark.executor.multiPerWorker is set to true. From f0852cc04c4533696b4b817922a475361efe6fea Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 7 May 2014 23:05:59 -0400 Subject: [PATCH 05/10] bug fix --- .../apache/spark/deploy/master/Master.scala | 22 ++++++++++++------- .../cluster/SparkDeploySchedulerBackend.scala | 2 +- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f6dab8a0a6b37..b645442aa709d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -509,9 +509,10 @@ private[spark] class Master( // allow user to run multiple executors in the same worker // (within the same worker JVM process) if (spreadOutApps) { - var usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(_.coresFree > 0).sortBy(_.coresFree).reverse for (app <- waitingApps if app.coresLeft > 0) { + var usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) + .filter(worker => worker.coresFree > 0 && worker.memoryFree >= app.desc.memoryPerExecutor). + sortBy(_.coresFree).reverse val maxCoreNumPerExecutor = app.desc.maxCorePerExecutor.get var mostFreeCoreWorkerPos = 0 var leftCoreToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) @@ -520,12 +521,17 @@ private[spark] class Master( val assigned = Array.fill[ListBuffer[Int]](numUsable)(new ListBuffer[Int]) val assignedSum = Array.fill[Int](numUsable)(0) var pos = 0 - var noEnoughMemoryWorkerNum = 0 + val noEnoughMemoryWorkers = new HashSet[Int] var maxPossibleCore = usableWorkers(mostFreeCoreWorkerPos).coresFree - while (leftCoreToAssign > 0 && noEnoughMemoryWorkerNum < numUsable) { - if (usableWorkers(mostFreeCoreWorkerPos).coresFree < usableWorkers(pos).coresFree) { + while (leftCoreToAssign > 0 && noEnoughMemoryWorkers.size < numUsable) { + if ((usableWorkers(mostFreeCoreWorkerPos).coresFree - assignedSum(mostFreeCoreWorkerPos) < + usableWorkers(pos).coresFree - assignedSum(pos) || + noEnoughMemoryWorkers.contains(mostFreeCoreWorkerPos)) && + usableWorkers(pos).coresFree - assignedSum(pos) > 0 && + usableWorkers(pos).memoryFree >= + app.desc.memoryPerExecutor * (assigned(pos).length + 1)) { mostFreeCoreWorkerPos = pos - maxPossibleCore = usableWorkers(mostFreeCoreWorkerPos).coresFree + maxPossibleCore = usableWorkers(pos).coresFree - assignedSum(pos) } val coreToAssign = math.min(math.min(maxCoreNumPerExecutor, maxPossibleCore), leftCoreToAssign) @@ -536,7 +542,7 @@ private[spark] class Master( assigned(pos) += coreToAssign assignedSum(pos) += coreToAssign } else { - noEnoughMemoryWorkerNum += 1 + noEnoughMemoryWorkers += pos } } pos = (pos + 1) % numUsable @@ -590,7 +596,7 @@ private[spark] class Master( } } - if (!conf.getBoolean("spark.executor.multiPerWorker", false)) { + if (!conf.getBoolean("spark.executor.multiPerWorker", true)) { startSingleExecutorPerWorker() } else { startMultiExecutorsPerWorker() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 906ed4f2d200a..f1e16accfb52d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -59,7 +59,7 @@ private[spark] class SparkDeploySchedulerBackend( val sparkHome = sc.getSparkHome() val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) - if (conf.getBoolean("spark.executor.multiPerWorker", false)) { + if (conf.getBoolean("spark.executor.multiPerWorker", true)) { appDesc.maxCorePerExecutor = Some(conf.getInt("spark.executor.maxCoreNumPerExecutor", 1)) } client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) From 1be60dabe4feb420d15d1380af0a993b87e200b8 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 8 May 2014 15:56:53 -0400 Subject: [PATCH 06/10] style fix --- .../main/scala/org/apache/spark/deploy/master/Master.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index b645442aa709d..5befa71faeaac 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -511,8 +511,8 @@ private[spark] class Master( if (spreadOutApps) { for (app <- waitingApps if app.coresLeft > 0) { var usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(worker => worker.coresFree > 0 && worker.memoryFree >= app.desc.memoryPerExecutor). - sortBy(_.coresFree).reverse + .filter(worker => worker.coresFree > 0 && worker.memoryFree >= + app.desc.memoryPerExecutor).sortBy(_.coresFree).reverse val maxCoreNumPerExecutor = app.desc.maxCorePerExecutor.get var mostFreeCoreWorkerPos = 0 var leftCoreToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) From 072dd6108f842e6f603498432a26537866dba2f4 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 8 May 2014 23:43:11 -0400 Subject: [PATCH 07/10] bug fix --- .../src/main/scala/org/apache/spark/deploy/master/Master.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 5befa71faeaac..2b80ee28aca07 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -531,8 +531,9 @@ private[spark] class Master( usableWorkers(pos).memoryFree >= app.desc.memoryPerExecutor * (assigned(pos).length + 1)) { mostFreeCoreWorkerPos = pos - maxPossibleCore = usableWorkers(pos).coresFree - assignedSum(pos) } + maxPossibleCore = usableWorkers(mostFreeCoreWorkerPos).coresFree - + assignedSum(mostFreeCoreWorkerPos) val coreToAssign = math.min(math.min(maxCoreNumPerExecutor, maxPossibleCore), leftCoreToAssign) if (usableWorkers(pos).coresFree - assignedSum(pos) >= coreToAssign) { From f936a4202be42ac6dc8a971b00366db8c2399c85 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 9 May 2014 07:03:49 -0400 Subject: [PATCH 08/10] leftCoreToAssign should be limited by mem also --- .../main/scala/org/apache/spark/deploy/master/Master.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 2b80ee28aca07..5199f0582b225 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -515,7 +515,9 @@ private[spark] class Master( app.desc.memoryPerExecutor).sortBy(_.coresFree).reverse val maxCoreNumPerExecutor = app.desc.maxCorePerExecutor.get var mostFreeCoreWorkerPos = 0 - var leftCoreToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) + val maxAvailableSlots = usableWorkers.map(worker => math.min(worker.coresFree, + worker.memoryFree / app.desc.memoryPerExecutor)).sum + var leftCoreToAssign = math.min(app.coresLeft, maxAvailableSlots) val numUsable = usableWorkers.length // Number of cores of each executor assigned to each worker val assigned = Array.fill[ListBuffer[Int]](numUsable)(new ListBuffer[Int]) From 5b21f87d461a871bfb05b6e1c1e7efaf26e11d68 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 9 May 2014 07:41:51 -0400 Subject: [PATCH 09/10] capture noEnoughMemWorker every iteration --- .../scala/org/apache/spark/deploy/master/Master.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 5199f0582b225..83a846887c76f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -515,9 +515,7 @@ private[spark] class Master( app.desc.memoryPerExecutor).sortBy(_.coresFree).reverse val maxCoreNumPerExecutor = app.desc.maxCorePerExecutor.get var mostFreeCoreWorkerPos = 0 - val maxAvailableSlots = usableWorkers.map(worker => math.min(worker.coresFree, - worker.memoryFree / app.desc.memoryPerExecutor)).sum - var leftCoreToAssign = math.min(app.coresLeft, maxAvailableSlots) + var leftCoreToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) val numUsable = usableWorkers.length // Number of cores of each executor assigned to each worker val assigned = Array.fill[ListBuffer[Int]](numUsable)(new ListBuffer[Int]) @@ -544,10 +542,12 @@ private[spark] class Master( leftCoreToAssign -= coreToAssign assigned(pos) += coreToAssign assignedSum(pos) += coreToAssign - } else { - noEnoughMemoryWorkers += pos } } + if (usableWorkers(pos).memoryFree < app.desc.memoryPerExecutor * + (assigned(pos).length + 1)) { + noEnoughMemoryWorkers += pos + } pos = (pos + 1) % numUsable } From ca70401840fcd5098f105e446bd7a2792fd0f026 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 9 May 2014 09:01:19 -0400 Subject: [PATCH 10/10] limit leftCoreToAssign with Mem --- .../main/scala/org/apache/spark/deploy/master/Master.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 83a846887c76f..26ea2de1906da 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -515,7 +515,9 @@ private[spark] class Master( app.desc.memoryPerExecutor).sortBy(_.coresFree).reverse val maxCoreNumPerExecutor = app.desc.maxCorePerExecutor.get var mostFreeCoreWorkerPos = 0 - var leftCoreToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) + val maxAvailableSlots = usableWorkers.map(worker => math.min(worker.coresFree, + worker.memoryFree / app.desc.memoryPerExecutor)).sum + var leftCoreToAssign = math.min(app.coresLeft, maxAvailableSlots) val numUsable = usableWorkers.length // Number of cores of each executor assigned to each worker val assigned = Array.fill[ListBuffer[Int]](numUsable)(new ListBuffer[Int])