Skip to content

Commit 3e1882d

Browse files
committed
restore the changes in SPARK-3411
1 parent 69ad2f7 commit 3e1882d

File tree

2 files changed

+19
-5
lines changed

2 files changed

+19
-5
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -727,15 +727,29 @@ private[deploy] class Master(
727727
* every time a new app joins or resource availability changes.
728728
*/
729729
private def schedule(): Unit = {
730-
if (state != RecoveryState.ALIVE) { return }
730+
if (state != RecoveryState.ALIVE) {
731+
return
732+
}
731733
// Drivers take strict precedence over executors
732-
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
733-
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
734-
for (driver <- waitingDrivers) {
734+
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
735+
val aliveWorkerNum = shuffledAliveWorkers.size
736+
var curPos = 0
737+
for (driver <- waitingDrivers.toList) {
738+
// iterate over a copy of waitingDrivers
739+
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
740+
// start from the last worker that was assigned a driver, and continue onwards until we have
741+
// explored all alive workers.
742+
curPos = (curPos + 1) % aliveWorkerNum
743+
val startPos = curPos
744+
var launched = false
745+
while (curPos != startPos && !launched) {
746+
val worker = shuffledAliveWorkers(curPos)
735747
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
736748
launchDriver(worker, driver)
737749
waitingDrivers -= driver
750+
launched = true
738751
}
752+
curPos = (curPos + 1) % aliveWorkerNum
739753
}
740754
}
741755
startExecutorsOnWorkers()

docs/configuration.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -769,7 +769,7 @@ Apart from these, the following properties are also available, and may be useful
769769
<td><code>spark.memory.offHeap.size</code></td>
770770
<td>0</td>
771771
<td>
772-
The absolute amount of memory (in terms by bytes) which can be used for off-heap allocation.
772+
The absolute amount of memory in bytes which can be used for off-heap allocation.
773773
This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly.
774774
This must be set to a positive value when <code>spark.memory.offHeap.enabled=true</code>.
775775
</td>

0 commit comments

Comments
 (0)