Skip to content

Commit ffe6831

Browse files
zsxwingtdas
authored andcommitted
[SPARK-10885] [STREAMING] Display the failed output op in Streaming UI
This PR implements the following features for both `master` and `branch-1.5`. 1. Display the failed output op count in the batch list 2. Display the failure reason of output op in the batch detail page Screenshots: <img width="1356" alt="1" src="https://cloud.githubusercontent.com/assets/1000778/10198387/5b2b97ec-67ce-11e5-81c2-f818b9d2f3ad.png"> <img width="1356" alt="2" src="https://cloud.githubusercontent.com/assets/1000778/10198388/5b76ac14-67ce-11e5-8c8b-de2683c5b485.png"> There are still two remaining problems in the UI. 1. If an output operation doesn't run any spark job, we cannot get the its duration since now it's the sum of all jobs' durations. 2. If an output operation doesn't run any spark job, we cannot get the description since it's the latest job's call site. We need to add new `StreamingListenerEvent` about output operations to fix them. So I'd like to fix them only for `master` in another PR. Author: zsxwing <[email protected]> Closes #8950 from zsxwing/batch-failure.
1 parent 5e03540 commit ffe6831

File tree

6 files changed

+143
-27
lines changed

6 files changed

+143
-27
lines changed

streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ case class BatchInfo(
4141

4242
private var _failureReasons: Map[Int, String] = Map.empty
4343

44+
private var _numOutputOp: Int = 0
45+
4446
@deprecated("Use streamIdToInputInfo instead", "1.5.0")
4547
def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords)
4648

@@ -77,4 +79,12 @@ case class BatchInfo(
7779

7880
/** Failure reasons corresponding to every output ops in the batch */
7981
private[streaming] def failureReasons = _failureReasons
82+
83+
/** Set the number of output operations in this batch */
84+
private[streaming] def setNumOutputOp(numOutputOp: Int): Unit = {
85+
_numOutputOp = numOutputOp
86+
}
87+
88+
/** Return the number of output operations in this batch */
89+
private[streaming] def numOutputOp: Int = _numOutputOp
8090
}

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ case class JobSet(
8181
if (processingEndTime >= 0) Some(processingEndTime) else None
8282
)
8383
binfo.setFailureReason(failureReasons)
84+
binfo.setNumOutputOp(jobs.size)
8485
binfo
8586
}
8687
}

streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,10 @@ private[ui] class ActiveBatchTable(
107107
private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long)
108108
extends BatchTableBase("completed-batches-table", batchInterval) {
109109

110-
override protected def columns: Seq[Node] = super.columns ++
111-
<th>Total Delay
112-
{SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
110+
override protected def columns: Seq[Node] = super.columns ++ {
111+
<th>Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
112+
<th>Output Ops: Succeeded/Total</th>
113+
}
113114

114115
override protected def renderRows: Seq[Node] = {
115116
batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>)
@@ -118,9 +119,17 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval:
118119
private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
119120
val totalDelay = batch.totalDelay
120121
val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
122+
val numFailedOutputOp = batch.failureReason.size
123+
val outputOpColumn = if (numFailedOutputOp > 0) {
124+
s"${batch.numOutputOp - numFailedOutputOp}/${batch.numOutputOp}" +
125+
s" (${numFailedOutputOp} failed)"
126+
} else {
127+
s"${batch.numOutputOp}/${batch.numOutputOp}"
128+
}
121129
baseRow(batch) ++
122130
<td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
123131
{formattedTotalDelay}
124132
</td>
133+
<td>{outputOpColumn}</td>
125134
}
126135
}

streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala

Lines changed: 113 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
3838
<th>Output Op Id</th>
3939
<th>Description</th>
4040
<th>Duration</th>
41+
<th>Status</th>
4142
<th>Job Id</th>
4243
<th>Duration</th>
4344
<th class="sorttable_nosort">Stages: Succeeded/Total</th>
@@ -49,18 +50,42 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
4950
outputOpId: OutputOpId,
5051
outputOpDescription: Seq[Node],
5152
formattedOutputOpDuration: String,
53+
outputOpStatus: String,
5254
numSparkJobRowsInOutputOp: Int,
5355
isFirstRow: Boolean,
5456
sparkJob: SparkJobIdWithUIData): Seq[Node] = {
5557
if (sparkJob.jobUIData.isDefined) {
5658
generateNormalJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
57-
numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
59+
outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
5860
} else {
5961
generateDroppedJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
60-
numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
62+
outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
6163
}
6264
}
6365

