Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
2568a6c
Rename JobProgressPage to AllStagesPage:
JoshRosen Oct 29, 2014
4487dcb
[SPARK-4145] Web UI job pages
JoshRosen Oct 30, 2014
bfce2b9
Address review comments, except for progress bar.
JoshRosen Nov 6, 2014
4b206fb
Merge remote-tracking branch 'origin/master' into job-page
JoshRosen Nov 6, 2014
45343b8
More comments
JoshRosen Nov 6, 2014
a475ea1
Add progress bars to jobs page.
JoshRosen Nov 11, 2014
56701fa
Move last stage name / description logic out of markup.
JoshRosen Nov 11, 2014
1cf4987
Fix broken kill links; add Selenium test to avoid future regressions.
JoshRosen Nov 11, 2014
85e9c85
Extract startTime into separate variable.
JoshRosen Nov 11, 2014
4d58e55
Change label to "Tasks (for all stages)"
JoshRosen Nov 11, 2014
4846ce4
Hide "(Job Group") if no jobs were submitted in job groups.
JoshRosen Nov 12, 2014
b7bf30e
Add stages progress bar; fix bug where active stages show as completed.
JoshRosen Nov 12, 2014
8a2351b
Add help tooltip to Spark Jobs page.
JoshRosen Nov 12, 2014
3d0a007
Merge remote-tracking branch 'origin/master' into job-page
JoshRosen Nov 17, 2014
1145c60
Display text instead of progress bar for stages.
JoshRosen Nov 17, 2014
d62ea7b
Add failing Selenium test for stage overcounting issue.
JoshRosen Nov 17, 2014
79793cd
Track indices of completed stage to avoid overcounting when failures …
JoshRosen Nov 18, 2014
5884f91
Add StageInfos to SparkListenerJobStart event.
JoshRosen Nov 18, 2014
8ab6c28
Compute numTasks from job start stage infos.
JoshRosen Nov 18, 2014
8955f4c
Display information for pending stages on jobs page.
JoshRosen Nov 19, 2014
e2f2c43
Fix sorting of stages in job details page.
JoshRosen Nov 19, 2014
171b53c
Move `startTime` to the start of SparkContext.
JoshRosen Nov 19, 2014
f2a15da
Add status field to job details page.
JoshRosen Nov 19, 2014
5eb39dc
Add pending stages table to job page.
JoshRosen Nov 19, 2014
d69c775
Fix table sorting on all jobs page.
JoshRosen Nov 19, 2014
7d10b97
Merge remote-tracking branch 'apache/master' into job-page
JoshRosen Nov 20, 2014
67080ba
Ensure that "phantom stages" don't cause memory leaks.
JoshRosen Nov 20, 2014
eebdc2c
Don’t display pending stages for completed jobs.
JoshRosen Nov 20, 2014
034aa8d
Use `.max()` to find result stage for job.
JoshRosen Nov 20, 2014
0b77e3e
More bug fixes for phantom stages.
JoshRosen Nov 20, 2014
1f45d44
Incorporate a bunch of minor review feedback.
JoshRosen Nov 20, 2014
61c265a
Add “skipped stages” table; only display non-empty tables.
JoshRosen Nov 20, 2014
2bbf41a
Update job progress bar to reflect skipped tasks/stages.
JoshRosen Nov 20, 2014
6f17f3f
Only store StageInfos in SparkListenerJobStart event.
JoshRosen Nov 21, 2014
ff804cd
Don't write "Stage Ids" field in JobStartEvent JSON.
JoshRosen Nov 21, 2014
b89c258
More JSON protocol backwards-compatibility fixes.
JoshRosen Nov 21, 2014
f00c851
Fix JsonProtocol compatibility
JoshRosen Nov 21, 2014
eb05e90
Disable kill button in completed stages tables.
JoshRosen Nov 24, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class SparkContext(config: SparkConf) extends Logging {
// contains a map from hostname to a list of input format splits on the host.
private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()

val startTime = System.currentTimeMillis()

/**
* Create a SparkContext that loads settings from system properties (for instance, when
* launching with ./bin/spark-submit).
Expand Down Expand Up @@ -269,8 +271,6 @@ class SparkContext(config: SparkConf) extends Logging {
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I moved this here to avoid a race-condition where a user could browse to the web UI before this field was initialized. There didn't seem to be any particular reason for it to be here as opposed to anywhere else.


val startTime = System.currentTimeMillis()

// Add each JAR given through the constructor
if (jars != null) {
jars.foreach(addJar)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -751,14 +751,15 @@ class DAGScheduler(
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
if (shouldRunLocally) {
// Compute very short actions like first() or take() with no parent stages locally.
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
listenerBus.post(SparkListenerJobStart(job.jobId, Seq.empty, properties))
runLocally(job)
} else {
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.resultOfJob = Some(job)
listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray,
properties))
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(SparkListenerJobStart(job.jobId, stageInfos, properties))
submitStage(finalStage)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,15 @@ case class SparkListenerTaskEnd(
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerJobStart(jobId: Int, stageIds: Seq[Int], properties: Properties = null)
extends SparkListenerEvent
case class SparkListenerJobStart(
jobId: Int,
stageInfos: Seq[StageInfo],
properties: Properties = null)
extends SparkListenerEvent {
// Note: this is here for backwards-compatibility with older versions of this event which
// only stored stageIds and not StageInfos:
val stageIds: Seq[Int] = stageInfos.map(_.stageId)
}

@DeveloperApi
case class SparkListenerJobEnd(jobId: Int, jobResult: JobResult) extends SparkListenerEvent
Expand Down
13 changes: 8 additions & 5 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.storage.StorageStatusListener
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab}
import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab}
import org.apache.spark.ui.jobs.{JobProgressListener, JobProgressTab}
import org.apache.spark.ui.jobs.{JobsTab, JobProgressListener, StagesTab}
import org.apache.spark.ui.storage.{StorageListener, StorageTab}

/**
Expand All @@ -43,17 +43,20 @@ private[spark] class SparkUI private (
extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI")
with Logging {

val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)

/** Initialize all components of the server. */
def initialize() {
val jobProgressTab = new JobProgressTab(this)
attachTab(jobProgressTab)
attachTab(new JobsTab(this))
val stagesTab = new StagesTab(this)
attachTab(stagesTab)
attachTab(new StorageTab(this))
attachTab(new EnvironmentTab(this))
attachTab(new ExecutorsTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/stages", basePath = basePath))
attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath))
attachHandler(
createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest))
createRedirectHandler("/stages/stage/kill", "/stages", stagesTab.handleKillRequest))
// If the UI is live, then serve
sc.foreach { _.env.metricsSystem.getServletHandlers.foreach(attachHandler) }
}
Expand Down
27 changes: 26 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ private[spark] object UIUtils extends Logging {
title: String,
content: => Seq[Node],
activeTab: SparkUITab,
refreshInterval: Option[Int] = None): Seq[Node] = {
refreshInterval: Option[Int] = None,
helpText: Option[String] = None): Seq[Node] = {

val appName = activeTab.appName
val shortAppName = if (appName.length < 36) appName else appName.take(32) + "..."
Expand All @@ -178,6 +179,9 @@ private[spark] object UIUtils extends Logging {
<a href={prependBaseUri(activeTab.basePath, "/" + tab.prefix + "/")}>{tab.name}</a>
</li>
}
val helpButton: Seq[Node] = helpText.map { helpText =>
<a data-toggle="tooltip" data-placement="bottom" title={helpText}>(?)</a>
}.getOrElse(Seq.empty)

<html>
<head>
Expand All @@ -201,6 +205,7 @@ private[spark] object UIUtils extends Logging {
<div class="span12">
<h3 style="vertical-align: bottom; display: inline-block;">
{title}
{helpButton}
</h3>
</div>
</div>
Expand Down Expand Up @@ -283,4 +288,24 @@ private[spark] object UIUtils extends Logging {
</tbody>
</table>
}

def makeProgressBar(
started: Int,
completed: Int,
failed: Int,
skipped:Int,
total: Int): Seq[Node] = {
val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
val startWidth = "width: %s%%".format((started.toDouble/total)*100)

<div class="progress">
<span style="text-align:center; position:absolute; width:100%; left:0;">
{completed}/{total}
{ if (failed > 0) s"($failed failed)" }
{ if (skipped > 0) s"($skipped skipped)" }
</span>
<div class="bar bar-completed" style={completeWidth}></div>
<div class="bar bar-running" style={startWidth}></div>
</div>
}
}
151 changes: 151 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ui.jobs

import scala.xml.{Node, NodeSeq}

import javax.servlet.http.HttpServletRequest

import org.apache.spark.JobExecutionStatus
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.ui.jobs.UIData.JobUIData

/** Page showing list of all ongoing and recently finished jobs */
private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
private val startTime: Option[Long] = parent.sc.map(_.startTime)
private val listener = parent.listener

private def jobsTable(jobs: Seq[JobUIData]): Seq[Node] = {
val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined)

val columns: Seq[Node] = {
<th>{if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"}</th>
<th>Description</th>
<th>Submitted</th>
<th>Duration</th>
<th class="sorttable_nosort">Stages: Succeeded/Total</th>
<th class="sorttable_nosort">Tasks (for all stages): Succeeded/Total</th>
}

def makeRow(job: JobUIData): Seq[Node] = {
val lastStageInfo = listener.stageIdToInfo.get(job.stageIds.max)
val lastStageData = lastStageInfo.flatMap { s =>
listener.stageIdToData.get((s.stageId, s.attemptId))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move the code to compute the last stage's name up here? It was hard for me to figure out why you were grabbing the last stage here.

val isComplete = job.status == JobExecutionStatus.SUCCEEDED
val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
val duration: Option[Long] = {
job.startTime.map { start =>
val end = job.endTime.getOrElse(System.currentTimeMillis())
end - start
}
}
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
val formattedSubmissionTime = job.startTime.map(UIUtils.formatDate).getOrElse("Unknown")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize we use "Unknown" in a few places. Can you declare a

val UNKNOWN: String = "Unknown"

in UIUtils? Then you can just import UIUtils._

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other places, we have "Unknown stage name", etc; I'm not sure that this is a huge win (it would be beneficial if we decided to localize, though, but we're not doing that here).

val detailUrl =
"%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), job.jobId)
<tr>
<td sorttable_customkey={job.jobId.toString}>
{job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
</td>
<td>
<div><em>{lastStageDescription}</em></div>
<a href={detailUrl}>{lastStageName}</a>
</td>
<td sorttable_customkey={job.startTime.getOrElse(-1).toString}>
{formattedSubmissionTime}
</td>
<td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
<td class="stage-progress-cell">
{job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages}
{if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
{if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
</td>
<td class="progress-cell">
{UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,
failed = job.numFailedTasks, skipped = job.numSkippedTasks,
total = job.numTasks - job.numSkippedTasks)}
</td>
</tr>
}

<table class="table table-bordered table-striped table-condensed sortable">
<thead>{columns}</thead>
<tbody>
{jobs.map(makeRow)}
</tbody>
</table>
}

def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val activeJobs = listener.activeJobs.values.toSeq
val completedJobs = listener.completedJobs.reverse.toSeq
val failedJobs = listener.failedJobs.reverse.toSeq
val now = System.currentTimeMillis

val activeJobsTable =
jobsTable(activeJobs.sortBy(_.startTime.getOrElse(-1L)).reverse)
val completedJobsTable =
jobsTable(completedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)
val failedJobsTable =
jobsTable(failedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)

val summary: NodeSeq =
<div>
<ul class="unstyled">
{if (startTime.isDefined) {
// Total duration is not meaningful unless the UI is live
<li>
<strong>Total Duration: </strong>
{UIUtils.formatDuration(now - startTime.get)}
</li>
}}
<li>
<strong>Scheduling Mode: </strong>
{listener.schedulingMode.map(_.toString).getOrElse("Unknown")}
</li>
<li>
<a href="#active"><strong>Active Jobs:</strong></a>
{activeJobs.size}
</li>
<li>
<a href="#completed"><strong>Completed Jobs:</strong></a>
{completedJobs.size}
</li>
<li>
<a href="#failed"><strong>Failed Jobs:</strong></a>
{failedJobs.size}
</li>
</ul>
</div>

val content = summary ++
<h4 id="active">Active Jobs ({activeJobs.size})</h4> ++ activeJobsTable ++
<h4 id="completed">Completed Jobs ({completedJobs.size})</h4> ++ completedJobsTable ++
<h4 id ="failed">Failed Jobs ({failedJobs.size})</h4> ++ failedJobsTable

val helpText = """A job is triggered by a action, like "count()" or "saveAsTextFile()".""" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: why not just escape the "s? I find this harder to read

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't care either way; do you have a strong opinion on this or can I just leave it as is?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can leave it

" Click on a job's title to see information about the stages of tasks associated with" +
" the job."

UIUtils.headerSparkPage("Spark Jobs", content, parent, helpText = Some(helpText))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.scheduler.Schedulable
import org.apache.spark.ui.{WebUIPage, UIUtils}

/** Page showing list of all ongoing and recently finished stages and pools */
private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") {
private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This naming change is great

private val sc = parent.sc
private val listener = parent.listener
private def isFairScheduler = parent.isFairScheduler
Expand All @@ -41,11 +41,14 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("")

val activeStagesTable =
new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
parent, parent.killEnabled)
parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler,
killEnabled = parent.killEnabled)
val completedStagesTable =
new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent)
new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath,
parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = false)
val failedStagesTable =
new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent.basePath,
parent.listener, isFairScheduler = parent.isFairScheduler)

// For now, pool information is only accessible in live UIs
val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable])
Expand Down Expand Up @@ -93,7 +96,7 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("")
<h4 id ="failed">Failed Stages ({numFailedStages})</h4> ++
failedStagesTable.toNodeSeq

UIUtils.headerSparkPage("Spark Stages", content, parent)
UIUtils.headerSparkPage("Spark Stages (for all jobs)", content, parent)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.ui.jobs.UIData.StageUIData
import org.apache.spark.util.Utils

/** Stage summary grouped by executors. */
private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobProgressTab) {
private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: StagesTab) {
private val listener = parent.listener

def toNodeSeq: Seq[Node] = {
Expand Down
Loading