diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 61556ea642614..3b2ff7072a6d1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -351,6 +351,11 @@ private[spark] class TaskSchedulerImpl( * that tasks are balanced across the cluster. */ def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { + + def availableSlots(availableCpus: Array[Int]): Int = { + availableCpus.map( _ / CPUS_PER_TASK).sum + } + // Mark each slave as alive and remember its hostname // Also track if new executor is added var newExecAvail = false @@ -386,7 +391,6 @@ private[spark] class TaskSchedulerImpl( // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) val availableCpus = shuffledOffers.map(o => o.cores).toArray - val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( @@ -401,13 +405,13 @@ private[spark] class TaskSchedulerImpl( // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { // Skip the barrier taskSet if the available slots are less than the number of pending tasks. - if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { + if (taskSet.isBarrier && availableSlots(availableCpus) < taskSet.numTasks) { // Skip the launch process. // TODO SPARK-24819 If the job requires more slots than available (both busy and free // slots), fail the job on submit. logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + - s"number of available slots is $availableSlots.") + s"number of available slots is ${availableSlots(availableCpus)}.") } else { var launchedAnyTask = false // Record all the executor IDs assigned barrier tasks on.