Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 35 additions & 33 deletions core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.deploy.client

import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture}

import scala.util.control.NonFatal
Expand Down Expand Up @@ -49,26 +50,27 @@ private[spark] class AppClient(
private val REGISTRATION_TIMEOUT_SECONDS = 20
private val REGISTRATION_RETRIES = 3

@volatile private var endpoint: RpcEndpointRef = null
@volatile private var appId: String = null
@volatile private var registered = false
private val endpoint = new AtomicReference[RpcEndpointRef]
private val appId = new AtomicReference[String]
private val registered = new AtomicBoolean(false)

private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint
with Logging {

private var master: Option[RpcEndpointRef] = None
// To avoid calling listener.disconnected() multiple times
private var alreadyDisconnected = false
@volatile private var alreadyDead = false // To avoid calling listener.dead() multiple times
@volatile private var registerMasterFutures: Array[JFuture[_]] = null
@volatile private var registrationRetryTimer: JScheduledFuture[_] = null
// To avoid calling listener.dead() multiple times
private val alreadyDead = new AtomicBoolean(false)
private val registerMasterFutures = new AtomicReference[Array[JFuture[_]]]
private val registrationRetryTimer = new AtomicReference[JScheduledFuture[_]]

// A thread pool for registering with masters. Because registering with a master is a blocking
// action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same
// time so that we can register with all masters.
private val registerMasterThreadPool = new ThreadPoolExecutor(
0,
masterRpcAddresses.size, // Make sure we can register with all masters at the same time
masterRpcAddresses.length, // Make sure we can register with all masters at the same time
60L, TimeUnit.SECONDS,
new SynchronousQueue[Runnable](),
ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))
Expand Down Expand Up @@ -100,7 +102,7 @@ private[spark] class AppClient(
for (masterAddress <- masterRpcAddresses) yield {
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = try {
if (registered) {
if (registered.get) {
return
}
logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
Expand All @@ -123,22 +125,22 @@ private[spark] class AppClient(
* nthRetry means this is the nth attempt to register with master.
*/
private def registerWithMaster(nthRetry: Int) {
registerMasterFutures = tryRegisterAllMasters()
registrationRetryTimer = registrationRetryThread.scheduleAtFixedRate(new Runnable {
registerMasterFutures.set(tryRegisterAllMasters())
registrationRetryTimer.set(registrationRetryThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
Utils.tryOrExit {
if (registered) {
registerMasterFutures.foreach(_.cancel(true))
if (registered.get) {
registerMasterFutures.get.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
} else if (nthRetry >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
registerMasterFutures.foreach(_.cancel(true))
registerMasterFutures.get.foreach(_.cancel(true))
registerWithMaster(nthRetry + 1)
}
}
}
}, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)
}, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}

/**
Expand All @@ -163,10 +165,10 @@ private[spark] class AppClient(
// RegisteredApplications due to an unstable network.
// 2. Receive multiple RegisteredApplication from different masters because the master is
// changing.
appId = appId_
registered = true
appId.set(appId_)
registered.set(true)
master = Some(masterRef)
listener.connected(appId)
listener.connected(appId.get)

case ApplicationRemoved(message) =>
markDead("Master removed our application: %s".format(message))
Expand All @@ -178,7 +180,7 @@ private[spark] class AppClient(
cores))
// FIXME if changing master and `ExecutorAdded` happen at the same time (the order is not
// guaranteed), `ExecutorStateChanged` may be sent to a dead master.
sendToMaster(ExecutorStateChanged(appId, id, ExecutorState.RUNNING, None, None))
sendToMaster(ExecutorStateChanged(appId.get, id, ExecutorState.RUNNING, None, None))
listener.executorAdded(fullId, workerId, hostPort, cores, memory)

case ExecutorUpdated(id, state, message, exitStatus) =>
Expand All @@ -193,13 +195,13 @@ private[spark] class AppClient(
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
master = Some(masterRef)
alreadyDisconnected = false
masterRef.send(MasterChangeAcknowledged(appId))
masterRef.send(MasterChangeAcknowledged(appId.get))
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case StopAppClient =>
markDead("Application has been stopped.")
sendToMaster(UnregisterApplication(appId))
sendToMaster(UnregisterApplication(appId.get))
context.reply(true)
stop()

Expand Down Expand Up @@ -263,18 +265,18 @@ private[spark] class AppClient(
}

def markDead(reason: String) {
if (!alreadyDead) {
if (!alreadyDead.get) {
listener.dead(reason)
alreadyDead = true
alreadyDead.set(true)
}
}

override def onStop(): Unit = {
if (registrationRetryTimer != null) {
registrationRetryTimer.cancel(true)
if (registrationRetryTimer.get != null) {
registrationRetryTimer.get.cancel(true)
}
registrationRetryThread.shutdownNow()
registerMasterFutures.foreach(_.cancel(true))
registerMasterFutures.get.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
askAndReplyThreadPool.shutdownNow()
}
Expand All @@ -283,19 +285,19 @@ private[spark] class AppClient(

def start() {
// Just launch an rpcEndpoint; it will call back into the listener.
endpoint = rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv))
endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
}

def stop() {
if (endpoint != null) {
if (endpoint.get != null) {
try {
val timeout = RpcUtils.askRpcTimeout(conf)
timeout.awaitResult(endpoint.ask[Boolean](StopAppClient))
timeout.awaitResult(endpoint.get.ask[Boolean](StopAppClient))
} catch {
case e: TimeoutException =>
logInfo("Stop request to Master timed out; it may already be shut down.")
}
endpoint = null
endpoint.set(null)
}
}

Expand All @@ -306,8 +308,8 @@ private[spark] class AppClient(
* @return whether the request is acknowledged.
*/
def requestTotalExecutors(requestedTotal: Int): Boolean = {
if (endpoint != null && appId != null) {
endpoint.askWithRetry[Boolean](RequestExecutors(appId, requestedTotal))
if (endpoint.get != null && appId.get != null) {
endpoint.get.askWithRetry[Boolean](RequestExecutors(appId.get, requestedTotal))
} else {
logWarning("Attempted to request executors before driver fully initialized.")
false
Expand All @@ -319,8 +321,8 @@ private[spark] class AppClient(
* @return whether the kill request is acknowledged.
*/
def killExecutors(executorIds: Seq[String]): Boolean = {
if (endpoint != null && appId != null) {
endpoint.askWithRetry[Boolean](KillExecutors(appId, executorIds))
if (endpoint.get != null && appId.get != null) {
endpoint.get.askWithRetry[Boolean](KillExecutors(appId.get, executorIds))
} else {
logWarning("Attempted to kill executors before driver fully initialized.")
false
Expand Down