From 3d4b69c4230065b22f95e757b8be18ab0268ec51 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 10 Nov 2015 16:43:45 -0800 Subject: [PATCH 1/3] [SPARK-10827] replace volatile with Atomic* in AppClient.scala. --- .../spark/deploy/client/AppClient.scala | 58 ++++++++++--------- 1 file changed, 30 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 3f29da663b798..14e2ec0593971 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.client import java.util.concurrent._ +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture} import scala.util.control.NonFatal @@ -49,9 +50,9 @@ private[spark] class AppClient( private val REGISTRATION_TIMEOUT_SECONDS = 20 private val REGISTRATION_RETRIES = 3 - @volatile private var endpoint: RpcEndpointRef = null - @volatile private var appId: String = null - @volatile private var registered = false + private var endpoint = new AtomicReference[RpcEndpointRef] + private val appId = new AtomicReference[String] + private val registered = new AtomicBoolean(false) private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { @@ -59,16 +60,17 @@ private[spark] class AppClient( private var master: Option[RpcEndpointRef] = None // To avoid calling listener.disconnected() multiple times private var alreadyDisconnected = false - @volatile private var alreadyDead = false // To avoid calling listener.dead() multiple times - @volatile private var registerMasterFutures: Array[JFuture[_]] = null - @volatile private var registrationRetryTimer: JScheduledFuture[_] = null + // To avoid calling listener.dead() multiple times + private val alreadyDead = new AtomicBoolean(false) + private val registerMasterFutures = new AtomicReference[Array[JFuture[_]]] + private val registrationRetryTimer = new AtomicReference[JScheduledFuture[_]] // A thread pool for registering with masters. Because registering with a master is a blocking // action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same // time so that we can register with all masters. private val registerMasterThreadPool = new ThreadPoolExecutor( 0, - masterRpcAddresses.size, // Make sure we can register with all masters at the same time + masterRpcAddresses.length, // Make sure we can register with all masters at the same time 60L, TimeUnit.SECONDS, new SynchronousQueue[Runnable](), ThreadUtils.namedThreadFactory("appclient-register-master-threadpool")) @@ -100,7 +102,7 @@ private[spark] class AppClient( for (masterAddress <- masterRpcAddresses) yield { registerMasterThreadPool.submit(new Runnable { override def run(): Unit = try { - if (registered) { + if (registered.get()) { return } logInfo("Connecting to master " + masterAddress.toSparkURL + "...") @@ -123,22 +125,22 @@ private[spark] class AppClient( * nthRetry means this is the nth attempt to register with master. */ private def registerWithMaster(nthRetry: Int) { - registerMasterFutures = tryRegisterAllMasters() - registrationRetryTimer = registrationRetryThread.scheduleAtFixedRate(new Runnable { + registerMasterFutures.set(tryRegisterAllMasters()) + registrationRetryTimer.set(registrationRetryThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = { Utils.tryOrExit { - if (registered) { - registerMasterFutures.foreach(_.cancel(true)) + if (registered.get()) { + registerMasterFutures.get.foreach(_.cancel(true)) registerMasterThreadPool.shutdownNow() } else if (nthRetry >= REGISTRATION_RETRIES) { markDead("All masters are unresponsive! Giving up.") } else { - registerMasterFutures.foreach(_.cancel(true)) + registerMasterFutures.get.foreach(_.cancel(true)) registerWithMaster(nthRetry + 1) } } } - }, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS) + }, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)) } /** @@ -163,10 +165,10 @@ private[spark] class AppClient( // RegisteredApplications due to an unstable network. // 2. Receive multiple RegisteredApplication from different masters because the master is // changing. - appId = appId_ - registered = true + appId.set(appId_) + registered.set(true) master = Some(masterRef) - listener.connected(appId) + listener.connected(appId.get) case ApplicationRemoved(message) => markDead("Master removed our application: %s".format(message)) @@ -178,7 +180,7 @@ private[spark] class AppClient( cores)) // FIXME if changing master and `ExecutorAdded` happen at the same time (the order is not // guaranteed), `ExecutorStateChanged` may be sent to a dead master. - sendToMaster(ExecutorStateChanged(appId, id, ExecutorState.RUNNING, None, None)) + sendToMaster(ExecutorStateChanged(appId.get, id, ExecutorState.RUNNING, None, None)) listener.executorAdded(fullId, workerId, hostPort, cores, memory) case ExecutorUpdated(id, state, message, exitStatus) => @@ -193,13 +195,13 @@ private[spark] class AppClient( logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL) master = Some(masterRef) alreadyDisconnected = false - masterRef.send(MasterChangeAcknowledged(appId)) + masterRef.send(MasterChangeAcknowledged(appId.get)) } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case StopAppClient => markDead("Application has been stopped.") - sendToMaster(UnregisterApplication(appId)) + sendToMaster(UnregisterApplication(appId.get)) context.reply(true) stop() @@ -263,18 +265,18 @@ private[spark] class AppClient( } def markDead(reason: String) { - if (!alreadyDead) { + if (!alreadyDead.get) { listener.dead(reason) - alreadyDead = true + alreadyDead.set(true) } } override def onStop(): Unit = { if (registrationRetryTimer != null) { - registrationRetryTimer.cancel(true) + registrationRetryTimer.get.cancel(true) } registrationRetryThread.shutdownNow() - registerMasterFutures.foreach(_.cancel(true)) + registerMasterFutures.get.foreach(_.cancel(true)) registerMasterThreadPool.shutdownNow() askAndReplyThreadPool.shutdownNow() } @@ -283,14 +285,14 @@ private[spark] class AppClient( def start() { // Just launch an rpcEndpoint; it will call back into the listener. - endpoint = rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)) + endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv))) } def stop() { if (endpoint != null) { try { val timeout = RpcUtils.askRpcTimeout(conf) - timeout.awaitResult(endpoint.ask[Boolean](StopAppClient)) + timeout.awaitResult(endpoint.get.ask[Boolean](StopAppClient)) } catch { case e: TimeoutException => logInfo("Stop request to Master timed out; it may already be shut down.") @@ -307,7 +309,7 @@ private[spark] class AppClient( */ def requestTotalExecutors(requestedTotal: Int): Boolean = { if (endpoint != null && appId != null) { - endpoint.askWithRetry[Boolean](RequestExecutors(appId, requestedTotal)) + endpoint.get.askWithRetry[Boolean](RequestExecutors(appId.get, requestedTotal)) } else { logWarning("Attempted to request executors before driver fully initialized.") false @@ -320,7 +322,7 @@ private[spark] class AppClient( */ def killExecutors(executorIds: Seq[String]): Boolean = { if (endpoint != null && appId != null) { - endpoint.askWithRetry[Boolean](KillExecutors(appId, executorIds)) + endpoint.get.askWithRetry[Boolean](KillExecutors(appId.get, executorIds)) } else { logWarning("Attempted to kill executors before driver fully initialized.") false From 5507aea02e6d8ea2bba539193f10f8ab55918614 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 10 Nov 2015 16:46:58 -0800 Subject: [PATCH 2/3] bo --- .../scala/org/apache/spark/deploy/client/AppClient.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 14e2ec0593971..84679ede6322d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -50,7 +50,7 @@ private[spark] class AppClient( private val REGISTRATION_TIMEOUT_SECONDS = 20 private val REGISTRATION_RETRIES = 3 - private var endpoint = new AtomicReference[RpcEndpointRef] + private val endpoint = new AtomicReference[RpcEndpointRef] private val appId = new AtomicReference[String] private val registered = new AtomicBoolean(false) @@ -102,7 +102,7 @@ private[spark] class AppClient( for (masterAddress <- masterRpcAddresses) yield { registerMasterThreadPool.submit(new Runnable { override def run(): Unit = try { - if (registered.get()) { + if (registered.get) { return } logInfo("Connecting to master " + masterAddress.toSparkURL + "...") @@ -129,7 +129,7 @@ private[spark] class AppClient( registrationRetryTimer.set(registrationRetryThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = { Utils.tryOrExit { - if (registered.get()) { + if (registered.get) { registerMasterFutures.get.foreach(_.cancel(true)) registerMasterThreadPool.shutdownNow() } else if (nthRetry >= REGISTRATION_RETRIES) { @@ -297,7 +297,7 @@ private[spark] class AppClient( case e: TimeoutException => logInfo("Stop request to Master timed out; it may already be shut down.") } - endpoint = null + endpoint.set(null) } } From b0833098acc9662ec2ca182fbb271977742cc400 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 10 Nov 2015 16:52:37 -0800 Subject: [PATCH 3/3] null checks --- .../scala/org/apache/spark/deploy/client/AppClient.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 84679ede6322d..afab362e213b5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -272,7 +272,7 @@ private[spark] class AppClient( } override def onStop(): Unit = { - if (registrationRetryTimer != null) { + if (registrationRetryTimer.get != null) { registrationRetryTimer.get.cancel(true) } registrationRetryThread.shutdownNow() @@ -289,7 +289,7 @@ private[spark] class AppClient( } def stop() { - if (endpoint != null) { + if (endpoint.get != null) { try { val timeout = RpcUtils.askRpcTimeout(conf) timeout.awaitResult(endpoint.get.ask[Boolean](StopAppClient)) @@ -308,7 +308,7 @@ private[spark] class AppClient( * @return whether the request is acknowledged. */ def requestTotalExecutors(requestedTotal: Int): Boolean = { - if (endpoint != null && appId != null) { + if (endpoint.get != null && appId.get != null) { endpoint.get.askWithRetry[Boolean](RequestExecutors(appId.get, requestedTotal)) } else { logWarning("Attempted to request executors before driver fully initialized.") @@ -321,7 +321,7 @@ private[spark] class AppClient( * @return whether the kill request is acknowledged. */ def killExecutors(executorIds: Seq[String]): Boolean = { - if (endpoint != null && appId != null) { + if (endpoint.get != null && appId.get != null) { endpoint.get.askWithRetry[Boolean](KillExecutors(appId.get, executorIds)) } else { logWarning("Attempted to kill executors before driver fully initialized.")