Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler.cluster

import java.util.concurrent.Semaphore

import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
Expand All @@ -31,16 +33,16 @@ private[spark] class SparkDeploySchedulerBackend(
with AppClientListener
with Logging {

var client: AppClient = null
var stopping = false
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
@volatile var appId: String = _
private var client: AppClient = null
private var stopping = false

@volatile var shutdownCallback: SparkDeploySchedulerBackend => Unit = _
@volatile private var appId: String = _

val registrationLock = new Object()
var registrationDone = false
private val registrationBarrier = new Semaphore(0)

val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
val totalExpectedCores = maxCores.getOrElse(0)
private val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
private val totalExpectedCores = maxCores.getOrElse(0)

override def start() {
super.start()
Expand Down Expand Up @@ -95,8 +97,10 @@ private[spark] class SparkDeploySchedulerBackend(
stopping = true
super.stop()
client.stop()
if (shutdownCallback != null) {
shutdownCallback(this)

val callback = shutdownCallback
if (callback != null) {
callback(this)
}
}

Expand Down Expand Up @@ -149,18 +153,11 @@ private[spark] class SparkDeploySchedulerBackend(
}

private def waitForRegistration() = {
registrationLock.synchronized {
while (!registrationDone) {
registrationLock.wait()
}
}
registrationBarrier.acquire()
}

private def notifyContext() = {
registrationLock.synchronized {
registrationDone = true
registrationLock.notifyAll()
}
registrationBarrier.release()
}

}