From b49170a5a7189ba5e9e15b47b0a2c4144db61038 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 20 Jan 2015 20:41:45 -0500 Subject: [PATCH 01/11] respect spark.task.cpus when scheduling Applications --- .../apache/spark/deploy/ApplicationDescription.scala | 2 ++ .../org/apache/spark/deploy/master/Master.scala | 12 ++++++------ .../cluster/SparkDeploySchedulerBackend.scala | 5 +++++ 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index ae99432f5ce86..4e9ee99a1c044 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -33,6 +33,8 @@ private[spark] class ApplicationDescription( val user = System.getProperty("user.name", "") + var coreNumPerTask = 1 + def copy( name: String = name, maxCores: Option[Int] = maxCores, diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index c5a6b1beac9be..818368e60d507 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -540,8 +540,8 @@ private[master] class Master( // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. if (spreadOutApps) { - // Try to spread out each app among all the workers, until it has all its cores - for (app <- waitingApps if app.coresLeft > 0) { + // Try to spread out each app among all the nodes, until it has all its cores + for (app <- waitingApps if app.coresLeft >= app.desc.coreNumPerTask) { val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1)) @@ -550,10 +550,10 @@ private[master] class Master( val assigned = new Array[Int](numUsable) // Number of cores to give on each node var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) var pos = 0 - while (toAssign > 0) { - if (usableWorkers(pos).coresFree - assigned(pos) > 0) { - toAssign -= 1 - assigned(pos) += 1 + while (toAssign >= app.desc.coreNumPerTask) { + if (usableWorkers(pos).coresFree - assigned(pos) >= app.desc.coreNumPerTask) { + toAssign -= app.desc.coreNumPerTask + assigned(pos) += app.desc.coreNumPerTask } pos = (pos + 1) % numUsable } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index ccf1dc5af6120..bebdf52631a5d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -85,6 +85,11 @@ private[spark] class SparkDeploySchedulerBackend( val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor) + appDesc.coreNumPerTask = scheduler.CPUS_PER_TASK + if (appDesc.coreNumPerTask < 1) { + throw new IllegalArgumentException("spark.task.cpus is set to an invalid value " + + s"${appDesc.coreNumPerTask}") + } client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() waitForRegistration() From 328bedc76551191c3aa854ca3668a6e9d649da36 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 20 Jan 2015 20:50:08 -0500 Subject: [PATCH 02/11] fix style error --- .../spark/scheduler/cluster/SparkDeploySchedulerBackend.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index bebdf52631a5d..c9f6529e35473 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -87,8 +87,8 @@ private[spark] class SparkDeploySchedulerBackend( command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor) appDesc.coreNumPerTask = scheduler.CPUS_PER_TASK if (appDesc.coreNumPerTask < 1) { - throw new IllegalArgumentException("spark.task.cpus is set to an invalid value " + - s"${appDesc.coreNumPerTask}") + throw new IllegalArgumentException( + s"spark.task.cpus is set to an invalid value ${appDesc.coreNumPerTask}") } client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() From 7f1bb918cd6390d25cfd6319fa2e4f225a60fa8f Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 21 Jan 2015 06:57:39 -0500 Subject: [PATCH 03/11] respect spark.tasks.cpu in Mesos --- .../mesos/CoarseMesosSchedulerBackend.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index b037a4966ced0..4d027ca50755e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -62,7 +62,9 @@ private[spark] class CoarseMesosSchedulerBackend( // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt - + + val coreNumPerTask = conf.getInt("spark.task.cpus", 1) + // Cores we have acquired with each Mesos task ID val coresByTaskId = new HashMap[Int, Int] var totalCoresAcquired = 0 @@ -87,7 +89,12 @@ private[spark] class CoarseMesosSchedulerBackend( override def start() { super.start() - + + if (coreNumPerTask < 1) { + throw new IllegalArgumentException( + s"spark.task.cpus is set to an invalid value $coreNumPerTask") + } + synchronized { new Thread("CoarseMesosSchedulerBackend driver") { setDaemon(true) @@ -213,13 +220,13 @@ private[spark] class CoarseMesosSchedulerBackend( val slaveId = offer.getSlaveId.toString val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus").toInt + val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) if (totalCoresAcquired < maxCores && + cpusToUse >= coreNumPerTask && mem >= MemoryUtils.calculateTotalMemory(sc) && - cpus >= 1 && failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && !slaveIdsWithExecutors.contains(slaveId)) { // Launch an executor on the slave - val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) totalCoresAcquired += cpusToUse val taskId = newMesosTaskId() taskIdToSlaveId(taskId) = slaveId From 4125f2321b233bc3b1e289d7c72651281418387b Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 21 Jan 2015 12:36:12 -0500 Subject: [PATCH 04/11] address the comments --- .../org/apache/spark/deploy/master/Master.scala | 9 +++++---- .../cluster/SparkDeploySchedulerBackend.scala | 16 ++++++++-------- .../mesos/CoarseMesosSchedulerBackend.scala | 16 +++++++++------- 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 818368e60d507..b3231e45d2479 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -542,6 +542,7 @@ private[master] class Master( if (spreadOutApps) { // Try to spread out each app among all the nodes, until it has all its cores for (app <- waitingApps if app.coresLeft >= app.desc.coreNumPerTask) { + val coreNumPerTask = app.desc.coreNumPerTask val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1)) @@ -550,10 +551,10 @@ private[master] class Master( val assigned = new Array[Int](numUsable) // Number of cores to give on each node var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) var pos = 0 - while (toAssign >= app.desc.coreNumPerTask) { - if (usableWorkers(pos).coresFree - assigned(pos) >= app.desc.coreNumPerTask) { - toAssign -= app.desc.coreNumPerTask - assigned(pos) += app.desc.coreNumPerTask + while (toAssign >= coreNumPerTask) { + if (usableWorkers(pos).coresFree - assigned(pos) >= coreNumPerTask) { + toAssign -= coreNumPerTask + assigned(pos) += coreNumPerTask } pos = (pos + 1) % numUsable } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index c9f6529e35473..aa5d5a0541966 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -42,8 +42,9 @@ private[spark] class SparkDeploySchedulerBackend( private val registrationBarrier = new Semaphore(0) - private val maxCores = conf.getOption("spark.cores.max").map(_.toInt) - private val totalExpectedCores = maxCores.getOrElse(0) + val maxCores = conf.getOption("spark.cores.max").map(_.toInt) + val coreNumPerTask = scheduler.CPUS_PER_TASK + val totalExpectedCores = maxCores.getOrElse(0) override def start() { super.start() @@ -82,14 +83,13 @@ private[spark] class SparkDeploySchedulerBackend( val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") - val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) - val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, - command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor) - appDesc.coreNumPerTask = scheduler.CPUS_PER_TASK - if (appDesc.coreNumPerTask < 1) { + if (coreNumPerTask < 1) { throw new IllegalArgumentException( - s"spark.task.cpus is set to an invalid value ${appDesc.coreNumPerTask}") + s"spark.task.cpus is set to an invalid value $coreNumPerTask") } + val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, + appUIAddress, sc.eventLogDir) + appDesc.coreNumPerTask = coreNumPerTask client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() waitForRegistration() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 4d027ca50755e..ed099367d27c5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -63,8 +63,15 @@ private[spark] class CoarseMesosSchedulerBackend( // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt - val coreNumPerTask = conf.getInt("spark.task.cpus", 1) - + val coreNumPerTask = { + val corePerTask = conf.getInt("spark.task.cpus", 1) + if (corePerTask < 1) { + throw new IllegalArgumentException( + s"spark.task.cpus is set to an invalid value $corePerTask") + } + corePerTask + } + // Cores we have acquired with each Mesos task ID val coresByTaskId = new HashMap[Int, Int] var totalCoresAcquired = 0 @@ -90,11 +97,6 @@ private[spark] class CoarseMesosSchedulerBackend( override def start() { super.start() - if (coreNumPerTask < 1) { - throw new IllegalArgumentException( - s"spark.task.cpus is set to an invalid value $coreNumPerTask") - } - synchronized { new Thread("CoarseMesosSchedulerBackend driver") { setDaemon(true) From a21e6081ad7476f6dffff5bdd42a452932094cb3 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 21 Jan 2015 12:39:11 -0500 Subject: [PATCH 05/11] more fix --- .../cluster/SparkDeploySchedulerBackend.scala | 13 ++++++++----- .../cluster/mesos/CoarseMesosSchedulerBackend.scala | 2 +- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index aa5d5a0541966..751dfa0b46cf4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -43,7 +43,14 @@ private[spark] class SparkDeploySchedulerBackend( private val registrationBarrier = new Semaphore(0) val maxCores = conf.getOption("spark.cores.max").map(_.toInt) - val coreNumPerTask = scheduler.CPUS_PER_TASK + val coreNumPerTask = { + val corePerTask = scheduler.CPUS_PER_TASK + if (corePerTask < 1) { + throw new IllegalArgumentException( + s"spark.task.cpus is set to an invalid value $corePerTask") + } + corePerTask + } val totalExpectedCores = maxCores.getOrElse(0) override def start() { @@ -83,10 +90,6 @@ private[spark] class SparkDeploySchedulerBackend( val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") - if (coreNumPerTask < 1) { - throw new IllegalArgumentException( - s"spark.task.cpus is set to an invalid value $coreNumPerTask") - } val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir) appDesc.coreNumPerTask = coreNumPerTask diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index ed099367d27c5..f663735ed40c7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -96,7 +96,7 @@ private[spark] class CoarseMesosSchedulerBackend( override def start() { super.start() - + synchronized { new Thread("CoarseMesosSchedulerBackend driver") { setDaemon(true) From f1615cbd7986fb6e1710a1a25df3214a053b02a7 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 22 Jan 2015 16:51:51 -0500 Subject: [PATCH 06/11] another way --- .../cluster/SparkDeploySchedulerBackend.scala | 13 ++++++------- .../cluster/mesos/CoarseMesosSchedulerBackend.scala | 12 +++++------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 751dfa0b46cf4..cbe989a70fc6f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -43,14 +43,13 @@ private[spark] class SparkDeploySchedulerBackend( private val registrationBarrier = new Semaphore(0) val maxCores = conf.getOption("spark.cores.max").map(_.toInt) - val coreNumPerTask = { - val corePerTask = scheduler.CPUS_PER_TASK - if (corePerTask < 1) { - throw new IllegalArgumentException( - s"spark.task.cpus is set to an invalid value $corePerTask") - } - corePerTask + val coreNumPerTask = scheduler.CPUS_PER_TASK + + if (coreNumPerTask < 1) { + throw new IllegalArgumentException( + s"spark.task.cpus is set to an invalid value $coreNumPerTask") } + val totalExpectedCores = maxCores.getOrElse(0) override def start() { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index f663735ed40c7..ce691c063ae0a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -63,13 +63,11 @@ private[spark] class CoarseMesosSchedulerBackend( // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt - val coreNumPerTask = { - val corePerTask = conf.getInt("spark.task.cpus", 1) - if (corePerTask < 1) { - throw new IllegalArgumentException( - s"spark.task.cpus is set to an invalid value $corePerTask") - } - corePerTask + val coreNumPerTask = conf.getInt("spark.task.cpus", 1) + + if (coreNumPerTask < 1) { + throw new IllegalArgumentException( + s"spark.task.cpus is set to an invalid value $coreNumPerTask") } // Cores we have acquired with each Mesos task ID From e771377727c30d68580d0a24a65ab9bf70b31e36 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 22 Jan 2015 16:57:32 -0500 Subject: [PATCH 07/11] check maxCores and coreNumPerTask --- .../scheduler/cluster/SparkDeploySchedulerBackend.scala | 5 +++++ .../cluster/mesos/CoarseMesosSchedulerBackend.scala | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index cbe989a70fc6f..6d9223ed1a630 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -49,6 +49,11 @@ private[spark] class SparkDeploySchedulerBackend( throw new IllegalArgumentException( s"spark.task.cpus is set to an invalid value $coreNumPerTask") } + + if (maxCores.isDefined && maxCores.get < coreNumPerTask) { + throw new IllegalArgumentException( + s"spark.task.cpus ($coreNumPerTask) should not be larger than spark.cores.max ($maxCores)") + } val totalExpectedCores = maxCores.getOrElse(0) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index ce691c063ae0a..83f6f655151b1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -70,6 +70,11 @@ private[spark] class CoarseMesosSchedulerBackend( s"spark.task.cpus is set to an invalid value $coreNumPerTask") } + if (maxCores < coreNumPerTask) { + throw new IllegalArgumentException( + s"spark.task.cpus ($coreNumPerTask) should not be larger than spark.cores.max ($maxCores)") + } + // Cores we have acquired with each Mesos task ID val coresByTaskId = new HashMap[Int, Int] var totalCoresAcquired = 0 From 0aec15936cdd61ba864399897a16d553fff0b389 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 13 Mar 2015 06:53:56 -0400 Subject: [PATCH 08/11] fix rebase mistake --- .../spark/scheduler/cluster/SparkDeploySchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 6d9223ed1a630..ebb99ec87af73 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -95,7 +95,7 @@ private[spark] class SparkDeploySchedulerBackend( args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - appUIAddress, sc.eventLogDir) + appUIAddress, sc.eventLogDir, sc.eventLogCodec) appDesc.coreNumPerTask = coreNumPerTask client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() From da8d4468e6789c785b0665655c8e0e3851826b75 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 20 Apr 2015 15:33:44 -0400 Subject: [PATCH 09/11] address the comments --- .../spark/deploy/ApplicationDescription.scala | 5 ++-- .../apache/spark/deploy/master/Master.scala | 23 +++++++++++++------ .../spark/scheduler/TaskSchedulerImpl.scala | 5 ++++ .../cluster/SparkDeploySchedulerBackend.scala | 9 ++------ .../mesos/CoarseMesosSchedulerBackend.scala | 16 +++++-------- 5 files changed, 31 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 4e9ee99a1c044..281725bb7327e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -28,13 +28,12 @@ private[spark] class ApplicationDescription( val eventLogDir: Option[URI] = None, // short name of compression codec used when writing event logs, if any (e.g. lzf) val eventLogCodec: Option[String] = None, - val coresPerExecutor: Option[Int] = None) + val coresPerExecutor: Option[Int] = None, + val coresPerTask: Int = 1) extends Serializable { val user = System.getProperty("user.name", "") - var coreNumPerTask = 1 - def copy( name: String = name, maxCores: Option[Int] = maxCores, diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index b3231e45d2479..6e62eabc4a1f1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -541,20 +541,26 @@ private[master] class Master( // in the queue, then the second app, etc. if (spreadOutApps) { // Try to spread out each app among all the nodes, until it has all its cores - for (app <- waitingApps if app.coresLeft >= app.desc.coreNumPerTask) { - val coreNumPerTask = app.desc.coreNumPerTask + for (app <- waitingApps if app.coresLeft >= app.desc.coresPerTask) { + val coreNumPerTask = app.desc.coresPerTask val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && - worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1)) + worker.coresFree >= math.max(app.desc.coresPerExecutor.getOrElse(1), + app.desc.coresPerTask)) .sortBy(_.coresFree).reverse val numUsable = usableWorkers.length val assigned = new Array[Int](numUsable) // Number of cores to give on each node var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) var pos = 0 - while (toAssign >= coreNumPerTask) { + val availableWorkerSet = new HashSet[Int] + // Take into account the number of cores the application uses per task + // to avoid starving executors or wasting cluster resources (SPARK-5337) + while (toAssign >= coreNumPerTask && availableWorkerSet.size < usableWorkers.length) { if (usableWorkers(pos).coresFree - assigned(pos) >= coreNumPerTask) { toAssign -= coreNumPerTask assigned(pos) += coreNumPerTask + } else { + availableWorkerSet += pos } pos = (pos + 1) % numUsable } @@ -566,7 +572,7 @@ private[master] class Master( } else { // Pack each app into as few workers as possible until we've assigned all its cores for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { - for (app <- waitingApps if app.coresLeft > 0) { + for (app <- waitingApps if app.coresLeft > app.desc.coresPerTask) { allocateWorkerResourceToExecutors(app, app.coresLeft, worker) } } @@ -583,9 +589,12 @@ private[master] class Master( app: ApplicationInfo, coresToAllocate: Int, worker: WorkerInfo): Unit = { + val cpuPerTask = app.desc.coresPerTask val memoryPerExecutor = app.desc.memoryPerExecutorMB - val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate) - var coresLeft = coresToAllocate + var coresLeft = coresToAllocate - coresToAllocate % cpuPerTask + val coresPerExecutor = math.max( + app.desc.coresPerExecutor.getOrElse(coresLeft), + app.desc.coresPerTask) while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) { val exec = app.addExecutor(worker, coresPerExecutor) coresLeft -= coresPerExecutor diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 13a52d836f32f..4a285e3913670 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -70,6 +70,11 @@ private[spark] class TaskSchedulerImpl( // CPUs to request per task val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) + if (CPUS_PER_TASK < 1) { + throw new IllegalArgumentException( + s"\"spark.task.cpus\" must be greater than 0! (was $CPUS_PER_TASK)") + } + // TaskSetManagers are not thread safe, so any access to one should be synchronized // on this class. val activeTaskSets = new HashMap[String, TaskSetManager] diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index ebb99ec87af73..ec008503b9ecc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -45,11 +45,6 @@ private[spark] class SparkDeploySchedulerBackend( val maxCores = conf.getOption("spark.cores.max").map(_.toInt) val coreNumPerTask = scheduler.CPUS_PER_TASK - if (coreNumPerTask < 1) { - throw new IllegalArgumentException( - s"spark.task.cpus is set to an invalid value $coreNumPerTask") - } - if (maxCores.isDefined && maxCores.get < coreNumPerTask) { throw new IllegalArgumentException( s"spark.task.cpus ($coreNumPerTask) should not be larger than spark.cores.max ($maxCores)") @@ -94,9 +89,9 @@ private[spark] class SparkDeploySchedulerBackend( val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") + val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - appUIAddress, sc.eventLogDir, sc.eventLogCodec) - appDesc.coreNumPerTask = coreNumPerTask + appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, coreNumPerTask) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() waitForRegistration() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 83f6f655151b1..0a77052bbe95d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -63,16 +63,11 @@ private[spark] class CoarseMesosSchedulerBackend( // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt - val coreNumPerTask = conf.getInt("spark.task.cpus", 1) - - if (coreNumPerTask < 1) { - throw new IllegalArgumentException( - s"spark.task.cpus is set to an invalid value $coreNumPerTask") - } + val coresPerTask = scheduler.CPUS_PER_TASK - if (maxCores < coreNumPerTask) { + if (maxCores < coresPerTask) { throw new IllegalArgumentException( - s"spark.task.cpus ($coreNumPerTask) should not be larger than spark.cores.max ($maxCores)") + s"spark.task.cpus ($coresPerTask) should not be larger than spark.cores.max ($maxCores)") } // Cores we have acquired with each Mesos task ID @@ -225,9 +220,10 @@ private[spark] class CoarseMesosSchedulerBackend( val slaveId = offer.getSlaveId.toString val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus").toInt - val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) + val demandCores = math.min(cpus, maxCores - totalCoresAcquired) + val cpusToUse = demandCores - demandCores % coresPerTask if (totalCoresAcquired < maxCores && - cpusToUse >= coreNumPerTask && + cpusToUse >= coresPerTask && mem >= MemoryUtils.calculateTotalMemory(sc) && failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && !slaveIdsWithExecutors.contains(slaveId)) { From 55d9143629cc9cb74ae1c1997aba7695a8e7204d Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 20 Apr 2015 15:48:06 -0400 Subject: [PATCH 10/11] work around scalastyle checker --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 4a285e3913670..67644eeede109 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -72,7 +72,7 @@ private[spark] class TaskSchedulerImpl( if (CPUS_PER_TASK < 1) { throw new IllegalArgumentException( - s"\"spark.task.cpus\" must be greater than 0! (was $CPUS_PER_TASK)") + s"spark.task.cpus must be greater than 0! (was $CPUS_PER_TASK)") } // TaskSetManagers are not thread safe, so any access to one should be synchronized From c10f980f437d326cab031b6927927c848cdec4cc Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 20 Apr 2015 16:03:08 -0400 Subject: [PATCH 11/11] fix several issues on allocation --- .../org/apache/spark/deploy/master/Master.scala | 7 ++++--- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 7 +++++++ .../cluster/SparkDeploySchedulerBackend.scala | 12 +++--------- .../cluster/mesos/CoarseMesosSchedulerBackend.scala | 5 ----- 4 files changed, 14 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 6e62eabc4a1f1..dd637b2044317 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -572,7 +572,7 @@ private[master] class Master( } else { // Pack each app into as few workers as possible until we've assigned all its cores for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { - for (app <- waitingApps if app.coresLeft > app.desc.coresPerTask) { + for (app <- waitingApps if app.coresLeft >= app.desc.coresPerTask) { allocateWorkerResourceToExecutors(app, app.coresLeft, worker) } } @@ -592,9 +592,10 @@ private[master] class Master( val cpuPerTask = app.desc.coresPerTask val memoryPerExecutor = app.desc.memoryPerExecutorMB var coresLeft = coresToAllocate - coresToAllocate % cpuPerTask + val maxCoresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresLeft) val coresPerExecutor = math.max( - app.desc.coresPerExecutor.getOrElse(coresLeft), - app.desc.coresPerTask) + maxCoresPerExecutor - maxCoresPerExecutor % cpuPerTask, + cpuPerTask) while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) { val exec = app.addExecutor(worker, coresPerExecutor) coresLeft -= coresPerExecutor diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 67644eeede109..3980fed4a7600 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -75,6 +75,13 @@ private[spark] class TaskSchedulerImpl( s"spark.task.cpus must be greater than 0! (was $CPUS_PER_TASK)") } + val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt + + if (maxCores < CPUS_PER_TASK) { + throw new IllegalArgumentException( + s"spark.task.cpus ($CPUS_PER_TASK) should not be larger than spark.cores.max ($maxCores)") + } + // TaskSetManagers are not thread safe, so any access to one should be synchronized // on this class. val activeTaskSets = new HashMap[String, TaskSetManager] diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index ec008503b9ecc..525b09d2cdb87 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -43,15 +43,9 @@ private[spark] class SparkDeploySchedulerBackend( private val registrationBarrier = new Semaphore(0) val maxCores = conf.getOption("spark.cores.max").map(_.toInt) - val coreNumPerTask = scheduler.CPUS_PER_TASK - - if (maxCores.isDefined && maxCores.get < coreNumPerTask) { - throw new IllegalArgumentException( - s"spark.task.cpus ($coreNumPerTask) should not be larger than spark.cores.max ($maxCores)") - } - val totalExpectedCores = maxCores.getOrElse(0) - + val coresPerTask = scheduler.CPUS_PER_TASK + override def start() { super.start() @@ -91,7 +85,7 @@ private[spark] class SparkDeploySchedulerBackend( val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, coreNumPerTask) + appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, coresPerTask) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() waitForRegistration() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 0a77052bbe95d..825f7b88282a1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -65,11 +65,6 @@ private[spark] class CoarseMesosSchedulerBackend( val coresPerTask = scheduler.CPUS_PER_TASK - if (maxCores < coresPerTask) { - throw new IllegalArgumentException( - s"spark.task.cpus ($coresPerTask) should not be larger than spark.cores.max ($maxCores)") - } - // Cores we have acquired with each Mesos task ID val coresByTaskId = new HashMap[Int, Int] var totalCoresAcquired = 0