Skip to content

Commit a1948a0

Browse files
committed
[SPARK-15395][CORE] Use getHostString to create RpcAddress
## What changes were proposed in this pull request? Right now the netty RPC uses `InetSocketAddress.getHostName` to create `RpcAddress` for network events. If we use an IP address to connect, then the RpcAddress's host will be a host name (if the reverse lookup successes) instead of the IP address. However, some places need to compare the original IP address and the RpcAddress in `onDisconnect` (e.g., CoarseGrainedExecutorBackend), and this behavior will make the check incorrect. This PR uses `getHostString` to resolve the issue. ## How was this patch tested? Jenkins unit tests. Author: Shixiong Zhu <[email protected]> Closes #13185 from zsxwing/host-string. (cherry picked from commit 5c9117a) Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 595ed8d commit a1948a0

File tree

1 file changed

+4
-4
lines changed

1 file changed

+4
-4
lines changed

core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@ private[netty] class NettyRpcHandler(
574574
private def internalReceive(client: TransportClient, message: ByteBuffer): RequestMessage = {
575575
val addr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress]
576576
assert(addr != null)
577-
val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
577+
val clientAddr = RpcAddress(addr.getHostString, addr.getPort)
578578
val requestMessage = nettyEnv.deserialize[RequestMessage](client, message)
579579
if (requestMessage.senderAddress == null) {
580580
// Create a new message with the socket address of the client as the sender.
@@ -595,7 +595,7 @@ private[netty] class NettyRpcHandler(
595595
override def exceptionCaught(cause: Throwable, client: TransportClient): Unit = {
596596
val addr = client.getChannel.remoteAddress().asInstanceOf[InetSocketAddress]
597597
if (addr != null) {
598-
val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
598+
val clientAddr = RpcAddress(addr.getHostString, addr.getPort)
599599
dispatcher.postToAll(RemoteProcessConnectionError(cause, clientAddr))
600600
// If the remove RpcEnv listens to some address, we should also fire a
601601
// RemoteProcessConnectionError for the remote RpcEnv listening address
@@ -614,14 +614,14 @@ private[netty] class NettyRpcHandler(
614614
override def channelActive(client: TransportClient): Unit = {
615615
val addr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress]
616616
assert(addr != null)
617-
val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
617+
val clientAddr = RpcAddress(addr.getHostString, addr.getPort)
618618
dispatcher.postToAll(RemoteProcessConnected(clientAddr))
619619
}
620620

621621
override def channelInactive(client: TransportClient): Unit = {
622622
val addr = client.getChannel.remoteAddress().asInstanceOf[InetSocketAddress]
623623
if (addr != null) {
624-
val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
624+
val clientAddr = RpcAddress(addr.getHostString, addr.getPort)
625625
nettyEnv.removeOutbox(clientAddr)
626626
dispatcher.postToAll(RemoteProcessDisconnected(clientAddr))
627627
val remoteEnvAddress = remoteAddresses.remove(clientAddr)

0 commit comments

Comments
 (0)