Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,26 @@ See the [configuration page](configuration.html) for information on Spark config
Fetcher Cache</a>
</td>
</tr>
<tr>
<td><code>spark.mesos.checkpoint</code></td>
<td>false</td>
<td>
If set, agents running tasks started by this framework will write the framework pid, executor pids and status updates to disk.
If the agent exits (e.g., due to a crash or as part of upgrading Mesos), this checkpointed data allows the restarted agent to
reconnect to executors that were started by the old instance of the agent. Enabling checkpointing improves fault tolerance,
at the cost of a (usually small) increase in disk I/O.
</td>
</tr>
<tr>
<td><code>spark.mesos.failoverTimeout</code></td>
<td>0.0</td>
<td>
The amount of time (in seconds) that the master will wait for the scheduler to failover before it tears down the framework
by killing all its tasks/executors. This should be non-zero if a framework expects to reconnect after a failure and not lose
its tasks/executors.
NOTE: To avoid accidental destruction of tasks, productionframeworks typically set this to a large value (e.g., 1 week).
</td>
</tr>
</table>

# Troubleshooting and Debugging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.appName,
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
None,
None,
sc.conf.getOption("spark.mesos.checkpoint").map(_.toBoolean),
sc.conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble),
sc.conf.getOption("spark.mesos.driver.frameworkId")
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ private[spark] class MesosFineGrainedSchedulerBackend(
sc.appName,
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
Option.empty,
Option.empty,
sc.conf.getOption("spark.mesos.checkpoint").map(_.toBoolean),
sc.conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble),
sc.conf.getOption("spark.mesos.driver.frameworkId")
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,43 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(cpus == offerCores)
}

test("mesos supports checkpointing") {

val checkpoint = true
val failoverTimeout = 10
setBackend(Map("spark.mesos.checkpoint" -> checkpoint.toString,
"spark.mesos.failoverTimeout" -> failoverTimeout.toString,
"spark.mesos.driver.webui.url" -> "http://webui"))

val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
val driver = mock[SchedulerDriver]
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
val securityManager = mock[SecurityManager]

val backend = new MesosCoarseGrainedSchedulerBackend(
taskScheduler, sc, "master", securityManager) {
override protected def createSchedulerDriver(
masterUrl: String,
scheduler: Scheduler,
sparkUser: String,
appName: String,
conf: SparkConf,
webuiUrl: Option[String] = None,
checkpoint: Option[Boolean] = None,
failoverTimeout: Option[Double] = None,
frameworkId: Option[String] = None): SchedulerDriver = {
markRegistered()
assert(checkpoint.contains(true))
assert(failoverTimeout.contains(10.0))
driver
}
}

backend.start()

}

test("mesos does not acquire more than spark.cores.max") {
val maxCores = 10
setBackend(Map("spark.cores.max" -> maxCores.toString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,43 @@ class MesosFineGrainedSchedulerBackendSuite
backend.start()
}

test("mesos supports checkpointing") {
val conf = new SparkConf
conf.set("spark.mesos.checkpoint", "true")
conf.set("spark.mesos.failoverTimeout", "10")
conf.set("spark.mesos.driver.webui.url", "http://webui")

val sc = mock[SparkContext]
when(sc.conf).thenReturn(conf)
when(sc.sparkUser).thenReturn("sparkUser1")
when(sc.appName).thenReturn("appName1")

val taskScheduler = mock[TaskSchedulerImpl]
val driver = mock[SchedulerDriver]
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)

val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") {
override protected def createSchedulerDriver(
masterUrl: String,
scheduler: Scheduler,
sparkUser: String,
appName: String,
conf: SparkConf,
webuiUrl: Option[String] = None,
checkpoint: Option[Boolean] = None,
failoverTimeout: Option[Double] = None,
frameworkId: Option[String] = None): SchedulerDriver = {
markRegistered()
assert(checkpoint.contains(true))
assert(failoverTimeout.contains(10.0))
driver
}
}

backend.start()

}

test("Use configured mesosExecutor.cores for ExecutorInfo") {
val mesosExecutorCores = 3
val conf = new SparkConf
Expand Down