Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ private[spark] class ExecutorAllocationManager(

// Lower and upper bounds on the number of executors.
private val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
// Upper bounds is dynamic for Spark on YARN
private var maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
private val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)

// How long there must be backlogged tasks for before an addition is triggered (seconds)
Expand Down Expand Up @@ -348,6 +349,15 @@ private[spark] class ExecutorAllocationManager(
* @return the number of additional executors actually requested.
*/
private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
// Update EnvironmentPage if spark.dynamicAllocation.maxExecutors changed.
if (maxNumExecutors != conf.get(DYN_ALLOCATION_MAX_EXECUTORS)) {
maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
val sc = listenerBus.sparkContext
val envDetails = SparkEnv.environmentDetails(conf, sc.getSchedulingMode.toString,
sc.addedJars.keys.toSeq, sc.addedFiles.keys.toSeq)
val event = SparkListenerEnvironmentUpdate(envDetails)
listenerBus.postToAll(event)
}
// Do not request more executors if it would put our target over the upper bound
if (numExecutorsTarget >= maxNumExecutors) {
logDebug(s"Not adding executors because our current target total " +
Expand Down
3 changes: 2 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1541,7 +1541,8 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.dynamicAllocation.maxExecutors</code></td>
<td>infinity</td>
<td>Depends on queue's max resources for YARN;
infinity for standalone mode and Mesos mode</td>
<td>
Upper bound for the number of executors if dynamic allocation is enabled.
</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.hadoop.yarn.util.Records

import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException}
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,25 @@

package org.apache.spark.scheduler.cluster

import scala.concurrent.{ExecutionContext, Future}
import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.util.{Failure, Success}
import scala.util.control.NonFatal

import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId, NodeState, QueueInfo}
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration

import org.apache.spark.SparkContext
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.rpc._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.ui.JettyUtils
import org.apache.spark.util.{RpcUtils, ThreadUtils}
import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils}

/**
* Abstract Yarn scheduler backend that contains common logic
Expand All @@ -54,6 +60,8 @@ private[spark] abstract class YarnSchedulerBackend(
private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint(
YarnSchedulerBackend.ENDPOINT_NAME, yarnSchedulerEndpoint)

private val yarnClient = YarnClient.createYarnClient

private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)

/** Application ID. */
Expand All @@ -68,6 +76,8 @@ private[spark] abstract class YarnSchedulerBackend(
// Flag to specify whether this schedulerBackend should be reset.
private var shouldResetOnAmRegister = false

private var isUserSetMaxExecutors = false

/**
* Bind to YARN. This *must* be done before calling [[start()]].
*
Expand All @@ -83,6 +93,12 @@ private[spark] abstract class YarnSchedulerBackend(
require(appId.isDefined, "application ID unset")
val binding = SchedulerExtensionServiceBinding(sc, appId.get, attemptId)
services.start(binding)

isUserSetMaxExecutors = DYN_ALLOCATION_MAX_EXECUTORS.defaultValue.get !=
conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
yarnClient.init(SparkHadoopUtil.get.newConfiguration(conf).asInstanceOf[YarnConfiguration])
yarnClient.start()

super.start()
}

Expand All @@ -91,6 +107,7 @@ private[spark] abstract class YarnSchedulerBackend(
// SPARK-12009: To prevent Yarn allocator from requesting backup for the executors which
// was Stopped by SchedulerBackend.
requestTotalExecutors(0, 0, Map.empty)
yarnClient.stop()
super.stop()
} finally {
services.stop()
Expand Down Expand Up @@ -135,6 +152,9 @@ private[spark] abstract class YarnSchedulerBackend(
* This includes executors already pending or running.
*/
override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
if (Utils.isDynamicAllocationEnabled(conf) && !isUserSetMaxExecutors) {
setMaxNumExecutors()
}
yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal))
}

Expand All @@ -149,6 +169,38 @@ private[spark] abstract class YarnSchedulerBackend(
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}

