@@ -284,7 +284,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
284284
285285 private [this ] val maxRetries = conf.getInt(" spark.akka.num.retries" , 3 )
286286 private [this ] val retryWaitMs = conf.getLong(" spark.akka.retry.wait" , 3000 )
287- private [this ] val defaultTimeout = conf.getLong(" spark.akka.lookupTimeout " , 30 ) seconds
287+ private [this ] val defaultAskTimeout = conf.getLong(" spark.akka.askTimeout " , 30 ) seconds
288288
289289 /**
290290 * return the address for the [[RpcEndpointRef ]]
@@ -304,7 +304,8 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
304304 *
305305 * This method only sends the message once and never retries.
306306 */
307- def sendWithReply [T : ClassTag ](message : Any ): Future [T ] = sendWithReply(message, defaultTimeout)
307+ def sendWithReply [T : ClassTag ](message : Any ): Future [T ] =
308+ sendWithReply(message, defaultAskTimeout)
308309
309310 /**
310311 * Send a message to the corresponding [[RpcEndpoint.receiveAndReply) ]] and return a `Future` to
@@ -327,7 +328,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
327328 * @tparam T type of the reply message
328329 * @return the reply message from the corresponding [[RpcEndpoint ]]
329330 */
330- def askWithReply [T : ClassTag ](message : Any ): T = askWithReply(message, defaultTimeout )
331+ def askWithReply [T : ClassTag ](message : Any ): T = askWithReply(message, defaultAskTimeout )
331332
332333 /**
333334 * Send a message to the corresponding [[RpcEndpoint.receive ]] and get its result within a
0 commit comments