From 5942765c27f908767035d2b15dcf8f7482e4a9db Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 24 Dec 2014 13:35:00 +0900 Subject: [PATCH 1/5] Enclosed shutdownCallback in SparkDeploySchedulerBackend by synchronized block --- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../cluster/SparkDeploySchedulerBackend.scala | 30 ++++++++++++------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 57bc3d4e4ae36..90944c413221b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1902,7 +1902,7 @@ object SparkContext extends Logging { val masterUrls = localCluster.start() val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) - backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => { + backend.setShutdownCallback { (backend: SparkDeploySchedulerBackend) => localCluster.stop() } (backend, scheduler) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 8c7de75600b5f..c2e82c516f86b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -31,16 +31,17 @@ private[spark] class SparkDeploySchedulerBackend( with AppClientListener with Logging { - var client: AppClient = null - var stopping = false - var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ - @volatile var appId: String = _ + private var client: AppClient = null + private var stopping = false + private val shutdownCallbackLock = new Object() + private var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ + @volatile private var appId: String = _ - val registrationLock = new Object() - var registrationDone = false + private val registrationLock = new Object() + private var registrationDone = false - val maxCores = conf.getOption("spark.cores.max").map(_.toInt) - val totalExpectedCores = maxCores.getOrElse(0) + private val maxCores = conf.getOption("spark.cores.max").map(_.toInt) + private val totalExpectedCores = maxCores.getOrElse(0) override def start() { super.start() @@ -82,8 +83,11 @@ private[spark] class SparkDeploySchedulerBackend( stopping = true super.stop() client.stop() - if (shutdownCallback != null) { - shutdownCallback(this) + + shutdownCallbackLock.synchronized { + if (shutdownCallback != null) { + shutdownCallback(this) + } } } @@ -135,6 +139,12 @@ private[spark] class SparkDeploySchedulerBackend( super.applicationId } + def setShutdownCallback(f: SparkDeploySchedulerBackend => Unit) { + shutdownCallbackLock.synchronized { + shutdownCallback = f + } + } + private def waitForRegistration() = { registrationLock.synchronized { while (!registrationDone) { From 1b60fd19bd0ef79e72fe568cf03d6976c7c32f97 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 24 Dec 2014 18:39:06 +0900 Subject: [PATCH 2/5] Improved the locking logics --- .../cluster/SparkDeploySchedulerBackend.scala | 32 +++++++------------ 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index c2e82c516f86b..c6407c3025f71 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -17,6 +17,9 @@ package org.apache.spark.scheduler.cluster +import java.util.concurrent.Semaphore +import java.util.concurrent.atomic.AtomicReference + import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv} import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.client.{AppClient, AppClientListener} @@ -33,12 +36,11 @@ private[spark] class SparkDeploySchedulerBackend( private var client: AppClient = null private var stopping = false - private val shutdownCallbackLock = new Object() - private var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ + private val shutdownCallback: AtomicReference[(SparkDeploySchedulerBackend) => Unit] = + new AtomicReference @volatile private var appId: String = _ - private val registrationLock = new Object() - private var registrationDone = false + private val registrationBarrier = new Semaphore(0) private val maxCores = conf.getOption("spark.cores.max").map(_.toInt) private val totalExpectedCores = maxCores.getOrElse(0) @@ -84,10 +86,9 @@ private[spark] class SparkDeploySchedulerBackend( super.stop() client.stop() - shutdownCallbackLock.synchronized { - if (shutdownCallback != null) { - shutdownCallback(this) - } + val callback = shutdownCallback.get + if (callback != null) { + callback(this) } } @@ -140,24 +141,15 @@ private[spark] class SparkDeploySchedulerBackend( } def setShutdownCallback(f: SparkDeploySchedulerBackend => Unit) { - shutdownCallbackLock.synchronized { - shutdownCallback = f - } + shutdownCallback.set(f) } private def waitForRegistration() = { - registrationLock.synchronized { - while (!registrationDone) { - registrationLock.wait() - } - } + registrationBarrier.acquire() } private def notifyContext() = { - registrationLock.synchronized { - registrationDone = true - registrationLock.notifyAll() - } + registrationBarrier.release() } } From 552df7c8b97d9de097763476b0b8026a7a416e4c Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 18 Feb 2015 10:39:39 +0900 Subject: [PATCH 3/5] Changed the declaration of the variable "shutdownCallback" as a volatile reference instead of AtomicReference --- .../scheduler/cluster/SparkDeploySchedulerBackend.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 0d7841c72a621..3f7951fd318ec 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -36,7 +36,7 @@ private[spark] class SparkDeploySchedulerBackend( private var client: AppClient = null private var stopping = false - private val shutdownCallback: AtomicReference[(SparkDeploySchedulerBackend) => Unit] = + private var shutdownCallback: SparkDeploySchedulerBackend => Unit = _ new AtomicReference @volatile private var appId: String = _ @@ -99,7 +99,7 @@ private[spark] class SparkDeploySchedulerBackend( super.stop() client.stop() - val callback = shutdownCallback.get + val callback = shutdownCallback if (callback != null) { callback(this) } @@ -154,7 +154,7 @@ private[spark] class SparkDeploySchedulerBackend( } def setShutdownCallback(f: SparkDeploySchedulerBackend => Unit) { - shutdownCallback.set(f) + shutdownCallback = f } private def waitForRegistration() = { From 42ca528d5684ff5b55b14fd3266ac09dceeb3d6b Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 18 Feb 2015 10:43:01 +0900 Subject: [PATCH 4/5] Changed the declaration of the variable "shutdownCallback" as a volatile reference instead of AtomicReference --- .../scheduler/cluster/SparkDeploySchedulerBackend.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 3f7951fd318ec..f161711433ed2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -18,7 +18,6 @@ package org.apache.spark.scheduler.cluster import java.util.concurrent.Semaphore -import java.util.concurrent.atomic.AtomicReference import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv} import org.apache.spark.deploy.{ApplicationDescription, Command} @@ -36,8 +35,8 @@ private[spark] class SparkDeploySchedulerBackend( private var client: AppClient = null private var stopping = false - private var shutdownCallback: SparkDeploySchedulerBackend => Unit = _ - new AtomicReference + + @volatile private var shutdownCallback: SparkDeploySchedulerBackend => Unit = _ @volatile private var appId: String = _ private val registrationBarrier = new Semaphore(0) From c146c93b3df500881f716b5007304315a70fb641 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 18 Feb 2015 19:58:59 +0900 Subject: [PATCH 5/5] Removed "setShutdownCallback" method --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- .../scheduler/cluster/SparkDeploySchedulerBackend.scala | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index eff274f9be0b8..d59b466830fdc 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2145,7 +2145,7 @@ object SparkContext extends Logging { val masterUrls = localCluster.start() val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) - backend.setShutdownCallback { (backend: SparkDeploySchedulerBackend) => + backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => { localCluster.stop() } (backend, scheduler) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index f161711433ed2..a0aa555f6244f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -36,7 +36,7 @@ private[spark] class SparkDeploySchedulerBackend( private var client: AppClient = null private var stopping = false - @volatile private var shutdownCallback: SparkDeploySchedulerBackend => Unit = _ + @volatile var shutdownCallback: SparkDeploySchedulerBackend => Unit = _ @volatile private var appId: String = _ private val registrationBarrier = new Semaphore(0) @@ -152,10 +152,6 @@ private[spark] class SparkDeploySchedulerBackend( super.applicationId } - def setShutdownCallback(f: SparkDeploySchedulerBackend => Unit) { - shutdownCallback = f - } - private def waitForRegistration() = { registrationBarrier.acquire() }