diff --git a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
index 9b6ed8cbbef1..2a7c16b04bf7 100644
--- a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
@@ -179,6 +179,7 @@ private[ui] trait PagedTable[T] {
Splitter
.on('&')
.trimResults()
+ .omitEmptyStrings()
.withKeyValueSeparator("=")
.split(querystring)
.asScala
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
index e75f1c57a69d..cba8f82dd77a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
@@ -38,22 +38,24 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
val numCompletedStages = listener.numCompletedStages
val failedStages = listener.failedStages.reverse.toSeq
val numFailedStages = listener.numFailedStages
- val now = System.currentTimeMillis
+ val subPath = "stages"
val activeStagesTable =
- new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
- parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler,
- killEnabled = parent.killEnabled)
+ new StageTableBase(request, activeStages, "activeStage", parent.basePath, subPath,
+ parent.progressListener, parent.isFairScheduler,
+ killEnabled = parent.killEnabled, isFailedStage = false)
val pendingStagesTable =
- new StageTableBase(pendingStages.sortBy(_.submissionTime).reverse,
- parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler,
- killEnabled = false)
+ new StageTableBase(request, pendingStages, "pendingStage", parent.basePath, subPath,
+ parent.progressListener, parent.isFairScheduler,
+ killEnabled = false, isFailedStage = false)
val completedStagesTable =
- new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath,
- parent.progressListener, isFairScheduler = parent.isFairScheduler, killEnabled = false)
+ new StageTableBase(request, completedStages, "completedStage", parent.basePath, subPath,
+ parent.progressListener, parent.isFairScheduler,
+ killEnabled = false, isFailedStage = false)
val failedStagesTable =
- new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent.basePath,
- parent.progressListener, isFairScheduler = parent.isFairScheduler)
+ new StageTableBase(request, failedStages, "failedStage", parent.basePath, subPath,
+ parent.progressListener, parent.isFairScheduler,
+ killEnabled = false, isFailedStage = true)
// For now, pool information is only accessible in live UIs
val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable])
@@ -136,3 +138,4 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
}
}
}
+
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index 99f2bd8bc1f2..0ec42d68d3dc 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -229,20 +229,24 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
}
}
+ val basePath = "jobs/job"
+
val activeStagesTable =
- new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
- parent.basePath, parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler,
- killEnabled = parent.killEnabled)
+ new StageTableBase(request, activeStages, "activeStage", parent.basePath,
+ basePath, parent.jobProgresslistener, parent.isFairScheduler,
+ killEnabled = parent.killEnabled, isFailedStage = false)
val pendingOrSkippedStagesTable =
- new StageTableBase(pendingOrSkippedStages.sortBy(_.stageId).reverse,
- parent.basePath, parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler,
- killEnabled = false)
+ new StageTableBase(request, pendingOrSkippedStages, "pendingStage", parent.basePath,
+ basePath, parent.jobProgresslistener, parent.isFairScheduler,
+ killEnabled = false, isFailedStage = false)
val completedStagesTable =
- new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath,
- parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler, killEnabled = false)
+ new StageTableBase(request, completedStages, "completedStage", parent.basePath,
+ basePath, parent.jobProgresslistener, parent.isFairScheduler,
+ killEnabled = false, isFailedStage = false)
val failedStagesTable =
- new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent.basePath,
- parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler)
+ new StageTableBase(request, failedStages, "failedStage", parent.basePath,
+ basePath, parent.jobProgresslistener, parent.isFairScheduler,
+ killEnabled = false, isFailedStage = true)
val shouldShowActiveStages = activeStages.nonEmpty
val shouldShowPendingStages = !isComplete && pendingOrSkippedStages.nonEmpty
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
index 6cd25919ca5f..f9cb71791859 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -42,9 +42,11 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") {
case Some(s) => s.values.toSeq
case None => Seq[StageInfo]()
}
- val activeStagesTable = new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
- parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler,
- killEnabled = parent.killEnabled)
+ val shouldShowActiveStages = activeStages.nonEmpty
+ val activeStagesTable =
+ new StageTableBase(request, activeStages, "activeStage", parent.basePath, "stages/pool",
+ parent.progressListener, parent.isFairScheduler, parent.killEnabled,
+ isFailedStage = false)
// For now, pool information is only accessible in live UIs
val pools = sc.map(_.getPoolForName(poolName).getOrElse {
@@ -52,9 +54,10 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") {
}).toSeq
val poolTable = new PoolTable(pools, parent)
- val content =
-
Summary
++ poolTable.toNodeSeq ++
- {activeStages.size} Active Stages
++ activeStagesTable.toNodeSeq
+ var content = Summary
++ poolTable.toNodeSeq
+ if (shouldShowActiveStages) {
+ content ++= {activeStages.size} Active Stages
++ activeStagesTable.toNodeSeq
+ }
UIUtils.headerSparkPage("Fair Scheduler Pool: " + poolName, content, parent)
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 0e020155a656..2a04e8fc7d00 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -17,61 +17,326 @@
package org.apache.spark.ui.jobs
+import java.net.URLEncoder
import java.util.Date
+import javax.servlet.http.HttpServletRequest
-import scala.xml.{Node, Text}
+import scala.collection.JavaConverters._
+import scala.xml._
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.scheduler.StageInfo
-import org.apache.spark.ui.{ToolTips, UIUtils}
+import org.apache.spark.ui._
+import org.apache.spark.ui.jobs.UIData.StageUIData
import org.apache.spark.util.Utils
-/** Page showing list of all ongoing and recently finished stages */
private[ui] class StageTableBase(
+ request: HttpServletRequest,
+ stages: Seq[StageInfo],
+ stageTag: String,
+ basePath: String,
+ subPath: String,
+ progressListener: JobProgressListener,
+ isFairScheduler: Boolean,
+ killEnabled: Boolean,
+ isFailedStage: Boolean) {
+ val allParameters = request.getParameterMap().asScala.toMap
+ val parameterOtherTable = allParameters.filterNot(_._1.startsWith(stageTag))
+ .map(para => para._1 + "=" + para._2(0))
+
+ val parameterStagePage = request.getParameter(stageTag + ".page")
+ val parameterStageSortColumn = request.getParameter(stageTag + ".sort")
+ val parameterStageSortDesc = request.getParameter(stageTag + ".desc")
+ val parameterStagePageSize = request.getParameter(stageTag + ".pageSize")
+ val parameterStagePrevPageSize = request.getParameter(stageTag + ".prevPageSize")
+
+ val stagePage = Option(parameterStagePage).map(_.toInt).getOrElse(1)
+ val stageSortColumn = Option(parameterStageSortColumn).map { sortColumn =>
+ UIUtils.decodeURLParameter(sortColumn)
+ }.getOrElse("Stage Id")
+ val stageSortDesc = Option(parameterStageSortDesc).map(_.toBoolean).getOrElse(
+ // New stages should be shown above old jobs by default.
+ if (stageSortColumn == "Stage Id") true else false
+ )
+ val stagePageSize = Option(parameterStagePageSize).map(_.toInt).getOrElse(100)
+ val stagePrevPageSize = Option(parameterStagePrevPageSize).map(_.toInt)
+ .getOrElse(stagePageSize)
+
+ val page: Int = {
+ // If the user has changed to a larger page size, then go to page 1 in order to avoid
+ // IndexOutOfBoundsException.
+ if (stagePageSize <= stagePrevPageSize) {
+ stagePage
+ } else {
+ 1
+ }
+ }
+ val currentTime = System.currentTimeMillis()
+
+ val toNodeSeq = try {
+ new StagePagedTable(
+ stages,
+ stageTag,
+ basePath,
+ subPath,
+ progressListener,
+ isFairScheduler,
+ killEnabled,
+ currentTime,
+ stagePageSize,
+ stageSortColumn,
+ stageSortDesc,
+ isFailedStage,
+ parameterOtherTable
+ ).table(page)
+ } catch {
+ case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) =>
+
+
Error while rendering stage table:
+
+ {Utils.exceptionString(e)}
+
+
+ }
+}
+
+private[ui] class StageTableRowData(
+ val stageInfo: StageInfo,
+ val stageData: Option[StageUIData],
+ val stageId: Int,
+ val attemptId: Int,
+ val schedulingPool: String,
+ val description: String,
+ val descriptionOption: Option[String],
+ val submissionTime: Long,
+ val formattedSubmissionTime: String,
+ val duration: Long,
+ val formattedDuration: String,
+ val inputRead: Long,
+ val inputReadWithUnit: String,
+ val outputWrite: Long,
+ val outputWriteWithUnit: String,
+ val shuffleRead: Long,
+ val shuffleReadWithUnit: String,
+ val shuffleWrite: Long,
+ val shuffleWriteWithUnit: String)
+
+private[ui] class MissingStageTableRowData(
+ stageInfo: StageInfo,
+ stageId: Int,
+ attemptId: Int) extends StageTableRowData(
+ stageInfo, None, stageId, attemptId, "", "", None, 0, "", -1, "", 0, "", 0, "", 0, "", 0, "")
+
+/** Page showing list of all ongoing and recently finished stages */
+private[ui] class StagePagedTable(
stages: Seq[StageInfo],
+ stageTag: String,
basePath: String,
+ subPath: String,
listener: JobProgressListener,
isFairScheduler: Boolean,
- killEnabled: Boolean) {
-
- protected def columns: Seq[Node] = {
- Stage Id | ++
- {if (isFairScheduler) {Pool Name | } else Seq.empty} ++
- Description |
- Submitted |
- Duration |
- Tasks: Succeeded/Total |
- Input |
- Output |
- Shuffle Read |
-
-
-
- Shuffle Write
-
- |
+ killEnabled: Boolean,
+ currentTime: Long,
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean,
+ isFailedStage: Boolean,
+ parameterOtherTable: Iterable[String]) extends PagedTable[StageTableRowData] {
+
+ override def tableId: String = stageTag + "-table"
+
+ override def tableCssClass: String =
+ "table table-bordered table-condensed table-striped table-head-clickable"
+
+ override def pageSizeFormField: String = stageTag + ".pageSize"
+
+ override def prevPageSizeFormField: String = stageTag + ".prevPageSize"
+
+ override def pageNumberFormField: String = stageTag + ".page"
+
+ val parameterPath = UIUtils.prependBaseUri(basePath) + s"/$subPath/?" +
+ parameterOtherTable.mkString("&")
+
+ override val dataSource = new StageDataSource(
+ stages,
+ listener,
+ currentTime,
+ pageSize,
+ sortColumn,
+ desc
+ )
+
+ override def pageLink(page: Int): String = {
+ val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+ parameterPath +
+ s"&$pageNumberFormField=$page" +
+ s"&$stageTag.sort=$encodedSortColumn" +
+ s"&$stageTag.desc=$desc" +
+ s"&$pageSizeFormField=$pageSize"
+ }
+
+ override def goButtonFormPath: String = {
+ val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+ s"$parameterPath&$stageTag.sort=$encodedSortColumn&$stageTag.desc=$desc"
}
- def toNodeSeq: Seq[Node] = {
- listener.synchronized {
- stageTable(renderStageRow, stages)
+ override def headers: Seq[Node] = {
+ // stageHeadersAndCssClasses has three parts: header title, tooltip information, and sortable.
+ // The tooltip information could be None, which indicates it does not have a tooltip.
+ // Otherwise, it has two parts: tooltip text, and position (true for left, false for default).
+ val stageHeadersAndCssClasses: Seq[(String, Option[(String, Boolean)], Boolean)] =
+ Seq(("Stage Id", None, true)) ++
+ {if (isFairScheduler) {Seq(("Pool Name", None, true))} else Seq.empty} ++
+ Seq(
+ ("Description", None, true), ("Submitted", None, true), ("Duration", None, true),
+ ("Tasks: Succeeded/Total", None, false),
+ ("Input", Some((ToolTips.INPUT, false)), true),
+ ("Output", Some((ToolTips.OUTPUT, false)), true),
+ ("Shuffle Read", Some((ToolTips.SHUFFLE_READ, false)), true),
+ ("Shuffle Write", Some((ToolTips.SHUFFLE_WRITE, true)), true)
+ ) ++
+ {if (isFailedStage) {Seq(("Failure Reason", None, false))} else Seq.empty}
+
+ if (!stageHeadersAndCssClasses.filter(_._3).map(_._1).contains(sortColumn)) {
+ throw new IllegalArgumentException(s"Unknown column: $sortColumn")
}
+
+ val headerRow: Seq[Node] = {
+ stageHeadersAndCssClasses.map { case (header, tooltip, sortable) =>
+ val headerSpan = tooltip.map { case (title, left) =>
+ if (left) {
+ /* Place the shuffle write tooltip on the left (rather than the default position
+ of on top) because the shuffle write column is the last column on the right side and
+ the tooltip is wider than the column, so it doesn't fit on top. */
+
+ {header}
+
+ } else {
+
+ {header}
+
+ }
+ }.getOrElse(
+ {header}
+ )
+
+ if (header == sortColumn) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
+ s"&$stageTag.desc=${!desc}" +
+ s"&$stageTag.pageSize=$pageSize")
+ val arrow = if (desc) "▾" else "▴" // UP or DOWN
+
+
+
+ {headerSpan}
+ {Unparsed(arrow)}
+
+
+ |
+ } else {
+ if (sortable) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
+ s"&$stageTag.pageSize=$pageSize")
+
+
+
+ {headerSpan}
+
+ |
+ } else {
+
+ {headerSpan}
+ |
+ }
+ }
+ }
+ }
+ {headerRow}
+ }
+
+ override def row(data: StageTableRowData): Seq[Node] = {
+
+ {rowContent(data)}
+
}
- /** Special table that merges two header cells. */
- protected def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
-
- {columns}
-
- {rows.map(r => makeRow(r))}
-
-
+ private def rowContent(data: StageTableRowData): Seq[Node] = {
+ data.stageData match {
+ case None => missingStageRow(data.stageId)
+ case Some(stageData) =>
+ val info = data.stageInfo
+
+ {if (data.attemptId > 0) {
+ {data.stageId} (retry {data.attemptId}) |
+ } else {
+ {data.stageId} |
+ }} ++
+ {if (isFairScheduler) {
+
+
+ {data.schedulingPool}
+
+ |
+ } else {
+ Seq.empty
+ }} ++
+ {makeDescription(info, data.descriptionOption)} |
+
+ {data.formattedSubmissionTime}
+ |
+ {data.formattedDuration} |
+
+ {UIUtils.makeProgressBar(started = stageData.numActiveTasks,
+ completed = stageData.completedIndices.size, failed = stageData.numFailedTasks,
+ skipped = 0, killed = stageData.numKilledTasks, total = info.numTasks)}
+ |
+ {data.inputReadWithUnit} |
+ {data.outputWriteWithUnit} |
+ {data.shuffleReadWithUnit} |
+ {data.shuffleWriteWithUnit} | ++
+ {
+ if (isFailedStage) {
+ failureReasonHtml(info)
+ } else {
+ Seq.empty
+ }
+ }
+ }
}
- private def makeDescription(s: StageInfo): Seq[Node] = {
+ private def failureReasonHtml(s: StageInfo): Seq[Node] = {
+ val failureReason = s.failureReason.getOrElse("")
+ val isMultiline = failureReason.indexOf('\n') >= 0
+ // Display the first line by default
+ val failureReasonSummary = StringEscapeUtils.escapeHtml4(
+ if (isMultiline) {
+ failureReason.substring(0, failureReason.indexOf('\n'))
+ } else {
+ failureReason
+ })
+ val details = if (isMultiline) {
+ // scalastyle:off
+
+ +details
+ ++
+
+ // scalastyle:on
+ } else {
+ ""
+ }
+ {failureReasonSummary}{details} |
+ }
+
+ private def makeDescription(s: StageInfo, descriptionOption: Option[String]): Seq[Node] = {
val basePathUri = UIUtils.prependBaseUri(basePath)
val killLink = if (killEnabled) {
@@ -111,12 +376,7 @@ private[ui] class StageTableBase(
}
- val stageDesc = for {
- stageData <- listener.stageIdToData.get((s.stageId, s.attemptId))
- desc <- stageData.description
- } yield {
- UIUtils.makeDescription(desc, basePathUri)
- }
+ val stageDesc = descriptionOption.map(UIUtils.makeDescription(_, basePathUri))
{stageDesc.getOrElse("")} {killLink} {nameLink} {details}
}
@@ -132,19 +392,44 @@ private[ui] class StageTableBase(
| ++ // Shuffle Read
| // Shuffle Write
}
+}
+
+private[ui] class StageDataSource(
+ stages: Seq[StageInfo],
+ listener: JobProgressListener,
+ currentTime: Long,
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean) extends PagedDataSource[StageTableRowData](pageSize) {
+ // Convert StageInfo to StageTableRowData which contains the final contents to show in the table
+ // so that we can avoid creating duplicate contents during sorting the data
+ private val data = stages.map(stageRow).sorted(ordering(sortColumn, desc))
+
+ private var _slicedStageIds: Set[Int] = null
- protected def stageRow(s: StageInfo): Seq[Node] = {
+ override def dataSize: Int = data.size
+
+ override def sliceData(from: Int, to: Int): Seq[StageTableRowData] = {
+ val r = data.slice(from, to)
+ _slicedStageIds = r.map(_.stageId).toSet
+ r
+ }
+
+ private def stageRow(s: StageInfo): StageTableRowData = {
val stageDataOption = listener.stageIdToData.get((s.stageId, s.attemptId))
+
if (stageDataOption.isEmpty) {
- return missingStageRow(s.stageId)
+ return new MissingStageTableRowData(s, s.stageId, s.attemptId)
}
-
val stageData = stageDataOption.get
- val submissionTime = s.submissionTime match {
+
+ val description = stageData.description
+
+ val formattedSubmissionTime = s.submissionTime match {
case Some(t) => UIUtils.formatDate(new Date(t))
case None => "Unknown"
}
- val finishTime = s.completionTime.getOrElse(System.currentTimeMillis)
+ val finishTime = s.completionTime.getOrElse(currentTime)
// The submission time for a stage is misleading because it counts the time
// the stage waits to be launched. (SPARK-10930)
@@ -156,7 +441,7 @@ private[ui] class StageTableBase(
if (finishTime > startTime) {
Some(finishTime - startTime)
} else {
- Some(System.currentTimeMillis() - startTime)
+ Some(currentTime - startTime)
}
} else {
None
@@ -172,76 +457,80 @@ private[ui] class StageTableBase(
val shuffleWrite = stageData.shuffleWriteBytes
val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else ""
- {if (s.attemptId > 0) {
- {s.stageId} (retry {s.attemptId}) |
- } else {
- {s.stageId} |
- }} ++
- {if (isFairScheduler) {
-
-
- {stageData.schedulingPool}
-
- |
- } else {
- Seq.empty
- }} ++
- {makeDescription(s)} |
-
- {submissionTime}
- |
- {formattedDuration} |
-
- {UIUtils.makeProgressBar(started = stageData.numActiveTasks,
- completed = stageData.completedIndices.size, failed = stageData.numFailedTasks,
- skipped = 0, killed = stageData.numKilledTasks, total = s.numTasks)}
- |
- {inputReadWithUnit} |
- {outputWriteWithUnit} |
- {shuffleReadWithUnit} |
- {shuffleWriteWithUnit} |
- }
- /** Render an HTML row that represents a stage */
- private def renderStageRow(s: StageInfo): Seq[Node] =
- {stageRow(s)}
-}
-
-private[ui] class FailedStageTable(
- stages: Seq[StageInfo],
- basePath: String,
- listener: JobProgressListener,
- isFairScheduler: Boolean)
- extends StageTableBase(stages, basePath, listener, isFairScheduler, killEnabled = false) {
-
- override protected def columns: Seq[Node] = super.columns ++ Failure Reason |
+ new StageTableRowData(
+ s,
+ stageDataOption,
+ s.stageId,
+ s.attemptId,
+ stageData.schedulingPool,
+ description.getOrElse(""),
+ description,
+ s.submissionTime.getOrElse(0),
+ formattedSubmissionTime,
+ duration.getOrElse(-1),
+ formattedDuration,
+ inputRead,
+ inputReadWithUnit,
+ outputWrite,
+ outputWriteWithUnit,
+ shuffleRead,
+ shuffleReadWithUnit,
+ shuffleWrite,
+ shuffleWriteWithUnit
+ )
+ }
- override protected def stageRow(s: StageInfo): Seq[Node] = {
- val basicColumns = super.stageRow(s)
- val failureReason = s.failureReason.getOrElse("")
- val isMultiline = failureReason.indexOf('\n') >= 0
- // Display the first line by default
- val failureReasonSummary = StringEscapeUtils.escapeHtml4(
- if (isMultiline) {
- failureReason.substring(0, failureReason.indexOf('\n'))
- } else {
- failureReason
- })
- val details = if (isMultiline) {
- // scalastyle:off
-
- +details
- ++
-
- // scalastyle:on
+ /**
+ * Return Ordering according to sortColumn and desc
+ */
+ private def ordering(sortColumn: String, desc: Boolean): Ordering[StageTableRowData] = {
+ val ordering = sortColumn match {
+ case "Stage Id" => new Ordering[StageTableRowData] {
+ override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+ Ordering.Int.compare(x.stageId, y.stageId)
+ }
+ case "Pool Name" => new Ordering[StageTableRowData] {
+ override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+ Ordering.String.compare(x.schedulingPool, y.schedulingPool)
+ }
+ case "Description" => new Ordering[StageTableRowData] {
+ override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+ Ordering.String.compare(x.description, y.description)
+ }
+ case "Submitted" => new Ordering[StageTableRowData] {
+ override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+ Ordering.Long.compare(x.submissionTime, y.submissionTime)
+ }
+ case "Duration" => new Ordering[StageTableRowData] {
+ override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+ Ordering.Long.compare(x.duration, y.duration)
+ }
+ case "Input" => new Ordering[StageTableRowData] {
+ override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+ Ordering.Long.compare(x.inputRead, y.inputRead)
+ }
+ case "Output" => new Ordering[StageTableRowData] {
+ override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+ Ordering.Long.compare(x.outputWrite, y.outputWrite)
+ }
+ case "Shuffle Read" => new Ordering[StageTableRowData] {
+ override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+ Ordering.Long.compare(x.shuffleRead, y.shuffleRead)
+ }
+ case "Shuffle Write" => new Ordering[StageTableRowData] {
+ override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+ Ordering.Long.compare(x.shuffleWrite, y.shuffleWrite)
+ }
+ case "Tasks: Succeeded/Total" =>
+ throw new IllegalArgumentException(s"Unsortable column: $sortColumn")
+ case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
+ }
+ if (desc) {
+ ordering.reverse
} else {
- ""
+ ordering
}
- val failureReasonHtml = {failureReasonSummary}{details} |
- basicColumns ++ failureReasonHtml
}
}
+