File tree Expand file tree Collapse file tree 2 files changed +10
-6
lines changed
core/src/main/scala/org/apache/spark Expand file tree Collapse file tree 2 files changed +10
-6
lines changed Original file line number Diff line number Diff line change @@ -265,6 +265,8 @@ object SparkEnv extends Logging {
265265 }
266266
267267 // Figure out which port Akka actually bound to in case the original port is 0 or occupied.
268+ // In the non-driver case, the RPC env's address may be null since it may not be listening
269+ // for incoming connections.
268270 if (isDriver) {
269271 conf.set(" spark.driver.port" , rpcEnv.address.port.toString)
270272 } else if (rpcEnv.address != null ) {
Original file line number Diff line number Diff line change @@ -21,6 +21,7 @@ import java.lang.{Boolean => JBoolean}
2121import java .net .{InetSocketAddress , URI }
2222import java .nio .ByteBuffer
2323import java .util .concurrent ._
24+ import javax .annotation .Nullable ;
2425import javax .annotation .concurrent .GuardedBy
2526
2627import scala .collection .mutable
@@ -89,6 +90,7 @@ private[netty] class NettyRpcEnv(
8990 RpcEndpointVerifier .NAME , new RpcEndpointVerifier (this , dispatcher))
9091 }
9192
93+ @ Nullable
9294 override lazy val address : RpcAddress = {
9395 if (server != null ) RpcAddress (host, server.getPort()) else null
9496 }
@@ -186,12 +188,12 @@ private[netty] class NettyRpcEnv(
186188 clientConnectionExecutor.execute(new Runnable {
187189 override def run (): Unit = Utils .tryLogNonFatalError {
188190 val client = try {
189- getTransportClient(message.receiver)
190- } catch {
191- case NonFatal (e) =>
192- promise.tryFailure(e)
193- throw e
194- }
191+ getTransportClient(message.receiver)
192+ } catch {
193+ case NonFatal (e) =>
194+ promise.tryFailure(e)
195+ throw e
196+ }
195197 client.sendRpc(serialize(message), new RpcResponseCallback {
196198
197199 override def onFailure (e : Throwable ): Unit = {
You can’t perform that action at this time.
0 commit comments