diff --git a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
index 9b6ed8cbbef1..2a7c16b04bf7 100644
--- a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
@@ -179,6 +179,7 @@ private[ui] trait PagedTable[T] {
Splitter
.on('&')
.trimResults()
+ .omitEmptyStrings()
.withKeyValueSeparator("=")
.split(querystring)
.asScala
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index 035d70601c8b..e5363ce8ca9d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -17,17 +17,21 @@
package org.apache.spark.ui.jobs
+import java.net.URLEncoder
import java.util.Date
import javax.servlet.http.HttpServletRequest
+import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer}
import scala.xml._
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.JobExecutionStatus
-import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
-import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData}
+import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.ui._
+import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData, StageUIData}
+import org.apache.spark.util.Utils
/** Page showing list of all ongoing and recently finished jobs */
private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
@@ -210,64 +214,69 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
}
- private def jobsTable(jobs: Seq[JobUIData]): Seq[Node] = {
+ private def jobsTable(
+ request: HttpServletRequest,
+ jobTag: String,
+ jobs: Seq[JobUIData]): Seq[Node] = {
+ val allParameters = request.getParameterMap.asScala.toMap
+ val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag))
+ .map(para => para._1 + "=" + para._2(0))
+
val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined)
+ val jobIdTitle = if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"
- val columns: Seq[Node] = {
-
{if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"} |
- Description |
- Submitted |
- Duration |
- Stages: Succeeded/Total |
- Tasks (for all stages): Succeeded/Total |
- }
+ val parameterJobPage = request.getParameter(jobTag + ".page")
+ val parameterJobSortColumn = request.getParameter(jobTag + ".sort")
+ val parameterJobSortDesc = request.getParameter(jobTag + ".desc")
+ val parameterJobPageSize = request.getParameter(jobTag + ".pageSize")
+ val parameterJobPrevPageSize = request.getParameter(jobTag + ".prevPageSize")
- def makeRow(job: JobUIData): Seq[Node] = {
- val (lastStageName, lastStageDescription) = getLastStageNameAndDescription(job)
- val duration: Option[Long] = {
- job.submissionTime.map { start =>
- val end = job.completionTime.getOrElse(System.currentTimeMillis())
- end - start
- }
+ val jobPage = Option(parameterJobPage).map(_.toInt).getOrElse(1)
+ val jobSortColumn = Option(parameterJobSortColumn).map { sortColumn =>
+ UIUtils.decodeURLParameter(sortColumn)
+ }.getOrElse(jobIdTitle)
+ val jobSortDesc = Option(parameterJobSortDesc).map(_.toBoolean).getOrElse(
+ // New jobs should be shown above old jobs by default.
+ if (jobSortColumn == jobIdTitle) true else false
+ )
+ val jobPageSize = Option(parameterJobPageSize).map(_.toInt).getOrElse(100)
+ val jobPrevPageSize = Option(parameterJobPrevPageSize).map(_.toInt).getOrElse(jobPageSize)
+
+ val page: Int = {
+ // If the user has changed to a larger page size, then go to page 1 in order to avoid
+ // IndexOutOfBoundsException.
+ if (jobPageSize <= jobPrevPageSize) {
+ jobPage
+ } else {
+ 1
}
- val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
- val formattedSubmissionTime = job.submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
- val basePathUri = UIUtils.prependBaseUri(parent.basePath)
- val jobDescription =
- UIUtils.makeDescription(lastStageDescription, basePathUri, plainText = false)
-
- val detailUrl = "%s/jobs/job?id=%s".format(basePathUri, job.jobId)
-
- |
- {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
- |
-
- {jobDescription}
- {lastStageName}
- |
-
- {formattedSubmissionTime}
- |
- {formattedDuration} |
-
- {job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages}
- {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
- {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
- |
-
- {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,
- failed = job.numFailedTasks, skipped = job.numSkippedTasks, killed = job.numKilledTasks,
- total = job.numTasks - job.numSkippedTasks)}
- |
-
}
+ val currentTime = System.currentTimeMillis()
-
- {columns}
-
- {jobs.map(makeRow)}
-
-
+ try {
+ new JobPagedTable(
+ jobs,
+ jobTag,
+ UIUtils.prependBaseUri(parent.basePath),
+ "jobs", // subPath
+ parameterOtherTable,
+ parent.jobProgresslistener.stageIdToInfo,
+ parent.jobProgresslistener.stageIdToData,
+ currentTime,
+ jobIdTitle,
+ pageSize = jobPageSize,
+ sortColumn = jobSortColumn,
+ desc = jobSortDesc
+ ).table(page)
+ } catch {
+ case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) =>
+
+
Error while rendering job table:
+
+ {Utils.exceptionString(e)}
+
+
+ }
}
def render(request: HttpServletRequest): Seq[Node] = {
@@ -279,12 +288,9 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
val completedJobs = listener.completedJobs.reverse.toSeq
val failedJobs = listener.failedJobs.reverse.toSeq
- val activeJobsTable =
- jobsTable(activeJobs.sortBy(_.submissionTime.getOrElse(-1L)).reverse)
- val completedJobsTable =
- jobsTable(completedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
- val failedJobsTable =
- jobsTable(failedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
+ val activeJobsTable = jobsTable(request, "activeJob", activeJobs)
+ val completedJobsTable = jobsTable(request, "completedJob", completedJobs)
+ val failedJobsTable = jobsTable(request, "failedJob", failedJobs)
val shouldShowActiveJobs = activeJobs.nonEmpty
val shouldShowCompletedJobs = completedJobs.nonEmpty
@@ -369,3 +375,246 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
}
}
}
+
+private[ui] class JobTableRowData(
+ val jobData: JobUIData,
+ val lastStageName: String,
+ val lastStageDescription: String,
+ val duration: Long,
+ val formattedDuration: String,
+ val submissionTime: Long,
+ val formattedSubmissionTime: String,
+ val jobDescription: NodeSeq,
+ val detailUrl: String)
+
+private[ui] class JobDataSource(
+ jobs: Seq[JobUIData],
+ stageIdToInfo: HashMap[Int, StageInfo],
+ stageIdToData: HashMap[(Int, Int), StageUIData],
+ basePath: String,
+ currentTime: Long,
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean) extends PagedDataSource[JobTableRowData](pageSize) {
+
+ // Convert JobUIData to JobTableRowData which contains the final contents to show in the table
+ // so that we can avoid creating duplicate contents during sorting the data
+ private val data = jobs.map(jobRow).sorted(ordering(sortColumn, desc))
+
+ private var _slicedJobIds: Set[Int] = null
+
+ override def dataSize: Int = data.size
+
+ override def sliceData(from: Int, to: Int): Seq[JobTableRowData] = {
+ val r = data.slice(from, to)
+ _slicedJobIds = r.map(_.jobData.jobId).toSet
+ r
+ }
+
+ private def getLastStageNameAndDescription(job: JobUIData): (String, String) = {
+ val lastStageInfo = Option(job.stageIds)
+ .filter(_.nonEmpty)
+ .flatMap { ids => stageIdToInfo.get(ids.max)}
+ val lastStageData = lastStageInfo.flatMap { s =>
+ stageIdToData.get((s.stageId, s.attemptId))
+ }
+ val name = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
+ val description = lastStageData.flatMap(_.description).getOrElse("")
+ (name, description)
+ }
+
+ private def jobRow(jobData: JobUIData): JobTableRowData = {
+ val (lastStageName, lastStageDescription) = getLastStageNameAndDescription(jobData)
+ val duration: Option[Long] = {
+ jobData.submissionTime.map { start =>
+ val end = jobData.completionTime.getOrElse(System.currentTimeMillis())
+ end - start
+ }
+ }
+ val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
+ val submissionTime = jobData.submissionTime
+ val formattedSubmissionTime = submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
+ val jobDescription = UIUtils.makeDescription(lastStageDescription, basePath, plainText = false)
+
+ val detailUrl = "%s/jobs/job?id=%s".format(basePath, jobData.jobId)
+
+ new JobTableRowData (
+ jobData,
+ lastStageName,
+ lastStageDescription,
+ duration.getOrElse(-1),
+ formattedDuration,
+ submissionTime.getOrElse(-1),
+ formattedSubmissionTime,
+ jobDescription,
+ detailUrl
+ )
+ }
+
+ /**
+ * Return Ordering according to sortColumn and desc
+ */
+ private def ordering(sortColumn: String, desc: Boolean): Ordering[JobTableRowData] = {
+ val ordering = sortColumn match {
+ case "Job Id" | "Job Id (Job Group)" => new Ordering[JobTableRowData] {
+ override def compare(x: JobTableRowData, y: JobTableRowData): Int =
+ Ordering.Int.compare(x.jobData.jobId, y.jobData.jobId)
+ }
+ case "Description" => new Ordering[JobTableRowData] {
+ override def compare(x: JobTableRowData, y: JobTableRowData): Int =
+ Ordering.String.compare(x.lastStageDescription, y.lastStageDescription)
+ }
+ case "Submitted" => new Ordering[JobTableRowData] {
+ override def compare(x: JobTableRowData, y: JobTableRowData): Int =
+ Ordering.Long.compare(x.submissionTime, y.submissionTime)
+ }
+ case "Duration" => new Ordering[JobTableRowData] {
+ override def compare(x: JobTableRowData, y: JobTableRowData): Int =
+ Ordering.Long.compare(x.duration, y.duration)
+ }
+ case "Stages: Succeeded/Total" | "Tasks (for all stages): Succeeded/Total" =>
+ throw new IllegalArgumentException(s"Unsortable column: $sortColumn")
+ case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
+ }
+ if (desc) {
+ ordering.reverse
+ } else {
+ ordering
+ }
+ }
+
+}
+private[ui] class JobPagedTable(
+ data: Seq[JobUIData],
+ jobTag: String,
+ basePath: String,
+ subPath: String,
+ parameterOtherTable: Iterable[String],
+ stageIdToInfo: HashMap[Int, StageInfo],
+ stageIdToData: HashMap[(Int, Int), StageUIData],
+ currentTime: Long,
+ jobIdTitle: String,
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean
+ ) extends PagedTable[JobTableRowData] {
+ val parameterPath = UIUtils.prependBaseUri(basePath) + s"/$subPath/?" +
+ parameterOtherTable.mkString("&")
+
+ override def tableId: String = jobTag + "-table"
+
+ override def tableCssClass: String =
+ "table table-bordered table-condensed table-striped table-head-clickable"
+
+ override def pageSizeFormField: String = jobTag + ".pageSize"
+
+ override def prevPageSizeFormField: String = jobTag + ".prevPageSize"
+
+ override def pageNumberFormField: String = jobTag + ".page"
+
+ override val dataSource = new JobDataSource(
+ data,
+ stageIdToInfo,
+ stageIdToData,
+ basePath,
+ currentTime,
+ pageSize,
+ sortColumn,
+ desc)
+
+ override def pageLink(page: Int): String = {
+ val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+ parameterPath +
+ s"&$pageNumberFormField=$page" +
+ s"&$jobTag.sort=$encodedSortColumn" +
+ s"&$jobTag.desc=$desc" +
+ s"&$pageSizeFormField=$pageSize"
+ }
+
+ override def goButtonFormPath: String = {
+ val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+ s"$parameterPath&$jobTag.sort=$encodedSortColumn&$jobTag.desc=$desc"
+ }
+
+ override def headers: Seq[Node] = {
+ // Information for each header: title, cssClass, and sortable
+ val jobHeadersAndCssClasses: Seq[(String, String, Boolean)] =
+ Seq(
+ (jobIdTitle, "", true),
+ ("Description", "", true), ("Submitted", "", true), ("Duration", "", true),
+ ("Stages: Succeeded/Total", "", false),
+ ("Tasks (for all stages): Succeeded/Total", "", false)
+ )
+
+ if (!jobHeadersAndCssClasses.filter(_._3).map(_._1).contains(sortColumn)) {
+ throw new IllegalArgumentException(s"Unknown column: $sortColumn")
+ }
+
+ val headerRow: Seq[Node] = {
+ jobHeadersAndCssClasses.map { case (header, cssClass, sortable) =>
+ if (header == sortColumn) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$jobTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
+ s"&$jobTag.desc=${!desc}" +
+ s"&$jobTag.pageSize=$pageSize")
+ val arrow = if (desc) "▾" else "▴" // UP or DOWN
+
+
+
+ {header}
+ {Unparsed(arrow)}
+
+
+ |
+ } else {
+ if (sortable) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$jobTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
+ s"&$jobTag.pageSize=$pageSize")
+
+
+
+ {header}
+
+ |
+ } else {
+
+ {header}
+ |
+ }
+ }
+ }
+ }
+ {headerRow}
+ }
+
+ override def row(jobTableRow: JobTableRowData): Seq[Node] = {
+ val job = jobTableRow.jobData
+
+
+ |
+ {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
+ |
+
+ {jobTableRow.jobDescription}
+ {jobTableRow.lastStageName}
+ |
+
+ {jobTableRow.formattedSubmissionTime}
+ |
+ {jobTableRow.formattedDuration} |
+
+ {job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages}
+ {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
+ {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
+ |
+
+ {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,
+ failed = job.numFailedTasks, skipped = job.numSkippedTasks, killed = job.numKilledTasks,
+ total = job.numTasks - job.numSkippedTasks)}
+ |
+
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index b0a35fe8c331..fd12a21b7927 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -218,7 +218,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
eventually(timeout(5 seconds), interval(50 milliseconds)) {
goToUi(sc, "/jobs")
val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
- tableHeaders should not contain "Job Id (Job Group)"
+ tableHeaders(0) should not startWith "Job Id (Job Group)"
}
// Once at least one job has been run in a job group, then we should display the group name:
sc.setJobGroup("my-job-group", "my-job-group-description")
@@ -226,7 +226,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
eventually(timeout(5 seconds), interval(50 milliseconds)) {
goToUi(sc, "/jobs")
val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
- tableHeaders should contain ("Job Id (Job Group)")
+ // Can suffix up/down arrow in the header
+ tableHeaders(0) should startWith ("Job Id (Job Group)")
}
val jobJson = getJson(sc.ui.get, "jobs")