From ae3e5bd0e0ab139f9d871d667c7fad7f2682285d Mon Sep 17 00:00:00 2001 From: "Susan X. Huynh" Date: Tue, 18 Jul 2017 11:07:53 -0700 Subject: [PATCH 1/2] Made the driver failover_timeout configurable as spark.mesos.driver.failoverTimeout. Added a unit test. --- docs/running-on-mesos.md | 10 ++++++ .../apache/spark/deploy/mesos/config.scala | 9 ++++- .../MesosCoarseGrainedSchedulerBackend.scala | 3 +- ...osCoarseGrainedSchedulerBackendSuite.scala | 35 +++++++++++++++++++ 4 files changed, 55 insertions(+), 2 deletions(-) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 7401b63e022c..3514e0901971 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -545,6 +545,16 @@ See the [configuration page](configuration.html) for information on Spark config Fetcher Cache + + spark.mesos.driver.failoverTimeout + 0.0 + + The amount of time (in seconds) that the master will wait for the + driver to reconnect, after being temporarily disconnected, before + it tears down the driver framework by killing all its + executors. + + # Troubleshooting and Debugging diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala index 56d697f35961..6c8619e3c3c1 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -58,9 +58,16 @@ package object config { private [spark] val DRIVER_LABELS = ConfigBuilder("spark.mesos.driver.labels") - .doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value" + + .doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value " + "pairs should be separated by a colon, and commas used to list more than one." + "Ex. key:value,key2:value2") .stringConf .createOptional + + private [spark] val DRIVER_FAILOVER_TIMEOUT = + ConfigBuilder("spark.mesos.driver.failoverTimeout") + .doc("Amount of time in seconds that the master will wait to hear from the driver, " + + "during a temporary disconnection, before tearing down all the executors.") + .doubleConf + .createWithDefault(0.0) } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 7dd42c41aa7c..6e7f41dad34b 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -29,6 +29,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} import org.apache.mesos.SchedulerDriver import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState} +import org.apache.spark.deploy.mesos.config._ import org.apache.spark.internal.config import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient @@ -177,7 +178,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf, sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)), None, - None, + Some(sc.conf.get(DRIVER_FAILOVER_TIMEOUT)), sc.conf.getOption("spark.mesos.driver.frameworkId") ) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 7cca5fedb31e..0e8882282d37 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -369,6 +369,41 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite backend.start() } + test("failover timeout is set in created scheduler driver") { + val failoverTimeoutIn = 3600.0 + initializeSparkConf(Map("spark.mesos.driver.failoverTimeout" -> failoverTimeoutIn.toString)) + sc = new SparkContext(sparkConf) + + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.sc).thenReturn(sc) + + val driver = mock[SchedulerDriver] + when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) + + val securityManager = mock[SecurityManager] + + val backend = new MesosCoarseGrainedSchedulerBackend( + taskScheduler, sc, "master", securityManager) { + override protected def createSchedulerDriver( + masterUrl: String, + scheduler: Scheduler, + sparkUser: String, + appName: String, + conf: SparkConf, + webuiUrl: Option[String] = None, + checkpoint: Option[Boolean] = None, + failoverTimeout: Option[Double] = None, + frameworkId: Option[String] = None): SchedulerDriver = { + markRegistered() + assert(failoverTimeout.isDefined) + assert(failoverTimeout.get.equals(failoverTimeoutIn)) + driver + } + } + + backend.start() + } + test("honors unset spark.mesos.containerizer") { setBackend(Map("spark.mesos.executor.docker.image" -> "test")) From f4a001faa612655c6c2aa7a7da85248be862241a Mon Sep 17 00:00:00 2001 From: "Susan X. Huynh" Date: Wed, 19 Jul 2017 14:40:39 -0700 Subject: [PATCH 2/2] Addressed review comments: (1) explain default value of zero, (2) use the config key. --- docs/running-on-mesos.md | 5 +++-- .../mesos/MesosCoarseGrainedSchedulerBackendSuite.scala | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 3514e0901971..cf257c06c951 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -549,10 +549,11 @@ See the [configuration page](configuration.html) for information on Spark config spark.mesos.driver.failoverTimeout 0.0 - The amount of time (in seconds) that the master will wait for the + The amount of time (in seconds) that the master will wait for the driver to reconnect, after being temporarily disconnected, before it tears down the driver framework by killing all its - executors. + executors. The default value is zero, meaning no timeout: if the + driver disconnects, the master immediately tears down the framework. diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 0e8882282d37..d9ff4a403ea3 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -33,6 +33,7 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.BeforeAndAfter import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.deploy.mesos.config._ import org.apache.spark.internal.config._ import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} @@ -371,7 +372,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("failover timeout is set in created scheduler driver") { val failoverTimeoutIn = 3600.0 - initializeSparkConf(Map("spark.mesos.driver.failoverTimeout" -> failoverTimeoutIn.toString)) + initializeSparkConf(Map(DRIVER_FAILOVER_TIMEOUT.key -> failoverTimeoutIn.toString)) sc = new SparkContext(sparkConf) val taskScheduler = mock[TaskSchedulerImpl]