Skip to content

Commit 0bb54f6

Browse files
committed
recover apache#2436
1 parent 3e1882d commit 0bb54f6

File tree

1 file changed

+6
-7
lines changed
  • core/src/main/scala/org/apache/spark/deploy/master

1 file changed

+6
-7
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -732,24 +732,23 @@ private[deploy] class Master(
732732
}
733733
// Drivers take strict precedence over executors
734734
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
735-
val aliveWorkerNum = shuffledAliveWorkers.size
735+
val numWorkersAlive = shuffledAliveWorkers.size
736736
var curPos = 0
737-
for (driver <- waitingDrivers.toList) {
738-
// iterate over a copy of waitingDrivers
737+
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
739738
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
740739
// start from the last worker that was assigned a driver, and continue onwards until we have
741740
// explored all alive workers.
742-
curPos = (curPos + 1) % aliveWorkerNum
743-
val startPos = curPos
744741
var launched = false
745-
while (curPos != startPos && !launched) {
742+
var numWorkersVisited = 0
743+
while (numWorkersVisited < numWorkersAlive && !launched) {
746744
val worker = shuffledAliveWorkers(curPos)
745+
numWorkersVisited += 1
747746
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
748747
launchDriver(worker, driver)
749748
waitingDrivers -= driver
750749
launched = true
751750
}
752-
curPos = (curPos + 1) % aliveWorkerNum
751+
curPos = (curPos + 1) % numWorkersAlive
753752
}
754753
}
755754
startExecutorsOnWorkers()

0 commit comments

Comments
 (0)