From c741fb995c79fd6c740c439f0cf3cfd504a0358e Mon Sep 17 00:00:00 2001 From: gkc2104 Date: Tue, 4 Apr 2017 17:04:25 -0700 Subject: [PATCH 1/2] [SPARK-4899][MESOS] Support for checkpointing on Coarse and Fine grained schedulers --- docs/running-on-mesos.md | 20 ++++++++++ .../MesosCoarseGrainedSchedulerBackend.scala | 6 +-- .../MesosFineGrainedSchedulerBackend.scala | 6 +-- ...osCoarseGrainedSchedulerBackendSuite.scala | 37 +++++++++++++++++++ ...esosFineGrainedSchedulerBackendSuite.scala | 37 +++++++++++++++++++ 5 files changed, 100 insertions(+), 6 deletions(-) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 923d8dbebf3d2..0c6db1fe02970 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -513,6 +513,26 @@ See the [configuration page](configuration.html) for information on Spark config If set to `true`, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the [Mesos fetcher cache](http://mesos.apache.org/documentation/latest/fetcher/) + + spark.mesos.checkpoint + false + + If set, agents running tasks started by this framework will write the framework pid, executor pids and status updates to disk. + If the agent exits (e.g., due to a crash or as part of upgrading Mesos), this checkpointed data allows the restarted agent to + reconnect to executors that were started by the old instance of the agent. Enabling checkpointing improves fault tolerance, + at the cost of a (usually small) increase in disk I/O. + + + + spark.mesos.failoverTimeout + 0.0 + + The amount of time (in seconds) that the master will wait for the scheduler to failover before it tears down the framework + by killing all its tasks/executors. This should be non-zero if a framework expects to reconnect after a failure and not lose + its tasks/executors. + NOTE: To avoid accidental destruction of tasks, productionframeworks typically set this to a large value (e.g., 1 week). + + # Troubleshooting and Debugging diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 5063c1fe988bc..d5b5c028133af 100644 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -158,9 +158,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.sparkUser, sc.appName, sc.conf, - sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)), - None, - None, + sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)), + sc.conf.getOption("spark.mesos.checkpoint").map(_.toBoolean), + sc.conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble), sc.conf.getOption("spark.mesos.driver.frameworkId") ) diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index 09a252f3c74ac..196f99529efd2 100644 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -77,9 +77,9 @@ private[spark] class MesosFineGrainedSchedulerBackend( sc.sparkUser, sc.appName, sc.conf, - sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)), - Option.empty, - Option.empty, + sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)), + sc.conf.getOption("spark.mesos.checkpoint").map(_.toBoolean), + sc.conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble), sc.conf.getOption("spark.mesos.driver.frameworkId") ) diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index f73638fda6232..c29f2afd66c17 100644 --- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -139,6 +139,43 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(cpus == offerCores) } + test("mesos supports checkpointing") { + + val checkpoint = true + val failoverTimeout = 10 + setBackend(Map("spark.mesos.checkpoint" -> checkpoint.toString, + "spark.mesos.failoverTimeout" -> failoverTimeout.toString, + "spark.mesos.driver.webui.url" -> "http://webui")) + + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.sc).thenReturn(sc) + val driver = mock[SchedulerDriver] + when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) + val securityManager = mock[SecurityManager] + + val backend = new MesosCoarseGrainedSchedulerBackend( + taskScheduler, sc, "master", securityManager) { + override protected def createSchedulerDriver( + masterUrl: String, + scheduler: Scheduler, + sparkUser: String, + appName: String, + conf: SparkConf, + webuiUrl: Option[String] = None, + checkpoint: Option[Boolean] = None, + failoverTimeout: Option[Double] = None, + frameworkId: Option[String] = None): SchedulerDriver = { + markRegistered() + assert(checkpoint.contains(true)) + assert(failoverTimeout.contains(10.0)) + driver + } + } + + backend.start() + + } + test("mesos does not acquire more than spark.cores.max") { val maxCores = 10 setBackend(Map("spark.cores.max" -> maxCores.toString)) diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index 1d7a86f4b0904..aec4dc6dd4399 100644 --- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -78,6 +78,43 @@ class MesosFineGrainedSchedulerBackendSuite backend.start() } + test("mesos supports checkpointing") { + val conf = new SparkConf + conf.set("spark.mesos.checkpoint", "true") + conf.set("spark.mesos.failoverTimeout", "10") + conf.set("spark.mesos.driver.webui.url", "http://webui") + + val sc = mock[SparkContext] + when(sc.conf).thenReturn(conf) + when(sc.sparkUser).thenReturn("sparkUser1") + when(sc.appName).thenReturn("appName1") + + val taskScheduler = mock[TaskSchedulerImpl] + val driver = mock[SchedulerDriver] + when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) + + val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") { + override protected def createSchedulerDriver( + masterUrl: String, + scheduler: Scheduler, + sparkUser: String, + appName: String, + conf: SparkConf, + webuiUrl: Option[String] = None, + checkpoint: Option[Boolean] = None, + failoverTimeout: Option[Double] = None, + frameworkId: Option[String] = None): SchedulerDriver = { + markRegistered() + assert(checkpoint.contains(true)) + assert(failoverTimeout.contains(10.0)) + driver + } + } + + backend.start() + + } + test("Use configured mesosExecutor.cores for ExecutorInfo") { val mesosExecutorCores = 3 val conf = new SparkConf From b1710c1bd856e6e9175f323623eea2b91d06654c Mon Sep 17 00:00:00 2001 From: Kamal Gurala Date: Mon, 24 Apr 2017 13:19:14 -0700 Subject: [PATCH 2/2] Fixed bug --- .../cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala | 2 +- .../cluster/mesos/MesosFineGrainedSchedulerBackend.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index d5b5c028133af..e50cd18dbf64c 100644 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -158,7 +158,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.sparkUser, sc.appName, sc.conf, - sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)), + sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)), sc.conf.getOption("spark.mesos.checkpoint").map(_.toBoolean), sc.conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble), sc.conf.getOption("spark.mesos.driver.frameworkId") diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index 196f99529efd2..8b78ce0f475e0 100644 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -77,7 +77,7 @@ private[spark] class MesosFineGrainedSchedulerBackend( sc.sparkUser, sc.appName, sc.conf, - sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)), + sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)), sc.conf.getOption("spark.mesos.checkpoint").map(_.toBoolean), sc.conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble), sc.conf.getOption("spark.mesos.driver.frameworkId")