Skip to content

Commit c10f980

Browse files
committed
fix several issues on allocation
1 parent 55d9143 commit c10f980

File tree

4 files changed

+14
-17
lines changed

4 files changed

+14
-17
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,7 @@ private[master] class Master(
572572
} else {
573573
// Pack each app into as few workers as possible until we've assigned all its cores
574574
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
575-
for (app <- waitingApps if app.coresLeft > app.desc.coresPerTask) {
575+
for (app <- waitingApps if app.coresLeft >= app.desc.coresPerTask) {
576576
allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
577577
}
578578
}
@@ -592,9 +592,10 @@ private[master] class Master(
592592
val cpuPerTask = app.desc.coresPerTask
593593
val memoryPerExecutor = app.desc.memoryPerExecutorMB
594594
var coresLeft = coresToAllocate - coresToAllocate % cpuPerTask
595+
val maxCoresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresLeft)
595596
val coresPerExecutor = math.max(
596-
app.desc.coresPerExecutor.getOrElse(coresLeft),
597-
app.desc.coresPerTask)
597+
maxCoresPerExecutor - maxCoresPerExecutor % cpuPerTask,
598+
cpuPerTask)
598599
while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) {
599600
val exec = app.addExecutor(worker, coresPerExecutor)
600601
coresLeft -= coresPerExecutor

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ private[spark] class TaskSchedulerImpl(
7575
s"spark.task.cpus must be greater than 0! (was $CPUS_PER_TASK)")
7676
}
7777

78+
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
79+
80+
if (maxCores < CPUS_PER_TASK) {
81+
throw new IllegalArgumentException(
82+
s"spark.task.cpus ($CPUS_PER_TASK) should not be larger than spark.cores.max ($maxCores)")
83+
}
84+
7885
// TaskSetManagers are not thread safe, so any access to one should be synchronized
7986
// on this class.
8087
val activeTaskSets = new HashMap[String, TaskSetManager]

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,9 @@ private[spark] class SparkDeploySchedulerBackend(
4343
private val registrationBarrier = new Semaphore(0)
4444

4545
val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
46-
val coreNumPerTask = scheduler.CPUS_PER_TASK
47-
48-
if (maxCores.isDefined && maxCores.get < coreNumPerTask) {
49-
throw new IllegalArgumentException(
50-
s"spark.task.cpus ($coreNumPerTask) should not be larger than spark.cores.max ($maxCores)")
51-
}
52-
5346
val totalExpectedCores = maxCores.getOrElse(0)
54-
47+
val coresPerTask = scheduler.CPUS_PER_TASK
48+
5549
override def start() {
5650
super.start()
5751

@@ -91,7 +85,7 @@ private[spark] class SparkDeploySchedulerBackend(
9185
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
9286
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
9387
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
94-
appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, coreNumPerTask)
88+
appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, coresPerTask)
9589
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
9690
client.start()
9791
waitForRegistration()

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,6 @@ private[spark] class CoarseMesosSchedulerBackend(
6565

6666
val coresPerTask = scheduler.CPUS_PER_TASK
6767

68-
if (maxCores < coresPerTask) {
69-
throw new IllegalArgumentException(
70-
s"spark.task.cpus ($coresPerTask) should not be larger than spark.cores.max ($maxCores)")
71-
}
72-
7368
// Cores we have acquired with each Mesos task ID
7469
val coresByTaskId = new HashMap[Int, Int]
7570
var totalCoresAcquired = 0

0 commit comments

Comments
 (0)