From 2cea799ec783852db88ad514ede59d0150c3bcae Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 5 Sep 2015 02:01:05 -0400 Subject: [PATCH 01/12] respect spark.task.cpus when scheduling Applications --- .../spark/deploy/ApplicationDescription.scala | 3 +- .../apache/spark/deploy/master/Master.scala | 11 +++-- .../cluster/SparkDeploySchedulerBackend.scala | 14 ++++-- .../mesos/CoarseMesosSchedulerBackend.scala | 46 +++++++++---------- .../CoarseMesosSchedulerBackendSuite.scala | 10 ++-- 5 files changed, 48 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index c5c5c60923f4e..449f3e649f7b1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -32,7 +32,8 @@ private[spark] case class ApplicationDescription( // number of executors this application wants to start with, // only used if dynamic allocation is enabled initialExecutorLimit: Option[Int] = None, - user: String = System.getProperty("user.name", "")) { + user: String = System.getProperty("user.name", ""), + val coresPerTask: Int = 1) { override def toString: String = "ApplicationDescription(" + name + ")" } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 0f11f680b3914..5301e7ca4a292 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -589,9 +589,10 @@ private[deploy] class Master( private def scheduleExecutorsOnWorkers( app: ApplicationInfo, usableWorkers: Array[WorkerInfo], - spreadOutApps: Boolean): Array[Int] = { + spreadOutApps: Boolean, + coresPerTask: Int): Array[Int] = { val coresPerExecutor = app.desc.coresPerExecutor - val minCoresPerExecutor = coresPerExecutor.getOrElse(1) + val minCoresPerExecutor = math.max(coresPerExecutor.getOrElse(1), coresPerTask) val oneExecutorPerWorker = coresPerExecutor.isEmpty val memoryPerExecutor = app.desc.memoryPerExecutorMB val numUsable = usableWorkers.length @@ -659,12 +660,14 @@ private[deploy] class Master( // in the queue, then the second app, etc. for (app <- waitingApps if app.coresLeft > 0) { val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor + val coreNumPerTask = app.desc.coresPerTask // Filter out workers that don't have enough resources to launch an executor val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && - worker.coresFree >= coresPerExecutor.getOrElse(1)) + worker.coresFree >= math.max(coresPerExecutor.getOrElse(1), coreNumPerTask)) .sortBy(_.coresFree).reverse - val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) + val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps, + coreNumPerTask) // Now that we've decided how many cores to allocate on each worker, let's allocate them for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index d209645610c12..dd21755308b94 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster import java.util.concurrent.Semaphore -import org.apache.spark.{Logging, SparkConf, SparkContext} +import org.apache.spark._ import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.client.{AppClient, AppClientListener} import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} @@ -97,8 +97,16 @@ private[spark] class SparkDeploySchedulerBackend( } else { None } - val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit) + if (coresPerExecutor.isDefined && (coresPerExecutor.get < scheduler.CPUS_PER_TASK || + (coresPerExecutor.get % scheduler.CPUS_PER_TASK) != 0)) { + throw new SparkException(s"invalid configuration of " + + s"spark.executor.cores(${coresPerExecutor.get}) and " + + s"spark.task.cpus(${scheduler.CPUS_PER_TASK}}), spark.executor.cores has to be no less " + + "than and folds of spark.task.cpus") + } + val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, + command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, + coresPerTask = scheduler.CPUS_PER_TASK) client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf) client.start() launcherBackend.setState(SparkAppHandle.State.SUBMITTED) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index b82196c86b637..c7b1d23cfa27a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -54,8 +54,8 @@ private[spark] class CoarseMesosSchedulerBackend( master: String, securityManager: SecurityManager) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) - with MScheduler - with MesosSchedulerUtils { + with MScheduler + with MesosSchedulerUtils { val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures @@ -192,11 +192,11 @@ private[spark] class CoarseMesosSchedulerBackend( command.setValue( "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend" .format(prefixEnv, runScript) + - s" --driver-url $driverURL" + - s" --executor-id ${offer.getSlaveId.getValue}" + - s" --hostname ${offer.getHostname}" + - s" --cores $numCores" + - s" --app-id $appId") + s" --driver-url $driverURL" + + s" --executor-id ${offer.getSlaveId.getValue}" + + s" --hostname ${offer.getHostname}" + + s" --cores $numCores" + + s" --app-id $appId") } else { // Grab everything to the first '.'. We'll use that and '*' to // glob the directory "correctly". @@ -204,12 +204,12 @@ private[spark] class CoarseMesosSchedulerBackend( val executorId = sparkExecutorId(offer.getSlaveId.getValue, taskId.toString) command.setValue( s"cd $basename*; $prefixEnv " + - "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" + - s" --driver-url $driverURL" + - s" --executor-id $executorId" + - s" --hostname ${offer.getHostname}" + - s" --cores $numCores" + - s" --app-id $appId") + "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" + + s" --driver-url $driverURL" + + s" --executor-id $executorId" + + s" --hostname ${offer.getHostname}" + + s" --cores $numCores" + + s" --app-id $appId") command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get)) } @@ -271,11 +271,11 @@ private[spark] class CoarseMesosSchedulerBackend( val id = offer.getId.getValue if (meetsConstraints) { if (taskIdToSlaveId.size < executorLimit && - totalCoresAcquired < maxCores && - mem >= calculateTotalMemory(sc) && - cpus >= 1 && - failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && - !slaveIdsWithExecutors.contains(slaveId)) { + totalCoresAcquired < maxCores && + mem >= calculateTotalMemory(sc) && + cpus >= 1 && + failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && + !slaveIdsWithExecutors.contains(slaveId)) { // Launch an executor on the slave val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) totalCoresAcquired += cpusToUse @@ -316,7 +316,7 @@ private[spark] class CoarseMesosSchedulerBackend( // This offer does not meet constraints. We don't need to see it again. // Decline the offer for a long period of time. logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" - + s" for $rejectOfferDurationForUnmetConstraints seconds") + + s" for $rejectOfferDurationForUnmetConstraints seconds") d.declineOffer(offer.getId, Filters.newBuilder() .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()) } @@ -336,8 +336,8 @@ private[spark] class CoarseMesosSchedulerBackend( // this application when the driver exits. There is currently not a great way to detect // this through Mesos, since the shuffle services are set up independently. if (TaskState.fromMesos(state).equals(TaskState.RUNNING) && - slaveIdToHost.contains(slaveId) && - shuffleServiceEnabled) { + slaveIdToHost.contains(slaveId) && + shuffleServiceEnabled) { assume(mesosExternalShuffleClient.isDefined, "External shuffle client was not instantiated even though shuffle service is enabled.") // TODO: Remove this and allow the MesosExternalShuffleService to detect @@ -345,7 +345,7 @@ private[spark] class CoarseMesosSchedulerBackend( val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337) val hostname = slaveIdToHost.remove(slaveId).get logDebug(s"Connecting to shuffle service on slave $slaveId, " + - s"host $hostname, port $externalShufflePort for app ${conf.getAppId}") + s"host $hostname, port $externalShufflePort for app ${conf.getAppId}") mesosExternalShuffleClient.get .registerDriverWithShuffleService(hostname, externalShufflePort) } @@ -364,7 +364,7 @@ private[spark] class CoarseMesosSchedulerBackend( failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1 if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) { logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " + - "is Spark installed on it?") + "is Spark installed on it?") } } executorTerminated(d, slaveId, s"Executor finished with state $state") diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala index a4110d2d462de..7f9dd79a4e38e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -33,9 +33,9 @@ import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkCon import org.apache.spark.scheduler.TaskSchedulerImpl class CoarseMesosSchedulerBackendSuite extends SparkFunSuite - with LocalSparkContext - with MockitoSugar - with BeforeAndAfter { + with LocalSparkContext + with MockitoSugar + with BeforeAndAfter { private def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer = { val builder = Offer.newBuilder() @@ -57,8 +57,8 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite } private def createSchedulerBackend( - taskScheduler: TaskSchedulerImpl, - driver: SchedulerDriver): CoarseMesosSchedulerBackend = { + taskScheduler: TaskSchedulerImpl, + driver: SchedulerDriver): CoarseMesosSchedulerBackend = { val securityManager = mock[SecurityManager] val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master", securityManager) { override protected def createSchedulerDriver( From 65024bb50b329d78597f26c9563e2e4bb77f9fcb Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 5 Sep 2015 12:19:15 -0400 Subject: [PATCH 02/12] fix compilation error --- .../main/scala/org/apache/spark/deploy/master/Master.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 5301e7ca4a292..2aa6c58b3d86f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -589,9 +589,9 @@ private[deploy] class Master( private def scheduleExecutorsOnWorkers( app: ApplicationInfo, usableWorkers: Array[WorkerInfo], - spreadOutApps: Boolean, - coresPerTask: Int): Array[Int] = { + spreadOutApps: Boolean): Array[Int] = { val coresPerExecutor = app.desc.coresPerExecutor + val coresPerTask = app.desc.coresPerTask val minCoresPerExecutor = math.max(coresPerExecutor.getOrElse(1), coresPerTask) val oneExecutorPerWorker = coresPerExecutor.isEmpty val memoryPerExecutor = app.desc.memoryPerExecutorMB @@ -666,8 +666,7 @@ private[deploy] class Master( .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= math.max(coresPerExecutor.getOrElse(1), coreNumPerTask)) .sortBy(_.coresFree).reverse - val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps, - coreNumPerTask) + val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) // Now that we've decided how many cores to allocate on each worker, let's allocate them for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { From 47b3ee777788f9a1f8a9b76a7242fbf394c6b1c3 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 5 Sep 2015 15:59:08 -0400 Subject: [PATCH 03/12] fix Mesos issue by assigning CPUS_PER_TASK a valid value --- .../cluster/mesos/CoarseMesosSchedulerBackendSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala index 7f9dd79a4e38e..d8ad0d768e6e1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -93,6 +93,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) val taskScheduler = mock[TaskSchedulerImpl] when(taskScheduler.sc).thenReturn(sc) + when(taskScheduler.CPUS_PER_TASK).thenReturn(sparkConf.getInt("spark.task.cpus", 1)) sparkConf.set("spark.driver.host", "driverHost") sparkConf.set("spark.driver.port", "1234") @@ -144,6 +145,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) val taskScheduler = mock[TaskSchedulerImpl] when(taskScheduler.sc).thenReturn(sc) + when(taskScheduler.CPUS_PER_TASK).thenReturn(sparkConf.getInt("spark.task.cpus", 1)) val backend = createSchedulerBackend(taskScheduler, driver) val minMem = backend.calculateTotalMemory(sc) + 1024 From 01796456c7a91e0651062b6d0a7cbf3d8ba4db9b Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 7 Sep 2015 09:05:50 -0400 Subject: [PATCH 04/12] add test cases --- .../spark/deploy/master/MasterSuite.scala | 103 +++++++++++++++++- 1 file changed, 101 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index ce00807ea46b9..cb83dcd31e81e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -175,6 +175,38 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva schedulingWithCoresPerExecutor(spreadOut = false) } + test("scheduling with cores per task - spread out") { + schedulingWithCoresPerTask(true) + } + + test("scheduling with cores per task - no spread out") { + schedulingWithCoresPerTask(false) + } + + test("scheduling with cores per task AND max cores - spread out") { + schedulingWithCoresPerTaskAndMaxCores(true) + } + + test("scheduling with cores per task AND max cores - no spread out") { + schedulingWithCoresPerTaskAndMaxCores(false) + } + + test("scheduling with cores per task AND cores per executor - spread out") { + schedulingWithCoresPerTaskAndCoresPerExecutor(true) + } + + test("scheduling with cores per task AND cores per executor - no spread out") { + schedulingWithCoresPerTaskAndCoresPerExecutor(false) + } + + test("scheduling with cores per task AND cores per executor AND max cores - spread out") { + schedulingWithCoresPerTaskAndCoresPerExecutorAndMaxCores(true) + } + + test("scheduling with cores per task AND cores per executor AND max cores - no spread out") { + schedulingWithCoresPerTaskAndCoresPerExecutorAndMaxCores(false) + } + test("scheduling with cores per executor AND max cores - spread out") { schedulingWithCoresPerExecutorAndMaxCores(spreadOut = true) } @@ -348,6 +380,71 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva } } + private def schedulingWithCoresPerTask(spreadOut: Boolean): Unit = { + val master = makeMaster() + val appInfo1 = makeAppInfo(1024, coresPerTask = Some(1)) + val appInfo2 = makeAppInfo(256, coresPerTask = Some(2)) + val appInfo3 = makeAppInfo(256, coresPerTask = Some(3)) + val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut) + val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut) + val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut) + assert(scheduledCores1 === Array(10, 10, 10)) + assert(scheduledCores2 === Array(10, 10, 10)) + assert(scheduledCores3 === Array(9, 9, 9)) + } + + private def schedulingWithCoresPerTaskAndMaxCores(spreadOut: Boolean): Unit = { + val master = makeMaster() + val appInfo1 = makeAppInfo(256, coresPerTask = Some(2), maxCores = Some(4)) + val appInfo2 = makeAppInfo(256, coresPerTask = Some(2), maxCores = Some(20)) + val appInfo3 = makeAppInfo(256, coresPerTask = Some(3), maxCores = Some(20)) + val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut) + val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut) + val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut) + if (spreadOut) { + assert(scheduledCores1 === Array(2, 2, 0)) + assert(scheduledCores2 === Array(8, 6, 6)) + assert(scheduledCores3 === Array(6, 6, 6)) + } else { + assert(scheduledCores1 === Array(4, 0, 0)) + assert(scheduledCores2 === Array(10, 10, 0)) + assert(scheduledCores3 === Array(9, 9, 0)) + } + } + + private def schedulingWithCoresPerTaskAndCoresPerExecutor(spreadOut: Boolean): Unit = { + val master = makeMaster() + // we don't test the case where coresPerExecutor is larger than coresPerTask or coresPerExecutor + // is not multiple folds of coresPerTask; because we prohibit this configuration in + // SparkDeployScheduleBackend + val appInfo1 = makeAppInfo(256, coresPerTask = Some(2), coresPerExecutor = Some(2)) + val appInfo2 = makeAppInfo(256, coresPerTask = Some(2), coresPerExecutor = Some(4)) + val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut) + val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut) + assert(scheduledCores1 === Array(10, 10, 10)) + assert(scheduledCores2 === Array(8, 8, 8)) + } + + private def schedulingWithCoresPerTaskAndCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = { + val master = makeMaster() + // we don't test the case where coresPerExecutor is larger than coresPerTask or coresPerExecutor + // is not multiple folds of coresPerTask; because we prohibit this configuration in + // SparkDeployScheduleBackend + val appInfo1 = makeAppInfo(256, coresPerTask = Some(2), coresPerExecutor = Some(2), + maxCores = Some(4)) + val appInfo2 = makeAppInfo(256, coresPerTask = Some(2), coresPerExecutor = Some(4), + maxCores = Some(18)) + val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut) + val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut) + if (spreadOut) { + assert(scheduledCores1 === Array(2, 2, 0)) + assert(scheduledCores2 === Array(8, 4, 4)) + } else { + assert(scheduledCores1 === Array(4, 0, 0)) + assert(scheduledCores2 === Array(8, 8, 0)) + } + } + // ========================================== // | Utility methods and fields for testing | // ========================================== @@ -366,9 +463,11 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva private def makeAppInfo( memoryPerExecutorMb: Int, coresPerExecutor: Option[Int] = None, - maxCores: Option[Int] = None): ApplicationInfo = { + maxCores: Option[Int] = None, + coresPerTask: Option[Int] = None): ApplicationInfo = { val desc = new ApplicationDescription( - "test", maxCores, memoryPerExecutorMb, null, "", None, None, coresPerExecutor) + "test", maxCores, memoryPerExecutorMb, null, "", None, None, coresPerExecutor, + coresPerTask.getOrElse(1)) val appId = System.currentTimeMillis.toString new ApplicationInfo(0, appId, desc, new Date, null, Int.MaxValue) } From 486c9b335e32a5d7d7a112786a39ea6ff6a77b86 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 9 Sep 2015 14:12:25 -0400 Subject: [PATCH 05/12] clearer comments --- .../spark/scheduler/cluster/SparkDeploySchedulerBackend.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index dd21755308b94..50787babfce0c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -101,8 +101,8 @@ private[spark] class SparkDeploySchedulerBackend( (coresPerExecutor.get % scheduler.CPUS_PER_TASK) != 0)) { throw new SparkException(s"invalid configuration of " + s"spark.executor.cores(${coresPerExecutor.get}) and " + - s"spark.task.cpus(${scheduler.CPUS_PER_TASK}}), spark.executor.cores has to be no less " + - "than and folds of spark.task.cpus") + s"spark.task.cpus(${scheduler.CPUS_PER_TASK}}), spark.executor.cores cannot be less " + + "than spark.task.cpus and has to be divisible by it") } val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, From 1a307e35643af65075dfa6a5198eb088e838c237 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 23 Nov 2015 10:32:07 -0500 Subject: [PATCH 06/12] rename var --- .../main/scala/org/apache/spark/deploy/master/Master.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 2aa6c58b3d86f..0c820e3a6ae6a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -660,11 +660,11 @@ private[deploy] class Master( // in the queue, then the second app, etc. for (app <- waitingApps if app.coresLeft > 0) { val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor - val coreNumPerTask = app.desc.coresPerTask + val coresPerTask = app.desc.coresPerTask // Filter out workers that don't have enough resources to launch an executor val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && - worker.coresFree >= math.max(coresPerExecutor.getOrElse(1), coreNumPerTask)) + worker.coresFree >= math.max(coresPerExecutor.getOrElse(1), coresPerTask)) .sortBy(_.coresFree).reverse val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) From 7c4a0ab8398d9f0e873d867909a14ec2083cff87 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 26 Nov 2015 09:07:28 -0500 Subject: [PATCH 07/12] add comments about Spark-5337 --- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 ++ .../scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala | 2 ++ 2 files changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 0c820e3a6ae6a..a42d4cd1ffded 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -662,6 +662,8 @@ private[deploy] class Master( val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor val coresPerTask = app.desc.coresPerTask // Filter out workers that don't have enough resources to launch an executor + // the user might set the requested number of cores per task via spark.task.cpus + // we need to respect this configuratio when allocating cores to the executor (SPARK-5337) val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= math.max(coresPerExecutor.getOrElse(1), coresPerTask)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index c7b1d23cfa27a..e69f6cc9561f9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -269,6 +269,8 @@ private[spark] class CoarseMesosSchedulerBackend( val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus").toInt val id = offer.getId.getValue + // the user might set the requested number of cores per task via spark.task.cpus + // we need to respect this configuratio when allocating cores to the executor (SPARK-5337) if (meetsConstraints) { if (taskIdToSlaveId.size < executorLimit && totalCoresAcquired < maxCores && From b391a965f6bc50eb2cbda19e8521226535f8e2e5 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 4 Feb 2016 08:47:25 +0800 Subject: [PATCH 08/12] stylistic fix --- .../mesos/CoarseMesosSchedulerBackend.scala | 46 +++++++++---------- .../CoarseMesosSchedulerBackendSuite.scala | 12 ++--- 2 files changed, 27 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index e69f6cc9561f9..54fe29c68be04 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -54,8 +54,8 @@ private[spark] class CoarseMesosSchedulerBackend( master: String, securityManager: SecurityManager) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) - with MScheduler - with MesosSchedulerUtils { + with MScheduler + with MesosSchedulerUtils { val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures @@ -192,11 +192,11 @@ private[spark] class CoarseMesosSchedulerBackend( command.setValue( "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend" .format(prefixEnv, runScript) + - s" --driver-url $driverURL" + - s" --executor-id ${offer.getSlaveId.getValue}" + - s" --hostname ${offer.getHostname}" + - s" --cores $numCores" + - s" --app-id $appId") + s" --driver-url $driverURL" + + s" --executor-id ${offer.getSlaveId.getValue}" + + s" --hostname ${offer.getHostname}" + + s" --cores $numCores" + + s" --app-id $appId") } else { // Grab everything to the first '.'. We'll use that and '*' to // glob the directory "correctly". @@ -204,12 +204,12 @@ private[spark] class CoarseMesosSchedulerBackend( val executorId = sparkExecutorId(offer.getSlaveId.getValue, taskId.toString) command.setValue( s"cd $basename*; $prefixEnv " + - "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" + - s" --driver-url $driverURL" + - s" --executor-id $executorId" + - s" --hostname ${offer.getHostname}" + - s" --cores $numCores" + - s" --app-id $appId") + "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" + + s" --driver-url $driverURL" + + s" --executor-id $executorId" + + s" --hostname ${offer.getHostname}" + + s" --cores $numCores" + + s" --app-id $appId") command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get)) } @@ -269,15 +269,13 @@ private[spark] class CoarseMesosSchedulerBackend( val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus").toInt val id = offer.getId.getValue - // the user might set the requested number of cores per task via spark.task.cpus - // we need to respect this configuratio when allocating cores to the executor (SPARK-5337) if (meetsConstraints) { if (taskIdToSlaveId.size < executorLimit && - totalCoresAcquired < maxCores && - mem >= calculateTotalMemory(sc) && - cpus >= 1 && - failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && - !slaveIdsWithExecutors.contains(slaveId)) { + totalCoresAcquired < maxCores && + mem >= calculateTotalMemory(sc) && + cpus >= 1 && + failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && + !slaveIdsWithExecutors.contains(slaveId)) { // Launch an executor on the slave val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) totalCoresAcquired += cpusToUse @@ -318,7 +316,7 @@ private[spark] class CoarseMesosSchedulerBackend( // This offer does not meet constraints. We don't need to see it again. // Decline the offer for a long period of time. logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus" - + s" for $rejectOfferDurationForUnmetConstraints seconds") + + s" for $rejectOfferDurationForUnmetConstraints seconds") d.declineOffer(offer.getId, Filters.newBuilder() .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()) } @@ -338,8 +336,8 @@ private[spark] class CoarseMesosSchedulerBackend( // this application when the driver exits. There is currently not a great way to detect // this through Mesos, since the shuffle services are set up independently. if (TaskState.fromMesos(state).equals(TaskState.RUNNING) && - slaveIdToHost.contains(slaveId) && - shuffleServiceEnabled) { + slaveIdToHost.contains(slaveId) && + shuffleServiceEnabled) { assume(mesosExternalShuffleClient.isDefined, "External shuffle client was not instantiated even though shuffle service is enabled.") // TODO: Remove this and allow the MesosExternalShuffleService to detect @@ -366,7 +364,7 @@ private[spark] class CoarseMesosSchedulerBackend( failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1 if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) { logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " + - "is Spark installed on it?") + "is Spark installed on it?") } } executorTerminated(d, slaveId, s"Executor finished with state $state") diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala index d8ad0d768e6e1..a4110d2d462de 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -33,9 +33,9 @@ import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkCon import org.apache.spark.scheduler.TaskSchedulerImpl class CoarseMesosSchedulerBackendSuite extends SparkFunSuite - with LocalSparkContext - with MockitoSugar - with BeforeAndAfter { + with LocalSparkContext + with MockitoSugar + with BeforeAndAfter { private def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer = { val builder = Offer.newBuilder() @@ -57,8 +57,8 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite } private def createSchedulerBackend( - taskScheduler: TaskSchedulerImpl, - driver: SchedulerDriver): CoarseMesosSchedulerBackend = { + taskScheduler: TaskSchedulerImpl, + driver: SchedulerDriver): CoarseMesosSchedulerBackend = { val securityManager = mock[SecurityManager] val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master", securityManager) { override protected def createSchedulerDriver( @@ -93,7 +93,6 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) val taskScheduler = mock[TaskSchedulerImpl] when(taskScheduler.sc).thenReturn(sc) - when(taskScheduler.CPUS_PER_TASK).thenReturn(sparkConf.getInt("spark.task.cpus", 1)) sparkConf.set("spark.driver.host", "driverHost") sparkConf.set("spark.driver.port", "1234") @@ -145,7 +144,6 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) val taskScheduler = mock[TaskSchedulerImpl] when(taskScheduler.sc).thenReturn(sc) - when(taskScheduler.CPUS_PER_TASK).thenReturn(sparkConf.getInt("spark.task.cpus", 1)) val backend = createSchedulerBackend(taskScheduler, driver) val minMem = backend.calculateTotalMemory(sc) + 1024 From 55c2802c23ef52410c656b60b98081572e112470 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 4 Feb 2016 09:13:24 +0800 Subject: [PATCH 09/12] do not start executors with changed perCore Configuration --- .../apache/spark/deploy/ApplicationDescription.scala | 11 +++++++++++ .../scala/org/apache/spark/deploy/master/Master.scala | 3 +-- .../cluster/mesos/CoarseMesosSchedulerBackend.scala | 2 +- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 449f3e649f7b1..8081f01de2a59 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -19,6 +19,8 @@ package org.apache.spark.deploy import java.net.URI +import org.apache.spark.SparkException + private[spark] case class ApplicationDescription( name: String, maxCores: Option[Int], @@ -35,5 +37,14 @@ private[spark] case class ApplicationDescription( user: String = System.getProperty("user.name", ""), val coresPerTask: Int = 1) { + // We only check whether coresPerExecutor is no less than coresPerTask when the user has the + // explicit configuration on this. We have the explicit mechanism to prevent to start executors + // which cannot execute any task in Master. (SPARK-5337) + if (coresPerExecutor.getOrElse(Int.MaxValue) < coresPerTask) { + throw new SparkException(s"illegal configuration for application $name, " + + s"coresPerExecutor (configured value: ${coresPerExecutor.getOrElse(1)}) cannot be less " + + s"than coresPerTask (configured value: $coresPerTask)") + } + override def toString: String = "ApplicationDescription(" + name + ")" } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index a42d4cd1ffded..628a9e69d578f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -591,8 +591,7 @@ private[deploy] class Master( usableWorkers: Array[WorkerInfo], spreadOutApps: Boolean): Array[Int] = { val coresPerExecutor = app.desc.coresPerExecutor - val coresPerTask = app.desc.coresPerTask - val minCoresPerExecutor = math.max(coresPerExecutor.getOrElse(1), coresPerTask) + val minCoresPerExecutor = coresPerExecutor.getOrElse(1) val oneExecutorPerWorker = coresPerExecutor.isEmpty val memoryPerExecutor = app.desc.memoryPerExecutorMB val numUsable = usableWorkers.length diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 54fe29c68be04..b82196c86b637 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -345,7 +345,7 @@ private[spark] class CoarseMesosSchedulerBackend( val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337) val hostname = slaveIdToHost.remove(slaveId).get logDebug(s"Connecting to shuffle service on slave $slaveId, " + - s"host $hostname, port $externalShufflePort for app ${conf.getAppId}") + s"host $hostname, port $externalShufflePort for app ${conf.getAppId}") mesosExternalShuffleClient.get .registerDriverWithShuffleService(hostname, externalShufflePort) } From 90207375b12c4103aedc2ae4c89284ba38f93566 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 4 Feb 2016 09:34:44 +0800 Subject: [PATCH 10/12] compilation fix --- .../test/scala/org/apache/spark/deploy/master/MasterSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index cb83dcd31e81e..ff26bab6c2bd6 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -467,7 +467,7 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva coresPerTask: Option[Int] = None): ApplicationInfo = { val desc = new ApplicationDescription( "test", maxCores, memoryPerExecutorMb, null, "", None, None, coresPerExecutor, - coresPerTask.getOrElse(1)) + coresPerTask = coresPerTask.getOrElse(1)) val appId = System.currentTimeMillis.toString new ApplicationInfo(0, appId, desc, new Date, null, Int.MaxValue) } From 8286e972fa43135c2d7dad4f103b77d55b72e163 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 5 Feb 2016 20:47:36 +0800 Subject: [PATCH 11/12] distinguish for oneExecutorPerWorker and multiExecutorsPerWorker --- .../org/apache/spark/deploy/master/Master.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 628a9e69d578f..c0976e08d2e18 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -598,11 +598,13 @@ private[deploy] class Master( val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) + val coresPerTask = app.desc.coresPerTask /** Return whether the specified worker can launch an executor for this app. */ def canLaunchExecutor(pos: Int): Boolean = { - val keepScheduling = coresToAssign >= minCoresPerExecutor - val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor + val keepScheduling = coresToAssign >= math.max(minCoresPerExecutor, coresPerTask) + val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= + math.max(minCoresPerExecutor, coresPerTask) // If we allow multiple executors per worker, then we can always launch new executors. // Otherwise, if there is already an executor on this worker, just give it more cores. @@ -626,17 +628,21 @@ private[deploy] class Master( freeWorkers.foreach { pos => var keepScheduling = true while (keepScheduling && canLaunchExecutor(pos)) { - coresToAssign -= minCoresPerExecutor - assignedCores(pos) += minCoresPerExecutor - + var coresToAllocateInThisRnd = 0 // If we are launching one executor per worker, then every iteration assigns 1 core // to the executor. Otherwise, every iteration assigns cores to a new executor. if (oneExecutorPerWorker) { assignedExecutors(pos) = 1 + coresToAllocateInThisRnd = coresPerTask } else { assignedExecutors(pos) += 1 + coresToAllocateInThisRnd = minCoresPerExecutor } + coresToAssign -= coresToAllocateInThisRnd + assignedCores(pos) += coresToAllocateInThisRnd + + // Spreading out an application means spreading out its executors across as // many workers as possible. If we are not spreading out, then we should keep // scheduling executors on this worker until we use all of its resources. From d644654d3af68c2c9f53978ed81f19da737460c5 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 6 Feb 2016 06:45:31 +0800 Subject: [PATCH 12/12] recover initialExecutorLimit --- .../spark/scheduler/cluster/SparkDeploySchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 50787babfce0c..54e3127e47eae 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -106,7 +106,7 @@ private[spark] class SparkDeploySchedulerBackend( } val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, - coresPerTask = scheduler.CPUS_PER_TASK) + initialExecutorLimit, coresPerTask = scheduler.CPUS_PER_TASK) client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf) client.start() launcherBackend.setState(SparkAppHandle.State.SUBMITTED)