Skip to content

Commit 94f2566

Browse files
committed
Change Utils.exceptionString to contain the inner exceptions and make the error information in Web UI more friendly
1 parent 76386e1 commit 94f2566

File tree

10 files changed

+121
-23
lines changed

10 files changed

+121
-23
lines changed

core/src/main/scala/org/apache/spark/TaskEndReason.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,11 @@ case class FetchFailed(
8888
case class ExceptionFailure(
8989
className: String,
9090
description: String,
91-
stackTrace: Array[StackTraceElement],
91+
stackTrace: String,
9292
metrics: Option[TaskMetrics])
9393
extends TaskFailedReason {
94-
override def toErrorString: String = Utils.exceptionString(className, description, stackTrace)
94+
95+
override def toErrorString: String = stackTrace
9596
}
9697

9798
/**

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,8 @@ private[spark] class Executor(
257257
m.executorRunTime = serviceTime
258258
m.jvmGCTime = gcTime - startGCTime
259259
}
260-
val reason = ExceptionFailure(t.getClass.getName, t.getMessage, t.getStackTrace, metrics)
260+
val reason = ExceptionFailure(t.getClass.getName, t.getMessage, Utils.exceptionString(t),
261+
metrics)
261262
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
262263

263264
// Don't forcibly exit unless the exception was inherently fatal, to avoid

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1063,7 +1063,7 @@ class DAGScheduler(
10631063
if (runningStages.contains(failedStage)) {
10641064
logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
10651065
s"due to a fetch failure from $mapStage (${mapStage.name})")
1066-
markStageAsFinished(failedStage, Some("Fetch failure: " + failureMessage))
1066+
markStageAsFinished(failedStage, Some(failureMessage))
10671067
runningStages -= failedStage
10681068
}
10691069

core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,30 @@ private[spark] class FetchFailedException(
3232
shuffleId: Int,
3333
mapId: Int,
3434
reduceId: Int,
35-
message: String)
36-
extends Exception(message) {
35+
message: String,
36+
cause: Throwable)
37+
extends Exception(message, cause) {
38+
39+
def this(
40+
bmAddress: BlockManagerId,
41+
shuffleId: Int,
42+
mapId: Int,
43+
reduceId: Int,
44+
message: String) {
45+
this(bmAddress, shuffleId, mapId, reduceId, message, null)
46+
}
47+
48+
def this(
49+
bmAddress: BlockManagerId,
50+
shuffleId: Int,
51+
mapId: Int,
52+
reduceId: Int,
53+
cause: Throwable) {
54+
this(bmAddress, shuffleId, mapId, reduceId, cause.getMessage, cause)
55+
}
3756

38-
def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, message)
57+
def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId,
58+
Utils.exceptionString(this))
3959
}
4060

4161
/**

core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark._
2525
import org.apache.spark.serializer.Serializer
2626
import org.apache.spark.shuffle.FetchFailedException
2727
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId}
28-
import org.apache.spark.util.{CompletionIterator, Utils}
28+
import org.apache.spark.util.CompletionIterator
2929

3030
private[hash] object BlockStoreShuffleFetcher extends Logging {
3131
def fetch[T](
@@ -64,8 +64,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
6464
blockId match {
6565
case ShuffleBlockId(shufId, mapId, _) =>
6666
val address = statuses(mapId.toInt)._1
67-
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId,
68-
Utils.exceptionString(e))
67+
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e)
6968
case _ =>
7069
throw new SparkException(
7170
"Failed to get block " + blockId + ", which is not a shuffle block", e)

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import javax.servlet.http.HttpServletRequest
2222

2323
import scala.xml.{Node, Unparsed}
2424

25+
import org.apache.commons.lang3.StringEscapeUtils
26+
2527
import org.apache.spark.executor.TaskMetrics
2628
import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils}
2729
import org.apache.spark.ui.jobs.UIData._
@@ -409,13 +411,35 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
409411
{diskBytesSpilledReadable}
410412
</td>
411413
}}
412-
<td>
413-
{errorMessage.map { e => <pre>{e}</pre> }.getOrElse("")}
414-
</td>
414+
{errorMessageCell(errorMessage)}
415415
</tr>
416416
}
417417
}
418418

419+
private def errorMessageCell(errorMessage: Option[String]): Seq[Node] = {
420+
val error = errorMessage.getOrElse("")
421+
val isMultiline = error.indexOf('\n') >= 0
422+
// Display the first line by default
423+
val errorSummary = StringEscapeUtils.escapeHtml4(
424+
if (isMultiline) {
425+
error.substring(0, error.indexOf('\n'))
426+
} else {
427+
error
428+
})
429+
val details = if (isMultiline) {
430+
<span onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
431+
class="expand-details">
432+
+details
433+
</span> ++
434+
<div class="stage-details collapsed">
435+
<pre>{error}</pre>
436+
</div>
437+
} else {
438+
""
439+
}
440+
<td>{errorSummary}{details}</td>
441+
}
442+
419443
private def getSchedulerDelay(info: TaskInfo, metrics: TaskMetrics): Long = {
420444
val totalExecutionTime = {
421445
if (info.gettingResultTime > 0) {

core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import scala.xml.Text
2222

2323
import java.util.Date
2424

25+
import org.apache.commons.lang3.StringEscapeUtils
26+
2527
import org.apache.spark.scheduler.StageInfo
2628
import org.apache.spark.ui.{ToolTips, UIUtils}
2729
import org.apache.spark.util.Utils
@@ -195,7 +197,27 @@ private[ui] class FailedStageTable(
195197

196198
override protected def stageRow(s: StageInfo): Seq[Node] = {
197199
val basicColumns = super.stageRow(s)
198-
val failureReason = <td valign="middle"><pre>{s.failureReason.getOrElse("")}</pre></td>
199-
basicColumns ++ failureReason
200+
val failureReason = s.failureReason.getOrElse("")
201+
val isMultiline = failureReason.indexOf('\n') >= 0
202+
// Display the first line by default
203+
val failureReasonSummary = StringEscapeUtils.escapeHtml4(
204+
if (isMultiline) {
205+
failureReason.substring(0, failureReason.indexOf('\n'))
206+
} else {
207+
failureReason
208+
})
209+
val details = if (isMultiline) {
210+
<span onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
211+
class="expand-details">
212+
+details
213+
</span> ++
214+
<div class="stage-details collapsed">
215+
<pre>{failureReason}</pre>
216+
</div>
217+
} else {
218+
""
219+
}
220+
val failureReasonHtml = <td valign="middle">{failureReasonSummary}{details}</td>
221+
basicColumns ++ failureReasonHtml
200222
}
201223
}

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,11 +282,10 @@ private[spark] object JsonProtocol {
282282
("Reduce ID" -> fetchFailed.reduceId) ~
283283
("Message" -> fetchFailed.message)
284284
case exceptionFailure: ExceptionFailure =>
285-
val stackTrace = stackTraceToJson(exceptionFailure.stackTrace)
286285
val metrics = exceptionFailure.metrics.map(taskMetricsToJson).getOrElse(JNothing)
287286
("Class Name" -> exceptionFailure.className) ~
288287
("Description" -> exceptionFailure.description) ~
289-
("Stack Trace" -> stackTrace) ~
288+
("Full Stack Trace" -> exceptionFailure.stackTrace) ~
290289
("Metrics" -> metrics)
291290
case ExecutorLostFailure(executorId) =>
292291
("Executor ID" -> executorId)
@@ -636,9 +635,14 @@ private[spark] object JsonProtocol {
636635
case `exceptionFailure` =>
637636
val className = (json \ "Class Name").extract[String]
638637
val description = (json \ "Description").extract[String]
639-
val stackTrace = stackTraceFromJson(json \ "Stack Trace")
638+
val stackTrace = Utils.jsonOption(json \ "Full Stack Trace").map(_.extract[String]).
639+
getOrElse {
640+
// backward compatibility
641+
val oldStackTrace = stackTraceFromJson(json \ "Stack Trace")
642+
Utils.exceptionString(className, description, oldStackTrace)
643+
}
640644
val metrics = Utils.jsonOption(json \ "Metrics").map(taskMetricsFromJson)
641-
new ExceptionFailure(className, description, stackTrace, metrics)
645+
ExceptionFailure(className, description, stackTrace, metrics)
642646
case `taskResultLost` => TaskResultLost
643647
case `taskKilled` => TaskKilled
644648
case `executorLostFailure` =>

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1598,10 +1598,19 @@ private[spark] object Utils extends Logging {
15981598

15991599
/** Return a nice string representation of the exception, including the stack trace. */
16001600
def exceptionString(e: Throwable): String = {
1601-
if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace)
1601+
if (e == null) ""
1602+
else {
1603+
val stringWriter = new StringWriter()
1604+
e.printStackTrace(new PrintWriter(stringWriter))
1605+
stringWriter.toString
1606+
}
16021607
}
16031608

