Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.deploy

import java.net.URI

import org.apache.spark.SparkException

private[spark] case class ApplicationDescription(
name: String,
maxCores: Option[Int],
Expand All @@ -32,7 +34,17 @@ private[spark] case class ApplicationDescription(
// number of executors this application wants to start with,
// only used if dynamic allocation is enabled
initialExecutorLimit: Option[Int] = None,
user: String = System.getProperty("user.name", "<unknown>")) {
user: String = System.getProperty("user.name", "<unknown>"),
val coresPerTask: Int = 1) {

// We only check whether coresPerExecutor is no less than coresPerTask when the user has the
// explicit configuration on this. We have the explicit mechanism to prevent to start executors
// which cannot execute any task in Master. (SPARK-5337)
if (coresPerExecutor.getOrElse(Int.MaxValue) < coresPerTask) {
throw new SparkException(s"illegal configuration for application $name, " +
s"coresPerExecutor (configured value: ${coresPerExecutor.getOrElse(1)}) cannot be less " +
s"than coresPerTask (configured value: $coresPerTask)")
}

override def toString: String = "ApplicationDescription(" + name + ")"
}
21 changes: 15 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -598,11 +598,13 @@ private[deploy] class Master(
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
val coresPerTask = app.desc.coresPerTask

/** Return whether the specified worker can launch an executor for this app. */
def canLaunchExecutor(pos: Int): Boolean = {
val keepScheduling = coresToAssign >= minCoresPerExecutor
val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
val keepScheduling = coresToAssign >= math.max(minCoresPerExecutor, coresPerTask)
val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >=
math.max(minCoresPerExecutor, coresPerTask)

// If we allow multiple executors per worker, then we can always launch new executors.
// Otherwise, if there is already an executor on this worker, just give it more cores.
Expand All @@ -626,17 +628,21 @@ private[deploy] class Master(
freeWorkers.foreach { pos =>
var keepScheduling = true
while (keepScheduling && canLaunchExecutor(pos)) {
coresToAssign -= minCoresPerExecutor
assignedCores(pos) += minCoresPerExecutor

var coresToAllocateInThisRnd = 0
// If we are launching one executor per worker, then every iteration assigns 1 core
// to the executor. Otherwise, every iteration assigns cores to a new executor.
if (oneExecutorPerWorker) {
assignedExecutors(pos) = 1
coresToAllocateInThisRnd = coresPerTask
} else {
assignedExecutors(pos) += 1
coresToAllocateInThisRnd = minCoresPerExecutor
}

coresToAssign -= coresToAllocateInThisRnd
assignedCores(pos) += coresToAllocateInThisRnd


// Spreading out an application means spreading out its executors across as
// many workers as possible. If we are not spreading out, then we should keep
// scheduling executors on this worker until we use all of its resources.
Expand All @@ -659,10 +665,13 @@ private[deploy] class Master(
// in the queue, then the second app, etc.
for (app <- waitingApps if app.coresLeft > 0) {
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
val coresPerTask = app.desc.coresPerTask
// Filter out workers that don't have enough resources to launch an executor
// the user might set the requested number of cores per task via spark.task.cpus
// we need to respect this configuratio when allocating cores to the executor (SPARK-5337)
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
worker.coresFree >= math.max(coresPerExecutor.getOrElse(1), coresPerTask))
.sortBy(_.coresFree).reverse
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster

import java.util.concurrent.Semaphore

import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark._
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
Expand Down Expand Up @@ -97,8 +97,16 @@ private[spark] class SparkDeploySchedulerBackend(
} else {
None
}
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
if (coresPerExecutor.isDefined && (coresPerExecutor.get < scheduler.CPUS_PER_TASK ||
(coresPerExecutor.get % scheduler.CPUS_PER_TASK) != 0)) {
throw new SparkException(s"invalid configuration of " +
s"spark.executor.cores(${coresPerExecutor.get}) and " +
s"spark.task.cpus(${scheduler.CPUS_PER_TASK}}), spark.executor.cores cannot be less " +
"than spark.task.cpus and has to be divisible by it")
}
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor,
initialExecutorLimit, coresPerTask = scheduler.CPUS_PER_TASK)
client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
Expand Down
103 changes: 101 additions & 2 deletions core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,38 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
schedulingWithCoresPerExecutor(spreadOut = false)
}

test("scheduling with cores per task - spread out") {
schedulingWithCoresPerTask(true)
}

test("scheduling with cores per task - no spread out") {
schedulingWithCoresPerTask(false)
}

test("scheduling with cores per task AND max cores - spread out") {
schedulingWithCoresPerTaskAndMaxCores(true)
}

test("scheduling with cores per task AND max cores - no spread out") {
schedulingWithCoresPerTaskAndMaxCores(false)
}

test("scheduling with cores per task AND cores per executor - spread out") {
schedulingWithCoresPerTaskAndCoresPerExecutor(true)
}

test("scheduling with cores per task AND cores per executor - no spread out") {
schedulingWithCoresPerTaskAndCoresPerExecutor(false)
}

test("scheduling with cores per task AND cores per executor AND max cores - spread out") {
schedulingWithCoresPerTaskAndCoresPerExecutorAndMaxCores(true)
}

test("scheduling with cores per task AND cores per executor AND max cores - no spread out") {
schedulingWithCoresPerTaskAndCoresPerExecutorAndMaxCores(false)
}

test("scheduling with cores per executor AND max cores - spread out") {
schedulingWithCoresPerExecutorAndMaxCores(spreadOut = true)
}
Expand Down Expand Up @@ -348,6 +380,71 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
}
}

private def schedulingWithCoresPerTask(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo1 = makeAppInfo(1024, coresPerTask = Some(1))
val appInfo2 = makeAppInfo(256, coresPerTask = Some(2))
val appInfo3 = makeAppInfo(256, coresPerTask = Some(3))
val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut)
assert(scheduledCores1 === Array(10, 10, 10))
assert(scheduledCores2 === Array(10, 10, 10))
assert(scheduledCores3 === Array(9, 9, 9))
}

private def schedulingWithCoresPerTaskAndMaxCores(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo1 = makeAppInfo(256, coresPerTask = Some(2), maxCores = Some(4))
val appInfo2 = makeAppInfo(256, coresPerTask = Some(2), maxCores = Some(20))
val appInfo3 = makeAppInfo(256, coresPerTask = Some(3), maxCores = Some(20))
val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut)
if (spreadOut) {
assert(scheduledCores1 === Array(2, 2, 0))
assert(scheduledCores2 === Array(8, 6, 6))
assert(scheduledCores3 === Array(6, 6, 6))
} else {
assert(scheduledCores1 === Array(4, 0, 0))
assert(scheduledCores2 === Array(10, 10, 0))
assert(scheduledCores3 === Array(9, 9, 0))
}
}

private def schedulingWithCoresPerTaskAndCoresPerExecutor(spreadOut: Boolean): Unit = {
val master = makeMaster()
// we don't test the case where coresPerExecutor is larger than coresPerTask or coresPerExecutor
// is not multiple folds of coresPerTask; because we prohibit this configuration in
// SparkDeployScheduleBackend
val appInfo1 = makeAppInfo(256, coresPerTask = Some(2), coresPerExecutor = Some(2))
val appInfo2 = makeAppInfo(256, coresPerTask = Some(2), coresPerExecutor = Some(4))
val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
assert(scheduledCores1 === Array(10, 10, 10))
assert(scheduledCores2 === Array(8, 8, 8))
}

private def schedulingWithCoresPerTaskAndCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = {
val master = makeMaster()
// we don't test the case where coresPerExecutor is larger than coresPerTask or coresPerExecutor
// is not multiple folds of coresPerTask; because we prohibit this configuration in
// SparkDeployScheduleBackend
val appInfo1 = makeAppInfo(256, coresPerTask = Some(2), coresPerExecutor = Some(2),
maxCores = Some(4))
val appInfo2 = makeAppInfo(256, coresPerTask = Some(2), coresPerExecutor = Some(4),
maxCores = Some(18))
val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
if (spreadOut) {
assert(scheduledCores1 === Array(2, 2, 0))
assert(scheduledCores2 === Array(8, 4, 4))
} else {
assert(scheduledCores1 === Array(4, 0, 0))
assert(scheduledCores2 === Array(8, 8, 0))
}
}

// ==========================================
// | Utility methods and fields for testing |
// ==========================================
Expand All @@ -366,9 +463,11 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
private def makeAppInfo(
memoryPerExecutorMb: Int,
coresPerExecutor: Option[Int] = None,
maxCores: Option[Int] = None): ApplicationInfo = {
maxCores: Option[Int] = None,
coresPerTask: Option[Int] = None): ApplicationInfo = {
val desc = new ApplicationDescription(
"test", maxCores, memoryPerExecutorMb, null, "", None, None, coresPerExecutor)
"test", maxCores, memoryPerExecutorMb, null, "", None, None, coresPerExecutor,
coresPerTask = coresPerTask.getOrElse(1))
val appId = System.currentTimeMillis.toString
new ApplicationInfo(0, appId, desc, new Date, null, Int.MaxValue)
}
Expand Down