Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,14 @@ To use a custom metrics.properties for the application master and executors, upd
</td>
<td>1.6.0</td>
</tr>
<tr>
<td><code>spark.yarn.executor.bindAddress.mode</code></td>
<td><code><code>HOSTNAME</code></code></td>
<td>
Configures executor behaviour of which network to listen sockets to. Possible choices are: <code>HOSTNAME</code> means to bind to hostname, <code>ALL_IPS</code> means to bind to 0.0.0.0
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.yarn.executor.nodeLabelExpression</code></td>
<td>(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,9 @@ private[spark] class ApplicationMaster(
logInfo {
val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = _sparkConf.get(EXECUTOR_CORES)
val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>",
"<hostname>", executorMemory, executorCores, appId, securityMgr, localResources,
val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl,
"<executorId>", "<bindAddress>", "<hostname>",
executorMemory, executorCores, appId, securityMgr, localResources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
dummyRunner.launchContextDebugInfo()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ private[yarn] class ExecutorRunnable(
sparkConf: SparkConf,
masterAddress: String,
executorId: String,
bindAddress: String,
hostname: String,
executorMemory: Int,
executorCores: Int,
Expand Down Expand Up @@ -120,7 +121,7 @@ private[yarn] class ExecutorRunnable(
} catch {
case ex: Exception =>
throw new SparkException(s"Exception while starting container ${container.get.getId}" +
s" on host $hostname", ex)
s" on host $hostname ($bindAddress)", ex)
}
}

Expand Down Expand Up @@ -192,6 +193,7 @@ private[yarn] class ExecutorRunnable(
Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
"--driver-url", masterAddress,
"--executor-id", executorId,
"--bind-address", bindAddress,
"--hostname", hostname,
"--cores", executorCores.toString,
"--app-id", appId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ private[yarn] class YarnAllocator(
@GuardedBy("this")
val allocatedContainerToHostMap = new HashMap[ContainerId, String]

@GuardedBy("this")
val allocatedContainerToBindAddressMap = new HashMap[ContainerId, String]

// Containers that we no longer care about. We've either already told the RM to release them or
// will on the next heartbeat. Containers get removed from this map after the RM tells us they've
// completed.
Expand Down Expand Up @@ -173,6 +176,8 @@ private[yarn] class YarnAllocator(

private val minMemoryOverhead = sparkConf.get(EXECUTOR_MIN_MEMORY_OVERHEAD)

private val bindAddressMode = sparkConf.get(EXECUTOR_BIND_ADDRESS_MODE)

private val memoryOverheadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)

private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
Expand Down Expand Up @@ -757,12 +762,17 @@ private[yarn] class YarnAllocator(
val rpId = getResourceProfileIdFromPriority(container.getPriority)
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
val executorBindAddress = bindAddressMode match {
case "ALL_IPS" => "0.0.0.0"
case _ => executorHostname
}
val containerId = container.getId
val executorId = executorIdCounter.toString
val yarnResourceForRpId = rpIdToYarnResource.get(rpId)
assert(container.getResource.getMemorySize >= yarnResourceForRpId.getMemorySize)
logInfo(log"Launching container ${MDC(LogKeys.CONTAINER_ID, containerId)} " +
log"on host ${MDC(LogKeys.HOST, executorHostname)} for " +
log"on host ${MDC(LogKeys.HOST, executorHostname)} " +
log"with bind-address ${MDC(LogKeys.BIND_ADDRESS, executorBindAddress)} for " +
log"executor with ID ${MDC(LogKeys.EXECUTOR_ID, executorId)} for " +
log"ResourceProfile Id ${MDC(LogKeys.RESOURCE_PROFILE_ID, rpId)}")

Expand All @@ -788,6 +798,7 @@ private[yarn] class YarnAllocator(
sparkConf,
driverUrl,
executorId,
executorBindAddress,
executorHostname,
containerMem,
containerCores,
Expand Down Expand Up @@ -838,6 +849,10 @@ private[yarn] class YarnAllocator(
new HashSet[ContainerId])
containerSet += containerId
allocatedContainerToHostMap.put(containerId, executorHostname)
allocatedContainerToBindAddressMap.put(containerId, bindAddressMode match {
case "ALL_IPS" => "0.0.0.0"
case _ => executorHostname
})
launchingExecutorContainerIds.remove(containerId)
}
getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet()
Expand Down Expand Up @@ -952,6 +967,7 @@ private[yarn] class YarnAllocator(
}

allocatedContainerToHostMap.remove(containerId)
allocatedContainerToBindAddressMap.remove(containerId)
}

containerIdToExecutorIdAndResourceProfileId.remove(containerId).foreach { case (eid, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy.yarn

import java.util.Properties
import java.util.{Locale, Properties}
import java.util.concurrent.TimeUnit

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -339,6 +339,17 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val EXECUTOR_BIND_ADDRESS_MODE =
ConfigBuilder("spark.yarn.executor.bindAddress.mode")
.doc("Configures executor behaviour of which network to listen sockets to. " +
"Possible choices are: HOSTNAME means to bind to hostname, " +
"ALL_IPS means to bind to 0.0.0.0")
.version("4.0.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(Set("HOSTNAME", "ALL_IPS"))
.createWithDefault("HOSTNAME")

/* Unmanaged AM configuration. */

private[spark] val YARN_UNMANAGED_AM = ConfigBuilder("spark.yarn.unmanagedAM.enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class ExecutorRunnableSuite extends SparkFunSuite {
"yarn",
"exec-1",
"localhost",
"localhost",
1,
1,
"application_123_1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ class YarnAllocatorSuite extends SparkFunSuite
ContainerStatus.newInstance(containerId, containerState, diagnostics, exitStatus)
}


test("single container allocated") {
// request a single container and receive it
val (handler, _) = createAllocator(1)
Expand All @@ -186,6 +185,7 @@ class YarnAllocatorSuite extends SparkFunSuite

handler.getNumExecutorsRunning should be (1)
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("host1")
val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId)
hostTocontainer.get("host1").get should contain(container.getId)

Expand Down Expand Up @@ -364,7 +364,7 @@ class YarnAllocatorSuite extends SparkFunSuite
}
}

test("container should not be created if requested number if met") {
test("container should not be created if requested number is met") {
// request a single container and receive it
val (handler, _) = createAllocator(1)
handler.updateResourceRequests()
Expand Down Expand Up @@ -904,4 +904,29 @@ class YarnAllocatorSuite extends SparkFunSuite
handler.getNumExecutorsRunning should be(0)
handler.getNumExecutorsStarting should be(0)
}

test("use 0.0.0.0 when requested bind-address mode is ALL_IPS") {
val (handler, _) = createAllocator(maxExecutors = 1,
additionalConfigs = Map(EXECUTOR_BIND_ADDRESS_MODE.key -> "ALL_IPS"))
handler.updateResourceRequests()

val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container).toImmutableArraySeq)

handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("0.0.0.0")
}

test("use hostname when requested bind-address mode is HOSTNAME") {
val (handler, _) = createAllocator(maxExecutors = 1,
additionalConfigs = Map(EXECUTOR_BIND_ADDRESS_MODE.key -> "HOSTNAME"))
handler.updateResourceRequests()

val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container).toImmutableArraySeq)

handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("host1")
}

}