1604-
/** Return a nice string representation of the exception, including the stack trace. */
1609+
/**
1610+
* Return a nice string representation of the exception, including the stack trace.
1611+
* It's only used for backward compatibility.
1612+
*/
1613+
@deprecated("Use exceptionString(Throwable) instead", "1.2.0")
16051614
def exceptionString(
16061615
className: String,
16071616
description: String,

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ class JsonProtocolSuite extends FunSuite {
109109
// TaskEndReason
110110
val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19,
111111
"Some exception")
112-
val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, None)
112+
val exceptionFailure = ExceptionFailure("To be", "or not to be",
113+
Utils.exceptionString(exception), None)
113114
testTaskEndReason(Success)
114115
testTaskEndReason(Resubmitted)
115116
testTaskEndReason(fetchFailed)
@@ -127,6 +128,23 @@ class JsonProtocolSuite extends FunSuite {
127128
testBlockId(StreamBlockId(1, 2L))
128129
}
129130

131+
test("ExceptionFailure backward compatibility") {
132+
val exceptionFailureJson =
133+
"""{"Reason":"ExceptionFailure","Class Name":"To be","Description":"or not to be",
134+
|"Stack Trace":[{"Declaring Class":"Apollo","Method Name":"Venus","File Name":"Mercury",
135+
|"Line Number":42},{"Declaring Class":"Afollo","Method Name":"Vemus","File Name":"Mercurry"
136+
|,"Line Number":420},{"Declaring Class":"Ayollo","Method Name":"Vesus","File Name":"Blackbe
137+
|rry","Line Number":4200}]}""".stripMargin.replaceAll("\r|\n", "")
138+
139+
val exception = new Exception("Out of Memory! Please restock film.")
140+
exception.setStackTrace(stackTrace)
141+
val expectedExceptionFailure = ExceptionFailure("To be", "or not to be",
142+
Utils.exceptionString("To be", "or not to be", stackTrace), None)
143+
144+
val exceptionFailure = JsonProtocol.taskEndReasonFromJson(parse(exceptionFailureJson))
145+
assertEquals(expectedExceptionFailure, exceptionFailure)
146+
}
147+
130148
test("StageInfo backward compatibility") {
131149
val info = makeStageInfo(1, 2, 3, 4L, 5L)
132150
val newJson = JsonProtocol.stageInfoToJson(info)
@@ -401,7 +419,7 @@ class JsonProtocolSuite extends FunSuite {
401419
case (r1: ExceptionFailure, r2: ExceptionFailure) =>
402420
assert(r1.className === r2.className)
403421
assert(r1.description === r2.description)
404-
assertSeqEquals(r1.stackTrace, r2.stackTrace, assertStackTraceElementEquals)
422+
assert(r1.stackTrace === r2.stackTrace)
405423
assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
406424
case (TaskResultLost, TaskResultLost) =>
407425
case (TaskKilled, TaskKilled) =>

0 commit comments

Comments
 (0)