private def setMaxNumExecutors(): Unit = {
val executorCores = conf.get(EXECUTOR_CORES)
val runningNodes = yarnClient.getNodeReports().asScala.
filter(_.getNodeState == NodeState.RUNNING)
val absMaxCapacity = getAbsMaxCapacity(conf.get(QUEUE_NAME))

val maxNumExecutors = (runningNodes.map(_.getCapability.getVirtualCores).
sum * absMaxCapacity / executorCores).toInt
conf.set(DYN_ALLOCATION_MAX_EXECUTORS, maxNumExecutors)
}

/**
* Get the absolute max capacity for a given queue.
*/
private def getAbsMaxCapacity(queueName: String): Float = {
var maxCapacity = 1F
for (queue <- yarnClient.getRootQueueInfos.asScala) {
getQueueInfo(queue, queue.getMaximumCapacity)
}

def getQueueInfo(queueInfo: QueueInfo, capacity: Float): Unit = {
if (queueInfo.getQueueName.equals(queueName)) {
maxCapacity = capacity
} else {
for (child <- queueInfo.getChildQueues.asScala) {
getQueueInfo(child, child.getMaximumCapacity * capacity)
}
}
}
maxCapacity
}

/**
* Add filters to the SparkUI.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import scala.language.postfixOps

import com.google.common.io.Files
import org.apache.commons.lang3.SerializationUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
import org.scalatest.{BeforeAndAfterAll, Matchers}
Expand Down Expand Up @@ -61,10 +62,11 @@ abstract class BaseYarnClusterSuite
private var fakeSparkJar: File = _
protected var hadoopConfDir: File = _
private var logConfDir: File = _
protected var numNodeManagers: Int = 1

var oldSystemProperties: Properties = null

def newYarnConfig(): YarnConfiguration
def newYarnConfig(): Configuration

override def beforeAll() {
super.beforeAll()
Expand All @@ -84,7 +86,7 @@ abstract class BaseYarnClusterSuite
yarnConf.set("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
"100.0")

yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
yarnCluster = new MiniYARNCluster(getClass().getName(), numNodeManagers, 1, 1)
yarnCluster.init(yarnConf)
yarnCluster.start()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.yarn

import java.io.File
import java.nio.charset.StandardCharsets

import scala.language.postfixOps

import com.google.common.io.Files
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration
import org.scalatest.Matchers

import org.apache.spark._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.tags.ExtendedYarnTest

/**
* Integration test for the dynamic set spark.dynamicAllocation.maxExecutors
* depends on queue's maxResources use a mini Yarn cluster.
*/
@ExtendedYarnTest
class DynamicSetMaxExecutorsSuite extends BaseYarnClusterSuite {

// coresTotal = cpuCores * numNodeManagers = 80
numNodeManagers = 10
val cpuCores = 8

val queueNameRA = "ra"
val queueNameRB = "rb"
val queueNameA1 = "a1"
val queueNameA2 = "a2"
val ra = CapacitySchedulerConfiguration.ROOT + "." + queueNameRA
val rb = CapacitySchedulerConfiguration.ROOT + "." + queueNameRB
val a1 = ra + "." + queueNameA1
val a2 = ra + "." + queueNameA2

val aCapacity = 40F
val aMaximumCapacity = 60F
val bCapacity = 60F
val bMaximumCapacity = 100F
val a1Capacity = 30F
val a1MaximumCapacity = 70F
val a2Capacity = 70F
val dynamicAllocationEnabled = "spark.dynamicAllocation.enabled=true" +
",spark.shuffle.service.enabled=true"
val dynamicAllocationDisabled = "spark.dynamicAllocation.enabled=false" +
",spark.shuffle.service.enabled=false"

override def newYarnConfig(): CapacitySchedulerConfiguration = {

val yarnConf = new CapacitySchedulerConfiguration()

// Define top-level queues
yarnConf.setQueues(CapacitySchedulerConfiguration.ROOT, Array(queueNameRA, queueNameRB))
yarnConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100)
yarnConf.setCapacity(ra, aCapacity)
yarnConf.setMaximumCapacity(ra, aMaximumCapacity)
yarnConf.setCapacity(rb, bCapacity)
yarnConf.setMaximumCapacity(rb, bMaximumCapacity)

// Define 2nd-level queues
yarnConf.setQueues(ra, Array(queueNameA1, queueNameA2))
yarnConf.setCapacity(a1, a1Capacity)
yarnConf.setMaximumCapacity(a1, a1MaximumCapacity)
yarnConf.setCapacity(a2, a2Capacity)
yarnConf.set("yarn.nodemanager.resource.cpu-vcores", cpuCores.toString)
yarnConf
}

