Skip to content

Commit 2d35cc0

Browse files
JoshRosenpwendell
authored andcommitted
[SPARK-4145] Web UI job pages
This PR adds two new pages to the Spark Web UI: - A jobs overview page, which shows details on running / completed / failed jobs. - A job details page, which displays information on an individual job's stages. The jobs overview page is now the default UI homepage; the old homepage is still accessible at `/stages`. ### Screenshots #### New UI homepage ![image](https://cloud.githubusercontent.com/assets/50748/5119035/fd0a69e6-701f-11e4-89cb-db7e9705714f.png) #### Job details page (This is effectively a per-job version of the stages page that can be extended later with other things, such as DAG visualizations) ![image](https://cloud.githubusercontent.com/assets/50748/5134910/50b340d4-70c7-11e4-88e1-6b73237ea7c8.png) ### Key changes in this PR - Rename `JobProgressPage` to `AllStagesPage` - Expose `StageInfo` objects in the ``SparkListenerJobStart` event; add backwards-compatibility tests to JsonProtocol. - Add additional data structures to `JobProgressListener` to map from stages to jobs. - Add several fields to `JobUIData`. I also added ~150 lines of Selenium tests as I uncovered UI issues while developing this patch. ### Limitations If a job contains stages that aren't run, then its overall job progress bar may be an underestimate of the total job progress; in other words, a completed job may appear to have a progress bar that's not at 100%. If stages or tasks fail, then the progress bar will not go backwards to reflect the true amount of remaining work. Author: Josh Rosen <[email protected]> Closes #3009 from JoshRosen/job-page and squashes the following commits: eb05e90 [Josh Rosen] Disable kill button in completed stages tables. f00c851 [Josh Rosen] Fix JsonProtocol compatibility b89c258 [Josh Rosen] More JSON protocol backwards-compatibility fixes. ff804cd [Josh Rosen] Don't write "Stage Ids" field in JobStartEvent JSON. 6f17f3f [Josh Rosen] Only store StageInfos in SparkListenerJobStart event. 2bbf41a [Josh Rosen] Update job progress bar to reflect skipped tasks/stages. 61c265a [Josh Rosen] Add “skipped stages” table; only display non-empty tables. 1f45d44 [Josh Rosen] Incorporate a bunch of minor review feedback. 0b77e3e [Josh Rosen] More bug fixes for phantom stages. 034aa8d [Josh Rosen] Use `.max()` to find result stage for job. eebdc2c [Josh Rosen] Don’t display pending stages for completed jobs. 67080ba [Josh Rosen] Ensure that "phantom stages" don't cause memory leaks. 7d10b97 [Josh Rosen] Merge remote-tracking branch 'apache/master' into job-page d69c775 [Josh Rosen] Fix table sorting on all jobs page. 5eb39dc [Josh Rosen] Add pending stages table to job page. f2a15da [Josh Rosen] Add status field to job details page. 171b53c [Josh Rosen] Move `startTime` to the start of SparkContext. e2f2c43 [Josh Rosen] Fix sorting of stages in job details page. 8955f4c [Josh Rosen] Display information for pending stages on jobs page. 8ab6c28 [Josh Rosen] Compute numTasks from job start stage infos. 5884f91 [Josh Rosen] Add StageInfos to SparkListenerJobStart event. 79793cd [Josh Rosen] Track indices of completed stage to avoid overcounting when failures occur. d62ea7b [Josh Rosen] Add failing Selenium test for stage overcounting issue. 1145c60 [Josh Rosen] Display text instead of progress bar for stages. 3d0a007 [Josh Rosen] Merge remote-tracking branch 'origin/master' into job-page 8a2351b [Josh Rosen] Add help tooltip to Spark Jobs page. b7bf30e [Josh Rosen] Add stages progress bar; fix bug where active stages show as completed. 4846ce4 [Josh Rosen] Hide "(Job Group") if no jobs were submitted in job groups. 4d58e55 [Josh Rosen] Change label to "Tasks (for all stages)" 85e9c85 [Josh Rosen] Extract startTime into separate variable. 1cf4987 [Josh Rosen] Fix broken kill links; add Selenium test to avoid future regressions. 56701fa [Josh Rosen] Move last stage name / description logic out of markup. a475ea1 [Josh Rosen] Add progress bars to jobs page. 45343b8 [Josh Rosen] More comments 4b206fb [Josh Rosen] Merge remote-tracking branch 'origin/master' into job-page bfce2b9 [Josh Rosen] Address review comments, except for progress bar. 4487dcb [Josh Rosen] [SPARK-4145] Web UI job pages 2568a6c [Josh Rosen] Rename JobProgressPage to AllStagesPage: (cherry picked from commit 4a90276) Signed-off-by: Patrick Wendell <[email protected]>
1 parent 97b7eb4 commit 2d35cc0

