From e6d0841ba73d5f6da2d82dcf8eff8bc95e1bbb5e Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 11 Apr 2017 11:46:59 -0700 Subject: [PATCH 1/3] Use a separate lock for stop --- .../scheduler/cluster/StandaloneSchedulerBackend.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 7befdb0c1f64d..6055e219e5f31 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -55,6 +55,9 @@ private[spark] class StandaloneSchedulerBackend( private val maxCores = conf.getOption("spark.cores.max").map(_.toInt) private val totalExpectedCores = maxCores.getOrElse(0) + /** Lock for `stop(finalState: SparkAppHandle.State)` method. */ + private val stopLock = new Object + override def start() { super.start() launcherBackend.connect() @@ -112,7 +115,7 @@ private[spark] class StandaloneSchedulerBackend( launcherBackend.setState(SparkAppHandle.State.RUNNING) } - override def stop(): Unit = synchronized { + override def stop(): Unit = { stop(SparkAppHandle.State.FINISHED) } @@ -206,7 +209,7 @@ private[spark] class StandaloneSchedulerBackend( registrationBarrier.release() } - private def stop(finalState: SparkAppHandle.State): Unit = synchronized { + private def stop(finalState: SparkAppHandle.State): Unit = stopLock.synchronized { try { stopping = true From 0d79882206314c5de469cc9c75363ff02f9d6672 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 12 Apr 2017 17:15:58 -0700 Subject: [PATCH 2/3] Change stopping to AtomicBoolean --- .../cluster/StandaloneSchedulerBackend.scala | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 6055e219e5f31..f1dafb1b5bde6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster import java.util.concurrent.Semaphore +import java.util.concurrent.atomic.AtomicBoolean import scala.concurrent.Future @@ -42,7 +43,7 @@ private[spark] class StandaloneSchedulerBackend( with Logging { private var client: StandaloneAppClient = null - private var stopping = false + private val stopping = new AtomicBoolean(false) private val launcherBackend = new LauncherBackend() { override protected def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED) } @@ -128,14 +129,14 @@ private[spark] class StandaloneSchedulerBackend( override def disconnected() { notifyContext() - if (!stopping) { + if (!stopping.get) { logWarning("Disconnected from Spark cluster! Waiting for reconnection...") } } override def dead(reason: String) { notifyContext() - if (!stopping) { + if (!stopping.get) { launcherBackend.setState(SparkAppHandle.State.KILLED) logError("Application has been killed. Reason: " + reason) try { @@ -209,20 +210,20 @@ private[spark] class StandaloneSchedulerBackend( registrationBarrier.release() } - private def stop(finalState: SparkAppHandle.State): Unit = stopLock.synchronized { - try { - stopping = true - - super.stop() - client.stop() + private def stop(finalState: SparkAppHandle.State): Unit = { + if (stopping.compareAndSet(false, true)) { + try { + super.stop() + client.stop() - val callback = shutdownCallback - if (callback != null) { - callback(this) + val callback = shutdownCallback + if (callback != null) { + callback(this) + } + } finally { + launcherBackend.setState(finalState) + launcherBackend.close() } - } finally { - launcherBackend.setState(finalState) - launcherBackend.close() } } From 929061c19de66208d606ceb80015b95f0de93cc0 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 12 Apr 2017 17:19:27 -0700 Subject: [PATCH 3/3] Remove lock --- .../spark/scheduler/cluster/StandaloneSchedulerBackend.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index f1dafb1b5bde6..0529fe9eed4da 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -56,9 +56,6 @@ private[spark] class StandaloneSchedulerBackend( private val maxCores = conf.getOption("spark.cores.max").map(_.toInt) private val totalExpectedCores = maxCores.getOrElse(0) - /** Lock for `stop(finalState: SparkAppHandle.State)` method. */ - private val stopLock = new Object - override def start() { super.start() launcherBackend.connect()