From 8b2b540752a5a068bd10d1623f2771885a6b71c5 Mon Sep 17 00:00:00 2001 From: Jie Date: Thu, 8 Jul 2021 13:55:40 +0800 Subject: [PATCH 1/3] [SPARK-35027][CORE] Close the inputStream in FileAppender when writing the logs failure ### What changes were proposed in this pull request? 1. Add an option to close the input streams in FileAppender 1. Set the closeStreams to true in FileAppender when used in ExecutorRunner ### Why are the changes needed? The "inputStream" in FileAppender is not closed when error happened in writting to outputStream ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add new tests for FileAppender --- .../spark/deploy/worker/ExecutorRunner.scala | 4 +- .../spark/util/logging/FileAppender.scala | 43 ++++++++++++++----- .../util/logging/RollingFileAppender.scala | 5 ++- .../apache/spark/util/FileAppenderSuite.scala | 35 +++++++++++++++ 4 files changed, 73 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 2e26ccf671d88..974c2d670c234 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -185,11 +185,11 @@ private[deploy] class ExecutorRunner( // Redirect its stdout and stderr to files val stdout = new File(executorDir, "stdout") - stdoutAppender = FileAppender(process.getInputStream, stdout, conf) + stdoutAppender = FileAppender(process.getInputStream, stdout, conf, true) val stderr = new File(executorDir, "stderr") Files.write(header, stderr, StandardCharsets.UTF_8) - stderrAppender = FileAppender(process.getErrorStream, stderr, conf) + stderrAppender = FileAppender(process.getErrorStream, stderr, conf, true) state = ExecutorState.RUNNING worker.send(ExecutorStateChanged(appId, execId, state, None, None)) diff --git a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala index 7107be25eb505..a2b165f0a0813 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala @@ -26,8 +26,12 @@ import org.apache.spark.util.{IntParam, Utils} /** * Continuously appends the data from an input stream into the given file. */ -private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSize: Int = 8192) - extends Logging { +private[spark] class FileAppender( + inputStream: InputStream, + file: File, + bufferSize: Int = 8192, + closeStreams: Boolean = false +) extends Logging { @volatile private var outputStream: FileOutputStream = null @volatile private var markedForStop = false // has the appender been asked to stopped @@ -76,7 +80,13 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi } } } { - closeFile() + try { + if (closeStreams) { + inputStream.close() + } + } finally { + closeFile() + } } } catch { case e: Exception => @@ -113,7 +123,18 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi private[spark] object FileAppender extends Logging { /** Create the right appender based on Spark configuration */ - def apply(inputStream: InputStream, file: File, conf: SparkConf): FileAppender = { + def apply( + inputStream: InputStream, + file: File, + conf: SparkConf) : FileAppender = { + apply(inputStream, file, conf, false) + } + + def apply( + inputStream: InputStream, + file: File, + conf: SparkConf, + closeStreams: Boolean): FileAppender = { val rollingStrategy = conf.get(config.EXECUTOR_LOGS_ROLLING_STRATEGY) val rollingSizeBytes = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_SIZE) @@ -141,9 +162,10 @@ private[spark] object FileAppender extends Logging { validatedParams.map { case (interval, pattern) => new RollingFileAppender( - inputStream, file, new TimeBasedRollingPolicy(interval, pattern), conf) + inputStream, file, new TimeBasedRollingPolicy(interval, pattern), conf, + closeStreams = closeStreams) }.getOrElse { - new FileAppender(inputStream, file) + new FileAppender(inputStream, file, closeStreams = closeStreams) } } @@ -151,17 +173,18 @@ private[spark] object FileAppender extends Logging { rollingSizeBytes match { case IntParam(bytes) => logInfo(s"Rolling executor logs enabled for $file with rolling every $bytes bytes") - new RollingFileAppender(inputStream, file, new SizeBasedRollingPolicy(bytes), conf) + new RollingFileAppender( + inputStream, file, new SizeBasedRollingPolicy(bytes), conf, closeStreams = closeStreams) case _ => logWarning( s"Illegal size [$rollingSizeBytes] for rolling executor logs, rolling logs not enabled") - new FileAppender(inputStream, file) + new FileAppender(inputStream, file, closeStreams = closeStreams) } } rollingStrategy match { case "" => - new FileAppender(inputStream, file) + new FileAppender(inputStream, file, closeStreams = closeStreams) case "time" => createTimeBasedAppender() case "size" => @@ -170,7 +193,7 @@ private[spark] object FileAppender extends Logging { logWarning( s"Illegal strategy [$rollingStrategy] for rolling executor logs, " + s"rolling logs not enabled") - new FileAppender(inputStream, file) + new FileAppender(inputStream, file, closeStreams = closeStreams) } } } diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala index b73f422649312..a6eb169f66756 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala @@ -42,8 +42,9 @@ private[spark] class RollingFileAppender( activeFile: File, val rollingPolicy: RollingPolicy, conf: SparkConf, - bufferSize: Int = RollingFileAppender.DEFAULT_BUFFER_SIZE - ) extends FileAppender(inputStream, activeFile, bufferSize) { + bufferSize: Int = RollingFileAppender.DEFAULT_BUFFER_SIZE, + closeStreams: Boolean = false + ) extends FileAppender(inputStream, activeFile, bufferSize, closeStreams) { private val maxRetainedFiles = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_RETAINED_FILES) private val enableCompression = conf.get(config.EXECUTOR_LOGS_ROLLING_ENABLE_COMPRESSION) diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala index 12d97573ff6ee..c178dcd8629f9 100644 --- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -61,6 +61,15 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { assert(Files.toString(testFile, StandardCharsets.UTF_8) === header + testString) } + test("basic file appender - close stream") { + val inputStream = mock(classOf[InputStream]) + val appender = new FileAppender(inputStream, testFile, closeStreams = true) + Thread.sleep(10) + appender.stop() + appender.awaitTermination() + verify(inputStream).close() + } + test("rolling file appender - time-based rolling") { // setup input stream and appender val testOutputStream = new PipedOutputStream() @@ -96,6 +105,32 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { appender, testOutputStream, textToAppend, rolloverIntervalMillis, isCompressed = true) } + test("rolling file appender - time-based rolling close stream") { + val inputStream = mock(classOf[InputStream]) + val sparkConf = new SparkConf() + sparkConf.set(config.EXECUTOR_LOGS_ROLLING_STRATEGY.key, "time") + val appender = FileAppender(inputStream, testFile, sparkConf, closeStreams = true) + assert( + appender.asInstanceOf[RollingFileAppender].rollingPolicy.isInstanceOf[TimeBasedRollingPolicy]) + Thread.sleep(10) + appender.stop() + appender.awaitTermination() + verify(inputStream).close() + } + + test("rolling file appender - size-based rolling close stream") { + val inputStream = mock(classOf[InputStream]) + val sparkConf = new SparkConf() + sparkConf.set(config.EXECUTOR_LOGS_ROLLING_STRATEGY.key, "size") + val appender = FileAppender(inputStream, testFile, sparkConf, closeStreams = true) + assert( + appender.asInstanceOf[RollingFileAppender].rollingPolicy.isInstanceOf[SizeBasedRollingPolicy]) + Thread.sleep(10) + appender.stop() + appender.awaitTermination() + verify(inputStream).close() + } + test("rolling file appender - size-based rolling") { // setup input stream and appender val testOutputStream = new PipedOutputStream() From e4b90a067678a9fb2bbd30871d48970bc4d039ca Mon Sep 17 00:00:00 2001 From: Jie Date: Mon, 12 Jul 2021 11:38:42 +0800 Subject: [PATCH 2/3] add optional "closeStreams" parameter rather override apply(...) --- .../org/apache/spark/util/logging/FileAppender.scala | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala index a2b165f0a0813..2243239dce6fd 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala @@ -123,18 +123,12 @@ private[spark] class FileAppender( private[spark] object FileAppender extends Logging { /** Create the right appender based on Spark configuration */ - def apply( - inputStream: InputStream, - file: File, - conf: SparkConf) : FileAppender = { - apply(inputStream, file, conf, false) - } - def apply( inputStream: InputStream, file: File, conf: SparkConf, - closeStreams: Boolean): FileAppender = { + closeStreams: Boolean = false + ) : FileAppender = { val rollingStrategy = conf.get(config.EXECUTOR_LOGS_ROLLING_STRATEGY) val rollingSizeBytes = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_SIZE) From 955d24651aa0af33bc317526411cf2a3f5116f0e Mon Sep 17 00:00:00 2001 From: Jie Date: Tue, 20 Jul 2021 16:00:46 +0800 Subject: [PATCH 3/3] add SPARK-35027 prefix to new added tests --- .../org/apache/spark/util/logging/RollingFileAppender.scala | 1 + .../scala/org/apache/spark/util/FileAppenderSuite.scala | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala index a6eb169f66756..68a59232c7a96 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala @@ -36,6 +36,7 @@ import org.apache.spark.internal.config * @param rollingPolicy Policy based on which files will be rolled over. * @param conf SparkConf that is used to pass on extra configurations * @param bufferSize Optional buffer size. Used mainly for testing. + * @param closeStreams Option flag: whether to close the inputStream at the end. */ private[spark] class RollingFileAppender( inputStream: InputStream, diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala index c178dcd8629f9..71010a10cb23c 100644 --- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -61,7 +61,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { assert(Files.toString(testFile, StandardCharsets.UTF_8) === header + testString) } - test("basic file appender - close stream") { + test("SPARK-35027: basic file appender - close stream") { val inputStream = mock(classOf[InputStream]) val appender = new FileAppender(inputStream, testFile, closeStreams = true) Thread.sleep(10) @@ -105,7 +105,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { appender, testOutputStream, textToAppend, rolloverIntervalMillis, isCompressed = true) } - test("rolling file appender - time-based rolling close stream") { + test("SPARK-35027: rolling file appender - time-based rolling close stream") { val inputStream = mock(classOf[InputStream]) val sparkConf = new SparkConf() sparkConf.set(config.EXECUTOR_LOGS_ROLLING_STRATEGY.key, "time") @@ -118,7 +118,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { verify(inputStream).close() } - test("rolling file appender - size-based rolling close stream") { + test("SPARK-35027: rolling file appender - size-based rolling close stream") { val inputStream = mock(classOf[InputStream]) val sparkConf = new SparkConf() sparkConf.set(config.EXECUTOR_LOGS_ROLLING_STRATEGY.key, "size")