21 files changed

+1054
-75
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ class SparkContext(config: SparkConf) extends Logging {
8383
// contains a map from hostname to a list of input format splits on the host.
8484
private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()
8585

86+
val startTime = System.currentTimeMillis()
87+
8688
/**
8789
* Create a SparkContext that loads settings from system properties (for instance, when
8890
* launching with ./bin/spark-submit).
@@ -269,8 +271,6 @@ class SparkContext(config: SparkConf) extends Logging {
269271
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
270272
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
271273

272-
val startTime = System.currentTimeMillis()
273-
274274
// Add each JAR given through the constructor
275275
if (jars != null) {
276276
jars.foreach(addJar)

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -751,14 +751,15 @@ class DAGScheduler(
751751
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
752752
if (shouldRunLocally) {
753753
// Compute very short actions like first() or take() with no parent stages locally.
754-
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
754+
listenerBus.post(SparkListenerJobStart(job.jobId, Seq.empty, properties))
755755
runLocally(job)
756756
} else {
757757
jobIdToActiveJob(jobId) = job
758758
activeJobs += job
759759
finalStage.resultOfJob = Some(job)
760-
listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray,
761-
properties))
760+
val stageIds = jobIdToStageIds(jobId).toArray
761+
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
762+
listenerBus.post(SparkListenerJobStart(job.jobId, stageInfos, properties))
762763
submitStage(finalStage)
763764
}
764765
}

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,15 @@ case class SparkListenerTaskEnd(
5656
extends SparkListenerEvent
5757

5858
@DeveloperApi
59-
case class SparkListenerJobStart(jobId: Int, stageIds: Seq[Int], properties: Properties = null)
60-
extends SparkListenerEvent
59+
case class SparkListenerJobStart(
60+
jobId: Int,
61+
stageInfos: Seq[StageInfo],
62+
properties: Properties = null)
63+
extends SparkListenerEvent {
64+
// Note: this is here for backwards-compatibility with older versions of this event which
65+
// only stored stageIds and not StageInfos:
66+
val stageIds: Seq[Int] = stageInfos.map(_.stageId)
67+
}
6168

6269
@DeveloperApi
6370
case class SparkListenerJobEnd(jobId: Int, jobResult: JobResult) extends SparkListenerEvent

core/src/main/scala/org/apache/spark/ui/SparkUI.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.storage.StorageStatusListener
2323
import org.apache.spark.ui.JettyUtils._
2424
import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab}
2525
import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab}
26-
import org.apache.spark.ui.jobs.{JobProgressListener, JobProgressTab}
26+
import org.apache.spark.ui.jobs.{JobsTab, JobProgressListener, StagesTab}
2727
import org.apache.spark.ui.storage.{StorageListener, StorageTab}
2828

2929
/**
@@ -43,17 +43,20 @@ private[spark] class SparkUI private (
4343
extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI")
4444
with Logging {
4545

46+
val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)
47+
4648
/** Initialize all components of the server. */
4749
def initialize() {
48-
val jobProgressTab = new JobProgressTab(this)
49-
attachTab(jobProgressTab)
50+
attachTab(new JobsTab(this))
51+
val stagesTab = new StagesTab(this)
52+
attachTab(stagesTab)
5053
attachTab(new StorageTab(this))
5154
attachTab(new EnvironmentTab(this))
5255
attachTab(new ExecutorsTab(this))
5356
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
54-
attachHandler(createRedirectHandler("/", "/stages", basePath = basePath))
57+
attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath))
5558
attachHandler(
56-
createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest))
59+
createRedirectHandler("/stages/stage/kill", "/stages", stagesTab.handleKillRequest))
5760
// If the UI is live, then serve
5861
sc.foreach { _.env.metricsSystem.getServletHandlers.foreach(attachHandler) }
5962
}

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ private[spark] object UIUtils extends Logging {
169169
title: String,
170170
content: => Seq[Node],
171171
activeTab: SparkUITab,
172-
refreshInterval: Option[Int] = None): Seq[Node] = {
172+
refreshInterval: Option[Int] = None,
173+
helpText: Option[String] = None): Seq[Node] = {
173174

174175
val appName = activeTab.appName
175176
val shortAppName = if (appName.length < 36) appName else appName.take(32) + "..."
@@ -178,6 +179,9 @@ private[spark] object UIUtils extends Logging {
178179
<a href={prependBaseUri(activeTab.basePath, "/" + tab.prefix + "/")}>{tab.name}</a>
179180
</li>
180181
}
182+
val helpButton: Seq[Node] = helpText.map { helpText =>
183+
<a data-toggle="tooltip" data-placement="bottom" title={helpText}>(?)</a>
184+
}.getOrElse(Seq.empty)
181185

182186
<html>
183187
<head>
@@ -201,6 +205,7 @@ private[spark] object UIUtils extends Logging {
201205
<div class="span12">
202206
<h3 style="vertical-align: bottom; display: inline-block;">
203207
{title}
208+
{helpButton}
204209
</h3>
205210
</div>
206211
</div>
@@ -283,4 +288,24 @@ private[spark] object UIUtils extends Logging {
283288
</tbody>
284289
</table>
285290
}
291+
292+
def makeProgressBar(
293+
started: Int,
294+
completed: Int,
295+
failed: Int,
296+
skipped:Int,
297+
total: Int): Seq[Node] = {
298+
val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
299+
val startWidth = "width: %s%%".format((started.toDouble/total)*100)
300+
301+
<div class="progress">
302+
<span style="text-align:center; position:absolute; width:100%; left:0;">
303+
{completed}/{total}
304+
{ if (failed > 0) s"($failed failed)" }
305+
{ if (skipped > 0) s"($skipped skipped)" }
306+
</span>
307+
<div class="bar bar-completed" style={completeWidth}></div>
308+
<div class="bar bar-running" style={startWidth}></div>
309+
</div>
310+
}
286311
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.ui.jobs
19+
20+
import scala.xml.{Node, NodeSeq}
21+
22+
import javax.servlet.http.HttpServletRequest
23+
24+
import org.apache.spark.JobExecutionStatus
25+
import org.apache.spark.ui.{WebUIPage, UIUtils}
26+
import org.apache.spark.ui.jobs.UIData.JobUIData
27+
28+
/** Page showing list of all ongoing and recently finished jobs */
29+
private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
30+
private val startTime: Option[Long] = parent.sc.map(_.startTime)
31+
private val listener = parent.listener
32+
33+
private def jobsTable(jobs: Seq[JobUIData]): Seq[Node] = {
34+
val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined)
35+
36+
val columns: Seq[Node] = {
37+
<th>{if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"}</th>
38+
<th>Description</th>
39+
<th>Submitted</th>
40+
<th>Duration</th>
41+
<th class="sorttable_nosort">Stages: Succeeded/Total</th>
42+
<th class="sorttable_nosort">Tasks (for all stages): Succeeded/Total</th>
43+
}
44+
45+
def makeRow(job: JobUIData): Seq[Node] = {
46+
val lastStageInfo = listener.stageIdToInfo.get(job.stageIds.max)
47+
val lastStageData = lastStageInfo.flatMap { s =>
48+
listener.stageIdToData.get((s.stageId, s.attemptId))
49+
}
50+
val isComplete = job.status == JobExecutionStatus.SUCCEEDED
51+
val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
52+
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
53+
val duration: Option[Long] = {
54+
job.startTime.map { start =>
55+
val end = job.endTime.getOrElse(System.currentTimeMillis())
56+
end - start
57+
}
58+
}
59+
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
60+
val formattedSubmissionTime = job.startTime.map(UIUtils.formatDate).getOrElse("Unknown")
61+
val detailUrl =
62+
"%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), job.jobId)
63+
<tr>
64+
<td sorttable_customkey={job.jobId.toString}>
65+
{job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
66+
</td>
67+
<td>
68+
<div><em>{lastStageDescription}</em></div>
69+
<a href={detailUrl}>{lastStageName}</a>
70+
</td>
71+
<td sorttable_customkey={job.startTime.getOrElse(-1).toString}>
72+
{formattedSubmissionTime}
73+
</td>
74+
<td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
75+
<td class="stage-progress-cell">
76+
{job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages}
77+
{if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
78+
{if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
79+
</td>
80+
<td class="progress-cell">
81+
{UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,
82+
failed = job.numFailedTasks, skipped = job.numSkippedTasks,
83+
total = job.numTasks - job.numSkippedTasks)}
84+
</td>
85+
</tr>
86+
}
87+
88+
<table class="table table-bordered table-striped table-condensed sortable">
89+
<thead>{columns}</thead>
90+
<tbody>
91+
{jobs.map(makeRow)}
92+
</tbody>
93+
</table>
94+
}
95+
96+
def render(request: HttpServletRequest): Seq[Node] = {
97+
listener.synchronized {
98+
val activeJobs = listener.activeJobs.values.toSeq
99+
val completedJobs = listener.completedJobs.reverse.toSeq
100+
val failedJobs = listener.failedJobs.reverse.toSeq
101+
val now = System.currentTimeMillis
102+
103+
val activeJobsTable =
104+
jobsTable(activeJobs.sortBy(_.startTime.getOrElse(-1L)).reverse)
105+
val completedJobsTable =
106+
jobsTable(completedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)
107+
val failedJobsTable =
108+
jobsTable(failedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)
109+
110+
val summary: NodeSeq =
111+
<div>
112+
<ul class="unstyled">
113+
{if (startTime.isDefined) {
114+
// Total duration is not meaningful unless the UI is live
115+
<li>
116+
<strong>Total Duration: </strong>
117+
{UIUtils.formatDuration(now - startTime.get)}
118+
</li>
119+
}}
120+
<li>
121+
<strong>Scheduling Mode: </strong>
122+
{listener.schedulingMode.map(_.toString).getOrElse("Unknown")}
123+
</li>
124+
<li>
125+
<a href="#active"><strong>Active Jobs:</strong></a>
126+
{activeJobs.size}
127+
</li>
128+
<li>
129+
<a href="#completed"><strong>Completed Jobs:</strong></a>
130+
{completedJobs.size}
131+
</li>
132+
<li>
133+
<a href="#failed"><strong>Failed Jobs:</strong></a>
134+
{failedJobs.size}
135+
</li>
136+
</ul>
137+
</div>
138+
139+
val content = summary ++
140+
<h4 id="active">Active Jobs ({activeJobs.size})</h4> ++ activeJobsTable ++
141+
<h4 id="completed">Completed Jobs ({completedJobs.size})</h4> ++ completedJobsTable ++
142+
<h4 id ="failed">Failed Jobs ({failedJobs.size})</h4> ++ failedJobsTable
143+
144+
val helpText = """A job is triggered by a action, like "count()" or "saveAsTextFile()".""" +
145+
" Click on a job's title to see information about the stages of tasks associated with" +
146+
" the job."
147+
148+
UIUtils.headerSparkPage("Spark Jobs", content, parent, helpText = Some(helpText))
149+
}
150+
}
151+
}

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala renamed to core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.scheduler.Schedulable
2525
import org.apache.spark.ui.{WebUIPage, UIUtils}
2626

2727
/** Page showing list of all ongoing and recently finished stages and pools */
28-
private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") {
28+
private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
2929
private val sc = parent.sc
3030
private val listener = parent.listener
3131
private def isFairScheduler = parent.isFairScheduler
@@ -41,11 +41,14 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("")
4141

4242
val activeStagesTable =
4343
new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
44-
parent, parent.killEnabled)
44+
parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler,
45+
killEnabled = parent.killEnabled)
4546
val completedStagesTable =
46-
new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent)
47+
new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath,
48+
parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = false)
4749
val failedStagesTable =
48-
new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
50+
new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent.basePath,
51+
parent.listener, isFairScheduler = parent.isFairScheduler)
4952

5053
// For now, pool information is only accessible in live UIs
5154
val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable])
@@ -93,7 +96,7 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("")
9396
<h4 id ="failed">Failed Stages ({numFailedStages})</h4> ++
9497
failedStagesTable.toNodeSeq
9598

96-
UIUtils.headerSparkPage("Spark Stages", content, parent)
99+
UIUtils.headerSparkPage("Spark Stages (for all jobs)", content, parent)
97100
}
98101
}
99102
}

core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.ui.jobs.UIData.StageUIData
2525
import org.apache.spark.util.Utils
2626

2727
/** Stage summary grouped by executors. */
28-
private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobProgressTab) {
28+
private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: StagesTab) {
2929
private val listener = parent.listener
3030

3131
def toNodeSeq: Seq[Node] = {

0 commit comments

Comments
 (0)