Skip to content

Commit e5690a5

Browse files
ryan-williamsAndrew Or
authored andcommitted
[SPARK-5783] Better eventlog-parsing error messages
Author: Ryan Williams <[email protected]> Closes #4573 from ryan-williams/history and squashes the following commits: a8647ec [Ryan Williams] fix test calls to .replay() 98aa3fe [Ryan Williams] include filename in history-parsing error message 8deecf0 [Ryan Williams] add line number to history-parsing error message b668b52 [Ryan Williams] add log info line to history-eventlog parsing
1 parent 5e63942 commit e5690a5

File tree

4 files changed

+11
-7
lines changed

4 files changed

+11
-7
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
247247
*/
248248
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = {
249249
val logPath = eventLog.getPath()
250+
logInfo(s"Replaying log path: $logPath")
250251
val (logInput, sparkVersion) =
251252
if (isLegacyLogDirectory(eventLog)) {
252253
openLegacyEventLog(logPath)
@@ -256,7 +257,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
256257
try {
257258
val appListener = new ApplicationEventListener
258259
bus.addListener(appListener)
259-
bus.replay(logInput, sparkVersion)
260+
bus.replay(logInput, sparkVersion, logPath.toString)
260261
new FsApplicationHistoryInfo(
261262
logPath.getName(),
262263
appListener.appId.getOrElse(logPath.getName()),

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -761,7 +761,7 @@ private[spark] class Master(
761761
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
762762
appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
763763
try {
764-
replayBus.replay(logInput, sparkVersion)
764+
replayBus.replay(logInput, sparkVersion, eventLogFile)
765765
} finally {
766766
logInput.close()
767767
}

core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,24 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
4040
*
4141
* @param logData Stream containing event log data.
4242
* @param version Spark version that generated the events.
43+
* @param sourceName Filename (or other source identifier) from whence @logData is being read
4344
*/
44-
def replay(logData: InputStream, version: String) {
45+
def replay(logData: InputStream, version: String, sourceName: String) {
4546
var currentLine: String = null
47+
var lineNumber: Int = 1
4648
try {
4749
val lines = Source.fromInputStream(logData).getLines()
4850
lines.foreach { line =>
4951
currentLine = line
5052
postToAll(JsonProtocol.sparkEventFromJson(parse(line)))
53+
lineNumber += 1
5154
}
5255
} catch {
5356
case ioe: IOException =>
5457
throw ioe
5558
case e: Exception =>
56-
logError("Exception in parsing Spark event log.", e)
57-
logError("Malformed line: %s\n".format(currentLine))
59+
logError(s"Exception parsing Spark event log: $sourceName", e)
60+
logError(s"Malformed line #$lineNumber: $currentLine\n")
5861
}
5962
}
6063

core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
6161
try {
6262
val replayer = new ReplayListenerBus()
6363
replayer.addListener(eventMonster)
64-
replayer.replay(logData, SPARK_VERSION)
64+
replayer.replay(logData, SPARK_VERSION, logFilePath.toString)
6565
} finally {
6666
logData.close()
6767
}
@@ -120,7 +120,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
120120
try {
121121
val replayer = new ReplayListenerBus()
122122
replayer.addListener(eventMonster)
123-
replayer.replay(logData, version)
123+
replayer.replay(logData, version, eventLog.getPath().toString)
124124
} finally {
125125
logData.close()
126126
}

0 commit comments

Comments
 (0)