Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/resources/org/apache/spark/ui/static/webui.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ $(function() {
collapseTablePageLoad('collapse-aggregated-poolActiveStages','aggregated-poolActiveStages');
collapseTablePageLoad('collapse-aggregated-tasks','aggregated-tasks');
collapseTablePageLoad('collapse-aggregated-rdds','aggregated-rdds');
collapseTablePageLoad('collapse-aggregated-activeBatches','aggregated-activeBatches');
collapseTablePageLoad('collapse-aggregated-waitingBatches','aggregated-waitingBatches');
collapseTablePageLoad('collapse-aggregated-runningBatches','aggregated-runningBatches');
collapseTablePageLoad('collapse-aggregated-completedBatches','aggregated-completedBatches');
collapseTablePageLoad('collapse-aggregated-runningExecutions','aggregated-runningExecutions');
collapseTablePageLoad('collapse-aggregated-completedExecutions','aggregated-completedExecutions');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,41 @@

package org.apache.spark.streaming.ui

import scala.xml.Node

import org.apache.spark.ui.{UIUtils => SparkUIUtils}
import java.net.URLEncoder
import java.nio.charset.StandardCharsets.UTF_8
import javax.servlet.http.HttpServletRequest

private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) {
import scala.xml.Node

protected def columns: Seq[Node] = {
<th>Batch Time</th>
<th>Records</th>
<th>Scheduling Delay
{SparkUIUtils.tooltip("Time taken by Streaming scheduler to submit jobs of a batch", "top")}
</th>
<th>Processing Time
{SparkUIUtils.tooltip("Time taken to process all jobs of a batch", "top")}</th>
}
import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils => SparkUIUtils}

private[ui] class StreamingPagedTable(
request: HttpServletRequest,
tableTag: String,
batches: Seq[BatchUIData],
basePath: String,
subPath: String,
batchInterval: Long) extends PagedTable[BatchUIData] {

private val(sortColumn, desc, pageSize) = getTableParameters(request, tableTag, "Batch Time")
private val parameterPath = s"$basePath/$subPath/?${getParameterOtherTable(request, tableTag)}"
private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())

private val firstFailureReason: Option[String] =
if (!tableTag.equals("waitingBatches")) {
getFirstFailureReason(batches)
} else {
None
}

/**
* Return the first failure reason if finding in the batches.
*/
protected def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = {
private def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = {
batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption
}

protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
private def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption
firstFailureReason.map { failureReason =>
val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason)
Expand All @@ -49,147 +60,154 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
}.getOrElse(<td>-</td>)
}

protected def baseRow(batch: BatchUIData): Seq[Node] = {
val batchTime = batch.batchTime.milliseconds
val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval)
val numRecords = batch.numRecords
val schedulingDelay = batch.schedulingDelay
val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
val processingTime = batch.processingDelay
val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-")
val batchTimeId = s"batch-$batchTime"

<td id={batchTimeId} sorttable_customkey={batchTime.toString}
isFailed={batch.isFailed.toString}>
<a href={s"batch?id=$batchTime"}>
{formattedBatchTime}
</a>
</td>
<td sorttable_customkey={numRecords.toString}>{numRecords.toString} records</td>
<td sorttable_customkey={schedulingDelay.getOrElse(Long.MaxValue).toString}>
{formattedSchedulingDelay}
</td>
<td sorttable_customkey={processingTime.getOrElse(Long.MaxValue).toString}>
{formattedProcessingTime}
</td>
}

private def batchTable: Seq[Node] = {
<table id={tableId} class="table table-bordered table-striped table-sm sortable">
<thead>
{columns}
</thead>
<tbody>
{renderRows}
</tbody>
</table>
}

def toNodeSeq: Seq[Node] = {
batchTable
}

protected def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = {
private def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = {
<td class="progress-cell">
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to move <td> and </td> outside this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed to maintain consistency. i.e

<td> ...... </td>
<td> ...... </td>

Anyway, i will use the original code.

{
SparkUIUtils.makeProgressBar(
started = batch.numActiveOutputOp,
completed = batch.numCompletedOutputOp,
failed = batch.numFailedOutputOp,
skipped = 0,
reasonToNumKilled = Map.empty,
total = batch.outputOperations.size)
SparkUIUtils.makeProgressBar(
started = batch.numActiveOutputOp,
completed = batch.numCompletedOutputOp,
failed = batch.numFailedOutputOp,
skipped = 0,
reasonToNumKilled = Map.empty,
total = batch.outputOperations.size)
}
</td>
}

/**
* Return HTML for all rows of this table.
*/
protected def renderRows: Seq[Node]
}
override def tableId: String = s"$tableTag-table"

