com.twitter
chill_${scala.binary.version}
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 4b0fe1ab8299..33402c927c73 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -20,6 +20,7 @@ package org.apache.spark.io
import java.io.{InputStream, OutputStream}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
+import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
import org.apache.spark.SparkConf
@@ -59,6 +60,27 @@ private[spark] object CompressionCodec {
}
+/**
+ * :: DeveloperApi ::
+ * LZ4 implementation of [[org.apache.spark.io.CompressionCodec]].
+ * Block size can be configured by `spark.io.compression.lz4.block.size`.
+ *
+ * Note: The wire protocol for this codec is not guaranteed to be compatible across versions
+ * of Spark. This is intended for use as an internal compression utility within a single Spark
+ * application.
+ */
+@DeveloperApi
+class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {
+
+ override def compressedOutputStream(s: OutputStream): OutputStream = {
+ val blockSize = conf.getInt("spark.io.compression.lz4.block.size", 32768)
+ new LZ4BlockOutputStream(s, blockSize)
+ }
+
+ override def compressedInputStream(s: InputStream): InputStream = new LZ4BlockInputStream(s)
+}
+
+
/**
* :: DeveloperApi ::
* LZF implementation of [[org.apache.spark.io.CompressionCodec]].
diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
index 68a0ea36aa54..42fc395fa698 100644
--- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
@@ -50,6 +50,12 @@ class CompressionCodecSuite extends FunSuite {
testCodec(codec)
}
+ test("lz4 compression codec") {
+ val codec = CompressionCodec.createCodec(conf, classOf[LZ4CompressionCodec].getName)
+ assert(codec.getClass === classOf[LZ4CompressionCodec])
+ testCodec(codec)
+ }
+
test("lzf compression codec") {
val codec = CompressionCodec.createCodec(conf, classOf[LZFCompressionCodec].getName)
assert(codec.getClass === classOf[LZFCompressionCodec])
diff --git a/docs/configuration.md b/docs/configuration.md
index 07aa4c035446..19fd980e6088 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -350,7 +350,15 @@ Apart from these, the following properties are also available, and may be useful
| 32768 |
Block size (in bytes) used in Snappy compression, in the case when Snappy compression codec
- is used.
+ is used. Lowering this block size will also lower shuffle memory usage when Snappy is used.
+ |
+
+
+ spark.io.compression.lz4.block.size |
+ 32768 |
+
+ Block size (in bytes) used in LZ4 compression, in the case when LZ4 compression codec
+ is used. Lowering this block size will also lower shuffle memory usage when LZ4 is used.
|
diff --git a/pom.xml b/pom.xml
index fa80707d0929..d570f3e6b932 100644
--- a/pom.xml
+++ b/pom.xml
@@ -297,6 +297,11 @@
snappy-java
1.0.5
+
+ net.jpountz.lz4
+ lz4
+ 1.2.0
+
com.clearspring.analytics
stream