From 3df38658b8e9129af1bacf7e2d5e9197f2335052 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Wed, 10 Oct 2018 14:21:48 -0700 Subject: [PATCH 1/2] Remove SnappyOutputStreamWrapper and other workaround now that new Snappy fixes these --- .../apache/spark/io/CompressionCodec.scala | 63 ++----------------- 1 file changed, 5 insertions(+), 58 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 7722db56ee297..0664c5ac752c1 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -154,72 +154,19 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec { */ @DeveloperApi class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { - val version = SnappyCompressionCodec.version - override def compressedOutputStream(s: OutputStream): OutputStream = { - val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt - new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize)) - } - - override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s) -} - -/** - * Object guards against memory leak bug in snappy-java library: - * (https://github.com/xerial/snappy-java/issues/131). - * Before a new version of the library, we only call the method once and cache the result. - */ -private final object SnappyCompressionCodec { - private lazy val version: String = try { + try { Snappy.getNativeLibraryVersion } catch { case e: Error => throw new IllegalArgumentException(e) } -} -/** - * Wrapper over `SnappyOutputStream` which guards against write-after-close and double-close - * issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version - * of snappy-java that contains the fix for https://github.com/xerial/snappy-java/issues/107. - */ -private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends OutputStream { - - private[this] var closed: Boolean = false - - override def write(b: Int): Unit = { - if (closed) { - throw new IOException("Stream is closed") - } - os.write(b) - } - - override def write(b: Array[Byte]): Unit = { - if (closed) { - throw new IOException("Stream is closed") - } - os.write(b) - } - - override def write(b: Array[Byte], off: Int, len: Int): Unit = { - if (closed) { - throw new IOException("Stream is closed") - } - os.write(b, off, len) - } - - override def flush(): Unit = { - if (closed) { - throw new IOException("Stream is closed") - } - os.flush() + override def compressedOutputStream(s: OutputStream): OutputStream = { + val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt + new SnappyOutputStream(s, blockSize) } - override def close(): Unit = { - if (!closed) { - closed = true - os.close() - } - } + override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s) } /** From 8850c7a7d563cf6bc46a84b7480b4d338d58b80f Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Wed, 10 Oct 2018 15:27:03 -0700 Subject: [PATCH 2/2] Add MiMa exclude --- project/MimaExcludes.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 0b074fbf64eda..bf85fe0b4512c 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,6 +36,7 @@ object MimaExcludes { // Exclude rules for 3.0.x lazy val v30excludes = v24excludes ++ Seq( + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.io.SnappyCompressionCodec.version") ) // Exclude rules for 2.4.x