From 4255c094a72699bdc08ce2f8ed09ba0401927d5c Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Fri, 17 Jun 2016 00:50:25 +0800 Subject: [PATCH 1/6] paginate stage table in stages tab --- .../apache/spark/ui/jobs/AllStagesPage.scala | 21 +- .../org/apache/spark/ui/jobs/JobPage.scala | 18 +- .../org/apache/spark/ui/jobs/PoolPage.scala | 6 +- .../org/apache/spark/ui/jobs/StageTable.scala | 503 ++++++++++++++---- 4 files changed, 409 insertions(+), 139 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index e75f1c57a69d..c95eedb7831f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -38,22 +38,19 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { val numCompletedStages = listener.numCompletedStages val failedStages = listener.failedStages.reverse.toSeq val numFailedStages = listener.numFailedStages - val now = System.currentTimeMillis val activeStagesTable = - new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, - parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler, - killEnabled = parent.killEnabled) + new StageTableBase(request, activeStages, "activeStage", parent.basePath, + parent.progressListener, parent.isFairScheduler, parent.killEnabled, false) val pendingStagesTable = - new StageTableBase(pendingStages.sortBy(_.submissionTime).reverse, - parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler, - killEnabled = false) + new StageTableBase(request, pendingStages, "pendingStage", parent.basePath, + parent.progressListener, parent.isFairScheduler, false, false) val completedStagesTable = - new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath, - parent.progressListener, isFairScheduler = parent.isFairScheduler, killEnabled = false) + new StageTableBase(request, completedStages, "completedStage", parent.basePath, + parent.progressListener, parent.isFairScheduler, false, false) val failedStagesTable = - new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent.basePath, - parent.progressListener, isFairScheduler = parent.isFairScheduler) + new StageTableBase(request, failedStages, "failedStage", parent.basePath, + parent.progressListener, parent.isFairScheduler, false, true) // For now, pool information is only accessible in live UIs val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable]) @@ -135,4 +132,4 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { UIUtils.headerSparkPage("Stages for All Jobs", content, parent) } } -} +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index 99f2bd8bc1f2..ca38d88e2a95 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -230,19 +230,17 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { } val activeStagesTable = - new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, - parent.basePath, parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler, - killEnabled = parent.killEnabled) + new StageTableBase(request, activeStages, "activeStage", parent.basePath, + parent.jobProgresslistener, parent.isFairScheduler, parent.killEnabled, false) val pendingOrSkippedStagesTable = - new StageTableBase(pendingOrSkippedStages.sortBy(_.stageId).reverse, - parent.basePath, parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler, - killEnabled = false) + new StageTableBase(request, pendingOrSkippedStages, "pendingStage", parent.basePath, + parent.jobProgresslistener, parent.isFairScheduler, false, false) val completedStagesTable = - new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath, - parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler, killEnabled = false) + new StageTableBase(request, completedStages, "completedStage", parent.basePath, + parent.jobProgresslistener, parent.isFairScheduler, false, false) val failedStagesTable = - new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent.basePath, - parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler) + new StageTableBase(request, failedStages, "failedStage", parent.basePath, + parent.jobProgresslistener, parent.isFairScheduler, false, true) val shouldShowActiveStages = activeStages.nonEmpty val shouldShowPendingStages = !isComplete && pendingOrSkippedStages.nonEmpty diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index 6cd25919ca5f..2ccdf359fedb 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -42,9 +42,9 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") { case Some(s) => s.values.toSeq case None => Seq[StageInfo]() } - val activeStagesTable = new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, - parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler, - killEnabled = parent.killEnabled) + val activeStagesTable = + new StageTableBase(request, activeStages, "activeStage", parent.basePath, + parent.progressListener, parent.isFairScheduler, parent.killEnabled, false) // For now, pool information is only accessible in live UIs val pools = sc.map(_.getPoolForName(poolName).getOrElse { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 0e020155a656..4fcfd460e9ab 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -17,61 +17,311 @@ package org.apache.spark.ui.jobs +import java.net.URLEncoder import java.util.Date +import javax.servlet.http.HttpServletRequest -import scala.xml.{Node, Text} +import scala.xml._ import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.scheduler.StageInfo -import org.apache.spark.ui.{ToolTips, UIUtils} +import org.apache.spark.ui._ +import org.apache.spark.ui.jobs.UIData.StageUIData import org.apache.spark.util.Utils -/** Page showing list of all ongoing and recently finished stages */ private[ui] class StageTableBase( + request: HttpServletRequest, stages: Seq[StageInfo], + stageTag: String, + basePath: String, + progressListener: JobProgressListener, + isFairScheduler: Boolean, + killEnabled: Boolean, + isFailedStage: Boolean) { + val parameterStagePage = request.getParameter(stageTag + ".page") + val parameterStageSortColumn = request.getParameter(stageTag + ".sort") + val parameterStageSortDesc = request.getParameter(stageTag + ".desc") + val parameterStagePageSize = request.getParameter(stageTag + ".pageSize") + val parameterStagePrevPageSize = request.getParameter(stageTag + ".prevPageSize") + + val stagePage = Option(parameterStagePage).map(_.toInt).getOrElse(1) + val stageSortColumn = Option(parameterStageSortColumn).map { sortColumn => + UIUtils.decodeURLParameter(sortColumn) + }.getOrElse("Stage Id") + val stageSortDesc = Option(parameterStageSortDesc).map(_.toBoolean).getOrElse( + // New stages should be shown above old jobs by default. + if (stageSortColumn == "Stage Id") true else false + ) + val stagePageSize = Option(parameterStagePageSize).map(_.toInt).getOrElse(100) + val stagePrevPageSize = Option(parameterStagePrevPageSize).map(_.toInt) + .getOrElse(stagePageSize) + + val page: Int = { + // If the user has changed to a larger page size, then go to page 1 in order to avoid + // IndexOutOfBoundsException. + if (stagePageSize <= stagePrevPageSize) { + stagePage + } else { + 1 + } + } + val currentTime = System.currentTimeMillis() + + val toNodeSeq = try { + new StagePagedTable( + stages, + stageTag, + basePath, + progressListener, + isFairScheduler, + killEnabled, + currentTime, + stagePageSize, + stageSortColumn, + stageSortDesc, + isFailedStage + ).table(page) + } catch { + case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) => +
+

