Skip to content

Commit da8d446

Browse files
committed
address the comments
1 parent 0aec159 commit da8d446

File tree

5 files changed

+31
-27
lines changed

5 files changed

+31
-27
lines changed

core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,12 @@ private[spark] class ApplicationDescription(
2828
val eventLogDir: Option[URI] = None,
2929
// short name of compression codec used when writing event logs, if any (e.g. lzf)
3030
val eventLogCodec: Option[String] = None,
31-
val coresPerExecutor: Option[Int] = None)
31+
val coresPerExecutor: Option[Int] = None,
32+
val coresPerTask: Int = 1)
3233
extends Serializable {
3334

3435
val user = System.getProperty("user.name", "<unknown>")
3536

36-
var coreNumPerTask = 1
37-
3837
def copy(
3938
name: String = name,
4039
maxCores: Option[Int] = maxCores,

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -541,20 +541,26 @@ private[master] class Master(
541541
// in the queue, then the second app, etc.
542542
if (spreadOutApps) {
543543
// Try to spread out each app among all the nodes, until it has all its cores
544-
for (app <- waitingApps if app.coresLeft >= app.desc.coreNumPerTask) {
545-
val coreNumPerTask = app.desc.coreNumPerTask
544+
for (app <- waitingApps if app.coresLeft >= app.desc.coresPerTask) {
545+
val coreNumPerTask = app.desc.coresPerTask
546546
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
547547
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
548-
worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1))
548+
worker.coresFree >= math.max(app.desc.coresPerExecutor.getOrElse(1),
549+
app.desc.coresPerTask))
549550
.sortBy(_.coresFree).reverse
550551
val numUsable = usableWorkers.length
551552
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
552553
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
553554
var pos = 0
554-
while (toAssign >= coreNumPerTask) {
555+
val availableWorkerSet = new HashSet[Int]
556+
// Take into account the number of cores the application uses per task
557+
// to avoid starving executors or wasting cluster resources (SPARK-5337)
558+
while (toAssign >= coreNumPerTask && availableWorkerSet.size < usableWorkers.length) {
555559
if (usableWorkers(pos).coresFree - assigned(pos) >= coreNumPerTask) {
556560
toAssign -= coreNumPerTask
557561
assigned(pos) += coreNumPerTask
562+
} else {
563+
availableWorkerSet += pos
558564
}
559565
pos = (pos + 1) % numUsable
560566
}
@@ -566,7 +572,7 @@ private[master] class Master(
566572
} else {
567573
// Pack each app into as few workers as possible until we've assigned all its cores
568574
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
569-
for (app <- waitingApps if app.coresLeft > 0) {
575+
for (app <- waitingApps if app.coresLeft > app.desc.coresPerTask) {
570576
allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
571577
}
572578
}
@@ -583,9 +589,12 @@ private[master] class Master(
583589
app: ApplicationInfo,
584590
coresToAllocate: Int,
585591
worker: WorkerInfo): Unit = {
592+
val cpuPerTask = app.desc.coresPerTask
586593
val memoryPerExecutor = app.desc.memoryPerExecutorMB
587-
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate)
588-
var coresLeft = coresToAllocate
594+
var coresLeft = coresToAllocate - coresToAllocate % cpuPerTask
595+
val coresPerExecutor = math.max(
596+
app.desc.coresPerExecutor.getOrElse(coresLeft),
597+
app.desc.coresPerTask)
589598
while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) {
590599
val exec = app.addExecutor(worker, coresPerExecutor)
591600
coresLeft -= coresPerExecutor

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ private[spark] class TaskSchedulerImpl(
7070
// CPUs to request per task
7171
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
7272

73+
if (CPUS_PER_TASK < 1) {
74+
throw new IllegalArgumentException(
75+
s"\"spark.task.cpus\" must be greater than 0! (was $CPUS_PER_TASK)")
76+
}
77+
7378
// TaskSetManagers are not thread safe, so any access to one should be synchronized
7479
// on this class.
7580
val activeTaskSets = new HashMap[String, TaskSetManager]

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,6 @@ private[spark] class SparkDeploySchedulerBackend(
4545
val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
4646
val coreNumPerTask = scheduler.CPUS_PER_TASK
4747

48-
if (coreNumPerTask < 1) {
49-
throw new IllegalArgumentException(
50-
s"spark.task.cpus is set to an invalid value $coreNumPerTask")
51-
}
52-
5348
if (maxCores.isDefined && maxCores.get < coreNumPerTask) {
5449
throw new IllegalArgumentException(
5550
s"spark.task.cpus ($coreNumPerTask) should not be larger than spark.cores.max ($maxCores)")
@@ -94,9 +89,9 @@ private[spark] class SparkDeploySchedulerBackend(
9489
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
9590
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
9691
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
92+
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
9793
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
98-
appUIAddress, sc.eventLogDir, sc.eventLogCodec)
99-
appDesc.coreNumPerTask = coreNumPerTask
94+
appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, coreNumPerTask)
10095
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
10196
client.start()
10297
waitForRegistration()

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,16 +63,11 @@ private[spark] class CoarseMesosSchedulerBackend(
6363
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
6464
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
6565

66-
val coreNumPerTask = conf.getInt("spark.task.cpus", 1)
67-
68-
if (coreNumPerTask < 1) {
69-
throw new IllegalArgumentException(
70-
s"spark.task.cpus is set to an invalid value $coreNumPerTask")
71-
}
66+
val coresPerTask = scheduler.CPUS_PER_TASK
7267

73-
if (maxCores < coreNumPerTask) {
68+
if (maxCores < coresPerTask) {
7469
throw new IllegalArgumentException(
75-
s"spark.task.cpus ($coreNumPerTask) should not be larger than spark.cores.max ($maxCores)")
70+
s"spark.task.cpus ($coresPerTask) should not be larger than spark.cores.max ($maxCores)")
7671
}
7772

7873
// Cores we have acquired with each Mesos task ID
@@ -225,9 +220,10 @@ private[spark] class CoarseMesosSchedulerBackend(
225220
val slaveId = offer.getSlaveId.toString
226221
val mem = getResource(offer.getResourcesList, "mem")
227222
val cpus = getResource(offer.getResourcesList, "cpus").toInt
228-
val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
223+
val demandCores = math.min(cpus, maxCores - totalCoresAcquired)
224+
val cpusToUse = demandCores - demandCores % coresPerTask
229225
if (totalCoresAcquired < maxCores &&
230-
cpusToUse >= coreNumPerTask &&
226+
cpusToUse >= coresPerTask &&
231227
mem >= MemoryUtils.calculateTotalMemory(sc) &&
232228
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
233229
!slaveIdsWithExecutors.contains(slaveId)) {

0 commit comments

Comments
 (0)