Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,15 @@ object SparkEnv extends Logging {
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
bindAddress: String,
hostname: String,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {
val env = create(
conf,
executorId,
hostname,
bindAddress,
hostname,
None,
isLocal,
Expand All @@ -211,6 +212,19 @@ object SparkEnv extends Logging {
env
}

/**
* Create a SparkEnv for an executor with hostname as executor's bind address.
*/
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
hostname: String,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {
createExecutorEnv(conf, executorId, hostname, hostname, numCores, ioEncryptionKey, isLocal)
}

/**
* Helper method to create a SparkEnv for a driver or an executor.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
bindAddress: String,
hostname: String,
cores: Int,
userClassPath: Seq[URL],
Expand Down Expand Up @@ -177,6 +178,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
private def run(
driverUrl: String,
executorId: String,
bindAddress: String,
hostname: String,
cores: Int,
appId: String,
Expand All @@ -193,10 +195,12 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val executorConf = new SparkConf
val fetcher = RpcEnv.create(
"driverPropsFetcher",
hostname,
-1,
bindAddress,
advertiseAddress = hostname,
port = -1,
executorConf,
new SecurityManager(executorConf),
numUsableCores = 0,
clientMode = true)
val driver = fetcher.setupEndpointRefByURI(driverUrl)
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
Expand All @@ -219,10 +223,10 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}

val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
driverConf, executorId, bindAddress, hostname, cores, cfg.ioEncryptionKey, isLocal = false)

env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
env.rpcEnv, driverUrl, executorId, bindAddress, hostname, cores, userClassPath, env))
workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
Expand All @@ -233,6 +237,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
def main(args: Array[String]) {
var driverUrl: String = null
var executorId: String = null
var bindAddress: String = null
var hostname: String = null
var cores: Int = 0
var appId: String = null
Expand All @@ -248,6 +253,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
case ("--executor-id") :: value :: tail =>
executorId = value
argv = tail
case ("--bind-address") :: value :: tail =>
bindAddress = value
argv = tail
case ("--hostname") :: value :: tail =>
hostname = value
argv = tail
Expand Down Expand Up @@ -278,11 +286,15 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
log.info(s"Executor hostname is not provided, will use '$hostname' to advertise itself")
}

if (bindAddress == null) {
bindAddress = hostname
}

if (driverUrl == null || executorId == null || cores <= 0 || appId == null) {
printUsageAndExit()
}

run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
run(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl, userClassPath)
System.exit(0)
}

Expand All @@ -295,6 +307,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
| Options are:
| --driver-url <driverUrl>
| --executor-id <executorId>
| --bind-address <bind-address>
| --hostname <hostname>
| --cores <cores>
| --app-id <appid>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ package object config {
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")

private[spark] val EXECUTOR_BIND_ADDRESS = ConfigBuilder("spark.executor.bindAddress")
.doc("Address where to bind network listen sockets on the executor.")
.stringConf
.createWithDefault(Utils.localHostName())

private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.executor.memoryOverhead")
.doc("The amount of off-heap memory to be allocated per executor in cluster mode, " +
"in MiB unless otherwise specified.")
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,18 @@ of the most common options to set are:
This option is currently supported on YARN and Kubernetes.
</td>
</tr>
<tr>
<td><code>spark.executor.bindAddress</code></td>
<td>(local hostname)</td>
<td>
Hostname or IP address where to bind listening sockets. This config overrides the SPARK_LOCAL_IP
environment variable (see below).
<br />It also allows a different address from the local one to be advertised to other
executors or external systems. This is useful, for example, when running containers with bridged networking.
For this to properly work, the different ports used by the driver (RPC, block manager and UI) need to be
forwarded from the container's host.
</td>
</tr>
<tr>
<td><code>spark.extraListeners</code></td>
<td>(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = _sparkConf.get(EXECUTOR_CORES)
val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>",
"<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
"<bind-address>", "<hostname>", executorMemory, executorCores, appId,
securityMgr, localResources)
dummyRunner.launchContextDebugInfo()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ private[yarn] class ExecutorRunnable(
sparkConf: SparkConf,
masterAddress: String,
executorId: String,
bindAddress: String,
hostname: String,
executorMemory: Int,
executorCores: Int,
Expand Down Expand Up @@ -204,6 +205,7 @@ private[yarn] class ExecutorRunnable(
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
"--driver-url", masterAddress,
"--executor-id", executorId,
"--bind-address", bindAddress,
"--hostname", hostname,
"--cores", executorCores.toString,
"--app-id", appId) ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ private[yarn] class YarnAllocator(
for (container <- containersToUse) {
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
val executorBindAddress = sparkConf.get(EXECUTOR_BIND_ADDRESS.key, executorHostname)
val containerId = container.getId
val executorId = executorIdCounter.toString
assert(container.getResource.getMemory >= resource.getMemory)
Expand Down Expand Up @@ -537,6 +538,7 @@ private[yarn] class YarnAllocator(
sparkConf,
driverUrl,
executorId,
executorBindAddress,
executorHostname,
executorMemory,
executorCores,
Expand Down