diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 27497e21b829d..2244303bbf420 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -235,7 +235,7 @@ object SparkEnv extends Logging { // for incoming connections. if (isDriver) { conf.set("spark.driver.port", rpcEnv.address.port.toString) - } else if (rpcEnv.address != null) { + } else if (rpcEnv.address.port != -1) { conf.set("spark.executor.port", rpcEnv.address.port.toString) } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index e08729510926b..e337739640e25 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -75,9 +75,10 @@ private[spark] class CoarseGrainedExecutorBackend( } override def receive: PartialFunction[Any, Unit] = { - case RegisteredExecutor(hostname) => + case RegisteredExecutor => logInfo("Successfully registered with driver") - executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) + executor = new Executor(executorId, env.rpcEnv.address.host, env, + userClassPath, isLocal = false) case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 7f2192e1f5a70..d1ff4f92bc2d3 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -120,9 +120,15 @@ private[netty] class NettyRpcEnv( RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher)) } - @Nullable override lazy val address: RpcAddress = { - if (server != null) RpcAddress(host, server.getPort()) else null + // When running in client mode (i.e.: not opening a listening port, but connecting to a + // 'server' with an open port), the server value is null. + // But a valid host still has to be provided, as it is needed later on. + // Previously, the server side of the connection was attempting to guess the client + // hostname, using the connection information. But the value generated is wrong when + // the connection is NATed. + // [SPARK-14849] + if (server != null) RpcAddress(host, server.getPort()) else RpcAddress(host, -1) } override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { @@ -152,8 +158,8 @@ private[netty] class NettyRpcEnv( if (receiver.client != null) { message.sendWith(receiver.client) } else { - require(receiver.address != null, - "Cannot send message to client endpoint with no listen address.") + require(receiver.address.port != -1, + "Cannot send message to client endpoint with no listen port.") val targetOutbox = { val outbox = outboxes.get(receiver.address) if (outbox == null) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 46a829114ec86..70341cc8272be 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -40,7 +40,7 @@ private[spark] object CoarseGrainedClusterMessages { sealed trait RegisterExecutorResponse - case class RegisteredExecutor(hostname: String) extends CoarseGrainedClusterMessage + case object RegisteredExecutor extends CoarseGrainedClusterMessage with RegisterExecutorResponse case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 8896391f9775f..dceb39c66fc0f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -153,13 +153,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) } else { - // If the executor's rpc env is not listening for incoming connections, `hostPort` - // will be null, and the client connection should be used to contact the executor. - val executorAddress = if (executorRef.address != null) { - executorRef.address - } else { - context.senderAddress - } + val executorAddress = executorRef.address logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId") addressToExecutorId(executorAddress) = executorId totalCoreCount.addAndGet(cores) @@ -178,7 +172,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") } } - executorRef.send(RegisteredExecutor(executorAddress.host)) + executorRef.send(RegisteredExecutor) // Note: some tests expect the reply to come after we put the executor in the map context.reply(true) listenerBus.post(