diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 8d5ad12cb85be..3f5439a287b57 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -516,6 +516,26 @@ See the [configuration page](configuration.html) for information on Spark config
Fetcher Cache
+
+ spark.mesos.checkpoint |
+ false |
+
+ If set, agents running tasks started by this framework will write the framework pid, executor pids and status updates to disk.
+ If the agent exits (e.g., due to a crash or as part of upgrading Mesos), this checkpointed data allows the restarted agent to
+ reconnect to executors that were started by the old instance of the agent. Enabling checkpointing improves fault tolerance,
+ at the cost of a (usually small) increase in disk I/O.
+ |
+
+
+ spark.mesos.failoverTimeout |
+ 0.0 |
+
+ The amount of time (in seconds) that the master will wait for the scheduler to failover before it tears down the framework
+ by killing all its tasks/executors. This should be non-zero if a framework expects to reconnect after a failure and not lose
+ its tasks/executors.
+ NOTE: To avoid accidental destruction of tasks, productionframeworks typically set this to a large value (e.g., 1 week).
+ |
+
# Troubleshooting and Debugging
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index f555072c3842a..89b0905300a42 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -158,8 +158,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.appName,
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
- None,
- None,
+ sc.conf.getOption("spark.mesos.checkpoint").map(_.toBoolean),
+ sc.conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble),
sc.conf.getOption("spark.mesos.driver.frameworkId")
)
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index 7e561916a71e2..71869664fe01e 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -78,8 +78,8 @@ private[spark] class MesosFineGrainedSchedulerBackend(
sc.appName,
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
- Option.empty,
- Option.empty,
+ sc.conf.getOption("spark.mesos.checkpoint").map(_.toBoolean),
+ sc.conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble),
sc.conf.getOption("spark.mesos.driver.frameworkId")
)
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index cdb3b68489654..308951769288a 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -139,6 +139,43 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(cpus == offerCores)
}
+ test("mesos supports checkpointing") {
+
+ val checkpoint = true
+ val failoverTimeout = 10
+ setBackend(Map("spark.mesos.checkpoint" -> checkpoint.toString,
+ "spark.mesos.failoverTimeout" -> failoverTimeout.toString,
+ "spark.mesos.driver.webui.url" -> "http://webui"))
+
+ val taskScheduler = mock[TaskSchedulerImpl]
+ when(taskScheduler.sc).thenReturn(sc)
+ val driver = mock[SchedulerDriver]
+ when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
+ val securityManager = mock[SecurityManager]
+
+ val backend = new MesosCoarseGrainedSchedulerBackend(
+ taskScheduler, sc, "master", securityManager) {
+ override protected def createSchedulerDriver(
+ masterUrl: String,
+ scheduler: Scheduler,
+ sparkUser: String,
+ appName: String,
+ conf: SparkConf,
+ webuiUrl: Option[String] = None,
+ checkpoint: Option[Boolean] = None,
+ failoverTimeout: Option[Double] = None,
+ frameworkId: Option[String] = None): SchedulerDriver = {
+ markRegistered()
+ assert(checkpoint.contains(true))
+ assert(failoverTimeout.contains(10.0))
+ driver
+ }
+ }
+
+ backend.start()
+
+ }
+
test("mesos does not acquire more than spark.cores.max") {
val maxCores = 10
setBackend(Map("spark.cores.max" -> maxCores.toString))
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
index 4ee85b91830a9..21343b063b00f 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
@@ -79,6 +79,43 @@ class MesosFineGrainedSchedulerBackendSuite
backend.start()
}
+ test("mesos supports checkpointing") {
+ val conf = new SparkConf
+ conf.set("spark.mesos.checkpoint", "true")
+ conf.set("spark.mesos.failoverTimeout", "10")
+ conf.set("spark.mesos.driver.webui.url", "http://webui")
+
+ val sc = mock[SparkContext]
+ when(sc.conf).thenReturn(conf)
+ when(sc.sparkUser).thenReturn("sparkUser1")
+ when(sc.appName).thenReturn("appName1")
+
+ val taskScheduler = mock[TaskSchedulerImpl]
+ val driver = mock[SchedulerDriver]
+ when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
+
+ val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") {
+ override protected def createSchedulerDriver(
+ masterUrl: String,
+ scheduler: Scheduler,
+ sparkUser: String,
+ appName: String,
+ conf: SparkConf,
+ webuiUrl: Option[String] = None,
+ checkpoint: Option[Boolean] = None,
+ failoverTimeout: Option[Double] = None,
+ frameworkId: Option[String] = None): SchedulerDriver = {
+ markRegistered()
+ assert(checkpoint.contains(true))
+ assert(failoverTimeout.contains(10.0))
+ driver
+ }
+ }
+
+ backend.start()
+
+ }
+
test("Use configured mesosExecutor.cores for ExecutorInfo") {
val mesosExecutorCores = 3
val conf = new SparkConf