Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,16 @@ See the [configuration page](configuration.html) for information on Spark config
Fetcher Cache</a>
</td>
</tr>
<tr>
<td><code>spark.mesos.checkpoint</code></td>
<td>false</td>
<td>
If set to true, the mesos agents that are running the Spark executors will write the framework pid, executor pids and status updates to disk.
If the agent exits (e.g., due to a crash or as part of upgrading Mesos), this checkpointed data allows the restarted agent to
reconnect to executors that were started by the old instance of the agent. Enabling checkpointing improves fault tolerance,
at the cost of a (usually small) increase in disk I/O.
</td>
</tr>
</table>

# Troubleshooting and Debugging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,15 @@ package object config {
.stringConf
.createOptional

private[spark] val CHECKPOINT =
ConfigBuilder("spark.mesos.checkpoint")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add .doc like the others.

.doc("If set to true, the agents that are running the Spark executors will write " +
"the framework pid, executor pids and status updates to disk. If the agent exits " +
"(e.g., due to a crash or as part of upgrading Mesos), this checkpointed data allows " +
"the restarted agent to reconnect to executors that were started by the old instance " +
"of the agent. Enabling checkpointing improves fault tolerance, at the cost of a " +
"(usually small) increase in disk I/O.")
.booleanConf
.createOptional

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import scala.concurrent.Future
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}

import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcEndpointAddress
Expand Down Expand Up @@ -158,7 +159,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.appName,
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
None,
sc.conf.get(CHECKPOINT),
None,
sc.conf.getOption("spark.mesos.driver.frameworkId")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,40 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(cpus == offerCores)
}

test("mesos supports checkpointing") {

val checkpoint = true
setBackend(Map("spark.mesos.checkpoint" -> checkpoint.toString,
"spark.mesos.driver.webui.url" -> "http://webui"))

val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
val driver = mock[SchedulerDriver]
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
val securityManager = mock[SecurityManager]

val backend = new MesosCoarseGrainedSchedulerBackend(
taskScheduler, sc, "master", securityManager) {
override protected def createSchedulerDriver(
masterUrl: String,
scheduler: Scheduler,
sparkUser: String,
appName: String,
conf: SparkConf,
webuiUrl: Option[String] = None,
checkpoint: Option[Boolean] = None,
failoverTimeout: Option[Double] = None,
frameworkId: Option[String] = None): SchedulerDriver = {
markRegistered()
assert(checkpoint.contains(true))
driver
}
}

backend.start()

}

test("mesos does not acquire more than spark.cores.max") {
val maxCores = 10
setBackend(Map("spark.cores.max" -> maxCores.toString))
Expand Down