diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index 08dc17d5887e9..3bda2000b7830 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -233,7 +233,11 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { val detailUrl = "%s/jobs/job?id=%s".format(basePathUri, job.jobId) - {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")} + {job.jobId}{ job.jobGroup.map { id => + + {id} + + }.getOrElse({job.jobGroup.map(id => s"($id)").getOrElse("")})} {jobDescription} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobGroupPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobGroupPage.scala new file mode 100644 index 0000000000000..c1ef0b38c1876 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobGroupPage.scala @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + +import java.util.Date +import javax.servlet.http.HttpServletRequest + +import org.apache.commons.lang3.StringEscapeUtils +import org.apache.spark.JobExecutionStatus +import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData} +import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} + +import scala.collection.mutable +import scala.collection.mutable.{HashMap, ListBuffer} +import scala.xml.{Node, NodeSeq, Unparsed, Utility} + +/** Page showing list of jobs under a job group id */ +private[ui] class JobGroupPage(parent: JobsTab) extends WebUIPage("jobgroup") { + private val JOBS_LEGEND = +
+ + Succeeded + + Failed + + Running +
.toString.filter(_ != '\n') + + private val EXECUTORS_LEGEND = +
+ + Added + + Removed +
.toString.filter(_ != '\n') + + private def getLastStageNameAndDescription(job: JobUIData): (String, String) = { + val lastStageInfo = Option(job.stageIds) + .filter(_.nonEmpty) + .flatMap { ids => parent.jobProgresslistener.stageIdToInfo.get(ids.max)} + val lastStageData = lastStageInfo.flatMap { s => + parent.jobProgresslistener.stageIdToData.get((s.stageId, s.attemptId)) + } + val name = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") + val description = lastStageData.flatMap(_.description).getOrElse("") + (name, description) + } + + private def jobsTable(jobs: Seq[JobUIData]): Seq[Node] = { + val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined) + + val columns: Seq[Node] = { + {if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"} + Description + Submitted + Duration + Stages: Succeeded/Total + Tasks (for all stages): Succeeded/Total + } + + def makeRow(job: JobUIData): Seq[Node] = { + val (lastStageName, lastStageDescription) = getLastStageNameAndDescription(job) + val duration: Option[Long] = { + job.submissionTime.map { start => + val end = job.completionTime.getOrElse(System.currentTimeMillis()) + end - start + } + } + val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown") + val formattedSubmissionTime = job.submissionTime.map(UIUtils.formatDate).getOrElse("Unknown") + val basePathUri = UIUtils.prependBaseUri(parent.basePath) + val jobDescription = UIUtils.makeDescription(lastStageDescription, basePathUri) + + val detailUrl = "%s/jobs/job?id=%s".format(basePathUri, job.jobId) + + + {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")} + + + {jobDescription} + {lastStageName} + + + {formattedSubmissionTime} + + {formattedDuration} + + {job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages} + {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"} + {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"} + + + {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks, + failed = job.numFailedTasks, skipped = job.numSkippedTasks, + total = job.numTasks - job.numSkippedTasks)} + + + } + + + {columns} + + {jobs.map(makeRow)} + +
+ } + + private def makeJobEvent(jobUIDatas: Seq[JobUIData]): Seq[String] = { + jobUIDatas.filter { jobUIData => + jobUIData.status != JobExecutionStatus.UNKNOWN && jobUIData.submissionTime.isDefined + }.map { jobUIData => + val jobId = jobUIData.jobId + val status = jobUIData.status + val (jobName, jobDescription) = getLastStageNameAndDescription(jobUIData) + val displayJobDescription = if (jobDescription.isEmpty) jobName else jobDescription + val submissionTime = jobUIData.submissionTime.get + val completionTimeOpt = jobUIData.completionTime + val completionTime = completionTimeOpt.getOrElse(System.currentTimeMillis()) + val classNameByStatus = status match { + case JobExecutionStatus.SUCCEEDED => "succeeded" + case JobExecutionStatus.FAILED => "failed" + case JobExecutionStatus.RUNNING => "running" + case JobExecutionStatus.UNKNOWN => "unknown" + } + + // The timeline library treats contents as HTML, so we have to escape them. We need to add + // extra layers of escaping in order to embed this in a Javascript string literal. + val escapedDesc = Utility.escape(displayJobDescription) + val jsEscapedDesc = StringEscapeUtils.escapeEcmaScript(escapedDesc) + val jobEventJsonAsStr = + s""" + |{ + | 'className': 'job application-timeline-object ${classNameByStatus}', + | 'group': 'jobs', + | 'start': new Date(${submissionTime}), + | 'end': new Date(${completionTime}), + | 'content': '
Completed: ${UIUtils.formatDate(new Date(completionTime))}""" + } else { + "" + } + }">' + + | '${jsEscapedDesc} (Job ${jobId})
' + |} + """.stripMargin + jobEventJsonAsStr + } + } + + private def makeExecutorEvent(executorUIDatas: HashMap[String, ExecutorUIData]): Seq[String] = { + val events = ListBuffer[String]() + executorUIDatas.foreach { + case (executorId, event) => + val addedEvent = + s""" + |{ + | 'className': 'executor added', + | 'group': 'executors', + | 'start': new Date(${event.startTime}), + | 'content': '
Executor ${executorId} added
' + |} + """.stripMargin + events += addedEvent + + if (event.finishTime.isDefined) { + val removedEvent = + s""" + |{ + | 'className': 'executor removed', + | 'group': 'executors', + | 'start': new Date(${event.finishTime.get}), + | 'content': '
Reason: ${event.finishReason.get.replace("\n", " ")}""" + } else { + "" + } + }"' + + | 'data-html="true">Executor ${executorId} removed
' + |} + """.stripMargin + events += removedEvent + } + } + events.toSeq + } + + + protected def makeTimeline( + jobs: Seq[JobUIData], + executors: HashMap[String, ExecutorUIData], + startTime: Long): Seq[Node] = { + + val jobEventJsonAsStrSeq = makeJobEvent(jobs) + val executorEventJsonAsStrSeq = makeExecutorEvent(executors) + + val groupJsonArrayAsStr = + s""" + |[ + | { + | 'id': 'executors', + | 'content': '
Executors
${EXECUTORS_LEGEND}', + | }, + | { + | 'id': 'jobs', + | 'content': '
Jobs
${JOBS_LEGEND}', + | } + |] + """.stripMargin + + val eventArrayAsStr = + (jobEventJsonAsStrSeq ++ executorEventJsonAsStrSeq).mkString("[", ",", "]") + + + + + Event Timeline + + ++ + ++ + + } + + override def render(request: HttpServletRequest): Seq[Node] = { + val listener = parent.jobProgresslistener + listener.synchronized { + val parameterId = request.getParameter("id") + require(parameterId != null && parameterId.nonEmpty, "Missing parameter id") + + val jobGroupId = parameterId + val groupToJobsTable = listener.jobGroupToJobIds.get(jobGroupId) + if (groupToJobsTable.isEmpty) { + val content = +
+

No information to display for jobGroup + {jobGroupId} +

+
+ return UIUtils.headerSparkPage( + s"Details for JobGroup $jobGroupId", content, parent) + } + + val jobsInGroup = listener.jobIdToData + + val activeJobsInGroup = mutable.Buffer[JobUIData]() + val completedJobsInGroup = mutable.Buffer[JobUIData]() + val failedJobsInGroup = mutable.Buffer[JobUIData]() + var totalDuration = 0L + groupToJobsTable.get.foreach { jobId => + val job = jobsInGroup.get(jobId) + val duration: Option[Long] = { + job.get.submissionTime.map { start => + val end = job.get.completionTime.getOrElse(System.currentTimeMillis()) + end - start + } + } + totalDuration += duration.getOrElse(0L) + job.get.status match { + case JobExecutionStatus.RUNNING => activeJobsInGroup ++= job + case JobExecutionStatus.SUCCEEDED => completedJobsInGroup ++= job + case JobExecutionStatus.FAILED => failedJobsInGroup ++= job + case JobExecutionStatus.UNKNOWN => // not handling unknown status + } + } + + val activeJobsTable = + jobsTable(activeJobsInGroup.sortBy(_.submissionTime.getOrElse((-1L))).reverse) + val completedJobsTable = + jobsTable(completedJobsInGroup.sortBy(_.completionTime.getOrElse(-1L)).reverse) + val failedJobsTable = + jobsTable(failedJobsInGroup.sortBy(_.completionTime.getOrElse(-1L)).reverse) + + val shouldShowActiveJobs = activeJobsInGroup.nonEmpty + val shouldShowCompletedJobs = completedJobsInGroup.nonEmpty + val shouldShowFailedJobs = failedJobsInGroup.nonEmpty + + val summary: NodeSeq = +
+ +
+ + var content = summary + val executorListener = parent.executorListener + content ++= makeTimeline(activeJobsInGroup ++ completedJobsInGroup ++ failedJobsInGroup, + executorListener.executorIdToData, listener.startTime) + + if (shouldShowActiveJobs) { + content ++=

Active Jobs ({activeJobsInGroup.size})

++ + activeJobsTable + } + if (shouldShowCompletedJobs) { + content ++=

Completed Jobs ({completedJobsInGroup.size})

++ + completedJobsTable + } + if (shouldShowFailedJobs) { + content ++=

Failed Jobs ({failedJobsInGroup.size})

++ + failedJobsTable + } + + val helpText = + s"""A job is triggered by an action, like count() or saveAsTextFile(). + | Click on a job to see information about the stages of tasks inside it.""".stripMargin + + UIUtils.headerSparkPage("Spark Jobs", content, parent, helpText = Some(helpText)) + } + + } + +} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala index 77ca60b000a9b..3c56d9e026c0b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala @@ -33,4 +33,5 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") { attachPage(new AllJobsPage(this)) attachPage(new JobPage(this)) + attachPage(new JobGroupPage(this)) } diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index 36c484dcfe8c9..61ea0f862d079 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -645,6 +645,44 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B } } + test("jobs with job group id should show correct link to job group page on all jobs page") { + withSpark(newSparkContext()) { sc => + val ui = sc.ui.get + // Once at least one job has been run in a job group, then we should display the link to + // job group page + val jobGroupId = "my-job-group" + sc.setJobGroup(jobGroupId, "my-job-group-description") + // Create an RDD that involves multiple stages: + val rdd = + sc.parallelize(Seq(1, 2, 3)).map(identity).groupBy(identity).map(identity).groupBy(identity) + // Run it twice; this will cause the second job to have two "phantom" stages that were + // mentioned in its job start event but which were never actually executed: + rdd.count() + eventually(timeout(10 seconds), interval(50 milliseconds)) { + goToUi(ui, "/jobs") + findAll(cssSelector("tbody tr")).foreach { row => + val links = row.underlying.findElements(By.xpath(".//a")) + links.size should be (2) + links.get(0).getText().toLowerCase should include (jobGroupId) + links.get(0).getAttribute("href") should include regex ( + s"(?=.*jobgroup)(?=.*${jobGroupId})") + links.get(1).getText().toLowerCase should include ("count") + } + } + + eventually(timeout(10 seconds), interval(50 milliseconds)) { + goToUi(ui, s"/jobs/jobgroup/?id=${jobGroupId}") + find(id("pending")) should be (None) + find(id("active")) should be (None) + find(id("failed")) should be (None) + find(id("completed")).get.text should be ("Completed Jobs (1)") + findAll(cssSelector("tbody tr a")).foreach { link => + link.text.toLowerCase should include ("count") + } + } + } + } + def goToUi(sc: SparkContext, path: String): Unit = { goToUi(sc.ui.get, path) }