Error while rendering stage table:

+
+          {Utils.exceptionString(e)}
+        
+
+ } +} + +private[ui] class StageTableRowData( + val stageInfo: StageInfo, + val stageData: Option[StageUIData], + val stageId: Int, + val attemptId: Int, + val schedulingPool: String, + val description: String, + val descriptionOption: Option[String], + val submissionTime: Long, + val formattedSubmissionTime: String, + val duration: Long, + val formattedDuration: String, + val inputRead: Long, + val inputReadWithUnit: String, + val outputWrite: Long, + val outputWriteWithUnit: String, + val shuffleRead: Long, + val shuffleReadWithUnit: String, + val shuffleWrite: Long, + val shuffleWriteWithUnit: String) + +private[ui] class MissingStageTableRowData( + stageInfo: StageInfo, + stageId: Int, + attemptId: Int) extends StageTableRowData( + stageInfo, None, stageId, attemptId, "", "", None, 0, "", -1, "", 0, "", 0, "", 0, "", 0, "") + +/** Page showing list of all ongoing and recently finished stages */ +private[ui] class StagePagedTable( + stages: Seq[StageInfo], + stageTag: String, basePath: String, listener: JobProgressListener, isFairScheduler: Boolean, - killEnabled: Boolean) { - - protected def columns: Seq[Node] = { - Stage Id ++ - {if (isFairScheduler) {Pool Name} else Seq.empty} ++ - Description - Submitted - Duration - Tasks: Succeeded/Total - Input - Output - Shuffle Read - - - - Shuffle Write - - + killEnabled: Boolean, + currentTime: Long, + pageSize: Int, + sortColumn: String, + desc: Boolean, + isFailedStage: Boolean) extends PagedTable[StageTableRowData] { + + override def tableId: String = stageTag + "-table" + + override def tableCssClass: String = + "table table-bordered table-condensed table-striped table-head-clickable" + + override def pageSizeFormField: String = stageTag + ".pageSize" + + override def prevPageSizeFormField: String = stageTag + ".prevPageSize" + + override def pageNumberFormField: String = stageTag + ".page" + + override val dataSource = new StageDataSource( + stages, + listener, + basePath, + currentTime, + pageSize, + sortColumn, + desc + ) + + override def pageLink(page: Int): String = { + val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") + basePath + + s"?$pageNumberFormField=$page" + + s"&$stageTag.sort=$encodedSortColumn" + + s"&$stageTag.desc=$desc" + + s"&$pageSizeFormField=$pageSize" + } + + override def goButtonFormPath: String = { + val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") + s"$basePath?$stageTag.sort=$encodedSortColumn&$stageTag.desc=$desc" + } + + override def headers: Seq[Node] = { + val stageHeadersAndCssClasses: Seq[(String, Option[(String, Boolean)], Boolean)] = + Seq(("Stage Id", None, true)) ++ + {if (isFairScheduler) {Seq(("Pool Name", None, true))} else Seq.empty} ++ + Seq( + ("Description", None, true), ("Submitted", None, true), ("Duration", None, true), + ("Tasks: Succeeded/Total", None, false), + ("Input", Some((ToolTips.INPUT, false)), true), + ("Output", Some((ToolTips.OUTPUT, false)), true), + ("Shuffle Read", Some((ToolTips.SHUFFLE_READ, false)), true), + ("Shuffle Write", Some((ToolTips.SHUFFLE_WRITE, true)), true) + ) ++ + {if (isFailedStage) {Seq(("Failure Reason", None, false))} else Seq.empty} + + if (!stageHeadersAndCssClasses.filter(_._3).map(_._1).contains(sortColumn)) { + throw new IllegalArgumentException(s"Unknown column: $sortColumn") + } + + val headerRow: Seq[Node] = { + stageHeadersAndCssClasses.map { case (header, tooltip, sortable) => + val headerSpan = tooltip.map { case (title, left) => + if (left) { + /* Place the shuffle write tooltip on the left (rather than the default position + of on top) because the shuffle write column is the last column on the right side and + the tooltip is wider than the column, so it doesn't fit on top. */ + + {header} + + } else { + + {header} + + } + }.getOrElse( + {header} + ) + + if (header == sortColumn) { + val headerLink = Unparsed( + basePath + + s"?$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" + + s"&$stageTag.desc=${!desc}" + + s"&$stageTag.pageSize=$pageSize") + val arrow = if (desc) "▾" else "▴" // UP or DOWN + + + + {headerSpan} +  {Unparsed(arrow)} + + + + } else { + if (sortable) { + val headerLink = Unparsed( + basePath + + s"?$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" + + s"&$stageTag.pageSize=$pageSize") + + + + {headerSpan} + + + } else { + + {headerSpan} + + } + } + } + } + {headerRow} + } + + override def row(data: StageTableRowData): Seq[Node] = { + + {rowContent(data)} + } - def toNodeSeq: Seq[Node] = { - listener.synchronized { - stageTable(renderStageRow, stages) + private def rowContent(data: StageTableRowData): Seq[Node] = { + data.stageData match { + case None => missingStageRow(data.stageId) + case Some(stageData) => + val info = data.stageInfo + + {if (data.attemptId > 0) { + {data.stageId} (retry {data.attemptId}) + } else { + {data.stageId} + }} ++ + {if (isFairScheduler) { + + + {data.schedulingPool} + + + } else { + Seq.empty + }} ++ + {makeDescription(info, data.descriptionOption)} + + {data.formattedSubmissionTime} + + {data.formattedDuration} + + {UIUtils.makeProgressBar(started = stageData.numActiveTasks, + completed = stageData.completedIndices.size, failed = stageData.numFailedTasks, + skipped = 0, killed = stageData.numKilledTasks, total = info.numTasks)} + + {data.inputReadWithUnit} + {data.outputWriteWithUnit} + {data.shuffleReadWithUnit} + {data.shuffleWriteWithUnit} ++ + { + if (isFailedStage) { + failureReasonHtml(info) + } else { + Seq.empty + } + } } } - /** Special table that merges two header cells. */ - protected def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { - - {columns} - - {rows.map(r => makeRow(r))} - -
+ private def failureReasonHtml(s: StageInfo): Seq[Node] = { + val failureReason = s.failureReason.getOrElse("") + val isMultiline = failureReason.indexOf('\n') >= 0 + // Display the first line by default + val failureReasonSummary = StringEscapeUtils.escapeHtml4( + if (isMultiline) { + failureReason.substring(0, failureReason.indexOf('\n')) + } else { + failureReason + }) + val details = if (isMultiline) { + // scalastyle:off + + +details + ++ + + // scalastyle:on + } else { + "" + } + {failureReasonSummary}{details} } - private def makeDescription(s: StageInfo): Seq[Node] = { + private def makeDescription(s: StageInfo, descriptionOption: Option[String]): Seq[Node] = { val basePathUri = UIUtils.prependBaseUri(basePath) val killLink = if (killEnabled) { @@ -111,12 +361,7 @@ private[ui] class StageTableBase( } - val stageDesc = for { - stageData <- listener.stageIdToData.get((s.stageId, s.attemptId)) - desc <- stageData.description - } yield { - UIUtils.makeDescription(desc, basePathUri) - } + val stageDesc = descriptionOption.map(UIUtils.makeDescription(_, basePathUri))
{stageDesc.getOrElse("")} {killLink} {nameLink} {details}
} @@ -132,19 +377,45 @@ private[ui] class StageTableBase( ++ // Shuffle Read // Shuffle Write } +} + +private[ui] class StageDataSource( + stages: Seq[StageInfo], + listener: JobProgressListener, + basePath: String, + currentTime: Long, + pageSize: Int, + sortColumn: String, + desc: Boolean) extends PagedDataSource[StageTableRowData](pageSize) { + // Convert StageInfo to StageTableRowData which contains the final contents to show in the table + // so that we can avoid creating duplicate contents during sorting the data + private val data = stages.map(stageRow).sorted(ordering(sortColumn, desc)) + + private var _slicedStageIds: Set[Int] = null - protected def stageRow(s: StageInfo): Seq[Node] = { + override def dataSize: Int = data.size + + override def sliceData(from: Int, to: Int): Seq[StageTableRowData] = { + val r = data.slice(from, to) + _slicedStageIds = r.map(_.stageId).toSet + r + } + + private def stageRow(s: StageInfo): StageTableRowData = { val stageDataOption = listener.stageIdToData.get((s.stageId, s.attemptId)) + if (stageDataOption.isEmpty) { - return missingStageRow(s.stageId) + return new MissingStageTableRowData(s, s.stageId, s.attemptId) } - val stageData = stageDataOption.get - val submissionTime = s.submissionTime match { + + val description = stageData.description + + val formattedSubmissionTime = s.submissionTime match { case Some(t) => UIUtils.formatDate(new Date(t)) case None => "Unknown" } - val finishTime = s.completionTime.getOrElse(System.currentTimeMillis) + val finishTime = s.completionTime.getOrElse(currentTime) // The submission time for a stage is misleading because it counts the time // the stage waits to be launched. (SPARK-10930) @@ -156,7 +427,7 @@ private[ui] class StageTableBase( if (finishTime > startTime) { Some(finishTime - startTime) } else { - Some(System.currentTimeMillis() - startTime) + Some(currentTime - startTime) } } else { None @@ -172,76 +443,80 @@ private[ui] class StageTableBase( val shuffleWrite = stageData.shuffleWriteBytes val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else "" - {if (s.attemptId > 0) { - {s.stageId} (retry {s.attemptId}) - } else { - {s.stageId} - }} ++ - {if (isFairScheduler) { - - - {stageData.schedulingPool} - - - } else { - Seq.empty - }} ++ - {makeDescription(s)} - - {submissionTime} - - {formattedDuration} - - {UIUtils.makeProgressBar(started = stageData.numActiveTasks, - completed = stageData.completedIndices.size, failed = stageData.numFailedTasks, - skipped = 0, killed = stageData.numKilledTasks, total = s.numTasks)} - - {inputReadWithUnit} - {outputWriteWithUnit} - {shuffleReadWithUnit} - {shuffleWriteWithUnit} - } - /** Render an HTML row that represents a stage */ - private def renderStageRow(s: StageInfo): Seq[Node] = - {stageRow(s)} -} - -private[ui] class FailedStageTable( - stages: Seq[StageInfo], - basePath: String, - listener: JobProgressListener, - isFairScheduler: Boolean) - extends StageTableBase(stages, basePath, listener, isFairScheduler, killEnabled = false) { - - override protected def columns: Seq[Node] = super.columns ++ Failure Reason + new StageTableRowData( + s, + stageDataOption, + s.stageId, + s.attemptId, + stageData.schedulingPool, + description.getOrElse(""), + description, + s.submissionTime.getOrElse(0), + formattedSubmissionTime, + duration.getOrElse(-1), + formattedDuration, + inputRead, + inputReadWithUnit, + outputWrite, + outputWriteWithUnit, + shuffleRead, + shuffleReadWithUnit, + shuffleWrite, + shuffleWriteWithUnit + ) + } - override protected def stageRow(s: StageInfo): Seq[Node] = { - val basicColumns = super.stageRow(s) - val failureReason = s.failureReason.getOrElse("") - val isMultiline = failureReason.indexOf('\n') >= 0 - // Display the first line by default - val failureReasonSummary = StringEscapeUtils.escapeHtml4( - if (isMultiline) { - failureReason.substring(0, failureReason.indexOf('\n')) - } else { - failureReason - }) - val details = if (isMultiline) { - // scalastyle:off - - +details - ++ - - // scalastyle:on + /** + * Return Ordering according to sortColumn and desc + */ + private def ordering(sortColumn: String, desc: Boolean): Ordering[StageTableRowData] = { + val ordering = sortColumn match { + case "Stage Id" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.Int.compare(x.stageId, y.stageId) + } + case "Pool Name" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.String.compare(x.schedulingPool, y.schedulingPool) + } + case "Description" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.String.compare(x.description, y.description) + } + case "Submitted" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.Long.compare(x.submissionTime, y.submissionTime) + } + case "Duration" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.Long.compare(x.duration, y.duration) + } + case "Input" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.Long.compare(x.inputRead, y.inputRead) + } + case "Output" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.Long.compare(x.outputWrite, y.outputWrite) + } + case "Shuffle Read" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.Long.compare(x.shuffleRead, y.shuffleRead) + } + case "Shuffle Write" => new Ordering[StageTableRowData] { + override def compare(x: StageTableRowData, y: StageTableRowData): Int = + Ordering.Long.compare(x.shuffleWrite, y.shuffleWrite) + } + case "Tasks: Succeeded/Total" => + throw new IllegalArgumentException(s"Unsortable column: $sortColumn") + case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") + } + if (desc) { + ordering.reverse } else { - "" + ordering } - val failureReasonHtml = {failureReasonSummary}{details} - basicColumns ++ failureReasonHtml } } + From 4de6d687bd94fa2666e8d0b75806a800ff809a17 Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Fri, 17 Jun 2016 01:17:58 +0800 Subject: [PATCH 2/6] add newline at end of AllStagesPage --- .../main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index c95eedb7831f..638edee37733 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -132,4 +132,5 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { UIUtils.headerSparkPage("Stages for All Jobs", content, parent) } } -} \ No newline at end of file +} + From 246432dfa1c5fc58177820ad609ef6399c2c326a Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Wed, 22 Jun 2016 02:14:05 +0800 Subject: [PATCH 3/6] fix problem of resitting other tables --- .../scala/org/apache/spark/ui/PagedTable.scala | 1 + .../org/apache/spark/ui/jobs/StageTable.scala | 16 +++++++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala index 9b6ed8cbbef1..2a7c16b04bf7 100644 --- a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala @@ -179,6 +179,7 @@ private[ui] trait PagedTable[T] { Splitter .on('&') .trimResults() + .omitEmptyStrings() .withKeyValueSeparator("=") .split(querystring) .asScala diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 4fcfd460e9ab..a6a4223f09a3 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -21,6 +21,7 @@ import java.net.URLEncoder import java.util.Date import javax.servlet.http.HttpServletRequest +import scala.collection.JavaConverters._ import scala.xml._ import org.apache.commons.lang3.StringEscapeUtils @@ -39,6 +40,11 @@ private[ui] class StageTableBase( isFairScheduler: Boolean, killEnabled: Boolean, isFailedStage: Boolean) { + val allParameters = request.getParameterMap().asScala.toMap + val parameterOtherTable = allParameters.filterNot(_._1.startsWith(stageTag)) + .map(para => para._1 + "=" + para._2(0)).mkString("&") + val reservedBasePath = basePath + "?" + parameterOtherTable + val parameterStagePage = request.getParameter(stageTag + ".page") val parameterStageSortColumn = request.getParameter(stageTag + ".sort") val parameterStageSortDesc = request.getParameter(stageTag + ".desc") @@ -72,7 +78,7 @@ private[ui] class StageTableBase( new StagePagedTable( stages, stageTag, - basePath, + reservedBasePath, progressListener, isFairScheduler, killEnabled, @@ -158,7 +164,7 @@ private[ui] class StagePagedTable( override def pageLink(page: Int): String = { val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") basePath + - s"?$pageNumberFormField=$page" + + s"&$pageNumberFormField=$page" + s"&$stageTag.sort=$encodedSortColumn" + s"&$stageTag.desc=$desc" + s"&$pageSizeFormField=$pageSize" @@ -166,7 +172,7 @@ private[ui] class StagePagedTable( override def goButtonFormPath: String = { val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") - s"$basePath?$stageTag.sort=$encodedSortColumn&$stageTag.desc=$desc" + s"$basePath&$stageTag.sort=$encodedSortColumn&$stageTag.desc=$desc" } override def headers: Seq[Node] = { @@ -209,7 +215,7 @@ private[ui] class StagePagedTable( if (header == sortColumn) { val headerLink = Unparsed( basePath + - s"?$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" + + s"&$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" + s"&$stageTag.desc=${!desc}" + s"&$stageTag.pageSize=$pageSize") val arrow = if (desc) "▾" else "▴" // UP or DOWN @@ -225,7 +231,7 @@ private[ui] class StagePagedTable( if (sortable) { val headerLink = Unparsed( basePath + - s"?$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" + + s"&$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" + s"&$stageTag.pageSize=$pageSize") From 7ccda86fefda67424c8c1665078c3c72c6c69532 Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Thu, 23 Jun 2016 00:45:25 +0800 Subject: [PATCH 4/6] fix the links to one-stage pages --- .../org/apache/spark/ui/jobs/StageTable.scala | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index a6a4223f09a3..ea2a4f78fb99 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -42,8 +42,7 @@ private[ui] class StageTableBase( isFailedStage: Boolean) { val allParameters = request.getParameterMap().asScala.toMap val parameterOtherTable = allParameters.filterNot(_._1.startsWith(stageTag)) - .map(para => para._1 + "=" + para._2(0)).mkString("&") - val reservedBasePath = basePath + "?" + parameterOtherTable + .map(para => para._1 + "=" + para._2(0)) val parameterStagePage = request.getParameter(stageTag + ".page") val parameterStageSortColumn = request.getParameter(stageTag + ".sort") @@ -78,7 +77,7 @@ private[ui] class StageTableBase( new StagePagedTable( stages, stageTag, - reservedBasePath, + basePath, progressListener, isFairScheduler, killEnabled, @@ -86,7 +85,8 @@ private[ui] class StageTableBase( stagePageSize, stageSortColumn, stageSortDesc, - isFailedStage + isFailedStage, + parameterOtherTable ).table(page) } catch { case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) => @@ -138,7 +138,8 @@ private[ui] class StagePagedTable( pageSize: Int, sortColumn: String, desc: Boolean, - isFailedStage: Boolean) extends PagedTable[StageTableRowData] { + isFailedStage: Boolean, + parameterOtherTable: Iterable[String]) extends PagedTable[StageTableRowData] { override def tableId: String = stageTag + "-table" @@ -151,6 +152,8 @@ private[ui] class StagePagedTable( override def pageNumberFormField: String = stageTag + ".page" + val parameterPath = basePath + "?" + parameterOtherTable.mkString("&") + override val dataSource = new StageDataSource( stages, listener, @@ -163,7 +166,7 @@ private[ui] class StagePagedTable( override def pageLink(page: Int): String = { val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") - basePath + + parameterPath + s"&$pageNumberFormField=$page" + s"&$stageTag.sort=$encodedSortColumn" + s"&$stageTag.desc=$desc" + @@ -172,7 +175,7 @@ private[ui] class StagePagedTable( override def goButtonFormPath: String = { val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") - s"$basePath&$stageTag.sort=$encodedSortColumn&$stageTag.desc=$desc" + s"$parameterPath&$stageTag.sort=$encodedSortColumn&$stageTag.desc=$desc" } override def headers: Seq[Node] = { @@ -214,7 +217,7 @@ private[ui] class StagePagedTable( if (header == sortColumn) { val headerLink = Unparsed( - basePath + + parameterPath + s"&$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" + s"&$stageTag.desc=${!desc}" + s"&$stageTag.pageSize=$pageSize") @@ -230,7 +233,7 @@ private[ui] class StagePagedTable( } else { if (sortable) { val headerLink = Unparsed( - basePath + + parameterPath + s"&$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" + s"&$stageTag.pageSize=$pageSize") From 4af0630e729f176c93e41762afbb7eb0aea61b9c Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Thu, 23 Jun 2016 02:39:42 +0800 Subject: [PATCH 5/6] fix linking problems in history page & not display empty tables in pool page --- .../scala/org/apache/spark/ui/jobs/AllStagesPage.scala | 9 +++++---- .../main/scala/org/apache/spark/ui/jobs/JobPage.scala | 10 ++++++---- .../main/scala/org/apache/spark/ui/jobs/PoolPage.scala | 10 ++++++---- .../scala/org/apache/spark/ui/jobs/StageTable.scala | 8 +++++--- 4 files changed, 22 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index 638edee37733..61139fa3e46d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -38,18 +38,19 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { val numCompletedStages = listener.numCompletedStages val failedStages = listener.failedStages.reverse.toSeq val numFailedStages = listener.numFailedStages + val subPath = "stages" val activeStagesTable = - new StageTableBase(request, activeStages, "activeStage", parent.basePath, + new StageTableBase(request, activeStages, "activeStage", parent.basePath, subPath, parent.progressListener, parent.isFairScheduler, parent.killEnabled, false) val pendingStagesTable = - new StageTableBase(request, pendingStages, "pendingStage", parent.basePath, + new StageTableBase(request, pendingStages, "pendingStage", parent.basePath, subPath, parent.progressListener, parent.isFairScheduler, false, false) val completedStagesTable = - new StageTableBase(request, completedStages, "completedStage", parent.basePath, + new StageTableBase(request, completedStages, "completedStage", parent.basePath, subPath, parent.progressListener, parent.isFairScheduler, false, false) val failedStagesTable = - new StageTableBase(request, failedStages, "failedStage", parent.basePath, + new StageTableBase(request, failedStages, "failedStage", parent.basePath, subPath, parent.progressListener, parent.isFairScheduler, false, true) // For now, pool information is only accessible in live UIs diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index ca38d88e2a95..cb0a0b0d6122 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -229,18 +229,20 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { } } + val basePath = "jobs/job" + val activeStagesTable = new StageTableBase(request, activeStages, "activeStage", parent.basePath, - parent.jobProgresslistener, parent.isFairScheduler, parent.killEnabled, false) + basePath, parent.jobProgresslistener, parent.isFairScheduler, parent.killEnabled, false) val pendingOrSkippedStagesTable = new StageTableBase(request, pendingOrSkippedStages, "pendingStage", parent.basePath, - parent.jobProgresslistener, parent.isFairScheduler, false, false) + basePath, parent.jobProgresslistener, parent.isFairScheduler, false, false) val completedStagesTable = new StageTableBase(request, completedStages, "completedStage", parent.basePath, - parent.jobProgresslistener, parent.isFairScheduler, false, false) + basePath, parent.jobProgresslistener, parent.isFairScheduler, false, false) val failedStagesTable = new StageTableBase(request, failedStages, "failedStage", parent.basePath, - parent.jobProgresslistener, parent.isFairScheduler, false, true) + basePath, parent.jobProgresslistener, parent.isFairScheduler, false, true) val shouldShowActiveStages = activeStages.nonEmpty val shouldShowPendingStages = !isComplete && pendingOrSkippedStages.nonEmpty diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index 2ccdf359fedb..d605087a8198 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -42,8 +42,9 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") { case Some(s) => s.values.toSeq case None => Seq[StageInfo]() } + val shouldShowActiveStages = activeStages.nonEmpty val activeStagesTable = - new StageTableBase(request, activeStages, "activeStage", parent.basePath, + new StageTableBase(request, activeStages, "activeStage", parent.basePath, "stages/pool", parent.progressListener, parent.isFairScheduler, parent.killEnabled, false) // For now, pool information is only accessible in live UIs @@ -52,9 +53,10 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") { }).toSeq val poolTable = new PoolTable(pools, parent) - val content = -

Summary

++ poolTable.toNodeSeq ++ -

{activeStages.size} Active Stages

++ activeStagesTable.toNodeSeq + var content =

Summary

++ poolTable.toNodeSeq + if (shouldShowActiveStages) { + content ++=

{activeStages.size} Active Stages

++ activeStagesTable.toNodeSeq + } UIUtils.headerSparkPage("Fair Scheduler Pool: " + poolName, content, parent) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index ea2a4f78fb99..b2f274519797 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -36,6 +36,7 @@ private[ui] class StageTableBase( stages: Seq[StageInfo], stageTag: String, basePath: String, + subPath: String, progressListener: JobProgressListener, isFairScheduler: Boolean, killEnabled: Boolean, @@ -78,6 +79,7 @@ private[ui] class StageTableBase( stages, stageTag, basePath, + subPath, progressListener, isFairScheduler, killEnabled, @@ -131,6 +133,7 @@ private[ui] class StagePagedTable( stages: Seq[StageInfo], stageTag: String, basePath: String, + subPath: String, listener: JobProgressListener, isFairScheduler: Boolean, killEnabled: Boolean, @@ -152,12 +155,12 @@ private[ui] class StagePagedTable( override def pageNumberFormField: String = stageTag + ".page" - val parameterPath = basePath + "?" + parameterOtherTable.mkString("&") + val parameterPath = UIUtils.prependBaseUri(basePath) + s"/$subPath/?" + + parameterOtherTable.mkString("&") override val dataSource = new StageDataSource( stages, listener, - basePath, currentTime, pageSize, sortColumn, @@ -391,7 +394,6 @@ private[ui] class StagePagedTable( private[ui] class StageDataSource( stages: Seq[StageInfo], listener: JobProgressListener, - basePath: String, currentTime: Long, pageSize: Int, sortColumn: String, From 3a98c4d9946b765bd5a36db648af8a177749143a Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Wed, 6 Jul 2016 21:13:15 +0800 Subject: [PATCH 6/6] fix nits to improve readability --- .../org/apache/spark/ui/jobs/AllStagesPage.scala | 12 ++++++++---- .../scala/org/apache/spark/ui/jobs/JobPage.scala | 12 ++++++++---- .../scala/org/apache/spark/ui/jobs/PoolPage.scala | 3 ++- .../scala/org/apache/spark/ui/jobs/StageTable.scala | 3 +++ 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index 61139fa3e46d..cba8f82dd77a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -42,16 +42,20 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { val activeStagesTable = new StageTableBase(request, activeStages, "activeStage", parent.basePath, subPath, - parent.progressListener, parent.isFairScheduler, parent.killEnabled, false) + parent.progressListener, parent.isFairScheduler, + killEnabled = parent.killEnabled, isFailedStage = false) val pendingStagesTable = new StageTableBase(request, pendingStages, "pendingStage", parent.basePath, subPath, - parent.progressListener, parent.isFairScheduler, false, false) + parent.progressListener, parent.isFairScheduler, + killEnabled = false, isFailedStage = false) val completedStagesTable = new StageTableBase(request, completedStages, "completedStage", parent.basePath, subPath, - parent.progressListener, parent.isFairScheduler, false, false) + parent.progressListener, parent.isFairScheduler, + killEnabled = false, isFailedStage = false) val failedStagesTable = new StageTableBase(request, failedStages, "failedStage", parent.basePath, subPath, - parent.progressListener, parent.isFairScheduler, false, true) + parent.progressListener, parent.isFairScheduler, + killEnabled = false, isFailedStage = true) // For now, pool information is only accessible in live UIs val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable]) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index cb0a0b0d6122..0ec42d68d3dc 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -233,16 +233,20 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { val activeStagesTable = new StageTableBase(request, activeStages, "activeStage", parent.basePath, - basePath, parent.jobProgresslistener, parent.isFairScheduler, parent.killEnabled, false) + basePath, parent.jobProgresslistener, parent.isFairScheduler, + killEnabled = parent.killEnabled, isFailedStage = false) val pendingOrSkippedStagesTable = new StageTableBase(request, pendingOrSkippedStages, "pendingStage", parent.basePath, - basePath, parent.jobProgresslistener, parent.isFairScheduler, false, false) + basePath, parent.jobProgresslistener, parent.isFairScheduler, + killEnabled = false, isFailedStage = false) val completedStagesTable = new StageTableBase(request, completedStages, "completedStage", parent.basePath, - basePath, parent.jobProgresslistener, parent.isFairScheduler, false, false) + basePath, parent.jobProgresslistener, parent.isFairScheduler, + killEnabled = false, isFailedStage = false) val failedStagesTable = new StageTableBase(request, failedStages, "failedStage", parent.basePath, - basePath, parent.jobProgresslistener, parent.isFairScheduler, false, true) + basePath, parent.jobProgresslistener, parent.isFairScheduler, + killEnabled = false, isFailedStage = true) val shouldShowActiveStages = activeStages.nonEmpty val shouldShowPendingStages = !isComplete && pendingOrSkippedStages.nonEmpty diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index d605087a8198..f9cb71791859 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -45,7 +45,8 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") { val shouldShowActiveStages = activeStages.nonEmpty val activeStagesTable = new StageTableBase(request, activeStages, "activeStage", parent.basePath, "stages/pool", - parent.progressListener, parent.isFairScheduler, parent.killEnabled, false) + parent.progressListener, parent.isFairScheduler, parent.killEnabled, + isFailedStage = false) // For now, pool information is only accessible in live UIs val pools = sc.map(_.getPoolForName(poolName).getOrElse { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index b2f274519797..2a04e8fc7d00 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -182,6 +182,9 @@ private[ui] class StagePagedTable( } override def headers: Seq[Node] = { + // stageHeadersAndCssClasses has three parts: header title, tooltip information, and sortable. + // The tooltip information could be None, which indicates it does not have a tooltip. + // Otherwise, it has two parts: tooltip text, and position (true for left, false for default). val stageHeadersAndCssClasses: Seq[(String, Option[(String, Boolean)], Boolean)] = Seq(("Stage Id", None, true)) ++ {if (isFairScheduler) {Seq(("Pool Name", None, true))} else Seq.empty} ++