Skip to content

Commit 6e97ec1

Browse files
Sergiusz UrbaniakMichael Gummelt
authored andcommitted
enable history server links in dispatcher UI
1 parent 2182e43 commit 6e97ec1

File tree

7 files changed

+77
-10
lines changed

7 files changed

+77
-10
lines changed

core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,17 @@ import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState
2828
import org.apache.spark.ui.{UIUtils, WebUIPage}
2929

3030
private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") {
31+
private val historyServerURL = parent.conf.getOption("spark.mesos.dispatcher.historyServer.url")
32+
3133
def render(request: HttpServletRequest): Seq[Node] = {
3234
val state = parent.scheduler.getSchedulerState()
33-
val queuedHeaders = Seq("Driver ID", "Submit Date", "Main Class", "Driver Resources")
34-
val driverHeaders = queuedHeaders ++
35+
36+
val driverHeader = Seq("Driver ID")
37+
val historyHeader = if (historyServerURL.isDefined) Seq("History") else Nil
38+
val submissionHeader = Seq("Submit Date", "Main Class", "Driver Resources")
39+
40+
val queuedHeaders = driverHeader ++ submissionHeader
41+
val driverHeaders = driverHeader ++ historyHeader ++ submissionHeader ++
3542
Seq("Start Date", "Mesos Slave ID", "State")
3643
val retryHeaders = Seq("Driver ID", "Submit Date", "Description") ++
3744
Seq("Last Failed Status", "Next Retry Time", "Attempt Count")
@@ -68,8 +75,18 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
6875

6976
private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = {
7077
val id = state.driverDescription.submissionId
78+
79+
val historyCol = if (historyServerURL.isDefined) {
80+
<td>
81+
<a href={s"${historyServerURL.get}/history/${state.frameworkId}"}>
82+
{state.frameworkId}
83+
</a>
84+
</td>
85+
} else Nil
86+
7187
<tr>
7288
<td><a href={s"driver?id=$id"}>{id}</a></td>
89+
{historyCol}
7390
<td>{state.driverDescription.submissionDate}</td>
7491
<td>{state.driverDescription.command.mainClass}</td>
7592
<td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td>

core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.ui.JettyUtils._
2828
private[spark] class MesosClusterUI(
2929
securityManager: SecurityManager,
3030
port: Int,
31-
conf: SparkConf,
31+
val conf: SparkConf,
3232
dispatcherPublicAddress: String,
3333
val scheduler: MesosClusterScheduler)
3434
extends WebUI(securityManager, securityManager.getSSLOptions("mesos"), port, conf) {

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,32 +37,36 @@ import org.apache.spark.util.Utils
3737

3838
/**
3939
* Tracks the current state of a Mesos Task that runs a Spark driver.
40+
*
4041
* @param driverDescription Submitted driver description from
4142
* [[org.apache.spark.deploy.rest.mesos.MesosRestServer]]
4243
* @param taskId Mesos TaskID generated for the task
4344
* @param slaveId Slave ID that the task is assigned to
4445
* @param mesosTaskStatus The last known task status update.
4546
* @param startDate The date the task was launched
47+
* @param frameworkId Mesos framework ID the task registers with
4648
*/
4749
private[spark] class MesosClusterSubmissionState(
4850
val driverDescription: MesosDriverDescription,
4951
val taskId: TaskID,
5052
val slaveId: SlaveID,
5153
var mesosTaskStatus: Option[TaskStatus],
5254
var startDate: Date,
53-
var finishDate: Option[Date])
55+
var finishDate: Option[Date],
56+
val frameworkId: String)
5457
extends Serializable {
5558

5659
def copy(): MesosClusterSubmissionState = {
5760
new MesosClusterSubmissionState(
58-
driverDescription, taskId, slaveId, mesosTaskStatus, startDate, finishDate)
61+
driverDescription, taskId, slaveId, mesosTaskStatus, startDate, finishDate, frameworkId)
5962
}
6063
}
6164

6265
/**
6366
* Tracks the retry state of a driver, which includes the next time it should be scheduled
6467
* and necessary information to do exponential backoff.
6568
* This class is not thread-safe, and we expect the caller to handle synchronizing state.
69+
*
6670
* @param lastFailureStatus Last Task status when it failed.
6771
* @param retries Number of times it has been retried.
6872
* @param nextRetry Time at which it should be retried next
@@ -80,6 +84,7 @@ private[spark] class MesosClusterRetryState(
8084
/**
8185
* The full state of the cluster scheduler, currently being used for displaying
8286
* information on the UI.
87+
*
8388
* @param frameworkId Mesos Framework id for the cluster scheduler.
8489
* @param masterUrl The Mesos master url
8590
* @param queuedDrivers All drivers queued to be launched
@@ -355,7 +360,15 @@ private[spark] class MesosClusterScheduler(
355360

356361
private def getDriverExecutorURI(desc: MesosDriverDescription): Option[String] = {
357362
desc.conf.getOption("spark.executor.uri")
358-
.orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
363+
.orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
364+
}
365+
366+
private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {
367+
m.updated(k, f(m.getOrElse(k, default)))
368+
}
369+
370+
private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
371+
s"${frameworkId}-${desc.submissionId}"
359372
}
360373

361374
private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
@@ -364,7 +377,12 @@ private[spark] class MesosClusterScheduler(
364377
val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts)
365378
val driverEnv = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.")
366379

367-
driverEnv ++ executorEnv ++ desc.command.environment
380+
var commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")(
381+
v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}"
382+
)
383+
384+
385+
driverEnv ++ executorEnv ++ commandEnv
368386
}
369387

370388
val envBuilder = Environment.newBuilder()
@@ -552,7 +570,7 @@ private[spark] class MesosClusterScheduler(
552570
logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
553571
submission.submissionId)
554572
val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
555-
None, new Date(), None)
573+
None, new Date(), None, getDriverFrameworkID(submission))
556574
launchedDrivers(submission.submissionId) = newState
557575
launchedDriversState.persist(submission.submissionId, newState)
558576
afterLaunchCallback(submission.submissionId)

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
152152
sc.sparkUser,
153153
sc.appName,
154154
sc.conf,
155-
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress))
155+
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)),
156+
Option.empty,
157+
Option.empty,
158+
sc.conf.getOption("spark.mesos.driver.frameworkId")
156159
)
160+
161+
unsetFrameworkID(sc)
157162
startScheduler(driver)
158163
}
159164

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,13 @@ private[spark] class MesosFineGrainedSchedulerBackend(
7777
sc.sparkUser,
7878
sc.appName,
7979
sc.conf,
80-
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress))
80+
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)),
81+
Option.empty,
82+
Option.empty,
83+
sc.conf.getOption("spark.mesos.driver.frameworkId")
8184
)
85+
86+
unsetFrameworkID(sc)
8287
startScheduler(driver)
8388
}
8489

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,4 +357,15 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
357357
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s")
358358
}
359359

360+
/**
361+
* spark.mesos.driver.frameworkId is set by the cluster dispatcher to correlate driver
362+
* submissions with frameworkIDs. However, this causes issues when a driver process launches
363+
* more than one framework (more than one SparkContext(, because they all try to register with
364+
* the same frameworkID. To enforce that only the first driver registers with the configured
365+
* framework ID, the driver calls this method after the first registration.
366+
*/
367+
def unsetFrameworkID(sc: SparkContext) {
368+
sc.conf.remove("spark.mesos.driver.frameworkId")
369+
System.clearProperty("spark.mesos.driver.frameworkId")
370+
}
360371
}

docs/running-on-mesos.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,17 @@ See the [configuration page](configuration.html) for information on Spark config
468468
If unset it will point to Spark's internal web UI.
469469
</td>
470470
</tr>
471+
<tr>
472+
<td><code>spark.mesos.dispatcher.historyServer.url</code></td>
473+
<td><code>(none)</code></td>
474+
<td>
475+
Set the URL of the <a
476+
href="http://spark.apache.org/docs/latest/monitoring.html#viewing-after-the-fact">history
477+
server>. The dispatcher will then link each driver to its entry
478+
in the history server.
479+
</td>
480+
</tr>
481+
471482
</table>
472483

473484
# Troubleshooting and Debugging

0 commit comments

Comments
 (0)