test(s"run Spark on YARN with dynamicAllocation enabled and ${ queueNameA1 } queue") {
// a1's executors: 80 * 0.6 * 0.7 = 33
setMaxExecutors(33, queueNameA1, dynamicAllocationEnabled)
}

test(s"run Spark on YARN with dynamicAllocation enabled and ${ queueNameA2 } queue") {
// a2's executors: 80 * 0.6 * 1 = 48
setMaxExecutors(48, queueNameA2, dynamicAllocationEnabled)
}

test(s"run Spark on YARN with dynamicAllocation enabled and ${ queueNameRB } queue") {
// b's executors: 80 * 1 = 80
setMaxExecutors(80, queueNameRB, dynamicAllocationEnabled)
}

test(s"run Spark on YARN with dynamicAllocation enabled and ${ queueNameA1 } queue and " +
s"user set maxExecutors") {
val executors = 12
setMaxExecutors(executors, queueNameA1,
s"${ dynamicAllocationEnabled },${DYN_ALLOCATION_MAX_EXECUTORS.key}=${ executors }")
}

test(s"run Spark on YARN with dynamicAllocation disabled and ${ queueNameA1 } queue") {
setMaxExecutors(DYN_ALLOCATION_MAX_EXECUTORS.defaultValue.get, queueNameA1,
dynamicAllocationDisabled)
}

test(s"run Spark on YARN with dynamicAllocation disabled and ${ queueNameA1 } queue and " +
s"user set maxExecutors") {
val executors = 12
setMaxExecutors(executors, queueNameA1,
s"${ dynamicAllocationEnabled },${DYN_ALLOCATION_MAX_EXECUTORS.key}=${ executors }")
}

test(s"run Spark on YARN with dynamicAllocation enabled and ${ queueNameRB } queue and " +
s"user set spark.executor.cores") {
// b's executors = 80 * 1 / 3 = 26
setMaxExecutors(26, queueNameRB, s"${ dynamicAllocationEnabled },${EXECUTOR_CORES.key}=3")
}

private def setMaxExecutors(expectedExecutors: Int,
queueName: String,
extArgMaps: String): Unit = {
val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(true,
mainClassName(SetMaxExecutors.getClass),
appArgs = Seq(result.getAbsolutePath, queueName, extArgMaps))
checkResult(finalState, result, expectedExecutors.toString)
}

}

private object SetMaxExecutors extends Logging with Matchers {
def main(args: Array[String]): Unit = {

var result = Int.MaxValue.toString
val status = new File(args(0))
val queueName = args(1)
val extArgMaps = args(2)

var sc: SparkContext = null
try {
val conf = new SparkConf()
.setAppName(s"DynamicSetMaxExecutors-${ queueName }-${ extArgMaps }")
.set(QUEUE_NAME, queueName)

extArgMaps.split(",").foreach{ kv =>
val confKVs = kv.split("=")
conf.set(confKVs(0), confKVs(1))
}

sc = new SparkContext(conf)

result = sc.getConf.get(DYN_ALLOCATION_MAX_EXECUTORS).toString
} finally {
Files.write(result, status, StandardCharsets.UTF_8)
sc.stop()
}
}
}