Skip to content

Commit ae3e5bd

Browse files
committed
Made the driver failover_timeout configurable as spark.mesos.driver.failoverTimeout. Added a unit test.
1 parent d03aebb commit ae3e5bd

File tree

4 files changed

+55
-2
lines changed

4 files changed

+55
-2
lines changed

docs/running-on-mesos.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,16 @@ See the [configuration page](configuration.html) for information on Spark config
545545
Fetcher Cache</a>
546546
</td>
547547
</tr>
548+
<tr>
549+
<td><code>spark.mesos.driver.failoverTimeout</code></td>
550+
<td><code>0.0</code></td>
551+
<td>
552+
The amount of time (in seconds) that the master will wait for the
553+
driver to reconnect, after being temporarily disconnected, before
554+
it tears down the driver framework by killing all its
555+
executors.
556+
</td>
557+
</tr>
548558
</table>
549559

550560
# Troubleshooting and Debugging

resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,16 @@ package object config {
5858

5959
private [spark] val DRIVER_LABELS =
6060
ConfigBuilder("spark.mesos.driver.labels")
61-
.doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value" +
61+
.doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value " +
6262
"pairs should be separated by a colon, and commas used to list more than one." +
6363
"Ex. key:value,key2:value2")
6464
.stringConf
6565
.createOptional
66+
67+
private [spark] val DRIVER_FAILOVER_TIMEOUT =
68+
ConfigBuilder("spark.mesos.driver.failoverTimeout")
69+
.doc("Amount of time in seconds that the master will wait to hear from the driver, " +
70+
"during a temporary disconnection, before tearing down all the executors.")
71+
.doubleConf
72+
.createWithDefault(0.0)
6673
}

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
2929
import org.apache.mesos.SchedulerDriver
3030

3131
import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
32+
import org.apache.spark.deploy.mesos.config._
3233
import org.apache.spark.internal.config
3334
import org.apache.spark.network.netty.SparkTransportConf
3435
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
@@ -177,7 +178,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
177178
sc.conf,
178179
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
179180
None,
180-
None,
181+
Some(sc.conf.get(DRIVER_FAILOVER_TIMEOUT)),
181182
sc.conf.getOption("spark.mesos.driver.frameworkId")
182183
)
183184

resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,41 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
369369
backend.start()
370370
}
371371

372+
test("failover timeout is set in created scheduler driver") {
373+
val failoverTimeoutIn = 3600.0
374+
initializeSparkConf(Map("spark.mesos.driver.failoverTimeout" -> failoverTimeoutIn.toString))
375+
sc = new SparkContext(sparkConf)
376+
377+
val taskScheduler = mock[TaskSchedulerImpl]
378+
when(taskScheduler.sc).thenReturn(sc)
379+
380+
val driver = mock[SchedulerDriver]
381+
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
382+
383+
val securityManager = mock[SecurityManager]
384+
385+
val backend = new MesosCoarseGrainedSchedulerBackend(
386+
taskScheduler, sc, "master", securityManager) {
387+
override protected def createSchedulerDriver(
388+
masterUrl: String,
389+
scheduler: Scheduler,
390+
sparkUser: String,
391+
appName: String,
392+
conf: SparkConf,
393+
webuiUrl: Option[String] = None,
394+
checkpoint: Option[Boolean] = None,
395+
failoverTimeout: Option[Double] = None,
396+
frameworkId: Option[String] = None): SchedulerDriver = {
397+
markRegistered()
398+
assert(failoverTimeout.isDefined)
399+
assert(failoverTimeout.get.equals(failoverTimeoutIn))
400+
driver
401+
}
402+
}
403+
404+
backend.start()
405+
}
406+
372407
test("honors unset spark.mesos.containerizer") {
373408
setBackend(Map("spark.mesos.executor.docker.image" -> "test"))
374409

0 commit comments

Comments
 (0)