From 65d4132d7979f6e7be7fad437cb8d11eb0f72ea7 Mon Sep 17 00:00:00 2001 From: iRakson Date: Sun, 3 May 2020 14:38:40 +0530 Subject: [PATCH 1/4] [SPARK-30119] Add Pagination Support to Streaming Page --- .../spark/streaming/ui/AllBatchesTable.scala | 329 +++++++++++------- .../spark/streaming/ui/StreamingPage.scala | 145 ++++++-- .../spark/streaming/UISeleniumSuite.scala | 13 +- 3 files changed, 318 insertions(+), 169 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index 1e443f656734..0324817b031f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -17,30 +17,46 @@ package org.apache.spark.streaming.ui -import scala.xml.Node +import java.net.URLEncoder +import java.nio.charset.StandardCharsets.UTF_8 -import org.apache.spark.ui.{UIUtils => SparkUIUtils} +import scala.xml.{Node, Unparsed} -private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) { +import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils => SparkUIUtils} - protected def columns: Seq[Node] = { - Batch Time - Records - Scheduling Delay - {SparkUIUtils.tooltip("Time taken by Streaming scheduler to submit jobs of a batch", "top")} - - Processing Time - {SparkUIUtils.tooltip("Time taken to process all jobs of a batch", "top")} - } +private[ui] class StreamingPagedTable( + tableTag: String, + batches: Seq[BatchUIData], + basePath: String, + subPath: String, + parameterOtherTable: Iterable[String], + pageSize: Int, + sortColumn: String, + desc: Boolean, + isRunningTable: Boolean, + isWaitingTable: Boolean, + isCompletedTable: Boolean, + batchInterval: Long) extends PagedTable[BatchUIData] { + + private val parameterPath = s"$basePath/$subPath/?${parameterOtherTable.mkString("&")}" + + private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) + + private val firstFailureReason: Option[String] = + if (!isWaitingTable) { + getFirstFailureReason(batches) + } else { + None + } /** * Return the first failure reason if finding in the batches. */ - protected def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = { + private def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = { batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption } - protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = { + private def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = { val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption firstFailureReason.map { failureReason => val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason) @@ -49,147 +65,200 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) }.getOrElse(-) } - protected def baseRow(batch: BatchUIData): Seq[Node] = { - val batchTime = batch.batchTime.milliseconds - val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval) - val numRecords = batch.numRecords - val schedulingDelay = batch.schedulingDelay - val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-") - val processingTime = batch.processingDelay - val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-") - val batchTimeId = s"batch-$batchTime" - - - - {formattedBatchTime} - - - {numRecords.toString} records - - {formattedSchedulingDelay} - - - {formattedProcessingTime} - + private def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = { + SparkUIUtils.makeProgressBar( + started = batch.numActiveOutputOp, + completed = batch.numCompletedOutputOp, + failed = batch.numFailedOutputOp, + skipped = 0, + reasonToNumKilled = Map.empty, + total = batch.outputOperations.size) } - private def batchTable: Seq[Node] = { - - - {columns} - - - {renderRows} - -
- } + override def tableId: String = s"$tableTag-table" - def toNodeSeq: Seq[Node] = { - batchTable - } + override def tableCssClass: String = + "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited" - protected def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = { - - { - SparkUIUtils.makeProgressBar( - started = batch.numActiveOutputOp, - completed = batch.numCompletedOutputOp, - failed = batch.numFailedOutputOp, - skipped = 0, - reasonToNumKilled = Map.empty, - total = batch.outputOperations.size) - } - - } + override def pageSizeFormField: String = s"$tableTag.pageSize" - /** - * Return HTML for all rows of this table. - */ - protected def renderRows: Seq[Node] -} + override def pageNumberFormField: String = s"$tableTag.page" -private[ui] class ActiveBatchTable( - runningBatches: Seq[BatchUIData], - waitingBatches: Seq[BatchUIData], - batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) { - - private val firstFailureReason = getFirstFailureReason(runningBatches) - - override protected def columns: Seq[Node] = super.columns ++ { - Output Ops: Succeeded/Total - Status ++ { - if (firstFailureReason.nonEmpty) { - Error - } else { - Nil - } - } + override def pageLink(page: Int): String = { + parameterPath + + s"&$tableTag.sort=$encodedSortColumn" + + s"&$tableTag.desc=$desc" + + s"&$pageNumberFormField=$page" + + s"&$pageSizeFormField=$pageSize" + + s"#$tableTag" } - override protected def renderRows: Seq[Node] = { - // The "batchTime"s of "waitingBatches" must be greater than "runningBatches"'s, so display - // waiting batches before running batches - waitingBatches.flatMap(batch => {waitingBatchRow(batch)}) ++ - runningBatches.flatMap(batch => {runningBatchRow(batch)}) - } + override def goButtonFormPath: String = + s"$parameterPath&$tableTag.sort=$encodedSortColumn&$tableTag.desc=$desc#$tableTag" - private def runningBatchRow(batch: BatchUIData): Seq[Node] = { - baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ processing ++ { - if (firstFailureReason.nonEmpty) { - getFirstFailureTableCell(batch) - } else { - Nil - } - } - } + override def dataSource: PagedDataSource[BatchUIData] = + new StreamingDataSource(batches, pageSize, sortColumn, desc) - private def waitingBatchRow(batch: BatchUIData): Seq[Node] = { - baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ queued++ { - if (firstFailureReason.nonEmpty) { - // Waiting batches have not run yet, so must have no failure reasons. - - - } else { - Nil + override def headers: Seq[Node] = { + // tuple containing tooltips for header fields + val tooltips = ("Time taken by Streaming scheduler to submit jobs of a batch", + "Time taken to process all jobs of a batch", "Total time taken to handle a batch") + val headersAndCssClasses: Seq[(String, Boolean, Option[String])] = { + Seq( + ("Batch Time", true, None), + ("Records", true, None), + ("Scheduling Delay", true, Some(tooltips._1)), + ("Processing Time", true, Some(tooltips._2))) ++ { + if (isCompletedTable) { + Seq( + ("Total Delay", true, Some(tooltips._3)), + ("Output Ops: Succeeded/Total", false, None)) + } else { + Seq( + ("Output Ops: Succeeded/Total", false, None), + ("Status", false, None)) + } + } ++ { + if (firstFailureReason.nonEmpty) { + Seq(("Error", false, None)) + } else { + Nil + } } } - } -} -private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long) - extends BatchTableBase("completed-batches-table", batchInterval) { + val sortableColumnHeader = headersAndCssClasses.filter { + case (_, sortable, _) => sortable + }.map { case (column, _, _) => column } - private val firstFailureReason = getFirstFailureReason(batches) + // verify that given column to sort is a valid sortable column + require(sortableColumnHeader.contains(sortColumn), s"Unknown Column: $sortColumn") - override protected def columns: Seq[Node] = super.columns ++ { - Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")} - Output Ops: Succeeded/Total ++ { - if (firstFailureReason.nonEmpty) { - Error - } else { - Nil + val headerRow: Seq[Node] = { + headersAndCssClasses.map { case (header, sortable, tooltip) => + if (header == sortColumn) { + val headerLink = Unparsed( + parameterPath + + s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + + s"&$tableTag.desc=${!desc}" + + s"&$tableTag.pageSize=$pageSize" + + s"#$tableTag") + val arrow = if (desc) "▾" else "▴" // UP or DOWN + + + + + {header} {Unparsed(arrow)} + + + + } else { + if (sortable) { + val headerLink = Unparsed( + parameterPath + + s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + + s"&$tableTag.pageSize=$pageSize" + + s"#$tableTag") + + + + + {header} + + + + } else { + + {header} + + } + } } } - } - override protected def renderRows: Seq[Node] = { - batches.flatMap(batch => {completedBatchRow(batch)}) + + {headerRow} + } - private def completedBatchRow(batch: BatchUIData): Seq[Node] = { + override def row(batch: BatchUIData): Seq[Node] = { + val batchTime = batch.batchTime.milliseconds + val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval) + val numRecords = batch.numRecords + val schedulingDelay = batch.schedulingDelay + val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-") + val processingTime = batch.processingDelay + val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-") + val batchTimeId = s"batch-$batchTime" val totalDelay = batch.totalDelay val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-") - baseRow(batch) ++ { - - {formattedTotalDelay} + + + + {formattedBatchTime} + - } ++ createOutputOperationProgressBar(batch)++ { - if (firstFailureReason.nonEmpty) { - getFirstFailureTableCell(batch) - } else { - Nil + {numRecords.toString} Records + {formattedSchedulingDelay} + {formattedProcessingTime} + { + if (isCompletedTable) { + {formattedTotalDelay} + {createOutputOperationProgressBar(batch)} ++ { + if (firstFailureReason.nonEmpty) { + getFirstFailureTableCell(batch) + } else { + Nil + } + } + } else if (isRunningTable) { + {createOutputOperationProgressBar(batch)} + processing ++ { + if (firstFailureReason.nonEmpty) { + getFirstFailureTableCell(batch) + } else { + Nil + } + } + } else { + {createOutputOperationProgressBar(batch)} + queued ++ { + if (firstFailureReason.nonEmpty) { + // Waiting batches have not run yet, so must have no failure reasons. + - + } else { + Nil + } + } + } } + + } +} + +private[ui] class StreamingDataSource(info: Seq[BatchUIData], pageSize: Int, sortColumn: String, + desc: Boolean) extends PagedDataSource[BatchUIData](pageSize) { + + private val data = info.sorted(ordering(sortColumn, desc)) + + override protected def dataSize: Int = data.size + + override protected def sliceData(from: Int, to: Int): Seq[BatchUIData] = data.slice(from, to) + + private def ordering(column: String, desc: Boolean): Ordering[BatchUIData] = { + val ordering: Ordering[BatchUIData] = column match { + case "Batch Time" => Ordering.by(_.batchTime.milliseconds) + case "Records" => Ordering.by(_.numRecords) + case "Scheduling Delay" => Ordering.by(_.schedulingDelay.getOrElse(Long.MaxValue)) + case "Processing Time" => Ordering.by(_.processingDelay.getOrElse(Long.MaxValue)) + case "Total Delay" => Ordering.by(_.totalDelay.getOrElse(Long.MaxValue)) + case unknownColumn => throw new IllegalArgumentException(s"Unknown Column: $unknownColumn") + } + if (desc) { + ordering.reverse + } else { + ordering } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 3bdf009dbce6..65a814823001 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -20,10 +20,13 @@ package org.apache.spark.streaming.ui import java.util.concurrent.TimeUnit import javax.servlet.http.HttpServletRequest +import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.xml.{Node, Unparsed} import org.apache.spark.internal.Logging import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage} +import org.apache.spark.util.Utils /** * A helper class for "scheduling delay", "processing time" and "total delay" to generate data that @@ -86,7 +89,7 @@ private[ui] class StreamingPage(parent: StreamingTab) onClickTimelineFunc ++ basicInfo ++ listener.synchronized { generateStatTable() ++ - generateBatchListTables() + generateBatchListTables(request) } SparkUIUtils.headerSparkPage(request, "Streaming Statistics", content, parent) } @@ -432,50 +435,128 @@ private[ui] class StreamingPage(parent: StreamingTab) } - private def generateBatchListTables(): Seq[Node] = { + private def streamingTable(request: HttpServletRequest, batches: Seq[BatchUIData], + tableTag: String): Seq[Node] = { + val interval: Long = listener.batchDuration + + val parameterOtherTable = request.getParameterMap.asScala + .filterNot(_._1.contains(tableTag)) + .map { case (name, vals) => + name + "=" + vals(0) + } + + val tableType = tableTag match { + case "runningBatches" => (true, false, false) + case "waitingBatches" => (false, true, false) + case "completedBatches" => (false, false, true) + } + + val parameterPage = request.getParameter(s"$tableTag.page") + val parameterDesc = request.getParameter(s"$tableTag.desc") + val parameterSortColumn = request.getParameter(s"$tableTag.sort") + val parameterPageSize = request.getParameter(s"$tableTag.pageSize") + + val streamingPage = Option(parameterPage).map(_.toInt).getOrElse(1) + val streamingSortColumn = Option(parameterSortColumn).map { sortColumn => + SparkUIUtils.decodeURLParameter(sortColumn) + }.getOrElse("Batch Time") + val streamingDesc = Option(parameterDesc).map(_.toBoolean).getOrElse( + streamingSortColumn == "Batch Time" + ) + val streamingPageSize = Option(parameterPageSize).map(_.toInt).getOrElse(100) + + try { + new StreamingPagedTable( + tableTag, + batches, + SparkUIUtils.prependBaseUri(request, parent.basePath), + "streaming", + parameterOtherTable, + streamingPageSize, + streamingSortColumn, + streamingDesc, + tableType._1, + tableType._2, + tableType._3, + interval + ).table(streamingPage) + } catch { + case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) => +
+

Error while rendering streaming table:

+
+            {Utils.exceptionString(e)}
+          
+
+ } + } + + private def generateBatchListTables(request: HttpServletRequest): Seq[Node] = { val runningBatches = listener.runningBatches.sortBy(_.batchTime.milliseconds).reverse val waitingBatches = listener.waitingBatches.sortBy(_.batchTime.milliseconds).reverse val completedBatches = listener.retainedCompletedBatches. sortBy(_.batchTime.milliseconds).reverse - val activeBatchesContent = { -
-
- -

- - Active Batches ({runningBatches.size + waitingBatches.size}) -

-
-
- {new ActiveBatchTable(runningBatches, waitingBatches, listener.batchDuration).toNodeSeq} + val content = mutable.ListBuffer[Node]() + + if (runningBatches.nonEmpty) { + content ++= +
+
+ +

+ + Running Batches ({runningBatches.size}) +

+
+
+ { streamingTable(request, runningBatches, "runningBatches") } +
-
} - val completedBatchesContent = { -
-
- -

- - Completed Batches (last {completedBatches.size} - out of {listener.numTotalCompletedBatches}) -

-
-
- {new CompletedBatchTable(completedBatches, listener.batchDuration).toNodeSeq} + if (waitingBatches.nonEmpty) { + content ++= +
+
+ +

+ + Waiting Batches ({waitingBatches.size}) +

+
+
+ { streamingTable(request, waitingBatches, "waitingBatches") } +
-
} - activeBatchesContent ++ completedBatchesContent + if (completedBatches.nonEmpty) { + content ++= +
+
+ +

+ + Completed Batches (last {completedBatches.size} + out of {listener.numTotalCompletedBatches}) +

+
+
+ { streamingTable(request, completedBatches, "completedBatches") } +
+
+
+ } + content } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala index bdc9e9ee2aed..6dd798a13011 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala @@ -125,20 +125,19 @@ class UISeleniumSuite // Check batch tables val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq - h4Text.exists(_.matches("Active Batches \\(\\d+\\)")) should be (true) h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true) - findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be { - List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)", + findAll(cssSelector("""#runningBatches-table th""")).map(_.text).toList should be { + List("Batch Time", "Records", "Scheduling Delay", "Processing Time", "Output Ops: Succeeded/Total", "Status") } - findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be { - List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)", - "Total Delay (?)", "Output Ops: Succeeded/Total") + findAll(cssSelector("""#completedBatches-table th""")).map(_.text).toList should be { + List("Batch Time", "Records", "Scheduling Delay", "Processing Time", + "Total Delay", "Output Ops: Succeeded/Total") } val batchLinks = - findAll(cssSelector("""#completed-batches-table a""")).flatMap(_.attribute("href")).toSeq + findAll(cssSelector("""#completedBatches-table a""")).flatMap(_.attribute("href")).toSeq batchLinks.size should be >= 1 // Check a normal batch page From f534bc9ecc329705c238d9e6524646ab62cce584 Mon Sep 17 00:00:00 2001 From: iRakson Date: Sun, 3 May 2020 21:27:13 +0530 Subject: [PATCH 2/4] fix --- .../org/apache/spark/streaming/ui/StreamingPage.scala | 6 +++--- .../scala/org/apache/spark/streaming/UISeleniumSuite.scala | 7 ++----- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 65a814823001..15b217c8f8f1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -505,7 +505,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
+ 'aggregated-runningBatches')">

Running Batches ({runningBatches.size}) @@ -524,7 +524,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
+ 'aggregated-waitingBatches')">

Waiting Batches ({waitingBatches.size}) @@ -543,7 +543,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
+ 'aggregated-completedBatches')">

