From 6efff7495a6bd630baf20b5238d51644d5abca46 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Mon, 11 Jan 2016 16:22:08 -0800 Subject: [PATCH 1/7] [SPARK-9844] Added check in FileAppender to not thrown an IOException if marked as stopped and changed file operation block to use tryWithSafeFinally --- .../spark/util/logging/FileAppender.scala | 29 ++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala index 58c8560a3d049..478176c462101 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala @@ -17,7 +17,7 @@ package org.apache.spark.util.logging -import java.io.{File, FileOutputStream, InputStream} +import java.io.{IOException, File, FileOutputStream, InputStream} import org.apache.spark.{Logging, SparkConf} import org.apache.spark.util.{IntParam, Utils} @@ -58,20 +58,29 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi protected def appendStreamToFile() { try { logDebug("Started appending thread") - openFile() - val buf = new Array[Byte](bufferSize) - var n = 0 - while (!markedForStop && n != -1) { - n = inputStream.read(buf) - if (n != -1) { - appendToFile(buf, n) + Utils.tryWithSafeFinally { + openFile() + val buf = new Array[Byte](bufferSize) + var n = 0 + while (!markedForStop && n != -1) { + try { + n = inputStream.read(buf) + } catch { + case e: IOException => + // An InputStream can throw IOException during read if the stream is closed + // asynchronously, so once appender has been flagged to stop these will be ignored + if (!markedForStop) throw e + } + if (n != -1) { + appendToFile(buf, n) + } } + } { + closeFile() } } catch { case e: Exception => logError(s"Error writing stream to file $file", e) - } finally { - closeFile() } } From f937e183d466d014f1545d823ecdf30f9becdf77 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Mon, 11 Jan 2016 16:22:55 -0800 Subject: [PATCH 2/7] [SPARK-9844] Added unit test for closing FileAppender InputStream asynchronously --- .../apache/spark/util/FileAppenderSuite.scala | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala index 98d1b28d5a167..649ed31c45659 100644 --- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -18,12 +18,17 @@ package org.apache.spark.util import java.io._ +import java.util.concurrent.CountDownLatch import scala.collection.mutable.HashSet import scala.reflect._ import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files +import org.apache.log4j.{Level, Appender, Logger} +import org.apache.log4j.spi.LoggingEvent +import org.mockito.ArgumentCaptor +import org.mockito.Mockito.{atLeast, mock, verify} import org.scalatest.BeforeAndAfter import org.apache.spark.{Logging, SparkConf, SparkFunSuite} @@ -188,6 +193,67 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { testAppenderSelection[FileAppender, Any](rollingStrategy("xyz")) } + test("file appender async close stream abruptly") { + // Test FileAppender reaction to closing InputStream using a mock logging appender + val mockAppender = mock(classOf[Appender]) + val loggingEventCaptor = new ArgumentCaptor[LoggingEvent] + + // Make sure only logging errors + val logger = Logger.getRootLogger + logger.setLevel(Level.ERROR) + logger.addAppender(mockAppender) + + val testOutputStream = new PipedOutputStream() + val testInputStream = new PipedInputStream(testOutputStream) + + // Close the stream before appender tries to read will cause an IOException + testInputStream.close() + testOutputStream.close() + val appender = FileAppender(testInputStream, testFile, new SparkConf) + + appender.awaitTermination() + + // If InputStream was closed without first stopping the appender, an exception will be logged + verify(mockAppender, atLeast(1)).doAppend(loggingEventCaptor.capture) + val loggingEvent = loggingEventCaptor.getValue + assert(loggingEvent.getLevel == Level.ERROR) + assert(loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException]) + } + + test("file appender async close stream gracefully") { + // Test FileAppender reaction to closing InputStream using a mock logging appender + val mockAppender = mock(classOf[Appender]) + val loggingEventCaptor = new ArgumentCaptor[LoggingEvent] + + // Make sure only logging errors + val logger = Logger.getRootLogger + logger.setLevel(Level.ERROR) + logger.addAppender(mockAppender) + + val testOutputStream = new PipedOutputStream() + val testInputStream = new PipedInputStream(testOutputStream) with LatchedInputStream + + // Close the stream before appender tries to read will cause an IOException + testInputStream.close() + testOutputStream.close() + val appender = FileAppender(testInputStream, testFile, new SparkConf) + + // Stop the appender before an IOException is called during read + testInputStream.latchReadStarted.await() + appender.stop() + testInputStream.latchReadProceed.countDown() + + appender.awaitTermination() + + // Make sure no IOException errors have been logged as a result of appender closing gracefully + verify(mockAppender, atLeast(0)).doAppend(loggingEventCaptor.capture) + import scala.collection.JavaConverters._ + loggingEventCaptor.getAllValues.asScala.foreach { loggingEvent => + assert(loggingEvent.getLevel != Level.ERROR + || !loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException]) + } + } + /** * Run the rolling file appender with data and see whether all the data was written correctly * across rolled over files. @@ -228,4 +294,15 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { file.getName.startsWith(testFile.getName) }.foreach { _.delete() } } + + /** Used to synchronize when read is called on a stream */ + private trait LatchedInputStream extends PipedInputStream { + val latchReadStarted = new CountDownLatch(1) + val latchReadProceed = new CountDownLatch(1) + abstract override def read(): Int = { + latchReadStarted.countDown() + latchReadProceed.await() + super.read() + } + } } From 1775d98c1d9c9664e0f9ce80082715938c6e0aed Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Mon, 11 Jan 2016 16:27:03 -0800 Subject: [PATCH 3/7] fixed import order --- .../main/scala/org/apache/spark/util/logging/FileAppender.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala index 478176c462101..08f957df051f5 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala @@ -17,7 +17,7 @@ package org.apache.spark.util.logging -import java.io.{IOException, File, FileOutputStream, InputStream} +import java.io.{File, FileOutputStream, InputStream, IOException} import org.apache.spark.{Logging, SparkConf} import org.apache.spark.util.{IntParam, Utils} From dc6573715a77107e7fbaaf97b4dbbc18f652b9b3 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Mon, 11 Jan 2016 16:57:09 -0800 Subject: [PATCH 4/7] Did not need check for log level ERROR because only logging ERRORS, check for getThrowableInformation null --- .../test/scala/org/apache/spark/util/FileAppenderSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala index 649ed31c45659..d05b1ad07f667 100644 --- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -216,7 +216,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { // If InputStream was closed without first stopping the appender, an exception will be logged verify(mockAppender, atLeast(1)).doAppend(loggingEventCaptor.capture) val loggingEvent = loggingEventCaptor.getValue - assert(loggingEvent.getLevel == Level.ERROR) + assert(loggingEvent.getThrowableInformation !== null) assert(loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException]) } @@ -249,7 +249,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { verify(mockAppender, atLeast(0)).doAppend(loggingEventCaptor.capture) import scala.collection.JavaConverters._ loggingEventCaptor.getAllValues.asScala.foreach { loggingEvent => - assert(loggingEvent.getLevel != Level.ERROR + assert(loggingEvent.getThrowableInformation === null || !loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException]) } } From 20d8e7b564bbc7bd5f0f3d0d1b17391b1a367d29 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Mon, 11 Jan 2016 22:20:39 -0800 Subject: [PATCH 5/7] fixed import order --- .../test/scala/org/apache/spark/util/FileAppenderSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala index d05b1ad07f667..b367cc8358342 100644 --- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -25,7 +25,7 @@ import scala.reflect._ import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files -import org.apache.log4j.{Level, Appender, Logger} +import org.apache.log4j.{Appender, Level, Logger} import org.apache.log4j.spi.LoggingEvent import org.mockito.ArgumentCaptor import org.mockito.Mockito.{atLeast, mock, verify} From a28f073ab69217ffb6bc02623e4989ed2ea39214 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 12 Jan 2016 11:02:05 -0800 Subject: [PATCH 6/7] Cleaned up exception handling and fixed check for appending for only if positive bytes are read --- .../org/apache/spark/util/logging/FileAppender.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala index 08f957df051f5..3f6cf9bc9bede 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala @@ -66,12 +66,11 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi try { n = inputStream.read(buf) } catch { - case e: IOException => - // An InputStream can throw IOException during read if the stream is closed - // asynchronously, so once appender has been flagged to stop these will be ignored - if (!markedForStop) throw e + // An InputStream can throw IOException during read if the stream is closed + // asynchronously, so once appender has been flagged to stop these will be ignored + case e: IOException if markedForStop => // do nothing and proceed to stop appending } - if (n != -1) { + if (n > 0) { appendToFile(buf, n) } } From 4684a582e2bad03ea92bdfa03b17ed933c0f6e62 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 12 Jan 2016 11:55:44 -0800 Subject: [PATCH 7/7] Can use a placeholder for exception match since it is being ignored --- .../main/scala/org/apache/spark/util/logging/FileAppender.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala index 3f6cf9bc9bede..86bbaa20f6cf2 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala @@ -68,7 +68,7 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi } catch { // An InputStream can throw IOException during read if the stream is closed // asynchronously, so once appender has been flagged to stop these will be ignored - case e: IOException if markedForStop => // do nothing and proceed to stop appending + case _: IOException if markedForStop => // do nothing and proceed to stop appending } if (n > 0) { appendToFile(buf, n)