Skip to content

Commit eb334a2

Browse files
committed
fix
1 parent 701fb70 commit eb334a2

File tree

4 files changed

+59
-41
lines changed

4 files changed

+59
-41
lines changed

core/src/main/resources/org/apache/spark/ui/static/webui.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ $(function() {
8282
collapseTablePageLoad('collapse-aggregated-poolActiveStages','aggregated-poolActiveStages');
8383
collapseTablePageLoad('collapse-aggregated-tasks','aggregated-tasks');
8484
collapseTablePageLoad('collapse-aggregated-rdds','aggregated-rdds');
85-
collapseTablePageLoad('collapse-aggregated-activeBatches','aggregated-activeBatches');
85+
collapseTablePageLoad('collapse-aggregated-waitingBatches','aggregated-waitingBatches');
86+
collapseTablePageLoad('collapse-aggregated-runningBatches','aggregated-runningBatches');
8687
collapseTablePageLoad('collapse-aggregated-completedBatches','aggregated-completedBatches');
8788
collapseTablePageLoad('collapse-aggregated-runningExecutions','aggregated-runningExecutions');
8889
collapseTablePageLoad('collapse-aggregated-completedExecutions','aggregated-completedExecutions');

streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,14 @@ private[ui] class StreamingPagedTable(
3131
batches: Seq[BatchUIData],
3232
basePath: String,
3333
subPath: String,
34-
isRunningTable: Boolean,
35-
isWaitingTable: Boolean,
36-
isCompletedTable: Boolean,
3734
batchInterval: Long) extends PagedTable[BatchUIData] {
3835

3936
private val(sortColumn, desc, pageSize) = getTableParameters(request, tableTag, "Batch Time")
4037
private val parameterPath = s"$basePath/$subPath/?${getParameterOtherTable(request, tableTag)}"
4138
private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
4239

4340
private val firstFailureReason: Option[String] =
44-
if (!isWaitingTable) {
41+
if (!tableTag.equals("waitingBatches")) {
4542
getFirstFailureReason(batches)
4643
} else {
4744
None
@@ -64,13 +61,17 @@ private[ui] class StreamingPagedTable(
6461
}
6562

6663
private def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = {
67-
SparkUIUtils.makeProgressBar(
68-
started = batch.numActiveOutputOp,
69-
completed = batch.numCompletedOutputOp,
70-
failed = batch.numFailedOutputOp,
71-
skipped = 0,
72-
reasonToNumKilled = Map.empty,
73-
total = batch.outputOperations.size)
64+
<td class="progress-cell">
65+
{
66+
SparkUIUtils.makeProgressBar(
67+
started = batch.numActiveOutputOp,
68+
completed = batch.numCompletedOutputOp,
69+
failed = batch.numFailedOutputOp,
70+
skipped = 0,
71+
reasonToNumKilled = Map.empty,
72+
total = batch.outputOperations.size)
73+
}
74+
</td>
7475
}
7576

7677
override def tableId: String = s"$tableTag-table"
@@ -98,19 +99,17 @@ private[ui] class StreamingPagedTable(
9899
new StreamingDataSource(batches, pageSize, sortColumn, desc)
99100

100101
override def headers: Seq[Node] = {
101-
// tuple containing tooltips for header fields
102-
val tooltips = ("Time taken by Streaming scheduler to submit jobs of a batch",
103-
"Time taken to process all jobs of a batch", "Total time taken to handle a batch")
104102
// headers, sortable and tooltips
105103
val headersAndCssClasses: Seq[(String, Boolean, Option[String])] = {
106104
Seq(
107105
("Batch Time", true, None),
108106
("Records", true, None),
109-
("Scheduling Delay", true, Some(tooltips._1)),
110-
("Processing Time", true, Some(tooltips._2))) ++ {
111-
if (isCompletedTable) {
107+
("Scheduling Delay", true, Some("Time taken by Streaming scheduler to submit jobs " +
108+
"of a batch")),
109+
("Processing Time", true, Some("Time taken to process all jobs of a batch"))) ++ {
110+
if (tableTag.equals("completedBatches")) {
112111
Seq(
113-
("Total Delay", true, Some(tooltips._3)),
112+
("Total Delay", true, Some("Total time taken to handle a batch")),
114113
("Output Ops: Succeeded/Total", false, None))
115114
} else {
116115
Seq(
@@ -144,26 +143,26 @@ private[ui] class StreamingPagedTable(
144143
val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
145144

146145
<tr>
147-
<td id = {batchTimeId} isFailed = {batch.isFailed.toString}>
146+
<td id={batchTimeId} isFailed={batch.isFailed.toString}>
148147
<a href={s"batch?id=$batchTime"}>
149148
{formattedBatchTime}
150149
</a>
151150
</td>
152-
<td> {numRecords.toString} Records </td>
151+
<td> {numRecords.toString} records </td>
153152
<td> {formattedSchedulingDelay} </td>
154153
<td> {formattedProcessingTime} </td>
155154
{
156-
if (isCompletedTable) {
157-
<td> {formattedTotalDelay} </td>
158-
<td class="progress-cell"> {createOutputOperationProgressBar(batch)} </td> ++ {
155+
if (tableTag.equals("completedBatches")) {
156+
<td> {formattedTotalDelay} </td> ++
157+
createOutputOperationProgressBar(batch) ++ {
159158
if (firstFailureReason.nonEmpty) {
160159
getFirstFailureTableCell(batch)
161160
} else {
162161
Nil
163162
}
164163
}
165-
} else if (isRunningTable) {
166-
<td class="progress-cell"> {createOutputOperationProgressBar(batch)} </td>
164+
} else if (tableTag.equals("runningBatches")) {
165+
createOutputOperationProgressBar(batch) ++
167166
<td> processing </td> ++ {
168167
if (firstFailureReason.nonEmpty) {
169168
getFirstFailureTableCell(batch)
@@ -172,7 +171,7 @@ private[ui] class StreamingPagedTable(
172171
}
173172
}
174173
} else {
175-
<td class="progress-cell"> {createOutputOperationProgressBar(batch)} </td>
174+
createOutputOperationProgressBar(batch) ++
176175
<td> queued </td> ++ {
177176
if (firstFailureReason.nonEmpty) {
178177
// Waiting batches have not run yet, so must have no failure reasons.

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -439,22 +439,13 @@ private[ui] class StreamingPage(parent: StreamingTab)
439439
val interval: Long = listener.batchDuration
440440
val streamingPage = Option(request.getParameter(s"$tableTag.page")).map(_.toInt).getOrElse(1)
441441

442-
val tableType = tableTag match {
443-
case "runningBatches" => (true, false, false)
444-
case "waitingBatches" => (false, true, false)
445-
case "completedBatches" => (false, false, true)
446-
}
447-
448442
try {
449443
new StreamingPagedTable(
450444
request,
451445
tableTag,
452446
batches,
453447
SparkUIUtils.prependBaseUri(request, parent.basePath),
454448
"streaming",
455-
tableType._1,
456-
tableType._2,
457-
tableType._3,
458449
interval
459450
).table(streamingPage)
460451
} catch {

streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class UISeleniumSuite
6363
.setMaster("local")
6464
.setAppName("test")
6565
.set(UI_ENABLED, true)
66-
val ssc = new StreamingContext(conf, Seconds(1))
66+
val ssc = new StreamingContext(conf, Milliseconds(100))
6767
assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
6868
ssc
6969
}
@@ -104,7 +104,7 @@ class UISeleniumSuite
104104
find(cssSelector( """ul li a[href*="streaming"]""")) should not be (None)
105105
}
106106

107-
eventually(timeout(10.seconds), interval(50.milliseconds)) {
107+
eventually(timeout(10.seconds), interval(500.milliseconds)) {
108108
// check whether streaming page exists
109109
go to (sparkUI.webUrl.stripSuffix("/") + "/streaming")
110110
val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
@@ -125,20 +125,47 @@ class UISeleniumSuite
125125

126126
// Check batch tables
127127
val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq
128+
h4Text.exists(_.matches("Running Batches \\(\\d+\\)")) should be (true)
129+
h4Text.exists(_.matches("Waiting Batches \\(\\d+\\)")) should be (true)
128130
h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true)
129131

130132
val arrow = 0x25BE.toChar
133+
findAll(cssSelector("""#runningBatches-table th""")).map(_.text).toList should be {
134+
List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time",
135+
"Output Ops: Succeeded/Total", "Status")
136+
}
137+
findAll(cssSelector("""#waitingBatches-table th""")).map(_.text).toList should be {
138+
List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time",
139+
"Output Ops: Succeeded/Total", "Status")
140+
}
131141
findAll(cssSelector("""#completedBatches-table th""")).map(_.text).toList should be {
132142
List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time",
133143
"Total Delay", "Output Ops: Succeeded/Total")
134144
}
135145

136-
val batchLinks =
137-
findAll(cssSelector("""#completedBatches-table a""")).flatMap(_.attribute("href")).toSeq
146+
val pageSize = 3
147+
val pagedTablePath = "/streaming/?completedBatches.sort=Batch+Time" +
148+
"&completedBatches.desc=true&completedBatches.page=1" +
149+
s"&completedBatches.pageSize=$pageSize#completedBatches"
150+
151+
go to (sparkUI.webUrl.stripSuffix("/") + pagedTablePath)
152+
val completedTableRows = findAll(cssSelector("""#completedBatches-table tr"""))
153+
.map(_.text).toList
154+
// header row + pagesize
155+
completedTableRows.length should be (1 + pageSize)
156+
157+
val sortedBatchTimePath = "/streaming/?&completedBatches.sort=Batch+Time" +
158+
"&completedBatches.desc=false&completedBatches.pageSize=3#completedBatches"
159+
160+
// sort batches in ascending order of batch time
161+
go to (sparkUI.webUrl.stripSuffix("/") + sortedBatchTimePath)
162+
163+
val batchLinks = findAll(cssSelector("""#completedBatches-table td a"""))
164+
.flatMap(_.attribute("href")).toSeq
138165
batchLinks.size should be >= 1
139166

140167
// Check a normal batch page
141-
go to (batchLinks.last) // Last should be the first batch, so it will have some jobs
168+
go to (batchLinks.head) // Head is the first batch, so it will have some jobs
142169
val summaryText = findAll(cssSelector("li strong")).map(_.text).toSeq
143170
summaryText should contain ("Batch Duration:")
144171
summaryText should contain ("Input data size:")

0 commit comments

Comments
 (0)