From 97e5eee6aaf2335a2af62e816b767d408c37a59e Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 6 Feb 2017 16:57:12 +0800 Subject: [PATCH 1/8] Set maxNumExecutor depends on yarn cluster VCores Total. --- docs/configuration.md | 3 ++- .../org/apache/spark/deploy/yarn/Client.scala | 22 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 7c040330db637..ec670374a10ea 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1541,7 +1541,8 @@ Apart from these, the following properties are also available, and may be useful spark.dynamicAllocation.maxExecutors - infinity + Depends on yarn cluster VCores Total for YARN; + infinity for standalone mode and Mesos mode Upper bound for the number of executors if dynamic allocation is enabled. diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index b212b0eaafcdf..933ba487959be 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -151,6 +151,8 @@ private[spark] class Client( yarnClient.init(yarnConf) yarnClient.start() + setMaxNumExecutors() + logInfo("Requesting a new application from cluster with %d NodeManagers" .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) @@ -1193,6 +1195,26 @@ private[spark] class Client( } } + /** + * If using dynamic allocation and user doesn't set spark.dynamicAllocation.maxExecutors + * then set the max number of executors depends on yarn cluster VCores Total. + * If not using dynamic allocation don't set it. + */ + def setMaxNumExecutors(): Unit = { + if (Utils.isDynamicAllocationEnabled(sparkConf)) { + + val defaultMaxNumExecutors = DYN_ALLOCATION_MAX_EXECUTORS.defaultValue.get + if (defaultMaxNumExecutors == sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) { + val executorCores = sparkConf.getInt("spark.executor.cores", 1) + val maxNumExecutors = yarnClient.getNodeReports().asScala. + filter(_.getNodeState == NodeState.RUNNING). + map(_.getCapability.getVirtualCores / executorCores).sum + + sparkConf.set(DYN_ALLOCATION_MAX_EXECUTORS, maxNumExecutors) + } + } + } + } private object Client extends Logging { From 4f81680364c16e5e70b65e785a439c184b1313e3 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 20 Feb 2017 14:02:24 +0800 Subject: [PATCH 2/8] Add a unit test --- .../spark/deploy/yarn/ClientSuite.scala | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 7deaf0af94849..bb8db584f1589 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -30,11 +30,13 @@ import org.apache.commons.lang3.SerializationUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.MRJobConfig +import org.apache.hadoop.service.Service.STATE import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.YarnClientApplication import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.server.MiniYARNCluster import org.apache.hadoop.yarn.util.Records import org.mockito.Matchers.{eq => meq, _} import org.mockito.Mockito._ @@ -42,6 +44,7 @@ import org.scalatest.{BeforeAndAfterAll, Matchers} import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils} import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.internal.config._ import org.apache.spark.util.{ResetSystemProperties, SparkConfWithEnv, Utils} class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll @@ -227,6 +230,74 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll appContext.getMaxAppAttempts should be (42) } + test("Dynamic set spark.dynamicAllocation.maxExecutors if dynamicAllocation enabled") { + val numNodeManagers = 4 + val conf = new YarnConfiguration + val yarnCluster = new MiniYARNCluster(classOf[ClientSuite].getName, numNodeManagers, 1, 1) + yarnCluster.init(conf) + yarnCluster.start() + assert(null != yarnCluster) + assert(STATE.STARTED == yarnCluster.getServiceState) + + val args = new ClientArguments(Array()) + // dynamicAllocation enabled + val sparkConfEnabled = new SparkConf() + .set("spark.dynamicAllocation.enabled", "true") + val clientEnabled = new Client(args, yarnCluster.getConfig, sparkConfEnabled) + assert(null != clientEnabled) + + val nodeManagerCores = + clientEnabled.hadoopConf.get("yarn.nodemanager.resource.cpu-vcores").toInt + assert(8 == nodeManagerCores) + + assert(Int.MaxValue == clientEnabled.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) + clientEnabled.init() + assert(numNodeManagers * nodeManagerCores == + clientEnabled.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) + clientEnabled.stop() + + // dynamicAllocation disabled + val sparkConfDisabled = new SparkConf() + .set("spark.dynamicAllocation.enabled", "false") + val clientDisabled = new Client(args, yarnCluster.getConfig, sparkConfDisabled) + assert(null != clientDisabled) + + assert(Int.MaxValue == clientDisabled.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) + clientDisabled.init() + assert(Int.MaxValue == clientDisabled.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) + clientDisabled.stop() + + // dynamicAllocation enabled and user set spark.dynamicAllocation.maxExecutors + val maxExecutors = 10 + val sparkConfSetMaxExes = new SparkConf() + .set("spark.dynamicAllocation.enabled", "true") + .set(DYN_ALLOCATION_MAX_EXECUTORS.key, maxExecutors.toString) + val clientEnabledSetMaxExes = new Client(args, yarnCluster.getConfig, sparkConfSetMaxExes) + assert(null != clientEnabledSetMaxExes) + + assert(maxExecutors == clientEnabledSetMaxExes.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) + clientEnabledSetMaxExes.init() + assert(maxExecutors == clientEnabledSetMaxExes.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) + clientEnabledSetMaxExes.stop() + + // dynamicAllocation enabled and user set spark.executor.cores + val executorCores = 3 + val sparkConfSetCores = new SparkConf() + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.executor.cores", executorCores.toString) + val clientEnabledSetCores = new Client(args, yarnCluster.getConfig, sparkConfSetCores) + assert(null != clientEnabledSetCores) + + assert(Int.MaxValue == clientEnabledSetCores.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) + clientEnabledSetCores.init() + // (8 / 3) * 4 = 8 + val expectNumExecutors = (nodeManagerCores / executorCores) * numNodeManagers + assert(expectNumExecutors == clientEnabledSetCores.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) + clientEnabledSetCores.stop() + + yarnCluster.stop() + } + test("spark.yarn.jars with multiple paths and globs") { val libs = Utils.createTempDir() val single = Utils.createTempDir() From 8e9970107c8e74b57718398d4972af7d4709ec2d Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 20 Feb 2017 14:34:20 +0800 Subject: [PATCH 3/8] Add a init function for Client.scala --- .../org/apache/spark/deploy/yarn/Client.scala | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 933ba487959be..e148531f7a9d4 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -144,14 +144,8 @@ private[spark] class Client( def submitApplication(): ApplicationId = { var appId: ApplicationId = null try { - launcherBackend.connect() - // Setup the credentials before doing anything else, - // so we have don't have issues at any point. - setupCredentials() - yarnClient.init(yarnConf) - yarnClient.start() - setMaxNumExecutors() + init() logInfo("Requesting a new application from cluster with %d NodeManagers" .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) @@ -1195,12 +1189,23 @@ private[spark] class Client( } } + def init(): Unit = { + launcherBackend.connect() + // Setup the credentials before doing anything else, + // so we have don't have issues at any point. + setupCredentials() + yarnClient.init(yarnConf) + yarnClient.start() + + setMaxNumExecutors() + } + /** * If using dynamic allocation and user doesn't set spark.dynamicAllocation.maxExecutors * then set the max number of executors depends on yarn cluster VCores Total. * If not using dynamic allocation don't set it. */ - def setMaxNumExecutors(): Unit = { + private def setMaxNumExecutors(): Unit = { if (Utils.isDynamicAllocationEnabled(sparkConf)) { val defaultMaxNumExecutors = DYN_ALLOCATION_MAX_EXECUTORS.defaultValue.get From fabe2c572db80bde639984f6e3834239b9590bfc Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Wed, 22 Feb 2017 20:46:58 +0800 Subject: [PATCH 4/8] Take queue's maxResources --- .../org/apache/spark/deploy/yarn/Client.scala | 35 +++- .../spark/deploy/yarn/ClientSuite.scala | 151 +++++++++++++----- 2 files changed, 139 insertions(+), 47 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index e148531f7a9d4..1189641d012fd 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException import org.apache.hadoop.yarn.util.Records -import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException} +import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.yarn.config._ import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager @@ -1210,14 +1210,37 @@ private[spark] class Client( val defaultMaxNumExecutors = DYN_ALLOCATION_MAX_EXECUTORS.defaultValue.get if (defaultMaxNumExecutors == sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) { - val executorCores = sparkConf.getInt("spark.executor.cores", 1) - val maxNumExecutors = yarnClient.getNodeReports().asScala. - filter(_.getNodeState == NodeState.RUNNING). - map(_.getCapability.getVirtualCores / executorCores).sum + val executorCores = sparkConf.get(EXECUTOR_CORES) + val runningNodes = yarnClient.getNodeReports().asScala. + filter(_.getNodeState == NodeState.RUNNING) + val absMaxCapacity = getAbsMaxCapacity(yarnClient, sparkConf.get(QUEUE_NAME)) + + val maxNumExecutors = runningNodes.map(_.getCapability.getVirtualCores). + sum * absMaxCapacity / executorCores + sparkConf.set(DYN_ALLOCATION_MAX_EXECUTORS, maxNumExecutors.toInt) + } + } + } - sparkConf.set(DYN_ALLOCATION_MAX_EXECUTORS, maxNumExecutors) + /** + * Get the absolute max capacity for a given queue. + */ + private def getAbsMaxCapacity(yarnClient: YarnClient, queueName: String): Float = { + var maxCapacity = 1F + for (queue <- yarnClient.getRootQueueInfos.asScala) { + getQueueInfo(queue, queue.getMaximumCapacity) + } + + def getQueueInfo(queueInfo: QueueInfo, capacity: Float): Unit = { + if (queueInfo.getQueueName.equals(queueName)) { + maxCapacity = capacity + } else { + for (child <- queueInfo.getChildQueues.asScala) { + getQueueInfo(child, child.getMaximumCapacity * capacity) + } } } + maxCapacity } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index bb8db584f1589..67318e79dbe2a 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -30,13 +30,13 @@ import org.apache.commons.lang3.SerializationUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.MRJobConfig -import org.apache.hadoop.service.Service.STATE import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.YarnClientApplication import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.server.MiniYARNCluster +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration import org.apache.hadoop.yarn.util.Records import org.mockito.Matchers.{eq => meq, _} import org.mockito.Mockito._ @@ -231,70 +231,139 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll } test("Dynamic set spark.dynamicAllocation.maxExecutors if dynamicAllocation enabled") { - val numNodeManagers = 4 - val conf = new YarnConfiguration + val a = CapacitySchedulerConfiguration.ROOT + ".a" + val b = CapacitySchedulerConfiguration.ROOT + ".b" + val a1 = a + ".a1" + val a2 = a + ".a2" + + val aCapacity = 40F + val aMaximumCapacity = 60F + val bCapacity = 60F + val bMaximumCapacity = 100F + val a1Capacity = 30F + val a1MaximumCapacity = 70F + val a2Capacity = 70F + + val cpuCores = 8 + val numNodeManagers = 10 + val coresTotal = cpuCores * numNodeManagers + + val conf = new CapacitySchedulerConfiguration() + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, Array("a", "b")) + conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100) + conf.setCapacity(a, aCapacity) + conf.setMaximumCapacity(a, aMaximumCapacity) + conf.setCapacity(b, bCapacity) + conf.setMaximumCapacity(b, bMaximumCapacity) + + // Define 2nd-level queues + conf.setQueues(a, Array("a1", "a2")) + conf.setCapacity(a1, a1Capacity) + conf.setMaximumCapacity(a1, a1MaximumCapacity) + conf.setCapacity(a2, a2Capacity) + conf.set("yarn.nodemanager.resource.cpu-vcores", cpuCores.toString) + val yarnCluster = new MiniYARNCluster(classOf[ClientSuite].getName, numNodeManagers, 1, 1) yarnCluster.init(conf) yarnCluster.start() - assert(null != yarnCluster) - assert(STATE.STARTED == yarnCluster.getServiceState) val args = new ClientArguments(Array()) + // dynamicAllocation enabled - val sparkConfEnabled = new SparkConf() + // a's cores: 80 * 0.6 = 48 + val aCoreTotal = (coresTotal * (aMaximumCapacity / 100)).toInt + val sparkConfA = new SparkConf() .set("spark.dynamicAllocation.enabled", "true") - val clientEnabled = new Client(args, yarnCluster.getConfig, sparkConfEnabled) - assert(null != clientEnabled) - - val nodeManagerCores = - clientEnabled.hadoopConf.get("yarn.nodemanager.resource.cpu-vcores").toInt - assert(8 == nodeManagerCores) - - assert(Int.MaxValue == clientEnabled.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) - clientEnabled.init() - assert(numNodeManagers * nodeManagerCores == - clientEnabled.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) - clientEnabled.stop() - - // dynamicAllocation disabled - val sparkConfDisabled = new SparkConf() - .set("spark.dynamicAllocation.enabled", "false") - val clientDisabled = new Client(args, yarnCluster.getConfig, sparkConfDisabled) - assert(null != clientDisabled) + .set(QUEUE_NAME, "a") + val clientA = new Client(args, yarnCluster.getConfig, sparkConfA) + + assert(Int.MaxValue === clientA.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) + clientA.init() + assert(aCoreTotal === 48) + assert(clientA.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS) === 48) + clientA.stop() + + // a1's cores: 80 * 0.6 * 0.7 = 33 + val a1CoreTotal = (coresTotal * (aMaximumCapacity / 100) * (a1MaximumCapacity/ 100)).toInt + val sparkConfA1 = new SparkConf() + .set("spark.dynamicAllocation.enabled", "true") + .set(QUEUE_NAME, "a1") + val clientA1 = new Client(args, yarnCluster.getConfig, sparkConfA1) + + assert(Int.MaxValue === clientA1.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) + clientA1.init() + assert(a1CoreTotal === 33) + assert(clientA1.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS) === 33) + clientA1.stop() + + // a2's cores: 80 * 0.6 * 1 = 48 + val a2CoreTotal = (coresTotal * (aMaximumCapacity / 100) * 1).toInt + val sparkConfA2 = new SparkConf() + .set("spark.dynamicAllocation.enabled", "true") + .set(QUEUE_NAME, "a2") + val clientA2 = new Client(args, yarnCluster.getConfig, sparkConfA2) + + assert(Int.MaxValue === clientA2.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) + clientA2.init() + assert(a2CoreTotal === 48) + assert(clientA2.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS) === 48) + clientA2.stop() + + // b's cores: 80 * 1 = 80 + val bCoreTotal = (coresTotal * (bMaximumCapacity / 100)).toInt + val sparkConfB = new SparkConf() + .set("spark.dynamicAllocation.enabled", "true") + .set(QUEUE_NAME, "b") + val clientB = new Client(args, yarnCluster.getConfig, sparkConfB) - assert(Int.MaxValue == clientDisabled.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) - clientDisabled.init() - assert(Int.MaxValue == clientDisabled.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) - clientDisabled.stop() + assert(Int.MaxValue === clientB.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) + clientB.init() + assert(bCoreTotal === 80) + assert(clientB.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS) === 80) + clientB.stop() // dynamicAllocation enabled and user set spark.dynamicAllocation.maxExecutors - val maxExecutors = 10 - val sparkConfSetMaxExes = new SparkConf() + val maxExecutors = 3 + val sparkConfSetExecutors = new SparkConf() .set("spark.dynamicAllocation.enabled", "true") - .set(DYN_ALLOCATION_MAX_EXECUTORS.key, maxExecutors.toString) - val clientEnabledSetMaxExes = new Client(args, yarnCluster.getConfig, sparkConfSetMaxExes) - assert(null != clientEnabledSetMaxExes) + .set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString) + .set(QUEUE_NAME, "b") + val clientSetExecutors = new Client(args, yarnCluster.getConfig, sparkConfSetExecutors) - assert(maxExecutors == clientEnabledSetMaxExes.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) - clientEnabledSetMaxExes.init() - assert(maxExecutors == clientEnabledSetMaxExes.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) - clientEnabledSetMaxExes.stop() + assert(maxExecutors === clientSetExecutors.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) + clientSetExecutors.init() + assert(maxExecutors === clientSetExecutors.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) + clientSetExecutors.stop() // dynamicAllocation enabled and user set spark.executor.cores + // b's execores = 80 * 1 / 3 = 26 val executorCores = 3 + val bExecutorTotal = (coresTotal * (bMaximumCapacity / 100)).toInt / executorCores val sparkConfSetCores = new SparkConf() .set("spark.dynamicAllocation.enabled", "true") .set("spark.executor.cores", executorCores.toString) + .set(QUEUE_NAME, "b") val clientEnabledSetCores = new Client(args, yarnCluster.getConfig, sparkConfSetCores) - assert(null != clientEnabledSetCores) assert(Int.MaxValue == clientEnabledSetCores.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) clientEnabledSetCores.init() - // (8 / 3) * 4 = 8 - val expectNumExecutors = (nodeManagerCores / executorCores) * numNodeManagers - assert(expectNumExecutors == clientEnabledSetCores.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) + assert(bExecutorTotal === 26) + assert(26 === clientEnabledSetCores.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) clientEnabledSetCores.stop() + // dynamicAllocation disabled + val sparkConfDisabled = new SparkConf() + .set("spark.dynamicAllocation.enabled", "false") + .set(QUEUE_NAME, "b") + val clientDisabled = new Client(args, yarnCluster.getConfig, sparkConfDisabled) + + assert(Int.MaxValue === clientDisabled.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) + clientDisabled.init() + assert(Int.MaxValue === clientDisabled.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) + clientDisabled.stop() + yarnCluster.stop() } From cd306e2662b9ed60cf21fe8cb234b232ddc9aca2 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Wed, 22 Feb 2017 22:50:17 +0800 Subject: [PATCH 5/8] Fix some typo --- docs/configuration.md | 2 +- .../scala/org/apache/spark/deploy/yarn/ClientSuite.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index ec670374a10ea..3afc3171797c6 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1541,7 +1541,7 @@ Apart from these, the following properties are also available, and may be useful spark.dynamicAllocation.maxExecutors - Depends on yarn cluster VCores Total for YARN; + Depends on queue's max resources for YARN; infinity for standalone mode and Mesos mode Upper bound for the number of executors if dynamic allocation is enabled. diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 67318e79dbe2a..78c9de5b99ec4 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -286,7 +286,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll clientA.stop() // a1's cores: 80 * 0.6 * 0.7 = 33 - val a1CoreTotal = (coresTotal * (aMaximumCapacity / 100) * (a1MaximumCapacity/ 100)).toInt + val a1CoreTotal = (coresTotal * (aMaximumCapacity / 100) * (a1MaximumCapacity / 100)).toInt val sparkConfA1 = new SparkConf() .set("spark.dynamicAllocation.enabled", "true") .set(QUEUE_NAME, "a1") @@ -347,10 +347,10 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll .set(QUEUE_NAME, "b") val clientEnabledSetCores = new Client(args, yarnCluster.getConfig, sparkConfSetCores) - assert(Int.MaxValue == clientEnabledSetCores.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) + assert(Int.MaxValue === clientEnabledSetCores.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) clientEnabledSetCores.init() assert(bExecutorTotal === 26) - assert(26 === clientEnabledSetCores.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) + assert(26 === clientEnabledSetCores.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) clientEnabledSetCores.stop() // dynamicAllocation disabled From a15afd99686e0f644633ff11c856580f4d47d9eb Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 26 Feb 2017 21:52:14 +0800 Subject: [PATCH 6/8] A more dynamic approach. --- .../spark/ExecutorAllocationManager.scala | 4 +- .../org/apache/spark/deploy/yarn/Client.scala | 62 +----- .../cluster/YarnSchedulerBackend.scala | 58 +++++- .../deploy/yarn/BaseYarnClusterSuite.scala | 3 +- .../spark/deploy/yarn/ClientSuite.scala | 140 -------------- .../yarn/DynamicSetMaxExecutorsSuite.scala | 178 ++++++++++++++++++ 6 files changed, 244 insertions(+), 201 deletions(-) create mode 100644 resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/DynamicSetMaxExecutorsSuite.scala diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 1366251d0618f..58feb8ed8bc49 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -90,7 +90,8 @@ private[spark] class ExecutorAllocationManager( // Lower and upper bounds on the number of executors. private val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS) - private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) + // Upper bounds is dynamic for Spark on YARN + private var maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) private val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) // How long there must be backlogged tasks for before an addition is triggered (seconds) @@ -348,6 +349,7 @@ private[spark] class ExecutorAllocationManager( * @return the number of additional executors actually requested. */ private def addExecutors(maxNumExecutorsNeeded: Int): Int = { + maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) // Do not request more executors if it would put our target over the upper bound if (numExecutorsTarget >= maxNumExecutors) { logDebug(s"Not adding executors because our current target total " + diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1189641d012fd..d32325c2b9d37 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -144,8 +144,12 @@ private[spark] class Client( def submitApplication(): ApplicationId = { var appId: ApplicationId = null try { - - init() + launcherBackend.connect() + // Setup the credentials before doing anything else, + // so we have don't have issues at any point. + setupCredentials() + yarnClient.init(yarnConf) + yarnClient.start() logInfo("Requesting a new application from cluster with %d NodeManagers" .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) @@ -1189,60 +1193,6 @@ private[spark] class Client( } } - def init(): Unit = { - launcherBackend.connect() - // Setup the credentials before doing anything else, - // so we have don't have issues at any point. - setupCredentials() - yarnClient.init(yarnConf) - yarnClient.start() - - setMaxNumExecutors() - } - - /** - * If using dynamic allocation and user doesn't set spark.dynamicAllocation.maxExecutors - * then set the max number of executors depends on yarn cluster VCores Total. - * If not using dynamic allocation don't set it. - */ - private def setMaxNumExecutors(): Unit = { - if (Utils.isDynamicAllocationEnabled(sparkConf)) { - - val defaultMaxNumExecutors = DYN_ALLOCATION_MAX_EXECUTORS.defaultValue.get - if (defaultMaxNumExecutors == sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) { - val executorCores = sparkConf.get(EXECUTOR_CORES) - val runningNodes = yarnClient.getNodeReports().asScala. - filter(_.getNodeState == NodeState.RUNNING) - val absMaxCapacity = getAbsMaxCapacity(yarnClient, sparkConf.get(QUEUE_NAME)) - - val maxNumExecutors = runningNodes.map(_.getCapability.getVirtualCores). - sum * absMaxCapacity / executorCores - sparkConf.set(DYN_ALLOCATION_MAX_EXECUTORS, maxNumExecutors.toInt) - } - } - } - - /** - * Get the absolute max capacity for a given queue. - */ - private def getAbsMaxCapacity(yarnClient: YarnClient, queueName: String): Float = { - var maxCapacity = 1F - for (queue <- yarnClient.getRootQueueInfos.asScala) { - getQueueInfo(queue, queue.getMaximumCapacity) - } - - def getQueueInfo(queueInfo: QueueInfo, capacity: Float): Unit = { - if (queueInfo.getQueueName.equals(queueName)) { - maxCapacity = capacity - } else { - for (child <- queueInfo.getChildQueues.asScala) { - getQueueInfo(child, child.getMaximumCapacity * capacity) - } - } - } - maxCapacity - } - } private object Client extends Logging { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index cbc6e60e839c1..1460500c8a8f4 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -17,19 +17,25 @@ package org.apache.spark.scheduler.cluster -import scala.concurrent.{ExecutionContext, Future} +import scala.collection.JavaConverters._ +import scala.concurrent.Future import scala.util.{Failure, Success} import scala.util.control.NonFatal -import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId, NodeState, QueueInfo} +import org.apache.hadoop.yarn.client.api.YarnClient +import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.SparkContext +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.rpc._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.ui.JettyUtils -import org.apache.spark.util.{RpcUtils, ThreadUtils} +import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils} /** * Abstract Yarn scheduler backend that contains common logic @@ -54,6 +60,8 @@ private[spark] abstract class YarnSchedulerBackend( private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint( YarnSchedulerBackend.ENDPOINT_NAME, yarnSchedulerEndpoint) + private val yarnClient = YarnClient.createYarnClient + private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf) /** Application ID. */ @@ -68,6 +76,8 @@ private[spark] abstract class YarnSchedulerBackend( // Flag to specify whether this schedulerBackend should be reset. private var shouldResetOnAmRegister = false + private var isUserSetMaxExecutors = false + /** * Bind to YARN. This *must* be done before calling [[start()]]. * @@ -83,6 +93,12 @@ private[spark] abstract class YarnSchedulerBackend( require(appId.isDefined, "application ID unset") val binding = SchedulerExtensionServiceBinding(sc, appId.get, attemptId) services.start(binding) + + isUserSetMaxExecutors = DYN_ALLOCATION_MAX_EXECUTORS.defaultValue.get != + conf.get(DYN_ALLOCATION_MAX_EXECUTORS) + yarnClient.init(SparkHadoopUtil.get.newConfiguration(conf).asInstanceOf[YarnConfiguration]) + yarnClient.start() + super.start() } @@ -91,6 +107,7 @@ private[spark] abstract class YarnSchedulerBackend( // SPARK-12009: To prevent Yarn allocator from requesting backup for the executors which // was Stopped by SchedulerBackend. requestTotalExecutors(0, 0, Map.empty) + yarnClient.stop() super.stop() } finally { services.stop() @@ -135,6 +152,9 @@ private[spark] abstract class YarnSchedulerBackend( * This includes executors already pending or running. */ override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = { + if (Utils.isDynamicAllocationEnabled(conf) && !isUserSetMaxExecutors) { + setMaxNumExecutors() + } yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal)) } @@ -149,6 +169,38 @@ private[spark] abstract class YarnSchedulerBackend( totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio } + private def setMaxNumExecutors(): Unit = { + val executorCores = conf.get(EXECUTOR_CORES) + val runningNodes = yarnClient.getNodeReports().asScala. + filter(_.getNodeState == NodeState.RUNNING) + val absMaxCapacity = getAbsMaxCapacity(conf.get(QUEUE_NAME)) + + val maxNumExecutors = (runningNodes.map(_.getCapability.getVirtualCores). + sum * absMaxCapacity / executorCores).toInt + conf.set(DYN_ALLOCATION_MAX_EXECUTORS, maxNumExecutors) + } + + /** + * Get the absolute max capacity for a given queue. + */ + private def getAbsMaxCapacity(queueName: String): Float = { + var maxCapacity = 1F + for (queue <- yarnClient.getRootQueueInfos.asScala) { + getQueueInfo(queue, queue.getMaximumCapacity) + } + + def getQueueInfo(queueInfo: QueueInfo, capacity: Float): Unit = { + if (queueInfo.getQueueName.equals(queueName)) { + maxCapacity = capacity + } else { + for (child <- queueInfo.getChildQueues.asScala) { + getQueueInfo(child, child.getMaximumCapacity * capacity) + } + } + } + maxCapacity + } + /** * Add filters to the SparkUI. */ diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index 9c3b18e4ec5f3..428c845e85942 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -61,6 +61,7 @@ abstract class BaseYarnClusterSuite private var fakeSparkJar: File = _ protected var hadoopConfDir: File = _ private var logConfDir: File = _ + protected var numNodeManagers: Int = 1 var oldSystemProperties: Properties = null @@ -84,7 +85,7 @@ abstract class BaseYarnClusterSuite yarnConf.set("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage", "100.0") - yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1) + yarnCluster = new MiniYARNCluster(getClass().getName(), numNodeManagers, 1, 1) yarnCluster.init(yarnConf) yarnCluster.start() diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 78c9de5b99ec4..7deaf0af94849 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -35,8 +35,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.YarnClientApplication import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.server.MiniYARNCluster -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration import org.apache.hadoop.yarn.util.Records import org.mockito.Matchers.{eq => meq, _} import org.mockito.Mockito._ @@ -44,7 +42,6 @@ import org.scalatest.{BeforeAndAfterAll, Matchers} import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils} import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.config._ import org.apache.spark.util.{ResetSystemProperties, SparkConfWithEnv, Utils} class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll @@ -230,143 +227,6 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll appContext.getMaxAppAttempts should be (42) } - test("Dynamic set spark.dynamicAllocation.maxExecutors if dynamicAllocation enabled") { - val a = CapacitySchedulerConfiguration.ROOT + ".a" - val b = CapacitySchedulerConfiguration.ROOT + ".b" - val a1 = a + ".a1" - val a2 = a + ".a2" - - val aCapacity = 40F - val aMaximumCapacity = 60F - val bCapacity = 60F - val bMaximumCapacity = 100F - val a1Capacity = 30F - val a1MaximumCapacity = 70F - val a2Capacity = 70F - - val cpuCores = 8 - val numNodeManagers = 10 - val coresTotal = cpuCores * numNodeManagers - - val conf = new CapacitySchedulerConfiguration() - - // Define top-level queues - conf.setQueues(CapacitySchedulerConfiguration.ROOT, Array("a", "b")) - conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100) - conf.setCapacity(a, aCapacity) - conf.setMaximumCapacity(a, aMaximumCapacity) - conf.setCapacity(b, bCapacity) - conf.setMaximumCapacity(b, bMaximumCapacity) - - // Define 2nd-level queues - conf.setQueues(a, Array("a1", "a2")) - conf.setCapacity(a1, a1Capacity) - conf.setMaximumCapacity(a1, a1MaximumCapacity) - conf.setCapacity(a2, a2Capacity) - conf.set("yarn.nodemanager.resource.cpu-vcores", cpuCores.toString) - - val yarnCluster = new MiniYARNCluster(classOf[ClientSuite].getName, numNodeManagers, 1, 1) - yarnCluster.init(conf) - yarnCluster.start() - - val args = new ClientArguments(Array()) - - // dynamicAllocation enabled - // a's cores: 80 * 0.6 = 48 - val aCoreTotal = (coresTotal * (aMaximumCapacity / 100)).toInt - val sparkConfA = new SparkConf() - .set("spark.dynamicAllocation.enabled", "true") - .set(QUEUE_NAME, "a") - val clientA = new Client(args, yarnCluster.getConfig, sparkConfA) - - assert(Int.MaxValue === clientA.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) - clientA.init() - assert(aCoreTotal === 48) - assert(clientA.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS) === 48) - clientA.stop() - - // a1's cores: 80 * 0.6 * 0.7 = 33 - val a1CoreTotal = (coresTotal * (aMaximumCapacity / 100) * (a1MaximumCapacity / 100)).toInt - val sparkConfA1 = new SparkConf() - .set("spark.dynamicAllocation.enabled", "true") - .set(QUEUE_NAME, "a1") - val clientA1 = new Client(args, yarnCluster.getConfig, sparkConfA1) - - assert(Int.MaxValue === clientA1.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) - clientA1.init() - assert(a1CoreTotal === 33) - assert(clientA1.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS) === 33) - clientA1.stop() - - // a2's cores: 80 * 0.6 * 1 = 48 - val a2CoreTotal = (coresTotal * (aMaximumCapacity / 100) * 1).toInt - val sparkConfA2 = new SparkConf() - .set("spark.dynamicAllocation.enabled", "true") - .set(QUEUE_NAME, "a2") - val clientA2 = new Client(args, yarnCluster.getConfig, sparkConfA2) - - assert(Int.MaxValue === clientA2.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) - clientA2.init() - assert(a2CoreTotal === 48) - assert(clientA2.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS) === 48) - clientA2.stop() - - // b's cores: 80 * 1 = 80 - val bCoreTotal = (coresTotal * (bMaximumCapacity / 100)).toInt - val sparkConfB = new SparkConf() - .set("spark.dynamicAllocation.enabled", "true") - .set(QUEUE_NAME, "b") - val clientB = new Client(args, yarnCluster.getConfig, sparkConfB) - - assert(Int.MaxValue === clientB.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) - clientB.init() - assert(bCoreTotal === 80) - assert(clientB.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS) === 80) - clientB.stop() - - // dynamicAllocation enabled and user set spark.dynamicAllocation.maxExecutors - val maxExecutors = 3 - val sparkConfSetExecutors = new SparkConf() - .set("spark.dynamicAllocation.enabled", "true") - .set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString) - .set(QUEUE_NAME, "b") - val clientSetExecutors = new Client(args, yarnCluster.getConfig, sparkConfSetExecutors) - - assert(maxExecutors === clientSetExecutors.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) - clientSetExecutors.init() - assert(maxExecutors === clientSetExecutors.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) - clientSetExecutors.stop() - - // dynamicAllocation enabled and user set spark.executor.cores - // b's execores = 80 * 1 / 3 = 26 - val executorCores = 3 - val bExecutorTotal = (coresTotal * (bMaximumCapacity / 100)).toInt / executorCores - val sparkConfSetCores = new SparkConf() - .set("spark.dynamicAllocation.enabled", "true") - .set("spark.executor.cores", executorCores.toString) - .set(QUEUE_NAME, "b") - val clientEnabledSetCores = new Client(args, yarnCluster.getConfig, sparkConfSetCores) - - assert(Int.MaxValue === clientEnabledSetCores.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) - clientEnabledSetCores.init() - assert(bExecutorTotal === 26) - assert(26 === clientEnabledSetCores.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) - clientEnabledSetCores.stop() - - // dynamicAllocation disabled - val sparkConfDisabled = new SparkConf() - .set("spark.dynamicAllocation.enabled", "false") - .set(QUEUE_NAME, "b") - val clientDisabled = new Client(args, yarnCluster.getConfig, sparkConfDisabled) - - assert(Int.MaxValue === clientDisabled.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) - clientDisabled.init() - assert(Int.MaxValue === clientDisabled.sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)) - clientDisabled.stop() - - yarnCluster.stop() - } - test("spark.yarn.jars with multiple paths and globs") { val libs = Utils.createTempDir() val single = Utils.createTempDir() diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/DynamicSetMaxExecutorsSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/DynamicSetMaxExecutorsSuite.scala new file mode 100644 index 0000000000000..f664269740304 --- /dev/null +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/DynamicSetMaxExecutorsSuite.scala @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.language.postfixOps + +import com.google.common.io.Files +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration +import org.scalatest.Matchers + +import org.apache.spark._ +import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.tags.ExtendedYarnTest + +/** + * Integration test for the dynamic set spark.dynamicAllocation.maxExecutors + * depends on queue's maxResources use a mini Yarn cluster. + */ +@ExtendedYarnTest +class DynamicSetMaxExecutorsSuite extends BaseYarnClusterSuite { + + numNodeManagers = 10 + val cpuCores = 8 + // coresTotal = cpuCores * numNodeManagers = 80 + + val queueNameRA = "ra" + val queueNameRB = "rb" + val queueNameA1 = "a1" + val queueNameA2 = "a2" + val ra = CapacitySchedulerConfiguration.ROOT + "." + queueNameRA + val rb = CapacitySchedulerConfiguration.ROOT + "." + queueNameRB + val a1 = ra + "." + queueNameA1 + val a2 = ra + "." + queueNameA2 + + val aCapacity = 40F + val aMaximumCapacity = 60F + val bCapacity = 60F + val bMaximumCapacity = 100F + val a1Capacity = 30F + val a1MaximumCapacity = 70F + val a2Capacity = 70F + val isDynamicAllocation = true + + override def newYarnConfig(): CapacitySchedulerConfiguration = { + + val yarnConf = new CapacitySchedulerConfiguration() + + // Define top-level queues + yarnConf.setQueues(CapacitySchedulerConfiguration.ROOT, Array(queueNameRA, queueNameRB)) + yarnConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100) + yarnConf.setCapacity(ra, aCapacity) + yarnConf.setMaximumCapacity(ra, aMaximumCapacity) + yarnConf.setCapacity(rb, bCapacity) + yarnConf.setMaximumCapacity(rb, bMaximumCapacity) + + // Define 2nd-level queues + yarnConf.setQueues(ra, Array(queueNameA1, queueNameA2)) + yarnConf.setCapacity(a1, a1Capacity) + yarnConf.setMaximumCapacity(a1, a1MaximumCapacity) + yarnConf.setCapacity(a2, a2Capacity) + yarnConf.set("yarn.nodemanager.resource.cpu-vcores", cpuCores.toString) + yarnConf + } + + test(s"run Spark on YARN with dynamicAllocation enabled and ${ queueNameA1 } queue") { + // a1's executors: 80 * 0.6 * 0.7 = 33 + setMaxExecutors(33, queueNameA1, + s"spark.dynamicAllocation.enabled=${ isDynamicAllocation }" + + s",spark.shuffle.service.enabled=${ isDynamicAllocation }") + } + + test(s"run Spark on YARN with dynamicAllocation enabled and ${ queueNameA2 } queue") { + // a2's executors: 80 * 0.6 * 1 = 48 + setMaxExecutors(48, queueNameA2, + s"spark.dynamicAllocation.enabled=${ isDynamicAllocation }" + + s",spark.shuffle.service.enabled=${ isDynamicAllocation }") + } + + test(s"run Spark on YARN with dynamicAllocation enabled and ${ queueNameRB } queue") { + // b's executors: 80 * 1 = 80 + setMaxExecutors(80, queueNameRB, + s"spark.dynamicAllocation.enabled=${ isDynamicAllocation }" + + s",spark.shuffle.service.enabled=${ isDynamicAllocation }") + } + + test(s"run Spark on YARN with dynamicAllocation enabled and ${ queueNameA1 } queue and " + + s"user set maxExecutors") { + val executors = 12 + setMaxExecutors(executors, queueNameA1, + s"spark.dynamicAllocation.enabled=${ isDynamicAllocation }" + + s",spark.shuffle.service.enabled=${ isDynamicAllocation }" + + s",${DYN_ALLOCATION_MAX_EXECUTORS.key}=${ executors }") + } + + test(s"run Spark on YARN with dynamicAllocation disabled and ${ queueNameA1 } queue") { + setMaxExecutors(Int.MaxValue, queueNameA1, + s"spark.dynamicAllocation.enabled=${ !isDynamicAllocation }" + + s",spark.shuffle.service.enabled=${ !isDynamicAllocation }") + } + + test(s"run Spark on YARN with dynamicAllocation disabled and ${ queueNameA1 } queue and " + + s"user set maxExecutors") { + val executors = 12 + setMaxExecutors(executors, queueNameA1, + s"spark.dynamicAllocation.enabled=${ isDynamicAllocation }" + + s",spark.shuffle.service.enabled=${ isDynamicAllocation }" + + s",${DYN_ALLOCATION_MAX_EXECUTORS.key}=${ executors }") + } + + test(s"run Spark on YARN with dynamicAllocation enabled and ${ queueNameRB } queue and " + + s"user set spark.executor.cores") { + // b's executors = 80 * 1 / 3 = 26 + setMaxExecutors(26, queueNameRB, + s"spark.dynamicAllocation.enabled=${ isDynamicAllocation }" + + s",spark.shuffle.service.enabled=${ isDynamicAllocation }" + + s",${EXECUTOR_CORES.key}=3") + } + + private def setMaxExecutors(expectedExecutors: Int, + queueName: String, + extArgMaps: String): Unit = { + val result = File.createTempFile("result", null, tempDir) + val finalState = runSpark(true, + mainClassName(SetMaxExecutors.getClass), + appArgs = Seq(result.getAbsolutePath, queueName, extArgMaps)) + checkResult(finalState, result, expectedExecutors.toString) + } + +} + +private object SetMaxExecutors extends Logging with Matchers { + def main(args: Array[String]): Unit = { + + var result = Int.MaxValue.toString + val status = new File(args(0)) + val queueName = args(1) + val extArgMaps = args(2) + + var sc: SparkContext = null + try { + val conf = new SparkConf() + .setAppName(s"DynamicSetMaxExecutors-${ queueName }-${ extArgMaps }") + .set(QUEUE_NAME, queueName) + + extArgMaps.split(",").foreach{ kv => + val confKVs = kv.split("=") + conf.set(confKVs(0), confKVs(1)) + } + + sc = new SparkContext(conf) + + result = sc.getConf.get(DYN_ALLOCATION_MAX_EXECUTORS).toString + } finally { + Files.write(result, status, StandardCharsets.UTF_8) + sc.stop() + } + } +} From 7b467faae53edb13092f16ad74abe0b12b722a34 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 26 Feb 2017 22:01:09 +0800 Subject: [PATCH 7/8] Change YarnConfiguration to Configuration --- .../org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index 428c845e85942..40f68d55252c5 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -28,6 +28,7 @@ import scala.language.postfixOps import com.google.common.io.Files import org.apache.commons.lang3.SerializationUtils +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.server.MiniYARNCluster import org.scalatest.{BeforeAndAfterAll, Matchers} @@ -65,7 +66,7 @@ abstract class BaseYarnClusterSuite var oldSystemProperties: Properties = null - def newYarnConfig(): YarnConfiguration + def newYarnConfig(): Configuration override def beforeAll() { super.beforeAll() From e4b3b0c9dfcb900b0c69329966cbc8030f68c1c3 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 27 Feb 2017 15:02:18 +0800 Subject: [PATCH 8/8] Update EnvironmentPage if spark.dynamicAllocation.maxExecutors changed. --- .../spark/ExecutorAllocationManager.scala | 10 ++++- .../yarn/DynamicSetMaxExecutorsSuite.scala | 37 +++++++------------ 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 58feb8ed8bc49..ac0e8806870b0 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -349,7 +349,15 @@ private[spark] class ExecutorAllocationManager( * @return the number of additional executors actually requested. */ private def addExecutors(maxNumExecutorsNeeded: Int): Int = { - maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) + // Update EnvironmentPage if spark.dynamicAllocation.maxExecutors changed. + if (maxNumExecutors != conf.get(DYN_ALLOCATION_MAX_EXECUTORS)) { + maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) + val sc = listenerBus.sparkContext + val envDetails = SparkEnv.environmentDetails(conf, sc.getSchedulingMode.toString, + sc.addedJars.keys.toSeq, sc.addedFiles.keys.toSeq) + val event = SparkListenerEnvironmentUpdate(envDetails) + listenerBus.postToAll(event) + } // Do not request more executors if it would put our target over the upper bound if (numExecutorsTarget >= maxNumExecutors) { logDebug(s"Not adding executors because our current target total " + diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/DynamicSetMaxExecutorsSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/DynamicSetMaxExecutorsSuite.scala index f664269740304..185f527a9702a 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/DynamicSetMaxExecutorsSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/DynamicSetMaxExecutorsSuite.scala @@ -39,9 +39,9 @@ import org.apache.spark.tags.ExtendedYarnTest @ExtendedYarnTest class DynamicSetMaxExecutorsSuite extends BaseYarnClusterSuite { + // coresTotal = cpuCores * numNodeManagers = 80 numNodeManagers = 10 val cpuCores = 8 - // coresTotal = cpuCores * numNodeManagers = 80 val queueNameRA = "ra" val queueNameRB = "rb" @@ -59,7 +59,10 @@ class DynamicSetMaxExecutorsSuite extends BaseYarnClusterSuite { val a1Capacity = 30F val a1MaximumCapacity = 70F val a2Capacity = 70F - val isDynamicAllocation = true + val dynamicAllocationEnabled = "spark.dynamicAllocation.enabled=true" + + ",spark.shuffle.service.enabled=true" + val dynamicAllocationDisabled = "spark.dynamicAllocation.enabled=false" + + ",spark.shuffle.service.enabled=false" override def newYarnConfig(): CapacitySchedulerConfiguration = { @@ -84,56 +87,42 @@ class DynamicSetMaxExecutorsSuite extends BaseYarnClusterSuite { test(s"run Spark on YARN with dynamicAllocation enabled and ${ queueNameA1 } queue") { // a1's executors: 80 * 0.6 * 0.7 = 33 - setMaxExecutors(33, queueNameA1, - s"spark.dynamicAllocation.enabled=${ isDynamicAllocation }" + - s",spark.shuffle.service.enabled=${ isDynamicAllocation }") + setMaxExecutors(33, queueNameA1, dynamicAllocationEnabled) } test(s"run Spark on YARN with dynamicAllocation enabled and ${ queueNameA2 } queue") { // a2's executors: 80 * 0.6 * 1 = 48 - setMaxExecutors(48, queueNameA2, - s"spark.dynamicAllocation.enabled=${ isDynamicAllocation }" + - s",spark.shuffle.service.enabled=${ isDynamicAllocation }") + setMaxExecutors(48, queueNameA2, dynamicAllocationEnabled) } test(s"run Spark on YARN with dynamicAllocation enabled and ${ queueNameRB } queue") { // b's executors: 80 * 1 = 80 - setMaxExecutors(80, queueNameRB, - s"spark.dynamicAllocation.enabled=${ isDynamicAllocation }" + - s",spark.shuffle.service.enabled=${ isDynamicAllocation }") + setMaxExecutors(80, queueNameRB, dynamicAllocationEnabled) } test(s"run Spark on YARN with dynamicAllocation enabled and ${ queueNameA1 } queue and " + s"user set maxExecutors") { val executors = 12 setMaxExecutors(executors, queueNameA1, - s"spark.dynamicAllocation.enabled=${ isDynamicAllocation }" + - s",spark.shuffle.service.enabled=${ isDynamicAllocation }" + - s",${DYN_ALLOCATION_MAX_EXECUTORS.key}=${ executors }") + s"${ dynamicAllocationEnabled },${DYN_ALLOCATION_MAX_EXECUTORS.key}=${ executors }") } test(s"run Spark on YARN with dynamicAllocation disabled and ${ queueNameA1 } queue") { - setMaxExecutors(Int.MaxValue, queueNameA1, - s"spark.dynamicAllocation.enabled=${ !isDynamicAllocation }" + - s",spark.shuffle.service.enabled=${ !isDynamicAllocation }") + setMaxExecutors(DYN_ALLOCATION_MAX_EXECUTORS.defaultValue.get, queueNameA1, + dynamicAllocationDisabled) } test(s"run Spark on YARN with dynamicAllocation disabled and ${ queueNameA1 } queue and " + s"user set maxExecutors") { val executors = 12 setMaxExecutors(executors, queueNameA1, - s"spark.dynamicAllocation.enabled=${ isDynamicAllocation }" + - s",spark.shuffle.service.enabled=${ isDynamicAllocation }" + - s",${DYN_ALLOCATION_MAX_EXECUTORS.key}=${ executors }") + s"${ dynamicAllocationEnabled },${DYN_ALLOCATION_MAX_EXECUTORS.key}=${ executors }") } test(s"run Spark on YARN with dynamicAllocation enabled and ${ queueNameRB } queue and " + s"user set spark.executor.cores") { // b's executors = 80 * 1 / 3 = 26 - setMaxExecutors(26, queueNameRB, - s"spark.dynamicAllocation.enabled=${ isDynamicAllocation }" + - s",spark.shuffle.service.enabled=${ isDynamicAllocation }" + - s",${EXECUTOR_CORES.key}=3") + setMaxExecutors(26, queueNameRB, s"${ dynamicAllocationEnabled },${EXECUTOR_CORES.key}=3") } private def setMaxExecutors(expectedExecutors: Int,