diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 7401b63e022c..cf257c06c951 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -545,6 +545,17 @@ See the [configuration page](configuration.html) for information on Spark config
Fetcher Cache
+
+ spark.mesos.driver.failoverTimeout |
+ 0.0 |
+
+ The amount of time (in seconds) that the master will wait for the
+ driver to reconnect, after being temporarily disconnected, before
+ it tears down the driver framework by killing all its
+ executors. The default value is zero, meaning no timeout: if the
+ driver disconnects, the master immediately tears down the framework.
+ |
+
# Troubleshooting and Debugging
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
index 56d697f35961..6c8619e3c3c1 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
@@ -58,9 +58,16 @@ package object config {
private [spark] val DRIVER_LABELS =
ConfigBuilder("spark.mesos.driver.labels")
- .doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value" +
+ .doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value " +
"pairs should be separated by a colon, and commas used to list more than one." +
"Ex. key:value,key2:value2")
.stringConf
.createOptional
+
+ private [spark] val DRIVER_FAILOVER_TIMEOUT =
+ ConfigBuilder("spark.mesos.driver.failoverTimeout")
+ .doc("Amount of time in seconds that the master will wait to hear from the driver, " +
+ "during a temporary disconnection, before tearing down all the executors.")
+ .doubleConf
+ .createWithDefault(0.0)
}
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 7dd42c41aa7c..6e7f41dad34b 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -29,6 +29,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.SchedulerDriver
import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
+import org.apache.spark.deploy.mesos.config._
import org.apache.spark.internal.config
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
@@ -177,7 +178,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
None,
- None,
+ Some(sc.conf.get(DRIVER_FAILOVER_TIMEOUT)),
sc.conf.getOption("spark.mesos.driver.frameworkId")
)
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 7cca5fedb31e..d9ff4a403ea3 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -33,6 +33,7 @@ import org.scalatest.mock.MockitoSugar
import org.scalatest.BeforeAndAfter
import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.deploy.mesos.config._
import org.apache.spark.internal.config._
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
@@ -369,6 +370,41 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend.start()
}
+ test("failover timeout is set in created scheduler driver") {
+ val failoverTimeoutIn = 3600.0
+ initializeSparkConf(Map(DRIVER_FAILOVER_TIMEOUT.key -> failoverTimeoutIn.toString))
+ sc = new SparkContext(sparkConf)
+
+ val taskScheduler = mock[TaskSchedulerImpl]
+ when(taskScheduler.sc).thenReturn(sc)
+
+ val driver = mock[SchedulerDriver]
+ when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
+
+ val securityManager = mock[SecurityManager]
+
+ val backend = new MesosCoarseGrainedSchedulerBackend(
+ taskScheduler, sc, "master", securityManager) {
+ override protected def createSchedulerDriver(
+ masterUrl: String,
+ scheduler: Scheduler,
+ sparkUser: String,
+ appName: String,
+ conf: SparkConf,
+ webuiUrl: Option[String] = None,
+ checkpoint: Option[Boolean] = None,
+ failoverTimeout: Option[Double] = None,
+ frameworkId: Option[String] = None): SchedulerDriver = {
+ markRegistered()
+ assert(failoverTimeout.isDefined)
+ assert(failoverTimeout.get.equals(failoverTimeoutIn))
+ driver
+ }
+ }
+
+ backend.start()
+ }
+
test("honors unset spark.mesos.containerizer") {
setBackend(Map("spark.mesos.executor.docker.image" -> "test"))