Skip to content

Commit 06a9793

Browse files
Eric VandenbergMarcelo Vanzin
authored andcommitted
[SPARK-21447][WEB UI] Spark history server fails to render compressed
inprogress history file in some cases. Add failure handling for EOFException that can be thrown during decompression of an inprogress spark history file, treat same as case where can't parse the last line. ## What changes were proposed in this pull request? Failure handling for case of EOFException thrown within the ReplayListenerBus.replay method to handle the case analogous to json parse fail case. This path can arise in compressed inprogress history files since an incomplete compression block could be read (not flushed by writer on a block boundary). See the stack trace of this occurrence in the jira ticket (https://issues.apache.org/jira/browse/SPARK-21447) ## How was this patch tested? Added a unit test that specifically targets validating the failure handling path appropriately when maybeTruncated is true and false. Author: Eric Vandenberg <[email protected]> Closes #18673 from ericvandenbergfb/fix_inprogress_compr_history_file.
1 parent 8de080d commit 06a9793

File tree

2 files changed

+77
-3
lines changed

2 files changed

+77
-3
lines changed

core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.scheduler
1919

20-
import java.io.{InputStream, IOException}
20+
import java.io.{EOFException, InputStream, IOException}
2121

2222
import scala.io.Source
2323

@@ -107,6 +107,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
107107
}
108108
}
109109
} catch {
110+
case _: EOFException if maybeTruncated =>
110111
case ioe: IOException =>
111112
throw ioe
112113
case e: Exception =>

core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@
1717

1818
package org.apache.spark.scheduler
1919

20-
import java.io.{File, PrintWriter}
20+
import java.io._
2121
import java.net.URI
22+
import java.util.concurrent.atomic.AtomicInteger
2223

2324
import org.json4s.jackson.JsonMethods._
2425
import org.scalatest.BeforeAndAfter
2526

2627
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
2728
import org.apache.spark.deploy.SparkHadoopUtil
28-
import org.apache.spark.io.CompressionCodec
29+
import org.apache.spark.io.{CompressionCodec, LZ4CompressionCodec}
2930
import org.apache.spark.util.{JsonProtocol, JsonProtocolSuite, Utils}
3031

3132
/**
@@ -72,6 +73,59 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
7273
assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd))
7374
}
7475

76+
/**
77+
* Test replaying compressed spark history file that internally throws an EOFException. To
78+
* avoid sensitivity to the compression specifics the test forces an EOFException to occur
79+
* while reading bytes from the underlying stream (such as observed in actual history files
80+
* in some cases) and forces specific failure handling. This validates correctness in both
81+
* cases when maybeTruncated is true or false.
82+
*/
83+
test("Replay compressed inprogress log file succeeding on partial read") {
84+
val buffered = new ByteArrayOutputStream
85+
val codec = new LZ4CompressionCodec(new SparkConf())
86+
val compstream = codec.compressedOutputStream(buffered)
87+
val writer = new PrintWriter(compstream)
88+
89+
val applicationStart = SparkListenerApplicationStart("AppStarts", None,
90+
125L, "Mickey", None)
91+
val applicationEnd = SparkListenerApplicationEnd(1000L)
92+
93+
// scalastyle:off println
94+
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
95+
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
96+
// scalastyle:on println
97+
writer.close()
98+
99+
val logFilePath = Utils.getFilePath(testDir, "events.lz4.inprogress")
100+
val fstream = fileSystem.create(logFilePath)
101+
val bytes = buffered.toByteArray
102+
103+
fstream.write(bytes, 0, buffered.size)
104+
fstream.close
105+
106+
// Read the compressed .inprogress file and verify only first event was parsed.
107+
val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
108+
val replayer = new ReplayListenerBus()
109+
110+
val eventMonster = new EventMonster(conf)
111+
replayer.addListener(eventMonster)
112+
113+
// Verify the replay returns the events given the input maybe truncated.
114+
val logData = EventLoggingListener.openEventLog(logFilePath, fileSystem)
115+
val failingStream = new EarlyEOFInputStream(logData, buffered.size - 10)
116+
replayer.replay(failingStream, logFilePath.toString, true)
117+
118+
assert(eventMonster.loggedEvents.size === 1)
119+
assert(failingStream.didFail)
120+
121+
// Verify the replay throws the EOF exception since the input may not be truncated.
122+
val logData2 = EventLoggingListener.openEventLog(logFilePath, fileSystem)
123+
val failingStream2 = new EarlyEOFInputStream(logData2, buffered.size - 10)
124+
intercept[EOFException] {
125+
replayer.replay(failingStream2, logFilePath.toString, false)
126+
}
127+
}
128+
75129
// This assumes the correctness of EventLoggingListener
76130
test("End-to-end replay") {
77131
testApplicationReplay()
@@ -156,4 +210,23 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
156210
override def start() { }
157211

158212
}
213+
214+
/*
215+
* This is a dummy input stream that wraps another input stream but ends prematurely when
216+
* reading at the specified position, throwing an EOFExeption.
217+
*/
218+
private class EarlyEOFInputStream(in: InputStream, failAtPos: Int) extends InputStream {
219+
private val countDown = new AtomicInteger(failAtPos)
220+
221+
def didFail: Boolean = countDown.get == 0
222+
223+
@throws[IOException]
224+
def read: Int = {
225+
if (countDown.get == 0) {
226+
throw new EOFException("Stream ended prematurely")
227+
}
228+
countDown.decrementAndGet()
229+
in.read
230+
}
231+
}
159232
}

0 commit comments

Comments
 (0)