diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 1366251d0618f..ac0e8806870b0 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -90,7 +90,8 @@ private[spark] class ExecutorAllocationManager(
// Lower and upper bounds on the number of executors.
private val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
- private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
+ // Upper bounds is dynamic for Spark on YARN
+ private var maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
private val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
// How long there must be backlogged tasks for before an addition is triggered (seconds)
@@ -348,6 +349,15 @@ private[spark] class ExecutorAllocationManager(
* @return the number of additional executors actually requested.
*/
private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
+ // Update EnvironmentPage if spark.dynamicAllocation.maxExecutors changed.
+ if (maxNumExecutors != conf.get(DYN_ALLOCATION_MAX_EXECUTORS)) {
+ maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
+ val sc = listenerBus.sparkContext
+ val envDetails = SparkEnv.environmentDetails(conf, sc.getSchedulingMode.toString,
+ sc.addedJars.keys.toSeq, sc.addedFiles.keys.toSeq)
+ val event = SparkListenerEnvironmentUpdate(envDetails)
+ listenerBus.postToAll(event)
+ }
// Do not request more executors if it would put our target over the upper bound
if (numExecutorsTarget >= maxNumExecutors) {
logDebug(s"Not adding executors because our current target total " +
diff --git a/docs/configuration.md b/docs/configuration.md
index 7c040330db637..3afc3171797c6 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1541,7 +1541,8 @@ Apart from these, the following properties are also available, and may be useful
spark.dynamicAllocation.maxExecutors |
- infinity |
+ Depends on queue's max resources for YARN;
+ infinity for standalone mode and Mesos mode |
Upper bound for the number of executors if dynamic allocation is enabled.
|
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index b212b0eaafcdf..d32325c2b9d37 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.hadoop.yarn.util.Records
-import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException}
+import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index cbc6e60e839c1..1460500c8a8f4 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -17,19 +17,25 @@
package org.apache.spark.scheduler.cluster
-import scala.concurrent.{ExecutionContext, Future}
+import scala.collection.JavaConverters._
+import scala.concurrent.Future
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
-import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId, NodeState, QueueInfo}
+import org.apache.hadoop.yarn.client.api.YarnClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.SparkContext
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
import org.apache.spark.rpc._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.ui.JettyUtils
-import org.apache.spark.util.{RpcUtils, ThreadUtils}
+import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils}
/**
* Abstract Yarn scheduler backend that contains common logic
@@ -54,6 +60,8 @@ private[spark] abstract class YarnSchedulerBackend(
private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint(
YarnSchedulerBackend.ENDPOINT_NAME, yarnSchedulerEndpoint)
+ private val yarnClient = YarnClient.createYarnClient
+
private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
/** Application ID. */
@@ -68,6 +76,8 @@ private[spark] abstract class YarnSchedulerBackend(
// Flag to specify whether this schedulerBackend should be reset.
private var shouldResetOnAmRegister = false
+ private var isUserSetMaxExecutors = false
+
/**
* Bind to YARN. This *must* be done before calling [[start()]].
*
@@ -83,6 +93,12 @@ private[spark] abstract class YarnSchedulerBackend(
require(appId.isDefined, "application ID unset")
val binding = SchedulerExtensionServiceBinding(sc, appId.get, attemptId)
services.start(binding)
+
+ isUserSetMaxExecutors = DYN_ALLOCATION_MAX_EXECUTORS.defaultValue.get !=
+ conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
+ yarnClient.init(SparkHadoopUtil.get.newConfiguration(conf).asInstanceOf[YarnConfiguration])
+ yarnClient.start()
+
super.start()
}
@@ -91,6 +107,7 @@ private[spark] abstract class YarnSchedulerBackend(
// SPARK-12009: To prevent Yarn allocator from requesting backup for the executors which
// was Stopped by SchedulerBackend.
requestTotalExecutors(0, 0, Map.empty)
+ yarnClient.stop()
super.stop()
} finally {
services.stop()
@@ -135,6 +152,9 @@ private[spark] abstract class YarnSchedulerBackend(
* This includes executors already pending or running.
*/
override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
+ if (Utils.isDynamicAllocationEnabled(conf) && !isUserSetMaxExecutors) {
+ setMaxNumExecutors()
+ }
yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal))
}
@@ -149,6 +169,38 @@ private[spark] abstract class YarnSchedulerBackend(
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}
+ private def setMaxNumExecutors(): Unit = {
+ val executorCores = conf.get(EXECUTOR_CORES)
+ val runningNodes = yarnClient.getNodeReports().asScala.
+ filter(_.getNodeState == NodeState.RUNNING)
+ val absMaxCapacity = getAbsMaxCapacity(conf.get(QUEUE_NAME))
+
+ val maxNumExecutors = (runningNodes.map(_.getCapability.getVirtualCores).
+ sum * absMaxCapacity / executorCores).toInt
+ conf.set(DYN_ALLOCATION_MAX_EXECUTORS, maxNumExecutors)
+ }
+
+ /**
+ * Get the absolute max capacity for a given queue.
+ */
+ private def getAbsMaxCapacity(queueName: String): Float = {
+ var maxCapacity = 1F
+ for (queue <- yarnClient.getRootQueueInfos.asScala) {
+ getQueueInfo(queue, queue.getMaximumCapacity)
+ }
+
+ def getQueueInfo(queueInfo: QueueInfo, capacity: Float): Unit = {
+ if (queueInfo.getQueueName.equals(queueName)) {
+ maxCapacity = capacity
+ } else {
+ for (child <- queueInfo.getChildQueues.asScala) {
+ getQueueInfo(child, child.getMaximumCapacity * capacity)
+ }
+ }
+ }
+ maxCapacity
+ }
+
/**
* Add filters to the SparkUI.
*/
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index 9c3b18e4ec5f3..40f68d55252c5 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -28,6 +28,7 @@ import scala.language.postfixOps
import com.google.common.io.Files
import org.apache.commons.lang3.SerializationUtils
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
import org.scalatest.{BeforeAndAfterAll, Matchers}
@@ -61,10 +62,11 @@ abstract class BaseYarnClusterSuite
private var fakeSparkJar: File = _
protected var hadoopConfDir: File = _
private var logConfDir: File = _
+ protected var numNodeManagers: Int = 1
var oldSystemProperties: Properties = null
- def newYarnConfig(): YarnConfiguration
+ def newYarnConfig(): Configuration
override def beforeAll() {
super.beforeAll()
@@ -84,7 +86,7 @@ abstract class BaseYarnClusterSuite
yarnConf.set("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
"100.0")
- yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
+ yarnCluster = new MiniYARNCluster(getClass().getName(), numNodeManagers, 1, 1)
yarnCluster.init(yarnConf)
yarnCluster.start()
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/DynamicSetMaxExecutorsSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/DynamicSetMaxExecutorsSuite.scala
new file mode 100644
index 0000000000000..185f527a9702a
--- /dev/null
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/DynamicSetMaxExecutorsSuite.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.language.postfixOps
+
+import com.google.common.io.Files
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration
+import org.scalatest.Matchers
+
+import org.apache.spark._
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.tags.ExtendedYarnTest
+
+/**
+ * Integration test for the dynamic set spark.dynamicAllocation.maxExecutors
+ * depends on queue's maxResources use a mini Yarn cluster.
+ */
+@ExtendedYarnTest
+class DynamicSetMaxExecutorsSuite extends BaseYarnClusterSuite {
+
+ // coresTotal = cpuCores * numNodeManagers = 80
+ numNodeManagers = 10
+ val cpuCores = 8
+
+ val queueNameRA = "ra"
+ val queueNameRB = "rb"
+ val queueNameA1 = "a1"
+ val queueNameA2 = "a2"
+ val ra = CapacitySchedulerConfiguration.ROOT + "." + queueNameRA
+ val rb = CapacitySchedulerConfiguration.ROOT + "." + queueNameRB
+ val a1 = ra + "." + queueNameA1
+ val a2 = ra + "." + queueNameA2
+
+ val aCapacity = 40F
+ val aMaximumCapacity = 60F
+ val bCapacity = 60F
+ val bMaximumCapacity = 100F
+ val a1Capacity = 30F
+ val a1MaximumCapacity = 70F
+ val a2Capacity = 70F
+ val dynamicAllocationEnabled = "spark.dynamicAllocation.enabled=true" +
+ ",spark.shuffle.service.enabled=true"
+ val dynamicAllocationDisabled = "spark.dynamicAllocation.enabled=false" +
+ ",spark.shuffle.service.enabled=false"
+
+ override def newYarnConfig(): CapacitySchedulerConfiguration = {
+
+ val yarnConf = new CapacitySchedulerConfiguration()
+
+ // Define top-level queues
+ yarnConf.setQueues(CapacitySchedulerConfiguration.ROOT, Array(queueNameRA, queueNameRB))
+ yarnConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100)
+ yarnConf.setCapacity(ra, aCapacity)
+ yarnConf.setMaximumCapacity(ra, aMaximumCapacity)
+ yarnConf.setCapacity(rb, bCapacity)
+ yarnConf.setMaximumCapacity(rb, bMaximumCapacity)
+
+ // Define 2nd-level queues
+ yarnConf.setQueues(ra, Array(queueNameA1, queueNameA2))
+ yarnConf.setCapacity(a1, a1Capacity)
+ yarnConf.setMaximumCapacity(a1, a1MaximumCapacity)
+ yarnConf.setCapacity(a2, a2Capacity)
+ yarnConf.set("yarn.nodemanager.resource.cpu-vcores", cpuCores.toString)
+ yarnConf
+ }
+
+ test(s"run Spark on YARN with dynamicAllocation enabled and ${ queueNameA1 } queue") {
+ // a1's executors: 80 * 0.6 * 0.7 = 33
+ setMaxExecutors(33, queueNameA1, dynamicAllocationEnabled)
+ }
+
+ test(s"run Spark on YARN with dynamicAllocation enabled and ${ queueNameA2 } queue") {
+ // a2's executors: 80 * 0.6 * 1 = 48
+ setMaxExecutors(48, queueNameA2, dynamicAllocationEnabled)
+ }
+
+ test(s"run Spark on YARN with dynamicAllocation enabled and ${ queueNameRB } queue") {
+ // b's executors: 80 * 1 = 80
+ setMaxExecutors(80, queueNameRB, dynamicAllocationEnabled)
+ }
+
+ test(s"run Spark on YARN with dynamicAllocation enabled and ${ queueNameA1 } queue and " +
+ s"user set maxExecutors") {
+ val executors = 12
+ setMaxExecutors(executors, queueNameA1,
+ s"${ dynamicAllocationEnabled },${DYN_ALLOCATION_MAX_EXECUTORS.key}=${ executors }")
+ }
+
+ test(s"run Spark on YARN with dynamicAllocation disabled and ${ queueNameA1 } queue") {
+ setMaxExecutors(DYN_ALLOCATION_MAX_EXECUTORS.defaultValue.get, queueNameA1,
+ dynamicAllocationDisabled)
+ }
+
+ test(s"run Spark on YARN with dynamicAllocation disabled and ${ queueNameA1 } queue and " +
+ s"user set maxExecutors") {
+ val executors = 12
+ setMaxExecutors(executors, queueNameA1,
+ s"${ dynamicAllocationEnabled },${DYN_ALLOCATION_MAX_EXECUTORS.key}=${ executors }")
+ }
+
+ test(s"run Spark on YARN with dynamicAllocation enabled and ${ queueNameRB } queue and " +
+ s"user set spark.executor.cores") {
+ // b's executors = 80 * 1 / 3 = 26
+ setMaxExecutors(26, queueNameRB, s"${ dynamicAllocationEnabled },${EXECUTOR_CORES.key}=3")
+ }
+
+ private def setMaxExecutors(expectedExecutors: Int,
+ queueName: String,
+ extArgMaps: String): Unit = {
+ val result = File.createTempFile("result", null, tempDir)
+ val finalState = runSpark(true,
+ mainClassName(SetMaxExecutors.getClass),
+ appArgs = Seq(result.getAbsolutePath, queueName, extArgMaps))
+ checkResult(finalState, result, expectedExecutors.toString)
+ }
+
+}
+
+private object SetMaxExecutors extends Logging with Matchers {
+ def main(args: Array[String]): Unit = {
+
+ var result = Int.MaxValue.toString
+ val status = new File(args(0))
+ val queueName = args(1)
+ val extArgMaps = args(2)
+
+ var sc: SparkContext = null
+ try {
+ val conf = new SparkConf()
+ .setAppName(s"DynamicSetMaxExecutors-${ queueName }-${ extArgMaps }")
+ .set(QUEUE_NAME, queueName)
+
+ extArgMaps.split(",").foreach{ kv =>
+ val confKVs = kv.split("=")
+ conf.set(confKVs(0), confKVs(1))
+ }
+
+ sc = new SparkContext(conf)
+
+ result = sc.getConf.get(DYN_ALLOCATION_MAX_EXECUTORS).toString
+ } finally {
+ Files.write(result, status, StandardCharsets.UTF_8)
+ sc.stop()
+ }
+ }
+}