From 2973024239f4b0b506cb334828e3fd10668b23b5 Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Fri, 6 Mar 2015 13:21:06 +0800 Subject: [PATCH 1/3] handle json exception when file not finished writing --- .../apache/spark/deploy/master/Master.scala | 3 ++- .../spark/scheduler/ReplayListenerBus.scala | 25 ++++++++++++++++--- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 15814293227ab..59b1a3e6f31e5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -765,7 +765,8 @@ private[spark] class Master( val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf), appName + status, HistoryServer.UI_PATH_PREFIX + s"/${app.id}") try { - replayBus.replay(logInput, eventLogFile) + replayBus.replay(logInput, eventLogFile, + eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS)) } finally { logInput.close() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 95273c716b3e2..d4e6375457113 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler +import com.fasterxml.jackson.core.JsonParseException import java.io.{InputStream, IOException} import scala.io.Source @@ -40,15 +41,31 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { * * @param logData Stream containing event log data. * @param sourceName Filename (or other source identifier) from whence @logData is being read + * @param sourceTruncated Indicate whether log file might be truncated (some abnormal situation + * encountered, log file not finish writing) or not */ - def replay(logData: InputStream, sourceName: String): Unit = { + def replay( + logData: InputStream, + sourceName: String, + sourceTruncated: Boolean = false): Unit = { var currentLine: String = null var lineNumber: Int = 1 try { val lines = Source.fromInputStream(logData).getLines() - lines.foreach { line => - currentLine = line - postToAll(JsonProtocol.sparkEventFromJson(parse(line))) + while (lines.hasNext) { + currentLine = lines.next() + try { + postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine))) + } catch { + case jpe: JsonParseException => + // We can only ignore exception from last line of the file that might be truncated + if (!sourceTruncated || lines.hasNext) { + throw jpe + } else { + logWarning(s"Get json parse exception from log file $sourceName" + + s" in line $lineNumber, the file might not finished writing.") + } + } lineNumber += 1 } } catch { From 2b48831d2cebd0ca094a0ab4eed7aa2318849ecc Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Fri, 6 Mar 2015 23:16:11 +0800 Subject: [PATCH 2/3] small changes with sean owen's comments --- .../org/apache/spark/deploy/master/Master.scala | 3 ++- .../apache/spark/scheduler/ReplayListenerBus.scala | 12 ++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 59b1a3e6f31e5..fcdd8513161de 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -764,9 +764,10 @@ private[spark] class Master( val replayBus = new ReplayListenerBus() val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf), appName + status, HistoryServer.UI_PATH_PREFIX + s"/${app.id}") + val maybeTruncated = eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS) try { replayBus.replay(logInput, eventLogFile, - eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS)) + maybeTruncated) } finally { logInput.close() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index d4e6375457113..c49eba0d3f231 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -41,13 +41,13 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { * * @param logData Stream containing event log data. * @param sourceName Filename (or other source identifier) from whence @logData is being read - * @param sourceTruncated Indicate whether log file might be truncated (some abnormal situation - * encountered, log file not finish writing) or not + * @param maybeTruncated Indicate whether log file might be truncated (some abnormal situations + * encountered, log file might not finished writing) or not */ def replay( logData: InputStream, sourceName: String, - sourceTruncated: Boolean = false): Unit = { + maybeTruncated: Boolean = false): Unit = { var currentLine: String = null var lineNumber: Int = 1 try { @@ -59,11 +59,11 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { } catch { case jpe: JsonParseException => // We can only ignore exception from last line of the file that might be truncated - if (!sourceTruncated || lines.hasNext) { + if (!maybeTruncated || lines.hasNext) { throw jpe } else { - logWarning(s"Get json parse exception from log file $sourceName" + - s" in line $lineNumber, the file might not finished writing.") + logWarning(s"Got JsonParseException from log file $sourceName" + + s" at line $lineNumber, the file might not have finished writing cleanly.") } } lineNumber += 1 From 5cbdc823a1dd6db3b6bd3cce72568c5171b43b4f Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Fri, 6 Mar 2015 23:37:40 +0800 Subject: [PATCH 3/3] without unnecessary wrap --- .../src/main/scala/org/apache/spark/deploy/master/Master.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index fcdd8513161de..22935c9b1d394 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -766,8 +766,7 @@ private[spark] class Master( appName + status, HistoryServer.UI_PATH_PREFIX + s"/${app.id}") val maybeTruncated = eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS) try { - replayBus.replay(logInput, eventLogFile, - maybeTruncated) + replayBus.replay(logInput, eventLogFile, maybeTruncated) } finally { logInput.close() }