66+
private def generateOutputOpRowWithoutSparkJobs(
67+
outputOpId: OutputOpId,
68+
outputOpDescription: Seq[Node],
69+
formattedOutputOpDuration: String,
70+
outputOpStatus: String): Seq[Node] = {
71+
<tr>
72+
<td class="output-op-id-cell" >{outputOpId.toString}</td>
73+
<td>{outputOpDescription}</td>
74+
<td>{formattedOutputOpDuration}</td>
75+
{outputOpStatusCell(outputOpStatus, rowspan = 1)}
76+
<!-- Job Id -->
77+
<td>-</td>
78+
<!-- Duration -->
79+
<td>-</td>
80+
<!-- Stages: Succeeded/Total -->
81+
<td>-</td>
82+
<!-- Tasks (for all stages): Succeeded/Total -->
83+
<td>-</td>
84+
<!-- Error -->
85+
<td>-</td>
86+
</tr>
87+
}
88+
6489
/**
6590
* Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into
6691
* one cell, we use "rowspan" for the first row of a output op.
@@ -69,6 +94,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
6994
outputOpId: OutputOpId,
7095
outputOpDescription: Seq[Node],
7196
formattedOutputOpDuration: String,
97+
outputOpStatus: String,
7298
numSparkJobRowsInOutputOp: Int,
7399
isFirstRow: Boolean,
74100
sparkJob: JobUIData): Seq[Node] = {
@@ -94,7 +120,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
94120
<td rowspan={numSparkJobRowsInOutputOp.toString}>
95121
{outputOpDescription}
96122
</td>
97-
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
123+
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> ++
124+
{outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)}
98125
} else {
99126
Nil
100127
}
@@ -125,7 +152,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
125152
total = sparkJob.numTasks - sparkJob.numSkippedTasks)
126153
}
127154
</td>
128-
{failureReasonCell(lastFailureReason)}
155+
{failureReasonCell(lastFailureReason, rowspan = 1)}
129156
</tr>
130157
}
131158

@@ -137,6 +164,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
137164
outputOpId: OutputOpId,
138165
outputOpDescription: Seq[Node],
139166
formattedOutputOpDuration: String,
167+
outputOpStatus: String,
140168
numSparkJobRowsInOutputOp: Int,
141169
isFirstRow: Boolean,
142170
jobId: Int): Seq[Node] = {
@@ -147,7 +175,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
147175
if (isFirstRow) {
148176
<td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
149177
<td rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpDescription}</td>
150-
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
178+
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> ++
179+
{outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)}
151180
} else {
152181
Nil
153182
}
@@ -156,7 +185,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
156185
<tr>
157186
{prefixCells}
158187
<td sorttable_customkey={jobId.toString}>
159-
{jobId.toString}
188+
{if (jobId >= 0) jobId.toString else "-"}
160189
</td>
161190
<!-- Duration -->
162191
<td>-</td>
@@ -170,7 +199,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
170199
}
171200

172201
private def generateOutputOpIdRow(
173-
outputOpId: OutputOpId, sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
202+
outputOpId: OutputOpId,
203+
outputOpStatus: String,
204+
sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
174205
// We don't count the durations of dropped jobs
175206
val sparkJobDurations = sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get).
176207
map(sparkJob => {
@@ -189,12 +220,32 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
189220

190221
val description = generateOutputOpDescription(sparkJobs)
191222

192-
generateJobRow(
193-
outputOpId, description, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
194-
sparkJobs.tail.map { sparkJob =>
223+
if (sparkJobs.isEmpty) {
224+
generateOutputOpRowWithoutSparkJobs(
225+
outputOpId, description, formattedOutputOpDuration, outputOpStatus)
226+
} else {
227+
val firstRow =
195228
generateJobRow(
196-
outputOpId, description, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
197-
}.flatMap(x => x)
229+
outputOpId,
230+
description,
231+
formattedOutputOpDuration,
232+
outputOpStatus,
233+
sparkJobs.size,
234+
true,
235+
sparkJobs.head)
236+
val tailRows =
237+
sparkJobs.tail.map { sparkJob =>
238+
generateJobRow(
239+
outputOpId,
240+
description,
241+
formattedOutputOpDuration,
242+
outputOpStatus,
243+
sparkJobs.size,
244+
false,
245+
sparkJob)
246+
}
247+
(firstRow ++ tailRows).flatten
248+
}
198249
}
199250

200251
private def generateOutputOpDescription(sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
@@ -228,7 +279,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
228279
}
229280
}
230281

231-
private def failureReasonCell(failureReason: String): Seq[Node] = {
282+
private def failureReasonCell(
283+
failureReason: String,
284+
rowspan: Int,
285+
includeFirstLineInExpandDetails: Boolean = true): Seq[Node] = {
232286
val isMultiline = failureReason.indexOf('\n') >= 0
233287
// Display the first line by default
234288
val failureReasonSummary = StringEscapeUtils.escapeHtml4(
@@ -237,20 +291,34 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
237291
} else {
238292
failureReason
239293
})
294+
val failureDetails =
295+
if (isMultiline && !includeFirstLineInExpandDetails) {
296+
// Skip the first line
297+
failureReason.substring(failureReason.indexOf('\n') + 1)
298+
} else {
299+
failureReason
300+
}
240301
val details = if (isMultiline) {
241302
// scalastyle:off
242303
<span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
243304
class="expand-details">
244305
+details
245306
</span> ++
246307
<div class="stacktrace-details collapsed">
247-
<pre>{failureReason}</pre>
308+
<pre>{failureDetails}</pre>
248309
</div>
249310
// scalastyle:on
250311
} else {
251312
""
252313
}
253-
<td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
314+
315+
if (rowspan == 1) {
316+
<td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
317+
} else {
318+
<td valign="middle" style="max-width: 300px" rowspan={rowspan.toString}>
319+
{failureReasonSummary}{details}
320+
</td>
321+
}
254322
}
255323

256324
private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = {
@@ -265,16 +333,31 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
265333
* Generate the job table for the batch.
266334
*/
267335
private def generateJobTable(batchUIData: BatchUIData): Seq[Node] = {
268-
val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).toSeq.
269-
sortBy(_._1). // sorted by OutputOpId
336+
val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).
270337
map { case (outputOpId, outputOpIdAndSparkJobIds) =>
271338
// sort SparkJobIds for each OutputOpId
272339
(outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
273340
}
341+
val outputOps = (0 until batchUIData.numOutputOp).map { outputOpId =>
342+
val status = batchUIData.failureReason.get(outputOpId).map { failure =>
343+
if (failure.startsWith("org.apache.spark.SparkException")) {
344+
"Failed due to Spark job error\n" + failure
345+
} else {
346+
var nextLineIndex = failure.indexOf("\n")
347+
if (nextLineIndex < 0) {
348+
nextLineIndex = failure.size
349+
}
350+
val firstLine = failure.substring(0, nextLineIndex)
351+
s"Failed due to error: $firstLine\n$failure"
352+
}
353+
}.getOrElse("Succeeded")
354+
val sparkJobIds = outputOpIdToSparkJobIds.getOrElse(outputOpId, Seq.empty)
355+
(outputOpId, status, sparkJobIds)
356+
}
274357
sparkListener.synchronized {
275-
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[SparkJobIdWithUIData])] =
276-
outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
277-
(outputOpId,
358+
val outputOpIdWithJobs: Seq[(OutputOpId, String, Seq[SparkJobIdWithUIData])] =
359+
outputOps.map { case (outputOpId, status, sparkJobIds) =>
360+
(outputOpId, status,
278361
sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId))))
279362
}
280363