Completed Batches (last {completedBatches.size} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala index 6dd798a13011..aff912e2dd04 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala @@ -127,12 +127,9 @@ class UISeleniumSuite val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true) - findAll(cssSelector("""#runningBatches-table th""")).map(_.text).toList should be { - List("Batch Time", "Records", "Scheduling Delay", "Processing Time", - "Output Ops: Succeeded/Total", "Status") - } + val arrow = 0x25BE.toChar findAll(cssSelector("""#completedBatches-table th""")).map(_.text).toList should be { - List("Batch Time", "Records", "Scheduling Delay", "Processing Time", + List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time", "Total Delay", "Output Ops: Succeeded/Total") } From 701fb70a0c318c1331164b96b07734a0cc89946b Mon Sep 17 00:00:00 2001 From: iRakson Date: Fri, 22 May 2020 16:59:33 +0530 Subject: [PATCH 3/4] rebase and update --- .../spark/streaming/ui/AllBatchesTable.scala | 68 +++---------------- .../spark/streaming/ui/StreamingPage.scala | 27 +------- 2 files changed, 11 insertions(+), 84 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index 0324817b031f..8154834a28dc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -19,27 +19,25 @@ package org.apache.spark.streaming.ui import java.net.URLEncoder import java.nio.charset.StandardCharsets.UTF_8 +import javax.servlet.http.HttpServletRequest -import scala.xml.{Node, Unparsed} +import scala.xml.Node import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils => SparkUIUtils} private[ui] class StreamingPagedTable( + request: HttpServletRequest, tableTag: String, batches: Seq[BatchUIData], basePath: String, subPath: String, - parameterOtherTable: Iterable[String], - pageSize: Int, - sortColumn: String, - desc: Boolean, isRunningTable: Boolean, isWaitingTable: Boolean, isCompletedTable: Boolean, batchInterval: Long) extends PagedTable[BatchUIData] { - private val parameterPath = s"$basePath/$subPath/?${parameterOtherTable.mkString("&")}" - + private val(sortColumn, desc, pageSize) = getTableParameters(request, tableTag, "Batch Time") + private val parameterPath = s"$basePath/$subPath/?${getParameterOtherTable(request, tableTag)}" private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) private val firstFailureReason: Option[String] = @@ -103,6 +101,7 @@ private[ui] class StreamingPagedTable( // tuple containing tooltips for header fields val tooltips = ("Time taken by Streaming scheduler to submit jobs of a batch", "Time taken to process all jobs of a batch", "Total time taken to handle a batch") + // headers, sortable and tooltips val headersAndCssClasses: Seq[(String, Boolean, Option[String])] = { Seq( ("Batch Time", true, None), @@ -126,59 +125,10 @@ private[ui] class StreamingPagedTable( } } } + // check if sort column is a valid sortable column + isSortColumnValid(headersAndCssClasses, sortColumn) - val sortableColumnHeader = headersAndCssClasses.filter { - case (_, sortable, _) => sortable - }.map { case (column, _, _) => column } - - // verify that given column to sort is a valid sortable column - require(sortableColumnHeader.contains(sortColumn), s"Unknown Column: $sortColumn") - - val headerRow: Seq[Node] = { - headersAndCssClasses.map { case (header, sortable, tooltip) => - if (header == sortColumn) { - val headerLink = Unparsed( - parameterPath + - s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + - s"&$tableTag.desc=${!desc}" + - s"&$tableTag.pageSize=$pageSize" + - s"#$tableTag") - val arrow = if (desc) "▾" else "▴" // UP or DOWN - - - - - {header} {Unparsed(arrow)} - - - - } else { - if (sortable) { - val headerLink = Unparsed( - parameterPath + - s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + - s"&$tableTag.pageSize=$pageSize" + - s"#$tableTag") - - - - - {header} - - - - } else { - - {header} - - } - } - } - } - - - {headerRow} - + headerRow(headersAndCssClasses, desc, pageSize, sortColumn, parameterPath, tableTag, tableTag) } override def row(batch: BatchUIData): Seq[Node] = { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 15b217c8f8f1..dd0150ee9aef 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -20,7 +20,6 @@ package org.apache.spark.streaming.ui import java.util.concurrent.TimeUnit import javax.servlet.http.HttpServletRequest -import scala.collection.JavaConverters._ import scala.collection.mutable import scala.xml.{Node, Unparsed} @@ -438,12 +437,7 @@ private[ui] class StreamingPage(parent: StreamingTab) private def streamingTable(request: HttpServletRequest, batches: Seq[BatchUIData], tableTag: String): Seq[Node] = { val interval: Long = listener.batchDuration - - val parameterOtherTable = request.getParameterMap.asScala - .filterNot(_._1.contains(tableTag)) - .map { case (name, vals) => - name + "=" + vals(0) - } + val streamingPage = Option(request.getParameter(s"$tableTag.page")).map(_.toInt).getOrElse(1) val tableType = tableTag match { case "runningBatches" => (true, false, false) @@ -451,30 +445,13 @@ private[ui] class StreamingPage(parent: StreamingTab) case "completedBatches" => (false, false, true) } - val parameterPage = request.getParameter(s"$tableTag.page") - val parameterDesc = request.getParameter(s"$tableTag.desc") - val parameterSortColumn = request.getParameter(s"$tableTag.sort") - val parameterPageSize = request.getParameter(s"$tableTag.pageSize") - - val streamingPage = Option(parameterPage).map(_.toInt).getOrElse(1) - val streamingSortColumn = Option(parameterSortColumn).map { sortColumn => - SparkUIUtils.decodeURLParameter(sortColumn) - }.getOrElse("Batch Time") - val streamingDesc = Option(parameterDesc).map(_.toBoolean).getOrElse( - streamingSortColumn == "Batch Time" - ) - val streamingPageSize = Option(parameterPageSize).map(_.toInt).getOrElse(100) - try { new StreamingPagedTable( + request, tableTag, batches, SparkUIUtils.prependBaseUri(request, parent.basePath), "streaming", - parameterOtherTable, - streamingPageSize, - streamingSortColumn, - streamingDesc, tableType._1, tableType._2, tableType._3, From eb334a2ca0020b1f47f0169a3f03eacde33e2f3a Mon Sep 17 00:00:00 2001 From: iRakson Date: Tue, 2 Jun 2020 19:31:38 +0530 Subject: [PATCH 4/4] fix --- .../org/apache/spark/ui/static/webui.js | 3 +- .../spark/streaming/ui/AllBatchesTable.scala | 51 +++++++++---------- .../spark/streaming/ui/StreamingPage.scala | 9 ---- .../spark/streaming/UISeleniumSuite.scala | 37 ++++++++++++-- 4 files changed, 59 insertions(+), 41 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.js b/core/src/main/resources/org/apache/spark/ui/static/webui.js index 0ba461f02317..9c57283f5543 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.js +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.js @@ -82,7 +82,8 @@ $(function() { collapseTablePageLoad('collapse-aggregated-poolActiveStages','aggregated-poolActiveStages'); collapseTablePageLoad('collapse-aggregated-tasks','aggregated-tasks'); collapseTablePageLoad('collapse-aggregated-rdds','aggregated-rdds'); - collapseTablePageLoad('collapse-aggregated-activeBatches','aggregated-activeBatches'); + collapseTablePageLoad('collapse-aggregated-waitingBatches','aggregated-waitingBatches'); + collapseTablePageLoad('collapse-aggregated-runningBatches','aggregated-runningBatches'); collapseTablePageLoad('collapse-aggregated-completedBatches','aggregated-completedBatches'); collapseTablePageLoad('collapse-aggregated-runningExecutions','aggregated-runningExecutions'); collapseTablePageLoad('collapse-aggregated-completedExecutions','aggregated-completedExecutions'); diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index 8154834a28dc..c0eec0e0b0a8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -31,9 +31,6 @@ private[ui] class StreamingPagedTable( batches: Seq[BatchUIData], basePath: String, subPath: String, - isRunningTable: Boolean, - isWaitingTable: Boolean, - isCompletedTable: Boolean, batchInterval: Long) extends PagedTable[BatchUIData] { private val(sortColumn, desc, pageSize) = getTableParameters(request, tableTag, "Batch Time") @@ -41,7 +38,7 @@ private[ui] class StreamingPagedTable( private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) private val firstFailureReason: Option[String] = - if (!isWaitingTable) { + if (!tableTag.equals("waitingBatches")) { getFirstFailureReason(batches) } else { None @@ -64,13 +61,17 @@ private[ui] class StreamingPagedTable( } private def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = { - SparkUIUtils.makeProgressBar( - started = batch.numActiveOutputOp, - completed = batch.numCompletedOutputOp, - failed = batch.numFailedOutputOp, - skipped = 0, - reasonToNumKilled = Map.empty, - total = batch.outputOperations.size) + + { + SparkUIUtils.makeProgressBar( + started = batch.numActiveOutputOp, + completed = batch.numCompletedOutputOp, + failed = batch.numFailedOutputOp, + skipped = 0, + reasonToNumKilled = Map.empty, + total = batch.outputOperations.size) + } + } override def tableId: String = s"$tableTag-table" @@ -98,19 +99,17 @@ private[ui] class StreamingPagedTable( new StreamingDataSource(batches, pageSize, sortColumn, desc) override def headers: Seq[Node] = { - // tuple containing tooltips for header fields - val tooltips = ("Time taken by Streaming scheduler to submit jobs of a batch", - "Time taken to process all jobs of a batch", "Total time taken to handle a batch") // headers, sortable and tooltips val headersAndCssClasses: Seq[(String, Boolean, Option[String])] = { Seq( ("Batch Time", true, None), ("Records", true, None), - ("Scheduling Delay", true, Some(tooltips._1)), - ("Processing Time", true, Some(tooltips._2))) ++ { - if (isCompletedTable) { + ("Scheduling Delay", true, Some("Time taken by Streaming scheduler to submit jobs " + + "of a batch")), + ("Processing Time", true, Some("Time taken to process all jobs of a batch"))) ++ { + if (tableTag.equals("completedBatches")) { Seq( - ("Total Delay", true, Some(tooltips._3)), + ("Total Delay", true, Some("Total time taken to handle a batch")), ("Output Ops: Succeeded/Total", false, None)) } else { Seq( @@ -144,26 +143,26 @@ private[ui] class StreamingPagedTable( val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-") - + {formattedBatchTime} - {numRecords.toString} Records + {numRecords.toString} records {formattedSchedulingDelay} {formattedProcessingTime} { - if (isCompletedTable) { - {formattedTotalDelay} - {createOutputOperationProgressBar(batch)} ++ { + if (tableTag.equals("completedBatches")) { + {formattedTotalDelay} ++ + createOutputOperationProgressBar(batch) ++ { if (firstFailureReason.nonEmpty) { getFirstFailureTableCell(batch) } else { Nil } } - } else if (isRunningTable) { - {createOutputOperationProgressBar(batch)} + } else if (tableTag.equals("runningBatches")) { + createOutputOperationProgressBar(batch) ++ processing ++ { if (firstFailureReason.nonEmpty) { getFirstFailureTableCell(batch) @@ -172,7 +171,7 @@ private[ui] class StreamingPagedTable( } } } else { - {createOutputOperationProgressBar(batch)} + createOutputOperationProgressBar(batch) ++ queued ++ { if (firstFailureReason.nonEmpty) { // Waiting batches have not run yet, so must have no failure reasons. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index dd0150ee9aef..42d0e50a068e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -439,12 +439,6 @@ private[ui] class StreamingPage(parent: StreamingTab) val interval: Long = listener.batchDuration val streamingPage = Option(request.getParameter(s"$tableTag.page")).map(_.toInt).getOrElse(1) - val tableType = tableTag match { - case "runningBatches" => (true, false, false) - case "waitingBatches" => (false, true, false) - case "completedBatches" => (false, false, true) - } - try { new StreamingPagedTable( request, @@ -452,9 +446,6 @@ private[ui] class StreamingPage(parent: StreamingTab) batches, SparkUIUtils.prependBaseUri(request, parent.basePath), "streaming", - tableType._1, - tableType._2, - tableType._3, interval ).table(streamingPage) } catch { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala index aff912e2dd04..952ef6c374f3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala @@ -63,7 +63,7 @@ class UISeleniumSuite .setMaster("local") .setAppName("test") .set(UI_ENABLED, true) - val ssc = new StreamingContext(conf, Seconds(1)) + val ssc = new StreamingContext(conf, Milliseconds(100)) assert(ssc.sc.ui.isDefined, "Spark UI is not started!") ssc } @@ -104,7 +104,7 @@ class UISeleniumSuite find(cssSelector( """ul li a[href*="streaming"]""")) should not be (None) } - eventually(timeout(10.seconds), interval(50.milliseconds)) { + eventually(timeout(10.seconds), interval(500.milliseconds)) { // check whether streaming page exists go to (sparkUI.webUrl.stripSuffix("/") + "/streaming") val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq @@ -125,20 +125,47 @@ class UISeleniumSuite // Check batch tables val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq + h4Text.exists(_.matches("Running Batches \\(\\d+\\)")) should be (true) + h4Text.exists(_.matches("Waiting Batches \\(\\d+\\)")) should be (true) h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true) val arrow = 0x25BE.toChar + findAll(cssSelector("""#runningBatches-table th""")).map(_.text).toList should be { + List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time", + "Output Ops: Succeeded/Total", "Status") + } + findAll(cssSelector("""#waitingBatches-table th""")).map(_.text).toList should be { + List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time", + "Output Ops: Succeeded/Total", "Status") + } findAll(cssSelector("""#completedBatches-table th""")).map(_.text).toList should be { List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time", "Total Delay", "Output Ops: Succeeded/Total") } - val batchLinks = - findAll(cssSelector("""#completedBatches-table a""")).flatMap(_.attribute("href")).toSeq + val pageSize = 3 + val pagedTablePath = "/streaming/?completedBatches.sort=Batch+Time" + + "&completedBatches.desc=true&completedBatches.page=1" + + s"&completedBatches.pageSize=$pageSize#completedBatches" + + go to (sparkUI.webUrl.stripSuffix("/") + pagedTablePath) + val completedTableRows = findAll(cssSelector("""#completedBatches-table tr""")) + .map(_.text).toList + // header row + pagesize + completedTableRows.length should be (1 + pageSize) + + val sortedBatchTimePath = "/streaming/?&completedBatches.sort=Batch+Time" + + "&completedBatches.desc=false&completedBatches.pageSize=3#completedBatches" + + // sort batches in ascending order of batch time + go to (sparkUI.webUrl.stripSuffix("/") + sortedBatchTimePath) + + val batchLinks = findAll(cssSelector("""#completedBatches-table td a""")) + .flatMap(_.attribute("href")).toSeq batchLinks.size should be >= 1 // Check a normal batch page - go to (batchLinks.last) // Last should be the first batch, so it will have some jobs + go to (batchLinks.head) // Head is the first batch, so it will have some jobs val summaryText = findAll(cssSelector("li strong")).map(_.text).toSeq summaryText should contain ("Batch Duration:") summaryText should contain ("Input data size:")