From 5a51833f69bfedc6585eb448f952339884e32902 Mon Sep 17 00:00:00 2001 From: Hendra Saputra Date: Thu, 7 Sep 2023 23:08:55 +0100 Subject: [PATCH 1/3] [SPARK-45278][YARN] Support executor bind address in Yarn executors --- docs/running-on-yarn.md | 19 +++++++++++++++++++ .../spark/deploy/yarn/ApplicationMaster.scala | 5 +++-- .../spark/deploy/yarn/ExecutorRunnable.scala | 4 +++- .../spark/deploy/yarn/YarnAllocator.scala | 12 +++++++++++- .../org/apache/spark/deploy/yarn/config.scala | 7 +++++++ .../deploy/yarn/ExecutorRunnableSuite.scala | 1 + .../deploy/yarn/YarnAllocatorSuite.scala | 17 +++++++++++++++-- 7 files changed, 59 insertions(+), 6 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index b8e22b12e3c9..beb20ada101f 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -514,6 +514,25 @@ To use a custom metrics.properties for the application master and executors, upd 1.6.0 + + spark.yarn.executor.bindAddress + (executor hostname) + + Hostname or IP address where to bind listening sockets in YARN cluster and client mode. +
It also allows a different address from the local one to be advertised to other + executors or external systems. + + 4.0.0 + + + spark.yarn.executor.failuresValidityInterval + (none) + + Defines the validity interval for executor failure tracking. + Executor failures which are older than the validity interval will be ignored. + + 2.0.0 + spark.yarn.executor.nodeLabelExpression (none) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 11d22a3225d8..ea7bb30dbe98 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -462,8 +462,9 @@ private[spark] class ApplicationMaster( logInfo { val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt val executorCores = _sparkConf.get(EXECUTOR_CORES) - val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "", - "", executorMemory, executorCores, appId, securityMgr, localResources, + val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, + "", "", "", + executorMemory, executorCores, appId, securityMgr, localResources, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) dummyRunner.launchContextDebugInfo() } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 983ab5b4341b..0fa871951a99 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -50,6 +50,7 @@ private[yarn] class ExecutorRunnable( sparkConf: SparkConf, masterAddress: String, executorId: String, + bindAddress: String, hostname: String, executorMemory: Int, executorCores: Int, @@ -120,7 +121,7 @@ private[yarn] class ExecutorRunnable( } catch { case ex: Exception => throw new SparkException(s"Exception while starting container ${container.get.getId}" + - s" on host $hostname", ex) + s" on host $hostname ($bindAddress)", ex) } } @@ -192,6 +193,7 @@ private[yarn] class ExecutorRunnable( Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend", "--driver-url", masterAddress, "--executor-id", executorId, + "--bind-address", bindAddress, "--hostname", hostname, "--cores", executorCores.toString, "--app-id", appId, diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index c86195d0ef31..ce3394b69729 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -87,6 +87,9 @@ private[yarn] class YarnAllocator( @GuardedBy("this") val allocatedContainerToHostMap = new HashMap[ContainerId, String] + @GuardedBy("this") + val allocatedContainerToBindAddressMap = new HashMap[ContainerId, String] + // Containers that we no longer care about. We've either already told the RM to release them or // will on the next heartbeat. Containers get removed from this map after the RM tells us they've // completed. @@ -173,6 +176,8 @@ private[yarn] class YarnAllocator( private val minMemoryOverhead = sparkConf.get(EXECUTOR_MIN_MEMORY_OVERHEAD) + private val bindAddress = sparkConf.get(EXECUTOR_BIND_ADDRESS) + private val memoryOverheadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR) private val launcherPool = ThreadUtils.newDaemonCachedThreadPool( @@ -757,12 +762,14 @@ private[yarn] class YarnAllocator( val rpId = getResourceProfileIdFromPriority(container.getPriority) executorIdCounter += 1 val executorHostname = container.getNodeId.getHost + val executorBindAddress = bindAddress.getOrElse(executorHostname) val containerId = container.getId val executorId = executorIdCounter.toString val yarnResourceForRpId = rpIdToYarnResource.get(rpId) assert(container.getResource.getMemorySize >= yarnResourceForRpId.getMemorySize) logInfo(log"Launching container ${MDC(LogKeys.CONTAINER_ID, containerId)} " + - log"on host ${MDC(LogKeys.HOST, executorHostname)} for " + + log"on host ${MDC(LogKeys.HOST, executorHostname)} " + + log"with bind-address ${MDC(LogKeys.BIND_ADDRESS, executorBindAddress)} for " + log"executor with ID ${MDC(LogKeys.EXECUTOR_ID, executorId)} for " + log"ResourceProfile Id ${MDC(LogKeys.RESOURCE_PROFILE_ID, rpId)}") @@ -788,6 +795,7 @@ private[yarn] class YarnAllocator( sparkConf, driverUrl, executorId, + executorBindAddress, executorHostname, containerMem, containerCores, @@ -838,6 +846,7 @@ private[yarn] class YarnAllocator( new HashSet[ContainerId]) containerSet += containerId allocatedContainerToHostMap.put(containerId, executorHostname) + allocatedContainerToBindAddressMap.put(containerId, bindAddress.getOrElse(executorHostname)) launchingExecutorContainerIds.remove(containerId) } getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet() @@ -952,6 +961,7 @@ private[yarn] class YarnAllocator( } allocatedContainerToHostMap.remove(containerId) + allocatedContainerToBindAddressMap.remove(containerId) } containerIdToExecutorIdAndResourceProfileId.remove(containerId).foreach { case (eid, _) => diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 51e5e0bfb908..5bf825665521 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -339,6 +339,13 @@ package object config extends Logging { .stringConf .createOptional + private[spark] val EXECUTOR_BIND_ADDRESS = + ConfigBuilder("spark.yarn.executor.bindAddress") + .doc("Address where to bind network listen sockets on the executor.") + .version("4.0.0") + .stringConf + .createOptional + /* Unmanaged AM configuration. */ private[spark] val YARN_UNMANAGED_AM = ConfigBuilder("spark.yarn.unmanagedAM.enabled") diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala index 1ef3c9c410af..70c6c2474b60 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala @@ -42,6 +42,7 @@ class ExecutorRunnableSuite extends SparkFunSuite { "yarn", "exec-1", "localhost", + "localhost", 1, 1, "application_123_1", diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 09e35a308728..68c7ceb83296 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -173,7 +173,6 @@ class YarnAllocatorSuite extends SparkFunSuite ContainerStatus.newInstance(containerId, containerState, diagnostics, exitStatus) } - test("single container allocated") { // request a single container and receive it val (handler, _) = createAllocator(1) @@ -186,6 +185,7 @@ class YarnAllocatorSuite extends SparkFunSuite handler.getNumExecutorsRunning should be (1) handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") + handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("host1") val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId) hostTocontainer.get("host1").get should contain(container.getId) @@ -364,7 +364,7 @@ class YarnAllocatorSuite extends SparkFunSuite } } - test("container should not be created if requested number if met") { + test("container should not be created if requested number is met") { // request a single container and receive it val (handler, _) = createAllocator(1) handler.updateResourceRequests() @@ -904,4 +904,17 @@ class YarnAllocatorSuite extends SparkFunSuite handler.getNumExecutorsRunning should be(0) handler.getNumExecutorsStarting should be(0) } + + test("use requested bind-address") { + val (handler, _) = createAllocator(maxExecutors = 1, + additionalConfigs = Map(EXECUTOR_BIND_ADDRESS.key -> "0.0.0.0")) + handler.updateResourceRequests() + + val container = createContainer("host1") + handler.handleAllocatedContainers(Array(container).toImmutableArraySeq) + + handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") + handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("0.0.0.0") + } + } From bbf8d7fd1e499e65d079227a3f0f1dd5e17393ea Mon Sep 17 00:00:00 2001 From: "hendra.saputra" Date: Tue, 27 Aug 2024 15:50:55 +0100 Subject: [PATCH 2/3] [SPARK-45278][YARN] Remove unrelated changes for doc --- docs/running-on-yarn.md | 9 --------- 1 file changed, 9 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index beb20ada101f..5e5ca72c87e7 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -524,15 +524,6 @@ To use a custom metrics.properties for the application master and executors, upd 4.0.0 - - spark.yarn.executor.failuresValidityInterval - (none) - - Defines the validity interval for executor failure tracking. - Executor failures which are older than the validity interval will be ignored. - - 2.0.0 - spark.yarn.executor.nodeLabelExpression (none) From dcad398986fa89081ec9599d8f563c195546020b Mon Sep 17 00:00:00 2001 From: "hendra.saputra" Date: Fri, 8 Nov 2024 09:43:17 +0000 Subject: [PATCH 3/3] [SPARK-45278][YARN] Allow limited configuration option to either HOSTNAME or ALL_IPS --- docs/running-on-yarn.md | 8 +++----- .../apache/spark/deploy/yarn/YarnAllocator.scala | 12 +++++++++--- .../org/apache/spark/deploy/yarn/config.scala | 14 +++++++++----- .../spark/deploy/yarn/YarnAllocatorSuite.scala | 16 ++++++++++++++-- 4 files changed, 35 insertions(+), 15 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 5e5ca72c87e7..5b957ce0f9b6 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -515,12 +515,10 @@ To use a custom metrics.properties for the application master and executors, upd 1.6.0 - spark.yarn.executor.bindAddress - (executor hostname) + spark.yarn.executor.bindAddress.mode + HOSTNAME - Hostname or IP address where to bind listening sockets in YARN cluster and client mode. -
It also allows a different address from the local one to be advertised to other - executors or external systems. + Configures executor behaviour of which network to listen sockets to. Possible choices are: HOSTNAME means to bind to hostname, ALL_IPS means to bind to 0.0.0.0 4.0.0 diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index ce3394b69729..7b52e7de8b96 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -176,7 +176,7 @@ private[yarn] class YarnAllocator( private val minMemoryOverhead = sparkConf.get(EXECUTOR_MIN_MEMORY_OVERHEAD) - private val bindAddress = sparkConf.get(EXECUTOR_BIND_ADDRESS) + private val bindAddressMode = sparkConf.get(EXECUTOR_BIND_ADDRESS_MODE) private val memoryOverheadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR) @@ -762,7 +762,10 @@ private[yarn] class YarnAllocator( val rpId = getResourceProfileIdFromPriority(container.getPriority) executorIdCounter += 1 val executorHostname = container.getNodeId.getHost - val executorBindAddress = bindAddress.getOrElse(executorHostname) + val executorBindAddress = bindAddressMode match { + case "ALL_IPS" => "0.0.0.0" + case _ => executorHostname + } val containerId = container.getId val executorId = executorIdCounter.toString val yarnResourceForRpId = rpIdToYarnResource.get(rpId) @@ -846,7 +849,10 @@ private[yarn] class YarnAllocator( new HashSet[ContainerId]) containerSet += containerId allocatedContainerToHostMap.put(containerId, executorHostname) - allocatedContainerToBindAddressMap.put(containerId, bindAddress.getOrElse(executorHostname)) + allocatedContainerToBindAddressMap.put(containerId, bindAddressMode match { + case "ALL_IPS" => "0.0.0.0" + case _ => executorHostname + }) launchingExecutorContainerIds.remove(containerId) } getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet() diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 5bf825665521..ec75ad897be3 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.yarn -import java.util.Properties +import java.util.{Locale, Properties} import java.util.concurrent.TimeUnit import org.apache.spark.internal.Logging @@ -339,12 +339,16 @@ package object config extends Logging { .stringConf .createOptional - private[spark] val EXECUTOR_BIND_ADDRESS = - ConfigBuilder("spark.yarn.executor.bindAddress") - .doc("Address where to bind network listen sockets on the executor.") + private[spark] val EXECUTOR_BIND_ADDRESS_MODE = + ConfigBuilder("spark.yarn.executor.bindAddress.mode") + .doc("Configures executor behaviour of which network to listen sockets to. " + + "Possible choices are: HOSTNAME means to bind to hostname, " + + "ALL_IPS means to bind to 0.0.0.0") .version("4.0.0") .stringConf - .createOptional + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(Set("HOSTNAME", "ALL_IPS")) + .createWithDefault("HOSTNAME") /* Unmanaged AM configuration. */ diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 68c7ceb83296..8b172baffb21 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -905,9 +905,9 @@ class YarnAllocatorSuite extends SparkFunSuite handler.getNumExecutorsStarting should be(0) } - test("use requested bind-address") { + test("use 0.0.0.0 when requested bind-address mode is ALL_IPS") { val (handler, _) = createAllocator(maxExecutors = 1, - additionalConfigs = Map(EXECUTOR_BIND_ADDRESS.key -> "0.0.0.0")) + additionalConfigs = Map(EXECUTOR_BIND_ADDRESS_MODE.key -> "ALL_IPS")) handler.updateResourceRequests() val container = createContainer("host1") @@ -917,4 +917,16 @@ class YarnAllocatorSuite extends SparkFunSuite handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("0.0.0.0") } + test("use hostname when requested bind-address mode is HOSTNAME") { + val (handler, _) = createAllocator(maxExecutors = 1, + additionalConfigs = Map(EXECUTOR_BIND_ADDRESS_MODE.key -> "HOSTNAME")) + handler.updateResourceRequests() + + val container = createContainer("host1") + handler.handleAllocatedContainers(Array(container).toImmutableArraySeq) + + handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") + handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("host1") + } + }