@@ -285,7 +368,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
285368
<tbody>
286369
{
287370
outputOpIdWithJobs.map {
288-
case (outputOpId, sparkJobIds) => generateOutputOpIdRow(outputOpId, sparkJobIds)
371+
case (outputOpId, status, sparkJobIds) =>
372+
generateOutputOpIdRow(outputOpId, status, sparkJobIds)
289373
}
290374
}
291375
</tbody>
@@ -386,4 +470,12 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
386470
Unparsed(StringEscapeUtils.escapeHtml4(metadataDescription).
387471
replaceAllLiterally("\t", "&nbsp;&nbsp;&nbsp;&nbsp;").replaceAllLiterally("\n", "<br/>"))
388472
}
473+
474+
private def outputOpStatusCell(status: String, rowspan: Int): Seq[Node] = {
475+
if (status == "Succeeded") {
476+
<td rowspan={rowspan.toString}>Succeeded</td>
477+
} else {
478+
failureReasonCell(status, rowspan, includeFirstLineInExpandDetails = false)
479+
}
480+
}
389481
}

streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ private[ui] case class BatchUIData(
3030
val submissionTime: Long,
3131
val processingStartTime: Option[Long],
3232
val processingEndTime: Option[Long],
33+
val numOutputOp: Int,
34+
val failureReason: Map[Int, String],
3335
var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty) {
3436

3537
/**
@@ -69,7 +71,9 @@ private[ui] object BatchUIData {
6971
batchInfo.streamIdToInputInfo,
7072
batchInfo.submissionTime,
7173
batchInfo.processingStartTime,
72-
batchInfo.processingEndTime
74+
batchInfo.processingEndTime,
75+
batchInfo.numOutputOp,
76+
batchInfo.failureReasons
7377
)
7478
}
7579
}

streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ class UISeleniumSuite
121121
}
122122
findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
123123
List("Batch Time", "Input Size", "Scheduling Delay (?)", "Processing Time (?)",
124-
"Total Delay (?)")
124+
"Total Delay (?)", "Output Ops: Succeeded/Total")
125125
}
126126

127127
val batchLinks =
@@ -138,7 +138,7 @@ class UISeleniumSuite
138138
summaryText should contain ("Total delay:")
139139

140140
findAll(cssSelector("""#batch-job-table th""")).map(_.text).toSeq should be {
141-
List("Output Op Id", "Description", "Duration", "Job Id", "Duration",
141+
List("Output Op Id", "Description", "Duration", "Status", "Job Id", "Duration",
142142
"Stages: Succeeded/Total", "Tasks (for all stages): Succeeded/Total", "Error")
143143
}
144144

0 commit comments

Comments
 (0)