diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 923d8dbebf3d2..0c6db1fe02970 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -513,6 +513,26 @@ See the [configuration page](configuration.html) for information on Spark config If set to `true`, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the [Mesos fetcher cache](http://mesos.apache.org/documentation/latest/fetcher/) + + spark.mesos.checkpoint + false + + If set, agents running tasks started by this framework will write the framework pid, executor pids and status updates to disk. + If the agent exits (e.g., due to a crash or as part of upgrading Mesos), this checkpointed data allows the restarted agent to + reconnect to executors that were started by the old instance of the agent. Enabling checkpointing improves fault tolerance, + at the cost of a (usually small) increase in disk I/O. + + + + spark.mesos.failoverTimeout + 0.0 + + The amount of time (in seconds) that the master will wait for the scheduler to failover before it tears down the framework + by killing all its tasks/executors. This should be non-zero if a framework expects to reconnect after a failure and not lose + its tasks/executors. + NOTE: To avoid accidental destruction of tasks, productionframeworks typically set this to a large value (e.g., 1 week). + + # Troubleshooting and Debugging diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 5063c1fe988bc..e50cd18dbf64c 100644 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -159,8 +159,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.appName, sc.conf, sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)), - None, - None, + sc.conf.getOption("spark.mesos.checkpoint").map(_.toBoolean), + sc.conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble), sc.conf.getOption("spark.mesos.driver.frameworkId") ) diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index 09a252f3c74ac..8b78ce0f475e0 100644 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -78,8 +78,8 @@ private[spark] class MesosFineGrainedSchedulerBackend( sc.appName, sc.conf, sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)), - Option.empty, - Option.empty, + sc.conf.getOption("spark.mesos.checkpoint").map(_.toBoolean), + sc.conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble), sc.conf.getOption("spark.mesos.driver.frameworkId") ) diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index f73638fda6232..c29f2afd66c17 100644 --- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -139,6 +139,43 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(cpus == offerCores) } + test("mesos supports checkpointing") { + + val checkpoint = true + val failoverTimeout = 10 + setBackend(Map("spark.mesos.checkpoint" -> checkpoint.toString, + "spark.mesos.failoverTimeout" -> failoverTimeout.toString, + "spark.mesos.driver.webui.url" -> "http://webui")) + + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.sc).thenReturn(sc) + val driver = mock[SchedulerDriver] + when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) + val securityManager = mock[SecurityManager] + + val backend = new MesosCoarseGrainedSchedulerBackend( + taskScheduler, sc, "master", securityManager) { + override protected def createSchedulerDriver( + masterUrl: String, + scheduler: Scheduler, + sparkUser: String, + appName: String, + conf: SparkConf, + webuiUrl: Option[String] = None, + checkpoint: Option[Boolean] = None, + failoverTimeout: Option[Double] = None, + frameworkId: Option[String] = None): SchedulerDriver = { + markRegistered() + assert(checkpoint.contains(true)) + assert(failoverTimeout.contains(10.0)) + driver + } + } + + backend.start() + + } + test("mesos does not acquire more than spark.cores.max") { val maxCores = 10 setBackend(Map("spark.cores.max" -> maxCores.toString)) diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index 1d7a86f4b0904..aec4dc6dd4399 100644 --- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -78,6 +78,43 @@ class MesosFineGrainedSchedulerBackendSuite backend.start() } + test("mesos supports checkpointing") { + val conf = new SparkConf + conf.set("spark.mesos.checkpoint", "true") + conf.set("spark.mesos.failoverTimeout", "10") + conf.set("spark.mesos.driver.webui.url", "http://webui") + + val sc = mock[SparkContext] + when(sc.conf).thenReturn(conf) + when(sc.sparkUser).thenReturn("sparkUser1") + when(sc.appName).thenReturn("appName1") + + val taskScheduler = mock[TaskSchedulerImpl] + val driver = mock[SchedulerDriver] + when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) + + val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") { + override protected def createSchedulerDriver( + masterUrl: String, + scheduler: Scheduler, + sparkUser: String, + appName: String, + conf: SparkConf, + webuiUrl: Option[String] = None, + checkpoint: Option[Boolean] = None, + failoverTimeout: Option[Double] = None, + frameworkId: Option[String] = None): SchedulerDriver = { + markRegistered() + assert(checkpoint.contains(true)) + assert(failoverTimeout.contains(10.0)) + driver + } + } + + backend.start() + + } + test("Use configured mesosExecutor.cores for ExecutorInfo") { val mesosExecutorCores = 3 val conf = new SparkConf