From d8fc88d19fd68b3cb9b34f34a5a624fe26b31c12 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 11 Dec 2015 14:18:53 +0800 Subject: [PATCH] Prevent RejectedExecutionException by checking if ThreadPoolExecutor is shutdown and its capacity. --- .../spark/deploy/client/AppClient.scala | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 1e2f469214b8..303b0526a8a4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -96,22 +96,27 @@ private[spark] class AppClient( /** * Register with all masters asynchronously and returns an array `Future`s for cancellation. */ - private def tryRegisterAllMasters(): Array[JFuture[_]] = { + private def tryRegisterAllMasters(): Array[Option[JFuture[_]]] = { for (masterAddress <- masterRpcAddresses) yield { - registerMasterThreadPool.submit(new Runnable { - override def run(): Unit = try { - if (registered.get) { - return + if (!registerMasterThreadPool.isShutdown() && + registerMasterThreadPool.getPoolSize() < registerMasterThreadPool.getMaximumPoolSize()) { + Some(registerMasterThreadPool.submit(new Runnable { + override def run(): Unit = try { + if (registered.get) { + return + } + logInfo("Connecting to master " + masterAddress.toSparkURL + "...") + val masterRef = + rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME) + masterRef.send(RegisterApplication(appDescription, self)) + } catch { + case ie: InterruptedException => // Cancelled + case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) } - logInfo("Connecting to master " + masterAddress.toSparkURL + "...") - val masterRef = - rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME) - masterRef.send(RegisterApplication(appDescription, self)) - } catch { - case ie: InterruptedException => // Cancelled - case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) - } - }) + })) + } else { + None + } } } @@ -123,7 +128,7 @@ private[spark] class AppClient( * nthRetry means this is the nth attempt to register with master. */ private def registerWithMaster(nthRetry: Int) { - registerMasterFutures.set(tryRegisterAllMasters()) + registerMasterFutures.set(tryRegisterAllMasters().flatMap(x => x)) registrationRetryTimer.set(registrationRetryThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = { Utils.tryOrExit {