.toString.filter(_ != '\n')
-
- private val EXECUTORS_LEGEND =
- .toString.filter(_ != '\n')
-
- private def makeJobEvent(jobs: Seq[v1.JobData]): Seq[String] = {
- jobs.filter { job =>
- job.status != JobExecutionStatus.UNKNOWN && job.submissionTime.isDefined
- }.map { job =>
- val jobId = job.jobId
- val status = job.status
- val (_, lastStageDescription) = lastStageNameAndDescription(store, job)
- val jobDescription = UIUtils.makeDescription(lastStageDescription, "", plainText = true).text
-
- val submissionTime = job.submissionTime.get.getTime()
- val completionTime = job.completionTime.map(_.getTime()).getOrElse(System.currentTimeMillis())
- val classNameByStatus = status match {
- case JobExecutionStatus.SUCCEEDED => "succeeded"
- case JobExecutionStatus.FAILED => "failed"
- case JobExecutionStatus.RUNNING => "running"
- case JobExecutionStatus.UNKNOWN => "unknown"
- }
-
- // The timeline library treats contents as HTML, so we have to escape them. We need to add
- // extra layers of escaping in order to embed this in a Javascript string literal.
- val escapedDesc = Utility.escape(jobDescription)
- val jsEscapedDesc = StringEscapeUtils.escapeEcmaScript(escapedDesc)
- val jobEventJsonAsStr =
- s"""
- |{
- | 'className': 'job application-timeline-object ${classNameByStatus}',
- | 'group': 'jobs',
- | 'start': new Date(${submissionTime}),
- | 'end': new Date(${completionTime}),
- | 'content': '
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobGroupPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobGroupPage.scala
new file mode 100644
index 000000000000..20165990685a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobGroupPage.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.collection.mutable.ListBuffer
+import scala.xml.{Node, NodeSeq}
+
+import org.apache.spark.JobExecutionStatus
+import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status.api.v1
+import org.apache.spark.ui._
+
+/** Page showing list of all ongoing and recently finished jobs belonging to a job group id */
+private[ui] class JobGroupPage(parent: JobsTab, store: AppStatusStore)
+ extends WebUIPage("jobgroup") {
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val parameterId = UIUtils.stripXSS(request.getParameter("id"))
+ require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
+
+ val jobGroupId = parameterId
+
+ val activeJobs = new ListBuffer[v1.JobData]()
+ val completedJobs = new ListBuffer[v1.JobData]()
+ val failedJobs = new ListBuffer[v1.JobData]()
+
+ var totalJobExecutionTime = 0L
+ store.jobsInJobGroupList(jobGroupId, null).foreach { job =>
+ val duration: Option[Long] = {
+ job.submissionTime.map { start =>
+ val end = job.completionTime.map(_.getTime).getOrElse(System.currentTimeMillis())
+ end - start.getTime()
+ }
+ }
+
+ totalJobExecutionTime += duration.getOrElse(0L)
+
+ job.status match {
+ case JobExecutionStatus.SUCCEEDED =>
+ completedJobs += job
+ case JobExecutionStatus.FAILED =>
+ failedJobs += job
+ case _ =>
+ activeJobs += job
+ }
+ }
+
+ val activeJobsTable =
+ JobsUtils.jobsTable(store, parent.basePath, request, "active",
+ "activeJob", activeJobs, killEnabled = parent.killEnabled)
+ val completedJobsTable =
+ JobsUtils.jobsTable(store, parent.basePath, request, "completed",
+ "completedJob", completedJobs, killEnabled = false)
+ val failedJobsTable =
+ JobsUtils.jobsTable(store, parent.basePath, request, "failed",
+ "failedJob", failedJobs, killEnabled = false)
+
+ val shouldShowActiveJobs = activeJobs.nonEmpty
+ val shouldShowCompletedJobs = completedJobs.nonEmpty
+ val shouldShowFailedJobs = failedJobs.nonEmpty
+
+ val summary: NodeSeq =
+
+ }
+}
+
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
index ff1b75e5c506..b47ec912e726 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
@@ -44,6 +44,7 @@ private[ui] class JobsTab(parent: SparkUI, store: AppStatusStore)
attachPage(new AllJobsPage(this, store))
attachPage(new JobPage(this, store))
+ attachPage(new JobGroupPage(this, store))
def handleKillRequest(request: HttpServletRequest): Unit = {
if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsUtils.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsUtils.scala
new file mode 100644
index 000000000000..705089750d63
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsUtils.scala
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.ui.jobs
+
+import java.util.Date
+import javax.servlet.http.HttpServletRequest
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.xml.{Node, Unparsed, Utility}
+
+import org.apache.commons.lang3.StringEscapeUtils
+
+import org.apache.spark.JobExecutionStatus
+import org.apache.spark.internal.Logging
+import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status.api.v1
+import org.apache.spark.ui.{ToolTips, UIUtils}
+import org.apache.spark.util.Utils
+
+private object JobsUtils extends Logging{
+ import ApiHelper._
+
+ val JOBS_LEGEND =
+ .toString.filter(_ != '\n')
+
+ val EXECUTORS_LEGEND =
+ .toString.filter(_ != '\n')
+
+ private def makeJobEvent(store: AppStatusStore, jobs: Seq[v1.JobData]): Seq[String] = {
+ jobs.filter { job =>
+ job.status != JobExecutionStatus.UNKNOWN && job.submissionTime.isDefined
+ }.map { job =>
+ val jobId = job.jobId
+ val status = job.status
+ val (_, lastStageDescription) = lastStageNameAndDescription(store, job)
+ val jobDescription = UIUtils.makeDescription(lastStageDescription, "", plainText = true).text
+
+ val submissionTime = job.submissionTime.get.getTime()
+ val completionTime = job.completionTime.map(_.getTime()).getOrElse(System.currentTimeMillis())
+ val classNameByStatus = status match {
+ case JobExecutionStatus.SUCCEEDED => "succeeded"
+ case JobExecutionStatus.FAILED => "failed"
+ case JobExecutionStatus.RUNNING => "running"
+ case JobExecutionStatus.UNKNOWN => "unknown"
+ }
+
+ // The timeline library treats contents as HTML, so we have to escape them. We need to add
+ // extra layers of escaping in order to embed this in a Javascript string literal.
+ val escapedDesc = Utility.escape(jobDescription)
+ val jsEscapedDesc = StringEscapeUtils.escapeEcmaScript(escapedDesc)
+ val jobEventJsonAsStr =
+ s"""
+ |{
+ | 'className': 'job application-timeline-object ${classNameByStatus}',
+ | 'group': 'jobs',
+ | 'start': new Date(${submissionTime}),
+ | 'end': new Date(${completionTime}),
+ | 'content': '
++
+
+ }
+
+ def jobsTable(
+ store: AppStatusStore,
+ basePath: String,
+ request: HttpServletRequest,
+ tableHeaderId: String,
+ jobTag: String,
+ jobs: Seq[v1.JobData],
+ killEnabled: Boolean): Seq[Node] = {
+ // stripXSS is called to remove suspicious characters used in XSS attacks
+ val allParameters = request.getParameterMap.asScala.toMap.map { case (k, v) =>
+ UIUtils.stripXSS(k) -> v.map(UIUtils.stripXSS).toSeq
+ }
+ val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag))
+ .map(para => para._1 + "=" + para._2(0))
+
+ val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined)
+ val jobIdTitle = if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"
+
+ // stripXSS is called first to remove suspicious characters used in XSS attacks
+ val parameterJobPage = UIUtils.stripXSS(request.getParameter(jobTag + ".page"))
+ val parameterJobSortColumn = UIUtils.stripXSS(request.getParameter(jobTag + ".sort"))
+ val parameterJobSortDesc = UIUtils.stripXSS(request.getParameter(jobTag + ".desc"))
+ val parameterJobPageSize = UIUtils.stripXSS(request.getParameter(jobTag + ".pageSize"))
+ val parameterJobPrevPageSize = UIUtils.stripXSS(request.getParameter(jobTag + ".prevPageSize"))
+
+ val jobPage = Option(parameterJobPage).map(_.toInt).getOrElse(1)
+ val jobSortColumn = Option(parameterJobSortColumn).map { sortColumn =>
+ UIUtils.decodeURLParameter(sortColumn)
+ }.getOrElse(jobIdTitle)
+ val jobSortDesc = Option(parameterJobSortDesc).map(_.toBoolean).getOrElse(
+ // New jobs should be shown above old jobs by default.
+ jobSortColumn == jobIdTitle
+ )
+ val jobPageSize = Option(parameterJobPageSize).map(_.toInt).getOrElse(100)
+ val jobPrevPageSize = Option(parameterJobPrevPageSize).map(_.toInt).getOrElse(jobPageSize)
+
+ val page: Int = {
+ // If the user has changed to a larger page size, then go to page 1 in order to avoid
+ // IndexOutOfBoundsException.
+ if (jobPageSize <= jobPrevPageSize) {
+ jobPage
+ } else {
+ 1
+ }
+ }
+ val currentTime = System.currentTimeMillis()
+
+ try {
+ new JobPagedTable(
+ store,
+ jobs,
+ tableHeaderId,
+ jobTag,
+ UIUtils.prependBaseUri(basePath),
+ "jobs", // subPath
+ parameterOtherTable,
+ killEnabled,
+ currentTime,
+ jobIdTitle,
+ pageSize = jobPageSize,
+ sortColumn = jobSortColumn,
+ desc = jobSortDesc
+ ).table(page)
+ } catch {
+ case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) =>
+
+
Error while rendering job table:
+
+ {Utils.exceptionString(e)}
+
+
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index 0f20eea73504..02bf199e1ff3 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -723,6 +723,48 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
}
}
+ test("jobs with job group id should show correct link to job group page on all jobs page") {
+ withSpark(newSparkContext()) { sc =>
+ val ui = sc.ui.get
+ // Once at least one job has been run in a job group, then we should display the link to
+ // job group page
+ val jobGroupId = "my-job-group"
+ sc.setJobGroup(jobGroupId, "my-job-group-description")
+ // Create an RDD that involves multiple stages
+ // :
+ val rdd =
+ sc.parallelize(Seq(1, 2, 3)).map(identity).groupBy(identity).map(identity).groupBy(identity)
+ // Run it twice; this will cause the second job to have two "phantom" stages that were
+ // mentioned in its job start event but which were never actually executed:
+ rdd.count()
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ goToUi(ui, "/jobs")
+ findAll(cssSelector("tbody tr")).foreach { row =>
+ val links = row.underlying.findElements(By.xpath(".//a"))
+ links.size should be (2)
+ links.get(0).getText().toLowerCase should include (jobGroupId)
+ links.get(0).getAttribute("href") should include regex (
+ s"(?=.*jobgroup)(?=.*${jobGroupId})")
+ links.get(1).getText().toLowerCase should include ("count")
+ }
+ }
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ goToUi(ui, s"/jobs/jobgroup/?id=${jobGroupId}")
+ find(id("pending")) should be (None)
+ find(id("active")) should be (None)
+ find(id("failed")) should be (None)
+ find(id("completed")).get.text should be ("Completed Jobs (1)")
+ findAll(cssSelector("tbody tr")).foreach { row =>
+ val links = row.underlying.findElements(By.xpath(".//a"))
+ links.size should be (2)
+ links.get(0).getText().toLowerCase should include (jobGroupId)
+ links.get(1).getText().toLowerCase should include ("count")
+ }
+ }
+ }
+ }
+
def goToUi(sc: SparkContext, path: String): Unit = {
goToUi(sc.ui.get, path)
}