Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ object SparkEnv extends Logging {
// for incoming connections.
if (isDriver) {
conf.set("spark.driver.port", rpcEnv.address.port.toString)
} else if (rpcEnv.address != null) {
} else if (rpcEnv.address.port != -1) {
conf.set("spark.executor.port", rpcEnv.address.port.toString)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ private[spark] class CoarseGrainedExecutorBackend(
}

override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor(hostname) =>
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
executor = new Executor(executorId, env.rpcEnv.address.host, env,
userClassPath, isLocal = false)

case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
Expand Down
14 changes: 10 additions & 4 deletions core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,15 @@ private[netty] class NettyRpcEnv(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}

@Nullable
override lazy val address: RpcAddress = {
if (server != null) RpcAddress(host, server.getPort()) else null
// When running in client mode (i.e.: not opening a listening port, but connecting to a
// 'server' with an open port), the server value is null.
// But a valid host still has to be provided, as it is needed later on.
// Previously, the server side of the connection was attempting to guess the client
// hostname, using the connection information. But the value generated is wrong when
// the connection is NATed.
// [SPARK-14849]
if (server != null) RpcAddress(host, server.getPort()) else RpcAddress(host, -1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while you are at it, we should probably document when server is null and explain the choices here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 is actually confusing. All of executors in the same node will have the same address.

}

override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
Expand Down Expand Up @@ -152,8 +158,8 @@ private[netty] class NettyRpcEnv(
if (receiver.client != null) {
message.sendWith(receiver.client)
} else {
require(receiver.address != null,
"Cannot send message to client endpoint with no listen address.")
require(receiver.address.port != -1,
"Cannot send message to client endpoint with no listen port.")
val targetOutbox = {
val outbox = outboxes.get(receiver.address)
if (outbox == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private[spark] object CoarseGrainedClusterMessages {

sealed trait RegisterExecutorResponse

case class RegisteredExecutor(hostname: String) extends CoarseGrainedClusterMessage
case object RegisteredExecutor extends CoarseGrainedClusterMessage
with RegisterExecutorResponse

case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
} else {
// If the executor's rpc env is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
val executorAddress = if (executorRef.address != null) {
executorRef.address
} else {
context.senderAddress
}
val executorAddress = executorRef.address
logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
addressToExecutorId(executorAddress) = executorId
totalCoreCount.addAndGet(cores)
Expand All @@ -178,7 +172,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
executorRef.send(RegisteredExecutor(executorAddress.host))
executorRef.send(RegisteredExecutor)
// Note: some tests expect the reply to come after we put the executor in the map
context.reply(true)
listenerBus.post(
Expand Down