Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,17 @@ See the [configuration page](configuration.html) for information on Spark config
Fetcher Cache</a>
</td>
</tr>
<tr>
<td><code>spark.mesos.driver.failoverTimeout</code></td>
<td><code>0.0</code></td>
<td>
The amount of time (in seconds) that the master will wait for the
driver to reconnect, after being temporarily disconnected, before
it tears down the driver framework by killing all its
executors. The default value is zero, meaning no timeout: if the
driver disconnects, the master immediately tears down the framework.
</td>
</tr>
</table>

# Troubleshooting and Debugging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,16 @@ package object config {

private [spark] val DRIVER_LABELS =
ConfigBuilder("spark.mesos.driver.labels")
.doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value" +
.doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value " +
"pairs should be separated by a colon, and commas used to list more than one." +
"Ex. key:value,key2:value2")
.stringConf
.createOptional

private [spark] val DRIVER_FAILOVER_TIMEOUT =
ConfigBuilder("spark.mesos.driver.failoverTimeout")
.doc("Amount of time in seconds that the master will wait to hear from the driver, " +
"during a temporary disconnection, before tearing down all the executors.")
.doubleConf
.createWithDefault(0.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zero means infinite time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, zero means 0ns. If the driver is disconnected, the Mesos master will wait 0ns for the driver to reconnect (it will teardown the framework immediately). This is the current default value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain that in the .md file? 0 is generally used for "infinity" so it may be confusing.

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.SchedulerDriver

import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.internal.config
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
Expand Down Expand Up @@ -177,7 +178,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
None,
None,
Some(sc.conf.get(DRIVER_FAILOVER_TIMEOUT)),
sc.conf.getOption("spark.mesos.driver.frameworkId")
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.scalatest.mock.MockitoSugar
import org.scalatest.BeforeAndAfter

import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.internal.config._
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
Expand Down Expand Up @@ -369,6 +370,41 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend.start()
}

test("failover timeout is set in created scheduler driver") {
val failoverTimeoutIn = 3600.0
initializeSparkConf(Map(DRIVER_FAILOVER_TIMEOUT.key -> failoverTimeoutIn.toString))
sc = new SparkContext(sparkConf)

val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)

val driver = mock[SchedulerDriver]
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)

val securityManager = mock[SecurityManager]

val backend = new MesosCoarseGrainedSchedulerBackend(
taskScheduler, sc, "master", securityManager) {
override protected def createSchedulerDriver(
masterUrl: String,
scheduler: Scheduler,
sparkUser: String,
appName: String,
conf: SparkConf,
webuiUrl: Option[String] = None,
checkpoint: Option[Boolean] = None,
failoverTimeout: Option[Double] = None,
frameworkId: Option[String] = None): SchedulerDriver = {
markRegistered()
assert(failoverTimeout.isDefined)
assert(failoverTimeout.get.equals(failoverTimeoutIn))
driver
}
}

backend.start()
}

test("honors unset spark.mesos.containerizer") {
setBackend(Map("spark.mesos.executor.docker.image" -> "test"))

Expand Down