@@ -43,8 +43,6 @@ trait CompressionCodec {
4343 def compressedOutputStream (s : OutputStream ): OutputStream
4444
4545 def compressedInputStream (s : InputStream ): InputStream
46-
47- def isAvailable () : Boolean = true
4846}
4947
5048private [spark] object CompressionCodec extends Logging {
@@ -67,9 +65,9 @@ private[spark] object CompressionCodec extends Logging {
6765 Some (ctor.newInstance(conf).asInstanceOf [CompressionCodec ])
6866 } catch {
6967 case e : ClassNotFoundException => None
68+ case e : IllegalArgumentException => None
7069 }
71- codec.filter(_.isAvailable())
72- .getOrElse(throw new IllegalArgumentException (s " Codec [ $codecName] is not available. " +
70+ codec.getOrElse(throw new IllegalArgumentException (s " Codec [ $codecName] is not available. " +
7371 s " Consider setting $configKey= $FALLBACK_COMPRESSION_CODEC" ))
7472 }
7573
@@ -131,19 +129,16 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
131129@ DeveloperApi
132130class SnappyCompressionCodec (conf : SparkConf ) extends CompressionCodec {
133131
132+ try {
133+ Snappy .getNativeLibraryVersion
134+ } catch {
135+ case e : Error => throw new IllegalArgumentException
136+ }
137+
134138 override def compressedOutputStream (s : OutputStream ): OutputStream = {
135139 val blockSize = conf.getInt(" spark.io.compression.snappy.block.size" , 32768 )
136140 new SnappyOutputStream (s, blockSize)
137141 }
138142
139143 override def compressedInputStream (s : InputStream ): InputStream = new SnappyInputStream (s)
140-
141- override def isAvailable () = {
142- try {
143- Snappy .getNativeLibraryVersion
144- true
145- } catch {
146- case e : Error => false
147- }
148- }
149144}
0 commit comments