From 41543e0f67ef7e8a6799857e5f94509f12626e16 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 15 Dec 2015 12:27:45 -0800 Subject: [PATCH 1/7] [SPARK-12330] Make CoarseMesosSchedulerBackend wait for executors to cleanup during shutdown * Adds `spark.mesos.coarse.shutdown.ms` to tune shutdown wait period --- .../CoarseGrainedExecutorBackend.scala | 8 ++++- .../mesos/CoarseMesosSchedulerBackend.scala | 33 ++++++++++++++++++- docs/running-on-mesos.md | 7 ++++ 3 files changed, 46 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 136cf4a84d38..3b5cb18da1b2 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -19,6 +19,7 @@ package org.apache.spark.executor import java.net.URL import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable import scala.util.{Failure, Success} @@ -42,6 +43,7 @@ private[spark] class CoarseGrainedExecutorBackend( env: SparkEnv) extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging { + private[this] val stopping = new AtomicBoolean(false) var executor: Executor = null @volatile var driver: Option[RpcEndpointRef] = None @@ -102,19 +104,23 @@ private[spark] class CoarseGrainedExecutorBackend( } case StopExecutor => + stopping.set(true) logInfo("Driver commanded a shutdown") // Cannot shutdown here because an ack may need to be sent back to the caller. So send // a message to self to actually do the shutdown. self.send(Shutdown) case Shutdown => + stopping.set(true) executor.stop() stop() rpcEnv.shutdown() } override def onDisconnected(remoteAddress: RpcAddress): Unit = { - if (driver.exists(_.address == remoteAddress)) { + if (stopping.get()) { + logInfo(s"Driver from $remoteAddress disconnected during shutdown") + } else if (driver.exists(_.address == remoteAddress)) { logError(s"Driver $remoteAddress disassociated! Shutting down.") System.exit(1) } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 58c30e7d9788..5a43e5b9fe7a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -18,12 +18,14 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File +import java.util.concurrent.TimeUnit import java.util.{Collections, List => JList} import java.util.concurrent.locks.ReentrantLock import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, HashSet} +import com.google.common.base.Stopwatch import com.google.common.collect.HashBiMap import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver} import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} @@ -60,6 +62,12 @@ private[spark] class CoarseMesosSchedulerBackend( // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt + private[this] val shutdownTimeoutMS = conf.getInt("spark.mesos.coarse.shutdown.ms", 10000) + .ensuring(_ >= 0, "spark.mesos.coarse.shutdown.ms must be >= 0") + + // Synchronization protected by stateLock + private[this] var stopCalled: Boolean = false + // If shuffle service is enabled, the Spark driver will register with the shuffle service. // This is for cleaning up shuffle files reliably. private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) @@ -245,6 +253,13 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { + if (stopCalled) { + logDebug("Ignoring offers during shutdown") + // Driver should simply return a stopped status on race + // condition between this.stop() and completing here + offers.asScala.map(_.getId).foreach(d.declineOffer) + return + } val filters = Filters.newBuilder().setRefuseSeconds(5).build() for (offer <- offers.asScala) { val offerAttributes = toAttributeMap(offer.getAttributesList) @@ -364,7 +379,23 @@ private[spark] class CoarseMesosSchedulerBackend( } override def stop() { - super.stop() + // Make sure we're not launching tasks during shutdown + stateLock.synchronized { + if (stopCalled) { + logWarning("Stop called multiple times, ignoring") + return + } + stopCalled = true + super.stop() + } + // Wait for finish + val stopwatch = new Stopwatch() + stopwatch.start() + // slaveIdsWithExecutors has no memory barrier, so this is eventually consistent + while (slaveIdsWithExecutors.nonEmpty && + stopwatch.elapsed(TimeUnit.MILLISECONDS) < shutdownTimeoutMS) { + Thread.sleep(100) + } if (mesosDriver != null) { mesosDriver.stop() } diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index ed720f1039f9..4eb821653a6b 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -387,6 +387,13 @@ See the [configuration page](configuration.html) for information on Spark config + + spark.mesos.coarse.shutdown.ms + 10000 (10 seconds) + + Time (in ms) to wait for executors to report that they have exited. Setting this too low has the risk of shutting down the Mesos driver (and thereby killing the spark executors) while the executor is still in the process of exiting cleanly. + + # Troubleshooting and Debugging From 0a2453085971584cc84e30760d935858c81d7607 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Fri, 29 Jan 2016 09:16:35 -0800 Subject: [PATCH 2/7] Add logging regarding leftover executors in CoarseMesosSchedulerBackend --- .../scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 5a43e5b9fe7a..1ad596c94018 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -396,6 +396,10 @@ private[spark] class CoarseMesosSchedulerBackend( stopwatch.elapsed(TimeUnit.MILLISECONDS) < shutdownTimeoutMS) { Thread.sleep(100) } + if(slaveIdsWithExecutors.nonEmpty) { + logWarning(s"${slaveIdsWithExecutors.size} executors still running. " + + "Proceeding with mesos driver stop.") + } if (mesosDriver != null) { mesosDriver.stop() } From b9e1c7753c180147bf8fa628eb63c8516f7ad180 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Fri, 29 Jan 2016 10:35:10 -0800 Subject: [PATCH 3/7] fix scala style --- .../scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 1ad596c94018..62620009c089 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -18,8 +18,8 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File -import java.util.concurrent.TimeUnit import java.util.{Collections, List => JList} +import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import scala.collection.JavaConverters._ From 9ae106c80a5793019d664d1d53ff3f7114c329c3 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 2 Feb 2016 13:23:18 -0800 Subject: [PATCH 4/7] Fix formatting problem in CoarseMesosSchedulerBackend --- .../scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 62620009c089..f32bce8b5411 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -396,7 +396,7 @@ private[spark] class CoarseMesosSchedulerBackend( stopwatch.elapsed(TimeUnit.MILLISECONDS) < shutdownTimeoutMS) { Thread.sleep(100) } - if(slaveIdsWithExecutors.nonEmpty) { + if (slaveIdsWithExecutors.nonEmpty) { logWarning(s"${slaveIdsWithExecutors.size} executors still running. " + "Proceeding with mesos driver stop.") } From f9c15c7ff0c2ee8313be5d3771260aeefa7abca6 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 2 Feb 2016 13:31:18 -0800 Subject: [PATCH 5/7] Address review comments on logging --- .../cluster/mesos/CoarseMesosSchedulerBackend.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index f32bce8b5411..b86b5eb85bf0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -62,7 +62,7 @@ private[spark] class CoarseMesosSchedulerBackend( // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt - private[this] val shutdownTimeoutMS = conf.getInt("spark.mesos.coarse.shutdown.ms", 10000) + private[this] val shutdownTimeoutMS = conf.getTimeAsMs("spark.mesos.coarse.shutdown.ms", "10s") .ensuring(_ >= 0, "spark.mesos.coarse.shutdown.ms must be >= 0") // Synchronization protected by stateLock @@ -388,7 +388,8 @@ private[spark] class CoarseMesosSchedulerBackend( stopCalled = true super.stop() } - // Wait for finish + // Wait for executors to report done, or else mesosDriver.stop() will forcefully kill them. + // See SPARK-12330 val stopwatch = new Stopwatch() stopwatch.start() // slaveIdsWithExecutors has no memory barrier, so this is eventually consistent @@ -397,8 +398,9 @@ private[spark] class CoarseMesosSchedulerBackend( Thread.sleep(100) } if (slaveIdsWithExecutors.nonEmpty) { - logWarning(s"${slaveIdsWithExecutors.size} executors still running. " - + "Proceeding with mesos driver stop.") + logWarning(s"Timed out waiting for ${slaveIdsWithExecutors.size} remaining executors " + + s"to terminate within $shutdownTimeoutMS ms. This may leave temporary files " + + "on the mesos nodes.") } if (mesosDriver != null) { mesosDriver.stop() From 75887c4879bbfc7ec251a22f3bccc934fce63a4c Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 2 Feb 2016 13:34:10 -0800 Subject: [PATCH 6/7] Update mesos docs for shutdown timeout --- docs/running-on-mesos.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 4eb821653a6b..7667c18ca4b3 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -389,9 +389,9 @@ See the [configuration page](configuration.html) for information on Spark config spark.mesos.coarse.shutdown.ms - 10000 (10 seconds) + 10s - Time (in ms) to wait for executors to report that they have exited. Setting this too low has the risk of shutting down the Mesos driver (and thereby killing the spark executors) while the executor is still in the process of exiting cleanly. + Time to wait for executors to report that they have exited. Setting this too low has the risk of shutting down the Mesos driver (and thereby killing the spark executors) while the executor is still in the process of exiting cleanly. From 1181a05a8597f3f8fae7a431b1d661b141d9474c Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Thu, 4 Feb 2016 09:59:25 -0800 Subject: [PATCH 7/7] Remove docs --- docs/running-on-mesos.md | 7 ------- 1 file changed, 7 deletions(-) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 7667c18ca4b3..ed720f1039f9 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -387,13 +387,6 @@ See the [configuration page](configuration.html) for information on Spark config - - spark.mesos.coarse.shutdown.ms - 10s - - Time to wait for executors to report that they have exited. Setting this too low has the risk of shutting down the Mesos driver (and thereby killing the spark executors) while the executor is still in the process of exiting cleanly. - - # Troubleshooting and Debugging