From 69ad2f78556acd4d0f854b68e27697d36381a6dc Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 7 Mar 2016 09:37:37 -0500 Subject: [PATCH 1/3] improve the doc for "spark.memory.offHeap.size" --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 937852ffdecd..864fc4774b33 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -769,7 +769,7 @@ Apart from these, the following properties are also available, and may be useful spark.memory.offHeap.size 0 - The absolute amount of memory in bytes which can be used for off-heap allocation. + The absolute amount of memory (in terms by bytes) which can be used for off-heap allocation. This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true. From 3e1882d85bbbd41a0a1143a6ea8bc156cd321045 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 7 Mar 2016 09:37:37 -0500 Subject: [PATCH 2/3] restore the changes in SPARK-3411 --- .../apache/spark/deploy/master/Master.scala | 22 +++++++++++++++---- docs/configuration.md | 2 +- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 6b9b1408ee44..94f38e2ecefa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -727,15 +727,29 @@ private[deploy] class Master( * every time a new app joins or resource availability changes. */ private def schedule(): Unit = { - if (state != RecoveryState.ALIVE) { return } + if (state != RecoveryState.ALIVE) { + return + } // Drivers take strict precedence over executors - val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers - for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { - for (driver <- waitingDrivers) { + val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) + val aliveWorkerNum = shuffledAliveWorkers.size + var curPos = 0 + for (driver <- waitingDrivers.toList) { + // iterate over a copy of waitingDrivers + // We assign workers to each waiting driver in a round-robin fashion. For each driver, we + // start from the last worker that was assigned a driver, and continue onwards until we have + // explored all alive workers. + curPos = (curPos + 1) % aliveWorkerNum + val startPos = curPos + var launched = false + while (curPos != startPos && !launched) { + val worker = shuffledAliveWorkers(curPos) if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver + launched = true } + curPos = (curPos + 1) % aliveWorkerNum } } startExecutorsOnWorkers() diff --git a/docs/configuration.md b/docs/configuration.md index 864fc4774b33..937852ffdecd 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -769,7 +769,7 @@ Apart from these, the following properties are also available, and may be useful spark.memory.offHeap.size 0 - The absolute amount of memory (in terms by bytes) which can be used for off-heap allocation. + The absolute amount of memory in bytes which can be used for off-heap allocation. This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true. From 0bb54f615105d03a63517f360b88a0b3422c22f2 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 14 Mar 2016 18:23:13 -0400 Subject: [PATCH 3/3] recover #2436 --- .../org/apache/spark/deploy/master/Master.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 94f38e2ecefa..c97ad4d72350 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -732,24 +732,23 @@ private[deploy] class Master( } // Drivers take strict precedence over executors val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) - val aliveWorkerNum = shuffledAliveWorkers.size + val numWorkersAlive = shuffledAliveWorkers.size var curPos = 0 - for (driver <- waitingDrivers.toList) { - // iterate over a copy of waitingDrivers + for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers // We assign workers to each waiting driver in a round-robin fashion. For each driver, we // start from the last worker that was assigned a driver, and continue onwards until we have // explored all alive workers. - curPos = (curPos + 1) % aliveWorkerNum - val startPos = curPos var launched = false - while (curPos != startPos && !launched) { + var numWorkersVisited = 0 + while (numWorkersVisited < numWorkersAlive && !launched) { val worker = shuffledAliveWorkers(curPos) + numWorkersVisited += 1 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver launched = true } - curPos = (curPos + 1) % aliveWorkerNum + curPos = (curPos + 1) % numWorkersAlive } } startExecutorsOnWorkers()