From 52dfa8f49c3fc804a33bb47e6b763cd49a3ecb80 Mon Sep 17 00:00:00 2001 From: Kostas Sakellis Date: Tue, 4 Nov 2014 18:26:12 -0800 Subject: [PATCH 1/5] [SPARK-4079] [CORE] Default to LZF if Snappy not available By default, snappy is the compression codec used. If Snappy is not available, Spark currently throws a stack trace. Now Spark falls back to LZF if Snappy is not available on the cluster and logs a warning message. The only exception is if the user has explicitly set spark.io.compression.codec=snappy. In this case, if snappy is not available, an IllegalArgumentException is thrown. Because of the way the Snappy library uses static initialization, it was very difficult in a unit test to simulate Snappy not being available. The only way I could think of was to create multiple classloaders which seemed excessive. As a result, most of this was tested adhoc on a test cluster by modifying the system property: org.xerial.snappy.use.systemlib=true which caused Snappy to not load and thus triggering this logic. --- .../apache/spark/io/CompressionCodec.scala | 48 ++++++++++++++++--- .../spark/io/CompressionCodecSuite.scala | 6 +++ 2 files changed, 47 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 1ac7f4e448eb..82c0f45ada76 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -21,11 +21,12 @@ import java.io.{InputStream, OutputStream} import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} -import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} +import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream} import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.Utils +import org.apache.spark.Logging /** * :: DeveloperApi :: @@ -42,28 +43,52 @@ trait CompressionCodec { def compressedOutputStream(s: OutputStream): OutputStream def compressedInputStream(s: InputStream): InputStream -} + def isAvailable() : Boolean = true +} -private[spark] object CompressionCodec { +private[spark] object CompressionCodec extends Logging { + private val configKey = "spark.io.compression.codec" private val shortCompressionCodecNames = Map( "lz4" -> classOf[LZ4CompressionCodec].getName, "lzf" -> classOf[LZFCompressionCodec].getName, "snappy" -> classOf[SnappyCompressionCodec].getName) def createCodec(conf: SparkConf): CompressionCodec = { - createCodec(conf, conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC)) + conf.getOption(configKey) + .map(createCodec(conf, _)) + .orElse(createCodecFromName(conf, DEFAULT_COMPRESSION_CODEC)) + .orElse({ + logWarning("Default codec " + DEFAULT_COMPRESSION_CODEC + + " is unavailable. Faling back to " + FALLBACK_COMPRESSION_CODEC) + createCodecFromName(conf, FALLBACK_COMPRESSION_CODEC) + }) + .getOrElse(throw new IllegalArgumentException("The codec [" + + FALLBACK_COMPRESSION_CODEC + "] is not available.")) } def createCodec(conf: SparkConf, codecName: String): CompressionCodec = { + createCodecFromName(conf, codecName) + .getOrElse(throw new IllegalArgumentException("The specified codec [" + + codecName + "] is not available.")) + } + + private def createCodecFromName(conf: SparkConf, codecName : String) + : Option[CompressionCodec] = { val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName) - val ctor = Class.forName(codecClass, true, Utils.getContextOrSparkClassLoader) - .getConstructor(classOf[SparkConf]) - ctor.newInstance(conf).asInstanceOf[CompressionCodec] + try { + val ctor = Class.forName(codecClass, true, Utils.getContextOrSparkClassLoader) + .getConstructor(classOf[SparkConf]) + Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec]) + .filter(_.isAvailable()) + } catch { + case e: ClassNotFoundException => None + } } val DEFAULT_COMPRESSION_CODEC = "snappy" + val FALLBACK_COMPRESSION_CODEC = "lzf" val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq } @@ -126,4 +151,13 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { } override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s) + + override def isAvailable() = { + try { + Snappy.getNativeLibraryVersion + true + } catch { + case e: Error => false + } + } } diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala index 25be7f25c21b..8c6035fb367f 100644 --- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala @@ -85,4 +85,10 @@ class CompressionCodecSuite extends FunSuite { assert(codec.getClass === classOf[SnappyCompressionCodec]) testCodec(codec) } + + test("bad compression codec") { + intercept[IllegalArgumentException] { + CompressionCodec.createCodec(conf, "foobar") + } + } } From 64f3d27f8f8d473079ae69a93d2feb8f93a17922 Mon Sep 17 00:00:00 2001 From: Kostas Sakellis Date: Mon, 10 Nov 2014 17:01:08 -0800 Subject: [PATCH 2/5] [SPARK-4079] [CORE] Code review feedback Removed the fallback logic and now just throws an IllegalArgumentException if you specify a bad codec or one that is not available like Snappy. --- .../apache/spark/io/CompressionCodec.scala | 25 +++---------------- 1 file changed, 4 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 82c0f45ada76..8ad1004897b0 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -49,46 +49,29 @@ trait CompressionCodec { private[spark] object CompressionCodec extends Logging { - private val configKey = "spark.io.compression.codec" private val shortCompressionCodecNames = Map( "lz4" -> classOf[LZ4CompressionCodec].getName, "lzf" -> classOf[LZFCompressionCodec].getName, "snappy" -> classOf[SnappyCompressionCodec].getName) def createCodec(conf: SparkConf): CompressionCodec = { - conf.getOption(configKey) - .map(createCodec(conf, _)) - .orElse(createCodecFromName(conf, DEFAULT_COMPRESSION_CODEC)) - .orElse({ - logWarning("Default codec " + DEFAULT_COMPRESSION_CODEC + - " is unavailable. Faling back to " + FALLBACK_COMPRESSION_CODEC) - createCodecFromName(conf, FALLBACK_COMPRESSION_CODEC) - }) - .getOrElse(throw new IllegalArgumentException("The codec [" + - FALLBACK_COMPRESSION_CODEC + "] is not available.")) + createCodec(conf, conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC)) } def createCodec(conf: SparkConf, codecName: String): CompressionCodec = { - createCodecFromName(conf, codecName) - .getOrElse(throw new IllegalArgumentException("The specified codec [" + - codecName + "] is not available.")) - } - - private def createCodecFromName(conf: SparkConf, codecName : String) - : Option[CompressionCodec] = { val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName) - try { + val codec = try { val ctor = Class.forName(codecClass, true, Utils.getContextOrSparkClassLoader) .getConstructor(classOf[SparkConf]) Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec]) - .filter(_.isAvailable()) } catch { case e: ClassNotFoundException => None } + codec.filter(_.isAvailable()) + .getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available.")) } val DEFAULT_COMPRESSION_CODEC = "snappy" - val FALLBACK_COMPRESSION_CODEC = "lzf" val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq } From 1d0ef2fd9fdd7aa063caf53a87e836d6e546adea Mon Sep 17 00:00:00 2001 From: Kostas Sakellis Date: Thu, 20 Nov 2014 15:49:20 -0800 Subject: [PATCH 3/5] [SPARK-4079] [CORE] Added more information to exception Adds more information to the exception thrown when Snappy is not available. --- .../main/scala/org/apache/spark/io/CompressionCodec.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 8ad1004897b0..ac15012a3eb6 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -49,13 +49,14 @@ trait CompressionCodec { private[spark] object CompressionCodec extends Logging { + private val configKey = "spark.io.compression.codec" private val shortCompressionCodecNames = Map( "lz4" -> classOf[LZ4CompressionCodec].getName, "lzf" -> classOf[LZFCompressionCodec].getName, "snappy" -> classOf[SnappyCompressionCodec].getName) def createCodec(conf: SparkConf): CompressionCodec = { - createCodec(conf, conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC)) + createCodec(conf, conf.get(configKey, DEFAULT_COMPRESSION_CODEC)) } def createCodec(conf: SparkConf, codecName: String): CompressionCodec = { @@ -68,9 +69,11 @@ private[spark] object CompressionCodec extends Logging { case e: ClassNotFoundException => None } codec.filter(_.isAvailable()) - .getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available.")) + .getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. " + + s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC")) } + val FALLBACK_COMPRESSION_CODEC = "lzf" val DEFAULT_COMPRESSION_CODEC = "snappy" val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq } From 63bfdd05d53a5a70a6e54359c27f2bcb0f812cd8 Mon Sep 17 00:00:00 2001 From: Kostas Sakellis Date: Mon, 1 Dec 2014 11:17:41 -0800 Subject: [PATCH 4/5] Removed isAvailable to preserve binary compatibility --- .../apache/spark/io/CompressionCodec.scala | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index ac15012a3eb6..87313882e3ea 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -43,8 +43,6 @@ trait CompressionCodec { def compressedOutputStream(s: OutputStream): OutputStream def compressedInputStream(s: InputStream): InputStream - - def isAvailable() : Boolean = true } private[spark] object CompressionCodec extends Logging { @@ -67,9 +65,9 @@ private[spark] object CompressionCodec extends Logging { Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec]) } catch { case e: ClassNotFoundException => None + case e: IllegalArgumentException => None } - codec.filter(_.isAvailable()) - .getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. " + + codec.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. " + s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC")) } @@ -131,19 +129,16 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec { @DeveloperApi class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { + try { + Snappy.getNativeLibraryVersion + } catch { + case e: Error => throw new IllegalArgumentException + } + override def compressedOutputStream(s: OutputStream): OutputStream = { val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 32768) new SnappyOutputStream(s, blockSize) } override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s) - - override def isAvailable() = { - try { - Snappy.getNativeLibraryVersion - true - } catch { - case e: Error => false - } - } } From 9709c7c605d607b19655ea8f592394be77cadb5b Mon Sep 17 00:00:00 2001 From: Kostas Sakellis Date: Mon, 1 Dec 2014 11:21:19 -0800 Subject: [PATCH 5/5] Removed unnecessary Logging class --- core/src/main/scala/org/apache/spark/io/CompressionCodec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 87313882e3ea..f856890d279f 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -45,7 +45,7 @@ trait CompressionCodec { def compressedInputStream(s: InputStream): InputStream } -private[spark] object CompressionCodec extends Logging { +private[spark] object CompressionCodec { private val configKey = "spark.io.compression.codec" private val shortCompressionCodecNames = Map(