Skip to content

Commit 22662b2

Browse files
committed
[SPARK-12614][CORE] Don't throw non fatal exception from ask
Right now RpcEndpointRef.ask may throw exception in some corner cases, such as calling ask after stopping RpcEnv. It's better to avoid throwing exception from RpcEndpointRef.ask. We can send the exception to the future for `ask`. Author: Shixiong Zhu <[email protected]> Closes #10568 from zsxwing/send-ask-fail.
1 parent eb91729 commit 22662b2

File tree

1 file changed

+29
-25
lines changed

1 file changed

+29
-25
lines changed

core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -211,33 +211,37 @@ private[netty] class NettyRpcEnv(
211211
}
212212
}
213213

214-
if (remoteAddr == address) {
215-
val p = Promise[Any]()
216-
p.future.onComplete {
217-
case Success(response) => onSuccess(response)
218-
case Failure(e) => onFailure(e)
219-
}(ThreadUtils.sameThread)
220-
dispatcher.postLocalMessage(message, p)
221-
} else {
222-
val rpcMessage = RpcOutboxMessage(serialize(message),
223-
onFailure,
224-
(client, response) => onSuccess(deserialize[Any](client, response)))
225-
postToOutbox(message.receiver, rpcMessage)
226-
promise.future.onFailure {
227-
case _: TimeoutException => rpcMessage.onTimeout()
228-
case _ =>
214+
try {
215+
if (remoteAddr == address) {
216+
val p = Promise[Any]()
217+
p.future.onComplete {
218+
case Success(response) => onSuccess(response)
219+
case Failure(e) => onFailure(e)
220+
}(ThreadUtils.sameThread)
221+
dispatcher.postLocalMessage(message, p)
222+
} else {
223+
val rpcMessage = RpcOutboxMessage(serialize(message),
224+
onFailure,
225+
(client, response) => onSuccess(deserialize[Any](client, response)))
226+
postToOutbox(message.receiver, rpcMessage)
227+
promise.future.onFailure {
228+
case _: TimeoutException => rpcMessage.onTimeout()
229+
case _ =>
230+
}(ThreadUtils.sameThread)
231+
}
232+
233+
val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
234+
override def run(): Unit = {
235+
onFailure(new TimeoutException(s"Cannot receive any reply in ${timeout.duration}"))
236+
}
237+
}, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
238+
promise.future.onComplete { v =>
239+
timeoutCancelable.cancel(true)
229240
}(ThreadUtils.sameThread)
241+
} catch {
242+
case NonFatal(e) =>
243+
onFailure(e)
230244
}
231-
232-
val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
233-
override def run(): Unit = {
234-
promise.tryFailure(
235-
new TimeoutException(s"Cannot receive any reply in ${timeout.duration}"))
236-
}
237-
}, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
238-
promise.future.onComplete { v =>
239-
timeoutCancelable.cancel(true)
240-
}(ThreadUtils.sameThread)
241245
promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
242246
}
243247

0 commit comments

Comments
 (0)