From 558ab2050ccb2a036753f3f06a3a6833f92539b4 Mon Sep 17 00:00:00 2001 From: gkc2104 Date: Tue, 4 Apr 2017 17:04:25 -0700 Subject: [PATCH 1/5] [SPARK-4899][MESOS] Support for checkpointing on Coarse and Fine grained schedulers --- docs/running-on-mesos.md | 20 ++++++++++ .../MesosCoarseGrainedSchedulerBackend.scala | 4 +- .../MesosFineGrainedSchedulerBackend.scala | 4 +- ...osCoarseGrainedSchedulerBackendSuite.scala | 37 +++++++++++++++++++ ...esosFineGrainedSchedulerBackendSuite.scala | 37 +++++++++++++++++++ 5 files changed, 98 insertions(+), 4 deletions(-) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 8d5ad12cb85be..3f5439a287b57 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -516,6 +516,26 @@ See the [configuration page](configuration.html) for information on Spark config Fetcher Cache + + spark.mesos.checkpoint + false + + If set, agents running tasks started by this framework will write the framework pid, executor pids and status updates to disk. + If the agent exits (e.g., due to a crash or as part of upgrading Mesos), this checkpointed data allows the restarted agent to + reconnect to executors that were started by the old instance of the agent. Enabling checkpointing improves fault tolerance, + at the cost of a (usually small) increase in disk I/O. + + + + spark.mesos.failoverTimeout + 0.0 + + The amount of time (in seconds) that the master will wait for the scheduler to failover before it tears down the framework + by killing all its tasks/executors. This should be non-zero if a framework expects to reconnect after a failure and not lose + its tasks/executors. + NOTE: To avoid accidental destruction of tasks, productionframeworks typically set this to a large value (e.g., 1 week). + + # Troubleshooting and Debugging diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index f555072c3842a..89b0905300a42 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -158,8 +158,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.appName, sc.conf, sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)), - None, - None, + sc.conf.getOption("spark.mesos.checkpoint").map(_.toBoolean), + sc.conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble), sc.conf.getOption("spark.mesos.driver.frameworkId") ) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index 7e561916a71e2..71869664fe01e 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -78,8 +78,8 @@ private[spark] class MesosFineGrainedSchedulerBackend( sc.appName, sc.conf, sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)), - Option.empty, - Option.empty, + sc.conf.getOption("spark.mesos.checkpoint").map(_.toBoolean), + sc.conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble), sc.conf.getOption("spark.mesos.driver.frameworkId") ) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index cdb3b68489654..308951769288a 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -139,6 +139,43 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(cpus == offerCores) } + test("mesos supports checkpointing") { + + val checkpoint = true + val failoverTimeout = 10 + setBackend(Map("spark.mesos.checkpoint" -> checkpoint.toString, + "spark.mesos.failoverTimeout" -> failoverTimeout.toString, + "spark.mesos.driver.webui.url" -> "http://webui")) + + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.sc).thenReturn(sc) + val driver = mock[SchedulerDriver] + when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) + val securityManager = mock[SecurityManager] + + val backend = new MesosCoarseGrainedSchedulerBackend( + taskScheduler, sc, "master", securityManager) { + override protected def createSchedulerDriver( + masterUrl: String, + scheduler: Scheduler, + sparkUser: String, + appName: String, + conf: SparkConf, + webuiUrl: Option[String] = None, + checkpoint: Option[Boolean] = None, + failoverTimeout: Option[Double] = None, + frameworkId: Option[String] = None): SchedulerDriver = { + markRegistered() + assert(checkpoint.contains(true)) + assert(failoverTimeout.contains(10.0)) + driver + } + } + + backend.start() + + } + test("mesos does not acquire more than spark.cores.max") { val maxCores = 10 setBackend(Map("spark.cores.max" -> maxCores.toString)) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index 4ee85b91830a9..21343b063b00f 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -79,6 +79,43 @@ class MesosFineGrainedSchedulerBackendSuite backend.start() } + test("mesos supports checkpointing") { + val conf = new SparkConf + conf.set("spark.mesos.checkpoint", "true") + conf.set("spark.mesos.failoverTimeout", "10") + conf.set("spark.mesos.driver.webui.url", "http://webui") + + val sc = mock[SparkContext] + when(sc.conf).thenReturn(conf) + when(sc.sparkUser).thenReturn("sparkUser1") + when(sc.appName).thenReturn("appName1") + + val taskScheduler = mock[TaskSchedulerImpl] + val driver = mock[SchedulerDriver] + when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) + + val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") { + override protected def createSchedulerDriver( + masterUrl: String, + scheduler: Scheduler, + sparkUser: String, + appName: String, + conf: SparkConf, + webuiUrl: Option[String] = None, + checkpoint: Option[Boolean] = None, + failoverTimeout: Option[Double] = None, + frameworkId: Option[String] = None): SchedulerDriver = { + markRegistered() + assert(checkpoint.contains(true)) + assert(failoverTimeout.contains(10.0)) + driver + } + } + + backend.start() + + } + test("Use configured mesosExecutor.cores for ExecutorInfo") { val mesosExecutorCores = 3 val conf = new SparkConf From f79326e3d6ea4caad41c8923ef113ee358b16b4f Mon Sep 17 00:00:00 2001 From: Kamal Gurala Date: Wed, 24 May 2017 13:21:42 +0530 Subject: [PATCH 2/5] [SPARK-4899][MESOS] --- docs/running-on-mesos.md | 10 ----- .../MesosCoarseGrainedSchedulerBackend.scala | 2 +- .../MesosFineGrainedSchedulerBackend.scala | 4 +- ...osCoarseGrainedSchedulerBackendSuite.scala | 3 -- ...esosFineGrainedSchedulerBackendSuite.scala | 37 ------------------- 5 files changed, 3 insertions(+), 53 deletions(-) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 3f5439a287b57..69e95fb285009 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -526,16 +526,6 @@ See the [configuration page](configuration.html) for information on Spark config at the cost of a (usually small) increase in disk I/O. - - spark.mesos.failoverTimeout - 0.0 - - The amount of time (in seconds) that the master will wait for the scheduler to failover before it tears down the framework - by killing all its tasks/executors. This should be non-zero if a framework expects to reconnect after a failure and not lose - its tasks/executors. - NOTE: To avoid accidental destruction of tasks, productionframeworks typically set this to a large value (e.g., 1 week). - - # Troubleshooting and Debugging diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 89b0905300a42..dc9c728bd218a 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -159,7 +159,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf, sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)), sc.conf.getOption("spark.mesos.checkpoint").map(_.toBoolean), - sc.conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble), + None, sc.conf.getOption("spark.mesos.driver.frameworkId") ) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index 71869664fe01e..3eadb42b709b5 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -78,8 +78,8 @@ private[spark] class MesosFineGrainedSchedulerBackend( sc.appName, sc.conf, sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)), - sc.conf.getOption("spark.mesos.checkpoint").map(_.toBoolean), - sc.conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble), + None, + None, sc.conf.getOption("spark.mesos.driver.frameworkId") ) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 308951769288a..08195368498d1 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -142,9 +142,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos supports checkpointing") { val checkpoint = true - val failoverTimeout = 10 setBackend(Map("spark.mesos.checkpoint" -> checkpoint.toString, - "spark.mesos.failoverTimeout" -> failoverTimeout.toString, "spark.mesos.driver.webui.url" -> "http://webui")) val taskScheduler = mock[TaskSchedulerImpl] @@ -167,7 +165,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite frameworkId: Option[String] = None): SchedulerDriver = { markRegistered() assert(checkpoint.contains(true)) - assert(failoverTimeout.contains(10.0)) driver } } diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index 21343b063b00f..4ee85b91830a9 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -79,43 +79,6 @@ class MesosFineGrainedSchedulerBackendSuite backend.start() } - test("mesos supports checkpointing") { - val conf = new SparkConf - conf.set("spark.mesos.checkpoint", "true") - conf.set("spark.mesos.failoverTimeout", "10") - conf.set("spark.mesos.driver.webui.url", "http://webui") - - val sc = mock[SparkContext] - when(sc.conf).thenReturn(conf) - when(sc.sparkUser).thenReturn("sparkUser1") - when(sc.appName).thenReturn("appName1") - - val taskScheduler = mock[TaskSchedulerImpl] - val driver = mock[SchedulerDriver] - when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) - - val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") { - override protected def createSchedulerDriver( - masterUrl: String, - scheduler: Scheduler, - sparkUser: String, - appName: String, - conf: SparkConf, - webuiUrl: Option[String] = None, - checkpoint: Option[Boolean] = None, - failoverTimeout: Option[Double] = None, - frameworkId: Option[String] = None): SchedulerDriver = { - markRegistered() - assert(checkpoint.contains(true)) - assert(failoverTimeout.contains(10.0)) - driver - } - } - - backend.start() - - } - test("Use configured mesosExecutor.cores for ExecutorInfo") { val mesosExecutorCores = 3 val conf = new SparkConf From 80d2838185dde0dfe45a45241eff70df4b53a247 Mon Sep 17 00:00:00 2001 From: Kamal Gurala Date: Wed, 24 May 2017 23:58:24 +0530 Subject: [PATCH 3/5] [SPARK-4899][MESOS] Updated implementation --- docs/running-on-mesos.md | 2 +- .../main/scala/org/apache/spark/deploy/mesos/config.scala | 5 +++++ .../cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala | 3 ++- .../cluster/mesos/MesosFineGrainedSchedulerBackend.scala | 4 ++-- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 69e95fb285009..3129de5bf0af9 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -520,7 +520,7 @@ See the [configuration page](configuration.html) for information on Spark config spark.mesos.checkpoint false - If set, agents running tasks started by this framework will write the framework pid, executor pids and status updates to disk. + If set to true, the agents that are running the spark-executors will write framework pids (Spark), executor pids and status updates to disk. If the agent exits (e.g., due to a crash or as part of upgrading Mesos), this checkpointed data allows the restarted agent to reconnect to executors that were started by the old instance of the agent. Enabling checkpointing improves fault tolerance, at the cost of a (usually small) increase in disk I/O. diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala index 19e253394f1b2..e7a6750ac2346 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -56,4 +56,9 @@ package object config { .stringConf .createOptional + private[spark] val CHECKPOINT = + ConfigBuilder("spark.mesos.checkpoint") + .booleanConf + .createOptional + } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index dc9c728bd218a..0e1a4ee1a06b2 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -28,6 +28,7 @@ import scala.concurrent.Future import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState} +import org.apache.spark.deploy.mesos.config._ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.RpcEndpointAddress @@ -158,7 +159,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.appName, sc.conf, sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)), - sc.conf.getOption("spark.mesos.checkpoint").map(_.toBoolean), + sc.conf.get(CHECKPOINT), None, sc.conf.getOption("spark.mesos.driver.frameworkId") ) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index 3eadb42b709b5..7e561916a71e2 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -78,8 +78,8 @@ private[spark] class MesosFineGrainedSchedulerBackend( sc.appName, sc.conf, sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)), - None, - None, + Option.empty, + Option.empty, sc.conf.getOption("spark.mesos.driver.frameworkId") ) From 4622fa82636d6a37a60562648e58de2ddd410bc5 Mon Sep 17 00:00:00 2001 From: Kamal Gurala Date: Thu, 25 May 2017 00:12:33 +0530 Subject: [PATCH 4/5] [SPARK-4899][MESOS] Fixed documentations --- docs/running-on-mesos.md | 2 +- .../main/scala/org/apache/spark/deploy/mesos/config.scala | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 3129de5bf0af9..140a4ed535a28 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -520,7 +520,7 @@ See the [configuration page](configuration.html) for information on Spark config spark.mesos.checkpoint false - If set to true, the agents that are running the spark-executors will write framework pids (Spark), executor pids and status updates to disk. + If set to true, the agents that are running the Spark executors will write the framework pid, executor pids and status updates to disk. If the agent exits (e.g., due to a crash or as part of upgrading Mesos), this checkpointed data allows the restarted agent to reconnect to executors that were started by the old instance of the agent. Enabling checkpointing improves fault tolerance, at the cost of a (usually small) increase in disk I/O. diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala index e7a6750ac2346..6b88f49c820f2 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -58,6 +58,12 @@ package object config { private[spark] val CHECKPOINT = ConfigBuilder("spark.mesos.checkpoint") + .doc("If set to true, the agents that are running the Spark executors will write " + + "the framework pid, executor pids and status updates to disk. If the agent exits " + + "(e.g., due to a crash or as part of upgrading Mesos), this checkpointed data allows " + + "the restarted agent to reconnect to executors that were started by the old instance " + + "of the agent. Enabling checkpointing improves fault tolerance, at the cost of a " + + "(usually small) increase in disk I/O.") .booleanConf .createOptional From b51395edf2e8901c54575af7f2b7d35a7344a41f Mon Sep 17 00:00:00 2001 From: Kamal Gurala Date: Tue, 30 May 2017 11:19:12 +0530 Subject: [PATCH 5/5] [SPARK-4899][MESOS] Edited documentation --- docs/running-on-mesos.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 140a4ed535a28..ce109e18c15fe 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -520,7 +520,7 @@ See the [configuration page](configuration.html) for information on Spark config spark.mesos.checkpoint false - If set to true, the agents that are running the Spark executors will write the framework pid, executor pids and status updates to disk. + If set to true, the mesos agents that are running the Spark executors will write the framework pid, executor pids and status updates to disk. If the agent exits (e.g., due to a crash or as part of upgrading Mesos), this checkpointed data allows the restarted agent to reconnect to executors that were started by the old instance of the agent. Enabling checkpointing improves fault tolerance, at the cost of a (usually small) increase in disk I/O.