From 71e84b62bb6dcf57669dfccffb1de45123029f0d Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 18 Sep 2014 05:06:18 +0900 Subject: [PATCH 1/4] Modified Master to enable schedule normally --- .../main/scala/org/apache/spark/deploy/master/Master.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 2a3bd6ba0b9dc..0d26641289566 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -491,14 +491,15 @@ private[spark] class Master( val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) val aliveWorkerNum = shuffledAliveWorkers.size var curPos = 0 + var stopPos = -1 for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers // We assign workers to each waiting driver in a round-robin fashion. For each driver, we // start from the last worker that was assigned a driver, and continue onwards until we have // explored all alive workers. - curPos = (curPos + 1) % aliveWorkerNum - val startPos = curPos + curPos = (stopPos + 1) % aliveWorkerNum + stopPos = curPos + aliveWorkerNum var launched = false - while (curPos != startPos && !launched) { + while (curPos != stopPos && !launched) { val worker = shuffledAliveWorkers(curPos) if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) From 4817ecd958f3a7eff1674358bd3fe08be6d02f6a Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 18 Sep 2014 05:13:30 +0900 Subject: [PATCH 2/4] Brushed up previous change --- .../main/scala/org/apache/spark/deploy/master/Master.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 0d26641289566..1fccf0f7663b5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -491,13 +491,11 @@ private[spark] class Master( val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) val aliveWorkerNum = shuffledAliveWorkers.size var curPos = 0 - var stopPos = -1 + var stopPos = aliveWorkerNum for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers // We assign workers to each waiting driver in a round-robin fashion. For each driver, we // start from the last worker that was assigned a driver, and continue onwards until we have // explored all alive workers. - curPos = (stopPos + 1) % aliveWorkerNum - stopPos = curPos + aliveWorkerNum var launched = false while (curPos != stopPos && !launched) { val worker = shuffledAliveWorkers(curPos) @@ -508,6 +506,8 @@ private[spark] class Master( } curPos = (curPos + 1) % aliveWorkerNum } + curPos = (stopPos + 1) % aliveWorkerNum + stopPos = curPos + aliveWorkerNum } // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app From 4e51e3578fa7b351ea652c2371eecc27ff92aa01 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 18 Sep 2014 05:46:53 +0900 Subject: [PATCH 3/4] Modified Master to prevent from 0 divide --- .../apache/spark/deploy/master/Master.scala | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 1fccf0f7663b5..d52d720c00780 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -490,24 +490,28 @@ private[spark] class Master( // Randomization helps balance drivers val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) val aliveWorkerNum = shuffledAliveWorkers.size - var curPos = 0 - var stopPos = aliveWorkerNum - for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers - // We assign workers to each waiting driver in a round-robin fashion. For each driver, we - // start from the last worker that was assigned a driver, and continue onwards until we have - // explored all alive workers. - var launched = false - while (curPos != stopPos && !launched) { - val worker = shuffledAliveWorkers(curPos) - if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { - launchDriver(worker, driver) - waitingDrivers -= driver - launched = true + + if (aliveWorkerNum > 0) { + var curPos = 0 + var stopPos = aliveWorkerNum + for (driver <- waitingDrivers.toList) { + // iterate over a copy of waitingDrivers + // We assign workers to each waiting driver in a round-robin fashion. For each driver, we + // start from the last worker that was assigned a driver, and continue onwards until we have + // explored all alive workers. + var launched = false + while (curPos != stopPos && !launched) { + val worker = shuffledAliveWorkers(curPos) + if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { + launchDriver(worker, driver) + waitingDrivers -= driver + launched = true + } + curPos = (curPos + 1) % aliveWorkerNum } - curPos = (curPos + 1) % aliveWorkerNum + curPos = (stopPos + 1) % aliveWorkerNum + stopPos = curPos + aliveWorkerNum } - curPos = (stopPos + 1) % aliveWorkerNum - stopPos = curPos + aliveWorkerNum } // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app From 7a4deea8552664cc53db1e20876b42631d36fae3 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 18 Sep 2014 06:54:11 +0900 Subject: [PATCH 4/4] Modified Master.scala to use numWorkersVisited and numWorkersAlive instead of stopPos --- .../apache/spark/deploy/master/Master.scala | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index d52d720c00780..432b552c58cd8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -489,28 +489,24 @@ private[spark] class Master( // First schedule drivers, they take strict precedence over applications // Randomization helps balance drivers val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) - val aliveWorkerNum = shuffledAliveWorkers.size - - if (aliveWorkerNum > 0) { - var curPos = 0 - var stopPos = aliveWorkerNum - for (driver <- waitingDrivers.toList) { - // iterate over a copy of waitingDrivers - // We assign workers to each waiting driver in a round-robin fashion. For each driver, we - // start from the last worker that was assigned a driver, and continue onwards until we have - // explored all alive workers. - var launched = false - while (curPos != stopPos && !launched) { - val worker = shuffledAliveWorkers(curPos) - if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { - launchDriver(worker, driver) - waitingDrivers -= driver - launched = true - } - curPos = (curPos + 1) % aliveWorkerNum + val numWorkersAlive = shuffledAliveWorkers.size + var curPos = 0 + + for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers + // We assign workers to each waiting driver in a round-robin fashion. For each driver, we + // start from the last worker that was assigned a driver, and continue onwards until we have + // explored all alive workers. + var launched = false + var numWorkersVisited = 0 + while (numWorkersVisited < numWorkersAlive && !launched) { + val worker = shuffledAliveWorkers(curPos) + numWorkersVisited += 1 + if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { + launchDriver(worker, driver) + waitingDrivers -= driver + launched = true } - curPos = (stopPos + 1) % aliveWorkerNum - stopPos = curPos + aliveWorkerNum + curPos = (curPos + 1) % numWorkersAlive } }