Skip to content

Commit 0c67993

Browse files
BryanCutlersrowen
authored andcommitted
[SPARK-9844][CORE] File appender race condition during shutdown
When an Executor process is destroyed, the FileAppender that is asynchronously reading the stderr stream of the process can throw an IOException during read because the stream is closed. Before the ExecutorRunner destroys the process, the FileAppender thread is flagged to stop. This PR wraps the inputStream.read call of the FileAppender in a try/catch block so that if an IOException is thrown and the thread has been flagged to stop, it will safely ignore the exception. Additionally, the FileAppender thread was changed to use Utils.tryWithSafeFinally to better log any exception that do occur. Added unit tests to verify a IOException is thrown and logged if FileAppender is not flagged to stop, and that no IOException when the flag is set. Author: Bryan Cutler <[email protected]> Closes #10714 from BryanCutler/file-appender-read-ioexception-SPARK-9844. (cherry picked from commit 56cdbd6) Signed-off-by: Sean Owen <[email protected]>
1 parent a490787 commit 0c67993

File tree

2 files changed

+100
-16
lines changed

2 files changed

+100
-16
lines changed

core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.util.logging
1919

20-
import java.io.{File, FileOutputStream, InputStream}
20+
import java.io.{File, FileOutputStream, InputStream, IOException}
2121

2222
import org.apache.spark.{Logging, SparkConf}
2323
import org.apache.spark.util.{IntParam, Utils}
@@ -63,24 +63,32 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
6363
protected def appendStreamToFile() {
6464
try {
6565
logDebug("Started appending thread")
66-
openFile()
67-
val buf = new Array[Byte](bufferSize)
68-
var n = 0
69-
while (!markedForStop && n != -1) {
70-
n = inputStream.read(buf)
71-
if (n != -1) {
72-
appendToFile(buf, n)
66+
Utils.tryWithSafeFinally {
67+
openFile()
68+
val buf = new Array[Byte](bufferSize)
69+
var n = 0
70+
while (!markedForStop && n != -1) {
71+
try {
72+
n = inputStream.read(buf)
73+
} catch {
74+
// An InputStream can throw IOException during read if the stream is closed
75+
// asynchronously, so once appender has been flagged to stop these will be ignored
76+
case _: IOException if markedForStop => // do nothing and proceed to stop appending
77+
}
78+
if (n > 0) {
79+
appendToFile(buf, n)
80+
}
81+
}
82+
} {
83+
closeFile()
84+
synchronized {
85+
stopped = true
86+
notifyAll()
7387
}
7488
}
7589
} catch {
7690
case e: Exception =>
7791
logError(s"Error writing stream to file $file", e)
78-
} finally {
79-
closeFile()
80-
synchronized {
81-
stopped = true
82-
notifyAll()
83-
}
8492
}
8593
}
8694

core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818
package org.apache.spark.util
1919

2020
import java.io._
21+
import java.util.concurrent.CountDownLatch
2122

2223
import scala.collection.mutable.HashSet
2324
import scala.reflect._
2425

25-
import org.scalatest.BeforeAndAfter
26-
2726
import com.google.common.base.Charsets.UTF_8
2827
import com.google.common.io.Files
28+
import org.apache.log4j.{Appender, Level, Logger}
29+
import org.apache.log4j.spi.LoggingEvent
30+
import org.mockito.ArgumentCaptor
31+
import org.mockito.Mockito.{atLeast, mock, verify}
32+
import org.scalatest.BeforeAndAfter
2933

3034
import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
3135
import org.apache.spark.util.logging.{RollingFileAppender, SizeBasedRollingPolicy, TimeBasedRollingPolicy, FileAppender}
@@ -189,6 +193,67 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
189193
testAppenderSelection[FileAppender, Any](rollingStrategy("xyz"))
190194
}
191195

196+
test("file appender async close stream abruptly") {
197+
// Test FileAppender reaction to closing InputStream using a mock logging appender
198+
val mockAppender = mock(classOf[Appender])
199+
val loggingEventCaptor = new ArgumentCaptor[LoggingEvent]
200+
201+
// Make sure only logging errors
202+
val logger = Logger.getRootLogger
203+
logger.setLevel(Level.ERROR)
204+
logger.addAppender(mockAppender)
205+
206+
val testOutputStream = new PipedOutputStream()
207+
val testInputStream = new PipedInputStream(testOutputStream)
208+
209+
// Close the stream before appender tries to read will cause an IOException
210+
testInputStream.close()
211+
testOutputStream.close()
212+
val appender = FileAppender(testInputStream, testFile, new SparkConf)
213+
214+
appender.awaitTermination()
215+
216+
// If InputStream was closed without first stopping the appender, an exception will be logged
217+
verify(mockAppender, atLeast(1)).doAppend(loggingEventCaptor.capture)
218+
val loggingEvent = loggingEventCaptor.getValue
219+
assert(loggingEvent.getThrowableInformation !== null)
220+
assert(loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])
221+
}
222+
223+
test("file appender async close stream gracefully") {
224+
// Test FileAppender reaction to closing InputStream using a mock logging appender
225+
val mockAppender = mock(classOf[Appender])
226+
val loggingEventCaptor = new ArgumentCaptor[LoggingEvent]
227+
228+
// Make sure only logging errors
229+
val logger = Logger.getRootLogger
230+
logger.setLevel(Level.ERROR)
231+
logger.addAppender(mockAppender)
232+
233+
val testOutputStream = new PipedOutputStream()
234+
val testInputStream = new PipedInputStream(testOutputStream) with LatchedInputStream
235+
236+
// Close the stream before appender tries to read will cause an IOException
237+
testInputStream.close()
238+
testOutputStream.close()
239+
val appender = FileAppender(testInputStream, testFile, new SparkConf)
240+
241+
// Stop the appender before an IOException is called during read
242+
testInputStream.latchReadStarted.await()
243+
appender.stop()
244+
testInputStream.latchReadProceed.countDown()
245+
246+
appender.awaitTermination()
247+
248+
// Make sure no IOException errors have been logged as a result of appender closing gracefully
249+
verify(mockAppender, atLeast(0)).doAppend(loggingEventCaptor.capture)
250+
import scala.collection.JavaConverters._
251+
loggingEventCaptor.getAllValues.asScala.foreach { loggingEvent =>
252+
assert(loggingEvent.getThrowableInformation === null
253+
|| !loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])
254+
}
255+
}
256+
192257
/**
193258
* Run the rolling file appender with data and see whether all the data was written correctly
194259
* across rolled over files.
@@ -229,4 +294,15 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
229294
file.getName.startsWith(testFile.getName)
230295
}.foreach { _.delete() }
231296
}
297+
298+
/** Used to synchronize when read is called on a stream */
299+
private trait LatchedInputStream extends PipedInputStream {
300+
val latchReadStarted = new CountDownLatch(1)
301+
val latchReadProceed = new CountDownLatch(1)
302+
abstract override def read(): Int = {
303+
latchReadStarted.countDown()
304+
latchReadProceed.await()
305+
super.read()
306+
}
307+
}
232308
}

0 commit comments

Comments
 (0)