private[ui] class ActiveBatchTable(
runningBatches: Seq[BatchUIData],
waitingBatches: Seq[BatchUIData],
batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) {
override def tableCssClass: String =
"table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"

private val firstFailureReason = getFirstFailureReason(runningBatches)
override def pageSizeFormField: String = s"$tableTag.pageSize"

override protected def columns: Seq[Node] = super.columns ++ {
<th>Output Ops: Succeeded/Total</th>
<th>Status</th> ++ {
if (firstFailureReason.nonEmpty) {
<th>Error</th>
} else {
Nil
}
}
}
override def pageNumberFormField: String = s"$tableTag.page"

override protected def renderRows: Seq[Node] = {
// The "batchTime"s of "waitingBatches" must be greater than "runningBatches"'s, so display
// waiting batches before running batches
waitingBatches.flatMap(batch => <tr>{waitingBatchRow(batch)}</tr>) ++
runningBatches.flatMap(batch => <tr>{runningBatchRow(batch)}</tr>)
override def pageLink(page: Int): String = {
parameterPath +
s"&$tableTag.sort=$encodedSortColumn" +
s"&$tableTag.desc=$desc" +
s"&$pageNumberFormField=$page" +
s"&$pageSizeFormField=$pageSize" +
s"#$tableTag"
}

private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>processing</td> ++ {
if (firstFailureReason.nonEmpty) {
getFirstFailureTableCell(batch)
} else {
Nil
override def goButtonFormPath: String =
s"$parameterPath&$tableTag.sort=$encodedSortColumn&$tableTag.desc=$desc#$tableTag"

override def dataSource: PagedDataSource[BatchUIData] =
new StreamingDataSource(batches, pageSize, sortColumn, desc)

override def headers: Seq[Node] = {
// headers, sortable and tooltips
val headersAndCssClasses: Seq[(String, Boolean, Option[String])] = {
Seq(
("Batch Time", true, None),
("Records", true, None),
("Scheduling Delay", true, Some("Time taken by Streaming scheduler to submit jobs " +
"of a batch")),
("Processing Time", true, Some("Time taken to process all jobs of a batch"))) ++ {
if (tableTag.equals("completedBatches")) {
Seq(
("Total Delay", true, Some("Total time taken to handle a batch")),
("Output Ops: Succeeded/Total", false, None))
} else {
Seq(
("Output Ops: Succeeded/Total", false, None),
("Status", false, None))
}
} ++ {
if (firstFailureReason.nonEmpty) {
Seq(("Error", false, None))
} else {
Nil
}
}
}
// check if sort column is a valid sortable column
isSortColumnValid(headersAndCssClasses, sortColumn)

headerRow(headersAndCssClasses, desc, pageSize, sortColumn, parameterPath, tableTag, tableTag)
}

private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>queued</td>++ {
if (firstFailureReason.nonEmpty) {
// Waiting batches have not run yet, so must have no failure reasons.
<td>-</td>
} else {
Nil
override def row(batch: BatchUIData): Seq[Node] = {
val batchTime = batch.batchTime.milliseconds
val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval)
val numRecords = batch.numRecords
val schedulingDelay = batch.schedulingDelay
val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
val processingTime = batch.processingDelay
val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-")
val batchTimeId = s"batch-$batchTime"
val totalDelay = batch.totalDelay
val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")

<tr>
<td id={batchTimeId} isFailed={batch.isFailed.toString}>
<a href={s"batch?id=$batchTime"}>
{formattedBatchTime}
</a>
</td>
<td> {numRecords.toString} records </td>
<td> {formattedSchedulingDelay} </td>
<td> {formattedProcessingTime} </td>
{
if (tableTag.equals("completedBatches")) {
<td> {formattedTotalDelay} </td> ++
createOutputOperationProgressBar(batch) ++ {
if (firstFailureReason.nonEmpty) {
getFirstFailureTableCell(batch)
} else {
Nil
}
}
} else if (tableTag.equals("runningBatches")) {
createOutputOperationProgressBar(batch) ++
<td> processing </td> ++ {
if (firstFailureReason.nonEmpty) {
getFirstFailureTableCell(batch)
} else {
Nil
}
}
} else {
createOutputOperationProgressBar(batch) ++
<td> queued </td> ++ {
if (firstFailureReason.nonEmpty) {
// Waiting batches have not run yet, so must have no failure reasons.
<td>-</td>
} else {
Nil
}
}
}
}
}
</tr>
}
}

private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long)
extends BatchTableBase("completed-batches-table", batchInterval) {
private[ui] class StreamingDataSource(info: Seq[BatchUIData], pageSize: Int, sortColumn: String,
desc: Boolean) extends PagedDataSource[BatchUIData](pageSize) {

private val firstFailureReason = getFirstFailureReason(batches)
private val data = info.sorted(ordering(sortColumn, desc))

override protected def columns: Seq[Node] = super.columns ++ {
<th>Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
<th>Output Ops: Succeeded/Total</th> ++ {
if (firstFailureReason.nonEmpty) {
<th>Error</th>
} else {
Nil
}
}
}
override protected def dataSize: Int = data.size

override protected def renderRows: Seq[Node] = {
batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>)
}
override protected def sliceData(from: Int, to: Int): Seq[BatchUIData] = data.slice(from, to)

private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
val totalDelay = batch.totalDelay
val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")

baseRow(batch) ++ {
<td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
{formattedTotalDelay}
</td>
} ++ createOutputOperationProgressBar(batch)++ {
if (firstFailureReason.nonEmpty) {
getFirstFailureTableCell(batch)
} else {
Nil
}
private def ordering(column: String, desc: Boolean): Ordering[BatchUIData] = {
val ordering: Ordering[BatchUIData] = column match {
case "Batch Time" => Ordering.by(_.batchTime.milliseconds)
case "Records" => Ordering.by(_.numRecords)
case "Scheduling Delay" => Ordering.by(_.schedulingDelay.getOrElse(Long.MaxValue))
case "Processing Time" => Ordering.by(_.processingDelay.getOrElse(Long.MaxValue))
case "Total Delay" => Ordering.by(_.totalDelay.getOrElse(Long.MaxValue))
case unknownColumn => throw new IllegalArgumentException(s"Unknown Column: $unknownColumn")
}
if (desc) {
ordering.reverse
} else {
ordering
}
}
}
Loading