1818package org .apache .spark .deploy .client
1919
2020import java .util .concurrent ._
21+ import java .util .concurrent .atomic .{AtomicBoolean , AtomicReference }
2122import java .util .concurrent .{Future => JFuture , ScheduledFuture => JScheduledFuture }
2223
2324import scala .util .control .NonFatal
@@ -49,26 +50,27 @@ private[spark] class AppClient(
4950 private val REGISTRATION_TIMEOUT_SECONDS = 20
5051 private val REGISTRATION_RETRIES = 3
5152
52- @ volatile private var endpoint : RpcEndpointRef = null
53- @ volatile private var appId : String = null
54- @ volatile private var registered = false
53+ private val endpoint = new AtomicReference [ RpcEndpointRef ]
54+ private val appId = new AtomicReference [ String ]
55+ private val registered = new AtomicBoolean ( false )
5556
5657 private class ClientEndpoint (override val rpcEnv : RpcEnv ) extends ThreadSafeRpcEndpoint
5758 with Logging {
5859
5960 private var master : Option [RpcEndpointRef ] = None
6061 // To avoid calling listener.disconnected() multiple times
6162 private var alreadyDisconnected = false
62- @ volatile private var alreadyDead = false // To avoid calling listener.dead() multiple times
63- @ volatile private var registerMasterFutures : Array [JFuture [_]] = null
64- @ volatile private var registrationRetryTimer : JScheduledFuture [_] = null
63+ // To avoid calling listener.dead() multiple times
64+ private val alreadyDead = new AtomicBoolean (false )
65+ private val registerMasterFutures = new AtomicReference [Array [JFuture [_]]]
66+ private val registrationRetryTimer = new AtomicReference [JScheduledFuture [_]]
6567
6668 // A thread pool for registering with masters. Because registering with a master is a blocking
6769 // action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same
6870 // time so that we can register with all masters.
6971 private val registerMasterThreadPool = new ThreadPoolExecutor (
7072 0 ,
71- masterRpcAddresses.size , // Make sure we can register with all masters at the same time
73+ masterRpcAddresses.length , // Make sure we can register with all masters at the same time
7274 60L , TimeUnit .SECONDS ,
7375 new SynchronousQueue [Runnable ](),
7476 ThreadUtils .namedThreadFactory(" appclient-register-master-threadpool" ))
@@ -100,7 +102,7 @@ private[spark] class AppClient(
100102 for (masterAddress <- masterRpcAddresses) yield {
101103 registerMasterThreadPool.submit(new Runnable {
102104 override def run (): Unit = try {
103- if (registered) {
105+ if (registered.get ) {
104106 return
105107 }
106108 logInfo(" Connecting to master " + masterAddress.toSparkURL + " ..." )
@@ -123,22 +125,22 @@ private[spark] class AppClient(
123125 * nthRetry means this is the nth attempt to register with master.
124126 */
125127 private def registerWithMaster (nthRetry : Int ) {
126- registerMasterFutures = tryRegisterAllMasters()
127- registrationRetryTimer = registrationRetryThread.scheduleAtFixedRate(new Runnable {
128+ registerMasterFutures.set( tryRegisterAllMasters() )
129+ registrationRetryTimer.set( registrationRetryThread.scheduleAtFixedRate(new Runnable {
128130 override def run (): Unit = {
129131 Utils .tryOrExit {
130- if (registered) {
131- registerMasterFutures.foreach(_.cancel(true ))
132+ if (registered.get ) {
133+ registerMasterFutures.get. foreach(_.cancel(true ))
132134 registerMasterThreadPool.shutdownNow()
133135 } else if (nthRetry >= REGISTRATION_RETRIES ) {
134136 markDead(" All masters are unresponsive! Giving up." )
135137 } else {
136- registerMasterFutures.foreach(_.cancel(true ))
138+ registerMasterFutures.get. foreach(_.cancel(true ))
137139 registerWithMaster(nthRetry + 1 )
138140 }
139141 }
140142 }
141- }, REGISTRATION_TIMEOUT_SECONDS , REGISTRATION_TIMEOUT_SECONDS , TimeUnit .SECONDS )
143+ }, REGISTRATION_TIMEOUT_SECONDS , REGISTRATION_TIMEOUT_SECONDS , TimeUnit .SECONDS ))
142144 }
143145
144146 /**
@@ -163,10 +165,10 @@ private[spark] class AppClient(
163165 // RegisteredApplications due to an unstable network.
164166 // 2. Receive multiple RegisteredApplication from different masters because the master is
165167 // changing.
166- appId = appId_
167- registered = true
168+ appId.set( appId_)
169+ registered.set( true )
168170 master = Some (masterRef)
169- listener.connected(appId)
171+ listener.connected(appId.get )
170172
171173 case ApplicationRemoved (message) =>
172174 markDead(" Master removed our application: %s" .format(message))
@@ -178,7 +180,7 @@ private[spark] class AppClient(
178180 cores))
179181 // FIXME if changing master and `ExecutorAdded` happen at the same time (the order is not
180182 // guaranteed), `ExecutorStateChanged` may be sent to a dead master.
181- sendToMaster(ExecutorStateChanged (appId, id, ExecutorState .RUNNING , None , None ))
183+ sendToMaster(ExecutorStateChanged (appId.get , id, ExecutorState .RUNNING , None , None ))
182184 listener.executorAdded(fullId, workerId, hostPort, cores, memory)
183185
184186 case ExecutorUpdated (id, state, message, exitStatus) =>
@@ -193,13 +195,13 @@ private[spark] class AppClient(
193195 logInfo(" Master has changed, new master is at " + masterRef.address.toSparkURL)
194196 master = Some (masterRef)
195197 alreadyDisconnected = false
196- masterRef.send(MasterChangeAcknowledged (appId))
198+ masterRef.send(MasterChangeAcknowledged (appId.get ))
197199 }
198200
199201 override def receiveAndReply (context : RpcCallContext ): PartialFunction [Any , Unit ] = {
200202 case StopAppClient =>
201203 markDead(" Application has been stopped." )
202- sendToMaster(UnregisterApplication (appId))
204+ sendToMaster(UnregisterApplication (appId.get ))
203205 context.reply(true )
204206 stop()
205207
@@ -263,18 +265,18 @@ private[spark] class AppClient(
263265 }
264266
265267 def markDead (reason : String ) {
266- if (! alreadyDead) {
268+ if (! alreadyDead.get ) {
267269 listener.dead(reason)
268- alreadyDead = true
270+ alreadyDead.set( true )
269271 }
270272 }
271273
272274 override def onStop (): Unit = {
273- if (registrationRetryTimer != null ) {
274- registrationRetryTimer.cancel(true )
275+ if (registrationRetryTimer.get != null ) {
276+ registrationRetryTimer.get. cancel(true )
275277 }
276278 registrationRetryThread.shutdownNow()
277- registerMasterFutures.foreach(_.cancel(true ))
279+ registerMasterFutures.get. foreach(_.cancel(true ))
278280 registerMasterThreadPool.shutdownNow()
279281 askAndReplyThreadPool.shutdownNow()
280282 }
@@ -283,19 +285,19 @@ private[spark] class AppClient(
283285
284286 def start () {
285287 // Just launch an rpcEndpoint; it will call back into the listener.
286- endpoint = rpcEnv.setupEndpoint(" AppClient" , new ClientEndpoint (rpcEnv))
288+ endpoint.set( rpcEnv.setupEndpoint(" AppClient" , new ClientEndpoint (rpcEnv) ))
287289 }
288290
289291 def stop () {
290- if (endpoint != null ) {
292+ if (endpoint.get != null ) {
291293 try {
292294 val timeout = RpcUtils .askRpcTimeout(conf)
293- timeout.awaitResult(endpoint.ask[Boolean ](StopAppClient ))
295+ timeout.awaitResult(endpoint.get. ask[Boolean ](StopAppClient ))
294296 } catch {
295297 case e : TimeoutException =>
296298 logInfo(" Stop request to Master timed out; it may already be shut down." )
297299 }
298- endpoint = null
300+ endpoint.set( null )
299301 }
300302 }
301303
@@ -306,8 +308,8 @@ private[spark] class AppClient(
306308 * @return whether the request is acknowledged.
307309 */
308310 def requestTotalExecutors (requestedTotal : Int ): Boolean = {
309- if (endpoint != null && appId != null ) {
310- endpoint.askWithRetry[Boolean ](RequestExecutors (appId, requestedTotal))
311+ if (endpoint.get != null && appId.get != null ) {
312+ endpoint.get. askWithRetry[Boolean ](RequestExecutors (appId.get , requestedTotal))
311313 } else {
312314 logWarning(" Attempted to request executors before driver fully initialized." )
313315 false
@@ -319,8 +321,8 @@ private[spark] class AppClient(
319321 * @return whether the kill request is acknowledged.
320322 */
321323 def killExecutors (executorIds : Seq [String ]): Boolean = {
322- if (endpoint != null && appId != null ) {
323- endpoint.askWithRetry[Boolean ](KillExecutors (appId, executorIds))
324+ if (endpoint.get != null && appId.get != null ) {
325+ endpoint.get. askWithRetry[Boolean ](KillExecutors (appId.get , executorIds))
324326 } else {
325327 logWarning(" Attempted to kill executors before driver fully initialized." )
326328 false
0 commit comments