From 90e909025a9a8522432debf8f3f04ce3246f683e Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 4 Nov 2016 15:13:31 -0700 Subject: [PATCH 1/2] Fix potential deadlock in `StandaloneSchedulerBackend.dead` --- .../scheduler/cluster/StandaloneSchedulerBackend.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 368cd30a2e11..cc091091b9be 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -139,7 +139,13 @@ private[spark] class StandaloneSchedulerBackend( scheduler.error(reason) } finally { // Ensure the application terminates, as we can no longer run jobs. - sc.stop() + new Thread("stop-spark-context") { + setDaemon(true) + + override def run(): Unit = { + sc.stop() + } + }.start() } } } From d9c56269833eaa62dbd48e2d9520cb996d617730 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 7 Nov 2016 10:18:17 -0800 Subject: [PATCH 2/2] Add a thread local flag for RPC threads and run 'stop' in a new thread automatically --- .../scala/org/apache/spark/SparkContext.scala | 22 +++++++++++++++++-- .../scala/org/apache/spark/rpc/RpcEnv.scala | 4 ++++ .../apache/spark/rpc/netty/Dispatcher.scala | 1 + .../apache/spark/rpc/netty/NettyRpcEnv.scala | 3 +++ .../cluster/StandaloneSchedulerBackend.scala | 8 +------ .../org/apache/spark/rpc/RpcEnvSuite.scala | 13 +++++++++++ 6 files changed, 42 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 9f0f6074229d..25a3d609a6b0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1757,8 +1757,26 @@ class SparkContext(config: SparkConf) extends Logging { */ def listJars(): Seq[String] = addedJars.keySet.toSeq - // Shut down the SparkContext. - def stop() { + /** + * Shut down the SparkContext. + */ + def stop(): Unit = { + if (env.rpcEnv.isInRPCThread) { + // `stop` will block until all RPC threads exit, so we cannot call stop inside a RPC thread. + // We should launch a new thread to call `stop` to avoid dead-lock. + new Thread("stop-spark-context") { + setDaemon(true) + + override def run(): Unit = { + _stop() + } + }.start() + } else { + _stop() + } + } + + private def _stop() { if (LiveListenerBus.withinListenerThread.value) { throw new SparkException( s"Cannot stop SparkContext within listener thread of ${LiveListenerBus.name}") diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala index 579122868afc..bbc416381490 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala @@ -147,6 +147,10 @@ private[spark] abstract class RpcEnv(conf: SparkConf) { */ def openChannel(uri: String): ReadableByteChannel + /** + * Return if the current thread is a RPC thread. + */ + def isInRPCThread: Boolean } /** diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index a02cf30a5d83..67baabd2cbff 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -201,6 +201,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { /** Message loop used for dispatching messages. */ private class MessageLoop extends Runnable { override def run(): Unit = { + NettyRpcEnv.rpcThreadFlag.value = true try { while (true) { try { diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index e51649a1ecce..0b8cd144a216 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -408,10 +408,13 @@ private[netty] class NettyRpcEnv( } + override def isInRPCThread: Boolean = NettyRpcEnv.rpcThreadFlag.value } private[netty] object NettyRpcEnv extends Logging { + private[netty] val rpcThreadFlag = new DynamicVariable[Boolean](false) + /** * When deserializing the [[NettyRpcEndpointRef]], it needs a reference to [[NettyRpcEnv]]. * Use `currentEnv` to wrap the deserialization codes. E.g., diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index cc091091b9be..368cd30a2e11 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -139,13 +139,7 @@ private[spark] class StandaloneSchedulerBackend( scheduler.error(reason) } finally { // Ensure the application terminates, as we can no longer run jobs. - new Thread("stop-spark-context") { - setDaemon(true) - - override def run(): Unit = { - sc.stop() - } - }.start() + sc.stop() } } } diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index acdf21df9a16..aa0705987d83 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -870,6 +870,19 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { verify(endpoint, never()).onDisconnected(any()) verify(endpoint, never()).onNetworkError(any(), any()) } + + test("isInRPCThread") { + val rpcEndpointRef = env.setupEndpoint("isInRPCThread", new RpcEndpoint { + override val rpcEnv = env + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case m => context.reply(rpcEnv.isInRPCThread) + } + }) + assert(rpcEndpointRef.askWithRetry[Boolean]("hello") === true) + assert(env.isInRPCThread === false) + env.stop(rpcEndpointRef) + } } class UnserializableClass