Skip to content

Commit dd95aba

Browse files
committed
[SPARK-2399] Add support for LZ4 compression.
Based on Greg Bowyer's patch from JIRA https://issues.apache.org/jira/browse/SPARK-2399 Author: Reynold Xin <[email protected]> Closes apache#1416 from rxin/lz4 and squashes the following commits: 6c8fefe [Reynold Xin] Fixed typo. 8a14d38 [Reynold Xin] [SPARK-2399] Add support for LZ4 compression.
1 parent 7446f5f commit dd95aba

File tree

5 files changed

+46
-1
lines changed

5 files changed

+46
-1
lines changed

core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,10 @@
114114
<groupId>org.xerial.snappy</groupId>
115115
<artifactId>snappy-java</artifactId>
116116
</dependency>
117+
<dependency>
118+
<groupId>net.jpountz.lz4</groupId>
119+
<artifactId>lz4</artifactId>
120+
</dependency>
117121
<dependency>
118122
<groupId>com.twitter</groupId>
119123
<artifactId>chill_${scala.binary.version}</artifactId>

core/src/main/scala/org/apache/spark/io/CompressionCodec.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.io
2020
import java.io.{InputStream, OutputStream}
2121

2222
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
23+
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
2324
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
2425

2526
import org.apache.spark.SparkConf
@@ -59,6 +60,27 @@ private[spark] object CompressionCodec {
5960
}
6061

6162

63+
/**
64+
* :: DeveloperApi ::
65+
* LZ4 implementation of [[org.apache.spark.io.CompressionCodec]].
66+
* Block size can be configured by `spark.io.compression.lz4.block.size`.
67+
*
68+
* Note: The wire protocol for this codec is not guaranteed to be compatible across versions
69+
* of Spark. This is intended for use as an internal compression utility within a single Spark
70+
* application.
71+
*/
72+
@DeveloperApi
73+
class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {
74+
75+
override def compressedOutputStream(s: OutputStream): OutputStream = {
76+
val blockSize = conf.getInt("spark.io.compression.lz4.block.size", 32768)
77+
new LZ4BlockOutputStream(s, blockSize)
78+
}
79+
80+
override def compressedInputStream(s: InputStream): InputStream = new LZ4BlockInputStream(s)
81+
}
82+
83+
6284
/**
6385
* :: DeveloperApi ::
6486
* LZF implementation of [[org.apache.spark.io.CompressionCodec]].

core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ class CompressionCodecSuite extends FunSuite {
5050
testCodec(codec)
5151
}
5252

53+
test("lz4 compression codec") {
54+
val codec = CompressionCodec.createCodec(conf, classOf[LZ4CompressionCodec].getName)
55+
assert(codec.getClass === classOf[LZ4CompressionCodec])
56+
testCodec(codec)
57+
}
58+
5359
test("lzf compression codec") {
5460
val codec = CompressionCodec.createCodec(conf, classOf[LZFCompressionCodec].getName)
5561
assert(codec.getClass === classOf[LZFCompressionCodec])

docs/configuration.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,15 @@ Apart from these, the following properties are also available, and may be useful
350350
<td>32768</td>
351351
<td>
352352
Block size (in bytes) used in Snappy compression, in the case when Snappy compression codec
353-
is used.
353+
is used. Lowering this block size will also lower shuffle memory usage when Snappy is used.
354+
</td>
355+
</tr>
356+
<tr>
357+
<td><code>spark.io.compression.lz4.block.size</code></td>
358+
<td>32768</td>
359+
<td>
360+
Block size (in bytes) used in LZ4 compression, in the case when LZ4 compression codec
361+
is used. Lowering this block size will also lower shuffle memory usage when LZ4 is used.
354362
</td>
355363
</tr>
356364
<tr>

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,11 @@
297297
<artifactId>snappy-java</artifactId>
298298
<version>1.0.5</version>
299299
</dependency>
300+
<dependency>
301+
<groupId>net.jpountz.lz4</groupId>
302+
<artifactId>lz4</artifactId>
303+
<version>1.2.0</version>
304+
</dependency>
300305
<dependency>
301306
<groupId>com.clearspring.analytics</groupId>
302307
<artifactId>stream</artifactId>

0 commit comments

Comments
 (0)