Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.util.logging

import java.io.{File, FileOutputStream, InputStream}
import java.io.{File, FileOutputStream, InputStream, IOException}

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.util.{IntParam, Utils}
Expand Down Expand Up @@ -58,20 +58,28 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
protected def appendStreamToFile() {
try {
logDebug("Started appending thread")
openFile()
val buf = new Array[Byte](bufferSize)
var n = 0
while (!markedForStop && n != -1) {
n = inputStream.read(buf)
if (n != -1) {
appendToFile(buf, n)
Utils.tryWithSafeFinally {
openFile()
val buf = new Array[Byte](bufferSize)
var n = 0
while (!markedForStop && n != -1) {
try {
n = inputStream.read(buf)
} catch {
// An InputStream can throw IOException during read if the stream is closed
// asynchronously, so once appender has been flagged to stop these will be ignored
case _: IOException if markedForStop => // do nothing and proceed to stop appending
}
if (n > 0) {
appendToFile(buf, n)
}
}
} {
closeFile()
}
} catch {
case e: Exception =>
logError(s"Error writing stream to file $file", e)
} finally {
closeFile()
}
}

Expand Down
77 changes: 77 additions & 0 deletions core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@
package org.apache.spark.util

import java.io._
import java.util.concurrent.CountDownLatch

import scala.collection.mutable.HashSet
import scala.reflect._

import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
import org.apache.log4j.{Appender, Level, Logger}
import org.apache.log4j.spi.LoggingEvent
import org.mockito.ArgumentCaptor
import org.mockito.Mockito.{atLeast, mock, verify}
import org.scalatest.BeforeAndAfter

import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
Expand Down Expand Up @@ -188,6 +193,67 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
testAppenderSelection[FileAppender, Any](rollingStrategy("xyz"))
}

test("file appender async close stream abruptly") {
// Test FileAppender reaction to closing InputStream using a mock logging appender
val mockAppender = mock(classOf[Appender])
val loggingEventCaptor = new ArgumentCaptor[LoggingEvent]

// Make sure only logging errors
val logger = Logger.getRootLogger
logger.setLevel(Level.ERROR)
logger.addAppender(mockAppender)

val testOutputStream = new PipedOutputStream()
val testInputStream = new PipedInputStream(testOutputStream)

// Close the stream before appender tries to read will cause an IOException
testInputStream.close()
testOutputStream.close()
val appender = FileAppender(testInputStream, testFile, new SparkConf)

appender.awaitTermination()

// If InputStream was closed without first stopping the appender, an exception will be logged
verify(mockAppender, atLeast(1)).doAppend(loggingEventCaptor.capture)
val loggingEvent = loggingEventCaptor.getValue
assert(loggingEvent.getThrowableInformation !== null)
assert(loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])
}

test("file appender async close stream gracefully") {
// Test FileAppender reaction to closing InputStream using a mock logging appender
val mockAppender = mock(classOf[Appender])
val loggingEventCaptor = new ArgumentCaptor[LoggingEvent]

// Make sure only logging errors
val logger = Logger.getRootLogger
logger.setLevel(Level.ERROR)
logger.addAppender(mockAppender)

val testOutputStream = new PipedOutputStream()
val testInputStream = new PipedInputStream(testOutputStream) with LatchedInputStream

// Close the stream before appender tries to read will cause an IOException
testInputStream.close()
testOutputStream.close()
val appender = FileAppender(testInputStream, testFile, new SparkConf)

// Stop the appender before an IOException is called during read
testInputStream.latchReadStarted.await()
appender.stop()
testInputStream.latchReadProceed.countDown()

appender.awaitTermination()

// Make sure no IOException errors have been logged as a result of appender closing gracefully
verify(mockAppender, atLeast(0)).doAppend(loggingEventCaptor.capture)
import scala.collection.JavaConverters._
loggingEventCaptor.getAllValues.asScala.foreach { loggingEvent =>
assert(loggingEvent.getThrowableInformation === null
|| !loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])
}
}

/**
* Run the rolling file appender with data and see whether all the data was written correctly
* across rolled over files.
Expand Down Expand Up @@ -228,4 +294,15 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
file.getName.startsWith(testFile.getName)
}.foreach { _.delete() }
}

/** Used to synchronize when read is called on a stream */
private trait LatchedInputStream extends PipedInputStream {
val latchReadStarted = new CountDownLatch(1)
val latchReadProceed = new CountDownLatch(1)
abstract override def read(): Int = {
latchReadStarted.countDown()
latchReadProceed.await()
super.read()
}
}
}