diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 8d5ad12cb85be..ce109e18c15fe 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -516,6 +516,16 @@ See the [configuration page](configuration.html) for information on Spark config Fetcher Cache + + spark.mesos.checkpoint + false + + If set to true, the mesos agents that are running the Spark executors will write the framework pid, executor pids and status updates to disk. + If the agent exits (e.g., due to a crash or as part of upgrading Mesos), this checkpointed data allows the restarted agent to + reconnect to executors that were started by the old instance of the agent. Enabling checkpointing improves fault tolerance, + at the cost of a (usually small) increase in disk I/O. + + # Troubleshooting and Debugging diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala index 19e253394f1b2..6b88f49c820f2 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -56,4 +56,15 @@ package object config { .stringConf .createOptional + private[spark] val CHECKPOINT = + ConfigBuilder("spark.mesos.checkpoint") + .doc("If set to true, the agents that are running the Spark executors will write " + + "the framework pid, executor pids and status updates to disk. If the agent exits " + + "(e.g., due to a crash or as part of upgrading Mesos), this checkpointed data allows " + + "the restarted agent to reconnect to executors that were started by the old instance " + + "of the agent. Enabling checkpointing improves fault tolerance, at the cost of a " + + "(usually small) increase in disk I/O.") + .booleanConf + .createOptional + } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index f555072c3842a..0e1a4ee1a06b2 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -28,6 +28,7 @@ import scala.concurrent.Future import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState} +import org.apache.spark.deploy.mesos.config._ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.RpcEndpointAddress @@ -158,7 +159,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.appName, sc.conf, sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)), - None, + sc.conf.get(CHECKPOINT), None, sc.conf.getOption("spark.mesos.driver.frameworkId") ) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index cdb3b68489654..08195368498d1 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -139,6 +139,40 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(cpus == offerCores) } + test("mesos supports checkpointing") { + + val checkpoint = true + setBackend(Map("spark.mesos.checkpoint" -> checkpoint.toString, + "spark.mesos.driver.webui.url" -> "http://webui")) + + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.sc).thenReturn(sc) + val driver = mock[SchedulerDriver] + when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) + val securityManager = mock[SecurityManager] + + val backend = new MesosCoarseGrainedSchedulerBackend( + taskScheduler, sc, "master", securityManager) { + override protected def createSchedulerDriver( + masterUrl: String, + scheduler: Scheduler, + sparkUser: String, + appName: String, + conf: SparkConf, + webuiUrl: Option[String] = None, + checkpoint: Option[Boolean] = None, + failoverTimeout: Option[Double] = None, + frameworkId: Option[String] = None): SchedulerDriver = { + markRegistered() + assert(checkpoint.contains(true)) + driver + } + } + + backend.start() + + } + test("mesos does not acquire more than spark.cores.max") { val maxCores = 10 setBackend(Map("spark.cores.max" -> maxCores.toString))