Skip to content

Commit dcad398

Browse files
author
hendra.saputra
committed
[SPARK-45278][YARN] Allow limited configuration option to either HOSTNAME or ALL_IPS
1 parent bbf8d7f commit dcad398

File tree

4 files changed

+35
-15
lines changed

4 files changed

+35
-15
lines changed

docs/running-on-yarn.md

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -515,12 +515,10 @@ To use a custom metrics.properties for the application master and executors, upd
515515
<td>1.6.0</td>
516516
</tr>
517517
<tr>
518-
<td><code>spark.yarn.executor.bindAddress</code></td>
519-
<td><code>(executor hostname)</code></td>
518+
<td><code>spark.yarn.executor.bindAddress.mode</code></td>
519+
<td><code><code>HOSTNAME</code></code></td>
520520
<td>
521-
Hostname or IP address where to bind listening sockets in YARN cluster and client mode.
522-
<br />It also allows a different address from the local one to be advertised to other
523-
executors or external systems.
521+
Configures executor behaviour of which network to listen sockets to. Possible choices are: <code>HOSTNAME</code> means to bind to hostname, <code>ALL_IPS</code> means to bind to 0.0.0.0
524522
</td>
525523
<td>4.0.0</td>
526524
</tr>

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ private[yarn] class YarnAllocator(
176176

177177
private val minMemoryOverhead = sparkConf.get(EXECUTOR_MIN_MEMORY_OVERHEAD)
178178

179-
private val bindAddress = sparkConf.get(EXECUTOR_BIND_ADDRESS)
179+
private val bindAddressMode = sparkConf.get(EXECUTOR_BIND_ADDRESS_MODE)
180180

181181
private val memoryOverheadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
182182

@@ -762,7 +762,10 @@ private[yarn] class YarnAllocator(
762762
val rpId = getResourceProfileIdFromPriority(container.getPriority)
763763
executorIdCounter += 1
764764
val executorHostname = container.getNodeId.getHost
765-
val executorBindAddress = bindAddress.getOrElse(executorHostname)
765+
val executorBindAddress = bindAddressMode match {
766+
case "ALL_IPS" => "0.0.0.0"
767+
case _ => executorHostname
768+
}
766769
val containerId = container.getId
767770
val executorId = executorIdCounter.toString
768771
val yarnResourceForRpId = rpIdToYarnResource.get(rpId)
@@ -846,7 +849,10 @@ private[yarn] class YarnAllocator(
846849
new HashSet[ContainerId])
847850
containerSet += containerId
848851
allocatedContainerToHostMap.put(containerId, executorHostname)
849-
allocatedContainerToBindAddressMap.put(containerId, bindAddress.getOrElse(executorHostname))
852+
allocatedContainerToBindAddressMap.put(containerId, bindAddressMode match {
853+
case "ALL_IPS" => "0.0.0.0"
854+
case _ => executorHostname
855+
})
850856
launchingExecutorContainerIds.remove(containerId)
851857
}
852858
getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet()

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.deploy.yarn
1919

20-
import java.util.Properties
20+
import java.util.{Locale, Properties}
2121
import java.util.concurrent.TimeUnit
2222

2323
import org.apache.spark.internal.Logging
@@ -339,12 +339,16 @@ package object config extends Logging {
339339
.stringConf
340340
.createOptional
341341

342-
private[spark] val EXECUTOR_BIND_ADDRESS =
343-
ConfigBuilder("spark.yarn.executor.bindAddress")
344-
.doc("Address where to bind network listen sockets on the executor.")
342+
private[spark] val EXECUTOR_BIND_ADDRESS_MODE =
343+
ConfigBuilder("spark.yarn.executor.bindAddress.mode")
344+
.doc("Configures executor behaviour of which network to listen sockets to. " +
345+
"Possible choices are: HOSTNAME means to bind to hostname, " +
346+
"ALL_IPS means to bind to 0.0.0.0")
345347
.version("4.0.0")
346348
.stringConf
347-
.createOptional
349+
.transform(_.toUpperCase(Locale.ROOT))
350+
.checkValues(Set("HOSTNAME", "ALL_IPS"))
351+
.createWithDefault("HOSTNAME")
348352

349353
/* Unmanaged AM configuration. */
350354

resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -905,9 +905,9 @@ class YarnAllocatorSuite extends SparkFunSuite
905905
handler.getNumExecutorsStarting should be(0)
906906
}
907907

908-
test("use requested bind-address") {
908+
test("use 0.0.0.0 when requested bind-address mode is ALL_IPS") {
909909
val (handler, _) = createAllocator(maxExecutors = 1,
910-
additionalConfigs = Map(EXECUTOR_BIND_ADDRESS.key -> "0.0.0.0"))
910+
additionalConfigs = Map(EXECUTOR_BIND_ADDRESS_MODE.key -> "ALL_IPS"))
911911
handler.updateResourceRequests()
912912

913913
val container = createContainer("host1")
@@ -917,4 +917,16 @@ class YarnAllocatorSuite extends SparkFunSuite
917917
handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("0.0.0.0")
918918
}
919919

920+
test("use hostname when requested bind-address mode is HOSTNAME") {
921+
val (handler, _) = createAllocator(maxExecutors = 1,
922+
additionalConfigs = Map(EXECUTOR_BIND_ADDRESS_MODE.key -> "HOSTNAME"))
923+
handler.updateResourceRequests()
924+
925+
val container = createContainer("host1")
926+
handler.handleAllocatedContainers(Array(container).toImmutableArraySeq)
927+
928+
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
929+
handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("host1")
930+
}
931+
920932
}

0 commit comments

Comments
 (0)