++
- failedJobsTable
+ failedJobsTable.toNodeSeq
}
val helpText = """A job is triggered by an action, like count() or saveAsTextFile().""" +
@@ -615,7 +363,11 @@ private[ui] class JobPagedTable(
{jobTableRow.jobDescription} {killLink}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobGroupPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobGroupPage.scala
new file mode 100644
index 0000000000000..b14d8aaed4ad3
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobGroupPage.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.collection.mutable
+import scala.xml.{Node, NodeSeq}
+
+import org.apache.spark.JobExecutionStatus
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+import org.apache.spark.ui.jobs.UIData.JobUIData
+
+/** Page showing list of jobs under a job group id */
+private[ui] class JobGroupPage(parent: JobsTab) extends WebUIPage("jobgroup") {
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val listener = parent.jobProgresslistener
+ listener.synchronized {
+ val parameterId = request.getParameter("id")
+ require(parameterId != null && parameterId.nonEmpty, "Missing parameter id")
+
+ val jobGroupId = parameterId
+ val groupToJobsTable = listener.jobGroupToJobIds.get(jobGroupId)
+ if (groupToJobsTable.isEmpty) {
+ val content =
+
+
No information to display for jobGroup
+ {jobGroupId}
+
+
+ return UIUtils.headerSparkPage(
+ s"Details for JobGroup $jobGroupId", content, parent)
+ }
+
+ val jobsInGroup = listener.jobIdToData
+
+ val activeJobsInGroup = mutable.Buffer[JobUIData]()
+ val completedJobsInGroup = mutable.Buffer[JobUIData]()
+ val failedJobsInGroup = mutable.Buffer[JobUIData]()
+ var totalDuration = 0L
+ groupToJobsTable.get.foreach { jobId =>
+ val job = jobsInGroup.get(jobId)
+ val duration: Option[Long] = {
+ job.get.submissionTime.map { start =>
+ val end = job.get.completionTime.getOrElse(System.currentTimeMillis())
+ end - start
+ }
+ }
+ totalDuration += duration.getOrElse(0L)
+ job.get.status match {
+ case JobExecutionStatus.RUNNING => activeJobsInGroup ++= job
+ case JobExecutionStatus.SUCCEEDED => completedJobsInGroup ++= job
+ case JobExecutionStatus.FAILED => failedJobsInGroup ++= job
+ case JobExecutionStatus.UNKNOWN => // not handling unknown status
+ }
+ }
+
+ val activeJobsTable =
+ new JobsTable(request,
+ activeJobsInGroup.sortBy(_.submissionTime.getOrElse((-1L))).reverse,
+ "active", "activeJob", parent.basePath, listener, killEnabled = parent.killEnabled)
+ val completedJobsTable =
+ new JobsTable(request,
+ completedJobsInGroup.sortBy(_.completionTime.getOrElse(-1L)).reverse,
+ "completed", "completeJob", parent.basePath, listener, killEnabled = false)
+ val failedJobsTable =
+ new JobsTable(request,
+ failedJobsInGroup.sortBy(_.completionTime.getOrElse(-1L)).reverse,
+ "failed", "failedJob", parent.basePath, listener, killEnabled = false)
+
+ val shouldShowActiveJobs = activeJobsInGroup.nonEmpty
+ val shouldShowCompletedJobs = completedJobsInGroup.nonEmpty
+ val shouldShowFailedJobs = failedJobsInGroup.nonEmpty
+
+ val summary: NodeSeq =
+
++
+ failedJobsTable.toNodeSeq
+ }
+
+ val helpText =
+ s"""A job is triggered by an action, like count() or saveAsTextFile().
+ | Click on a job to see information about the stages of tasks inside it.""".stripMargin
+
+ UIUtils.headerSparkPage("Spark Jobs", content, parent, helpText = Some(helpText))
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
index cc173381879a6..141067e93db30 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
@@ -37,6 +37,7 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {
attachPage(new AllJobsPage(this))
attachPage(new JobPage(this))
+ attachPage(new JobGroupPage(this))
def handleKillRequest(request: HttpServletRequest): Unit = {
if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTable.scala
new file mode 100644
index 0000000000000..2da4baf5efc41
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTable.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.jobs.UIData.JobUIData
+import org.apache.spark.util.Utils
+
+private[ui] class JobsTable(
+ request: HttpServletRequest,
+ jobs: Seq[JobUIData],
+ tableHeaderId: String,
+ jobTag: String,
+ basePath: String,
+ progressListener: JobProgressListener,
+ killEnabled: Boolean) {
+ // stripXSS is called to remove suspicious characters used in XSS attacks
+ val allParameters = request.getParameterMap.asScala.toMap.mapValues(_.map(UIUtils.stripXSS))
+ val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag))
+ .map(para => para._1 + "=" + para._2(0))
+
+ val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined)
+ val jobIdTitle = if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"
+
+ // stripXSS is called first to remove suspicious characters used in XSS attacks
+ val parameterJobPage = UIUtils.stripXSS(request.getParameter(jobTag + ".page"))
+ val parameterJobSortColumn = UIUtils.stripXSS(request.getParameter(jobTag + ".sort"))
+ val parameterJobSortDesc = UIUtils.stripXSS(request.getParameter(jobTag + ".desc"))
+ val parameterJobPageSize = UIUtils.stripXSS(request.getParameter(jobTag + ".pageSize"))
+ val parameterJobPrevPageSize = UIUtils.stripXSS(request.getParameter(jobTag + ".prevPageSize"))
+
+ val jobPage = Option(parameterJobPage).map(_.toInt).getOrElse(1)
+ val jobSortColumn = Option(parameterJobSortColumn).map { sortColumn =>
+ UIUtils.decodeURLParameter(sortColumn)
+ }.getOrElse(jobIdTitle)
+ val jobSortDesc = Option(parameterJobSortDesc).map(_.toBoolean).getOrElse(
+ // New jobs should be shown above old jobs by default.
+ if (jobSortColumn == jobIdTitle) true else false
+ )
+ val jobPageSize = Option(parameterJobPageSize).map(_.toInt).getOrElse(100)
+ val jobPrevPageSize = Option(parameterJobPrevPageSize).map(_.toInt).getOrElse(jobPageSize)
+
+ val page: Int = {
+ // If the user has changed to a larger page size, then go to page 1 in order to avoid
+ // IndexOutOfBoundsException.
+ if (jobPageSize <= jobPrevPageSize) {
+ jobPage
+ } else {
+ 1
+ }
+ }
+ val currentTime = System.currentTimeMillis()
+
+ val toNodeSeq = try {
+ new JobPagedTable(
+ jobs,
+ tableHeaderId,
+ jobTag,
+ UIUtils.prependBaseUri(basePath),
+ "jobs", // subPath
+ parameterOtherTable,
+ progressListener.stageIdToInfo,
+ progressListener.stageIdToData,
+ killEnabled,
+ currentTime,
+ jobIdTitle,
+ pageSize = jobPageSize,
+ sortColumn = jobSortColumn,
+ desc = jobSortDesc
+ ).table(page)
+ } catch {
+ case e@(_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
+
+
Error while rendering job table:
+
+ {Utils.exceptionString(e)}
+
+
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsUtils.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsUtils.scala
new file mode 100644
index 0000000000000..db742fce07f1d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsUtils.scala
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.ui.jobs
+
+import java.util.Date
+
+import scala.collection.mutable.ListBuffer
+import scala.xml.{Node, Unparsed, Utility}
+
+import org.apache.commons.lang3.StringEscapeUtils
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.JobExecutionStatus
+import org.apache.spark.scheduler.{SparkListenerEvent, SparkListenerExecutorAdded, SparkListenerExecutorRemoved}
+import org.apache.spark.ui.{ToolTips, UIUtils}
+import org.apache.spark.ui.jobs.UIData.JobUIData
+
+/** Utility functions for generating Jobs related XML pages with spark content. */
+private object JobsUtils extends Logging {
+ val JOBS_LEGEND =
+ .toString.filter(_ != '\n')
+
+ val EXECUTORS_LEGEND =
+ .toString.filter(_ != '\n')
+
+ def getLastStageNameAndDescription(
+ job: JobUIData, progressListener: JobProgressListener): (String, String) = {
+ val lastStageInfo = Option(job.stageIds)
+ .filter(_.nonEmpty)
+ .flatMap { ids => progressListener.stageIdToInfo.get(ids.max)}
+ val lastStageData = lastStageInfo.flatMap { s =>
+ progressListener.stageIdToData.get((s.stageId, s.attemptId))
+ }
+ val name = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
+ val description = lastStageData.flatMap(_.description).getOrElse("")
+ (name, description)
+ }
+
+ def makeJobEvent(
+ jobUIDatas: Seq[JobUIData], progressListener: JobProgressListener): Seq[String] = {
+ jobUIDatas.filter { jobUIData =>
+ jobUIData.status != JobExecutionStatus.UNKNOWN && jobUIData.submissionTime.isDefined
+ }.map { jobUIData =>
+ val jobId = jobUIData.jobId
+ val status = jobUIData.status
+ val (jobName, jobDescription) = getLastStageNameAndDescription(jobUIData, progressListener)
+ val displayJobDescription =
+ if (jobDescription.isEmpty) {
+ jobName
+ } else {
+ UIUtils.makeDescription(jobDescription, "", plainText = true).text
+ }
+ val submissionTime = jobUIData.submissionTime.get
+ val completionTimeOpt = jobUIData.completionTime
+ val completionTime = completionTimeOpt.getOrElse(System.currentTimeMillis())
+ val classNameByStatus = status match {
+ case JobExecutionStatus.SUCCEEDED => "succeeded"
+ case JobExecutionStatus.FAILED => "failed"
+ case JobExecutionStatus.RUNNING => "running"
+ case JobExecutionStatus.UNKNOWN => "unknown"
+ }
+
+ // The timeline library treats contents as HTML, so we have to escape them. We need to add
+ // extra layers of escaping in order to embed this in a Javascript string literal.
+ val escapedDesc = Utility.escape(displayJobDescription)
+ val jsEscapedDesc = StringEscapeUtils.escapeEcmaScript(escapedDesc)
+ val jobEventJsonAsStr =
+ s"""
+ |{
+ | 'className': 'job application-timeline-object ${classNameByStatus}',
+ | 'group': 'jobs',
+ | 'start': new Date(${submissionTime}),
+ | 'end': new Date(${completionTime}),
+ | 'content': '
++
+
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index bdd148875e38a..18ee6a8ef4824 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -706,6 +706,47 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
}
}
+ test("jobs with job group id should show correct link to job group page on all jobs page") {
+ withSpark(newSparkContext()) { sc =>
+ val ui = sc.ui.get
+ // Once at least one job has been run in a job group, then we should display the link to
+ // job group page
+ val jobGroupId = "my-job-group"
+ sc.setJobGroup(jobGroupId, "my-job-group-description")
+ // Create an RDD that involves multiple stages:
+ val rdd =
+ sc.parallelize(Seq(1, 2, 3)).map(identity).groupBy(identity).map(identity).groupBy(identity)
+ // Run it twice; this will cause the second job to have two "phantom" stages that were
+ // mentioned in its job start event but which were never actually executed:
+ rdd.count()
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ goToUi(ui, "/jobs")
+ findAll(cssSelector("tbody tr")).foreach { row =>
+ val links = row.underlying.findElements(By.xpath(".//a"))
+ links.size should be (2)
+ links.get(0).getText().toLowerCase should include (jobGroupId)
+ links.get(0).getAttribute("href") should include regex (
+ s"(?=.*jobgroup)(?=.*${jobGroupId})")
+ links.get(1).getText().toLowerCase should include ("count")
+ }
+ }
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ goToUi(ui, s"/jobs/jobgroup/?id=${jobGroupId}")
+ find(id("pending")) should be (None)
+ find(id("active")) should be (None)
+ find(id("failed")) should be (None)
+ find(id("completed")).get.text should be ("Completed Jobs (1)")
+ findAll(cssSelector("tbody tr")).foreach { row =>
+ val links = row.underlying.findElements(By.xpath(".//a"))
+ links.size should be (2)
+ links.get(0).getText().toLowerCase should include (jobGroupId)
+ links.get(1).getText().toLowerCase should include ("count")
+ }
+ }
+ }
+ }
+
def goToUi(sc: SparkContext, path: String): Unit = {
goToUi(sc.ui.get, path)
}