Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ private[spark] class ApplicationDescription(
val eventLogDir: Option[URI] = None,
// short name of compression codec used when writing event logs, if any (e.g. lzf)
val eventLogCodec: Option[String] = None,
val coresPerExecutor: Option[Int] = None)
val coresPerExecutor: Option[Int] = None,
val coresPerTask: Int = 1)
extends Serializable {

val user = System.getProperty("user.name", "<unknown>")
Expand Down
31 changes: 21 additions & 10 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -540,20 +540,27 @@ private[master] class Master(
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
if (spreadOutApps) {
// Try to spread out each app among all the workers, until it has all its cores
for (app <- waitingApps if app.coresLeft > 0) {
// Try to spread out each app among all the nodes, until it has all its cores
for (app <- waitingApps if app.coresLeft >= app.desc.coresPerTask) {
val coreNumPerTask = app.desc.coresPerTask
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1))
worker.coresFree >= math.max(app.desc.coresPerExecutor.getOrElse(1),
app.desc.coresPerTask))
.sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
while (toAssign > 0) {
if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
toAssign -= 1
assigned(pos) += 1
val availableWorkerSet = new HashSet[Int]
// Take into account the number of cores the application uses per task
// to avoid starving executors or wasting cluster resources (SPARK-5337)
while (toAssign >= coreNumPerTask && availableWorkerSet.size < usableWorkers.length) {
if (usableWorkers(pos).coresFree - assigned(pos) >= coreNumPerTask) {
toAssign -= coreNumPerTask
assigned(pos) += coreNumPerTask
} else {
availableWorkerSet += pos
}
pos = (pos + 1) % numUsable
}
Expand All @@ -565,7 +572,7 @@ private[master] class Master(
} else {
// Pack each app into as few workers as possible until we've assigned all its cores
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) {
for (app <- waitingApps if app.coresLeft >= app.desc.coresPerTask) {
allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
}
}
Expand All @@ -582,9 +589,13 @@ private[master] class Master(
app: ApplicationInfo,
coresToAllocate: Int,
worker: WorkerInfo): Unit = {
val cpuPerTask = app.desc.coresPerTask
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate)
var coresLeft = coresToAllocate
var coresLeft = coresToAllocate - coresToAllocate % cpuPerTask
val maxCoresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresLeft)
val coresPerExecutor = math.max(
maxCoresPerExecutor - maxCoresPerExecutor % cpuPerTask,
cpuPerTask)
while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) {
val exec = app.addExecutor(worker, coresPerExecutor)
coresLeft -= coresPerExecutor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ private[spark] class TaskSchedulerImpl(
// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)

if (CPUS_PER_TASK < 1) {
throw new IllegalArgumentException(
s"spark.task.cpus must be greater than 0! (was $CPUS_PER_TASK)")
}

val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt

if (maxCores < CPUS_PER_TASK) {
throw new IllegalArgumentException(
s"spark.task.cpus ($CPUS_PER_TASK) should not be larger than spark.cores.max ($maxCores)")
}

// TaskSetManagers are not thread safe, so any access to one should be synchronized
// on this class.
val activeTaskSets = new HashMap[String, TaskSetManager]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ private[spark] class SparkDeploySchedulerBackend(

private val registrationBarrier = new Semaphore(0)

private val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
private val totalExpectedCores = maxCores.getOrElse(0)

val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
val totalExpectedCores = maxCores.getOrElse(0)
val coresPerTask = scheduler.CPUS_PER_TASK

override def start() {
super.start()

Expand Down Expand Up @@ -83,8 +84,8 @@ private[spark] class SparkDeploySchedulerBackend(
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, coresPerTask)
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
waitForRegistration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ private[spark] class CoarseMesosSchedulerBackend(

// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt

val coresPerTask = scheduler.CPUS_PER_TASK

// Cores we have acquired with each Mesos task ID
val coresByTaskId = new HashMap[Int, Int]
Expand Down Expand Up @@ -213,13 +215,14 @@ private[spark] class CoarseMesosSchedulerBackend(
val slaveId = offer.getSlaveId.toString
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
val demandCores = math.min(cpus, maxCores - totalCoresAcquired)
val cpusToUse = demandCores - demandCores % coresPerTask
if (totalCoresAcquired < maxCores &&
cpusToUse >= coresPerTask &&
mem >= MemoryUtils.calculateTotalMemory(sc) &&
cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
// Launch an executor on the slave
val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
totalCoresAcquired += cpusToUse
val taskId = newMesosTaskId()
taskIdToSlaveId(taskId) = slaveId
Expand Down