From 96130c30d90abf155731346488f79c034bcaaf6a Mon Sep 17 00:00:00 2001 From: Gavin Li Date: Wed, 3 Jul 2013 05:49:04 +0000 Subject: [PATCH 01/10] add compression codec trait and snappy compression --- .../scala/spark/storage/BlockManager.scala | 20 +++++++++++++++++-- .../spark/storage/CompressionCodec.scala | 13 ++++++++++++ .../main/scala/spark/storage/DiskStore.scala | 1 - .../spark/storage/LZFCompressionCodec.scala | 16 +++++++++++++++ .../storage/SnappyCompressionCodec.scala | 18 +++++++++++++++++ 5 files changed, 65 insertions(+), 3 deletions(-) create mode 100644 core/src/main/scala/spark/storage/CompressionCodec.scala create mode 100644 core/src/main/scala/spark/storage/LZFCompressionCodec.scala create mode 100644 core/src/main/scala/spark/storage/SnappyCompressionCodec.scala diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 9b39d3aadf..2d4a3502c6 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -141,6 +141,8 @@ private[spark] class BlockManager( val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks) initialize() + var compressionCodec: CompressionCodec = null + /** * Construct a BlockManager with a memory limit set based on system properties. */ @@ -902,8 +904,15 @@ private[spark] class BlockManager( * Wrap an output stream for compression if block compression is enabled for its block type */ def wrapForCompression(blockId: String, s: OutputStream): OutputStream = { + if (compressionCodec == null) { + compressionCodec = Class.forName(System.getProperty("spark.storage.compression.codec", + "spark.storage.LZFCompressionCodec"), true, Thread.currentThread.getContextClassLoader) + .newInstance().asInstanceOf[CompressionCodec] + } + if (shouldCompress(blockId)) { - (new LZFOutputStream(s)).setFinishBlockOnFlush(true) + //(new LZFOutputStream(s)).setFinishBlockOnFlush(true) + compressionCodec.compressionOutputStream(s) } else { s } @@ -913,7 +922,14 @@ private[spark] class BlockManager( * Wrap an input stream for compression if block compression is enabled for its block type */ def wrapForCompression(blockId: String, s: InputStream): InputStream = { - if (shouldCompress(blockId)) new LZFInputStream(s) else s + if (compressionCodec == null) { + compressionCodec = Class.forName(System.getProperty("spark.storage.compression.codec", + "spark.storage.LZFCompressionCodec"), true, Thread.currentThread.getContextClassLoader) + .newInstance().asInstanceOf[CompressionCodec] + } + + if (shouldCompress(blockId)) /*new LZFInputStream(s) */ + compressionCodec.compressionInputStream(s) else s } def dataSerialize( diff --git a/core/src/main/scala/spark/storage/CompressionCodec.scala b/core/src/main/scala/spark/storage/CompressionCodec.scala new file mode 100644 index 0000000000..cd80de33f6 --- /dev/null +++ b/core/src/main/scala/spark/storage/CompressionCodec.scala @@ -0,0 +1,13 @@ +package spark.storage + +import java.io.{InputStream, OutputStream} + + +/** + * CompressionCodec allows the customization of the compression codec + */ +trait CompressionCodec { + def compressionOutputStream(s: OutputStream): OutputStream + + def compressionInputStream(s: InputStream): InputStream +} diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index da859eebcb..221e285192 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -49,7 +49,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) override def close() { if (initialized) { objOut.close() - bs.close() channel = null bs = null objOut = null diff --git a/core/src/main/scala/spark/storage/LZFCompressionCodec.scala b/core/src/main/scala/spark/storage/LZFCompressionCodec.scala new file mode 100644 index 0000000000..3328b949ef --- /dev/null +++ b/core/src/main/scala/spark/storage/LZFCompressionCodec.scala @@ -0,0 +1,16 @@ +package spark.storage + +import java.io.{InputStream, OutputStream} + +import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} + +/** + * LZF implementation of [[spark.storage.CompressionCodec]] + */ +class LZFCompressionCodec extends CompressionCodec { + def compressionOutputStream(s: OutputStream): OutputStream = + (new LZFOutputStream(s)).setFinishBlockOnFlush(true) + + def compressionInputStream(s: InputStream): InputStream = + new LZFInputStream(s) +} diff --git a/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala b/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala new file mode 100644 index 0000000000..dc8546b039 --- /dev/null +++ b/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala @@ -0,0 +1,18 @@ +package spark.storage + +import java.io.{InputStream, OutputStream} + +import org.xerial.snappy.SnappyOutputStream + +/** + * Snappy implementation of [[spark.storage.CompressionCodec]] + * block size can be configured by spark.snappy.block.size + */ +class SnappyCompressionCodec extends CompressionCodec { + def compressionOutputStream(s: OutputStream): OutputStream = + new SnappyOutputStream(s, + System.getProperty("spark.snappy.block.size", "32768").toInt) + + def compressionInputStream(s: InputStream): InputStream = + new SnappyInputStream(s) +} From 94238aae57475030f6e88102a83c7809c5835494 Mon Sep 17 00:00:00 2001 From: Gavin Li Date: Wed, 3 Jul 2013 18:08:38 +0000 Subject: [PATCH 02/10] fix dependencies --- core/src/main/scala/spark/storage/SnappyCompressionCodec.scala | 2 +- project/SparkBuild.scala | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala b/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala index dc8546b039..62b00ef3f6 100644 --- a/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala +++ b/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala @@ -2,7 +2,7 @@ package spark.storage import java.io.{InputStream, OutputStream} -import org.xerial.snappy.SnappyOutputStream +import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} /** * Snappy implementation of [[spark.storage.CompressionCodec]] diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 07572201de..f824826af3 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -162,7 +162,8 @@ object SparkBuild extends Build { "cc.spray" % "spray-json_2.9.2" % "1.1.1" excludeAll(excludeNetty), "org.apache.mesos" % "mesos" % "0.9.0-incubating", "io.netty" % "netty-all" % "4.0.0.Beta2", - "org.apache.derby" % "derby" % "10.4.2.0" % "test" + "org.apache.derby" % "derby" % "10.4.2.0" % "test", + "org.xerial.snappy" % "snappy-java" % "1.0.5" ) ++ ( if (HADOOP_MAJOR_VERSION == "2") { if (HADOOP_YARN) { From ad7e9d0d64277f616f90f2ca8bf8a5844641883a Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 30 Jul 2013 17:11:54 -0700 Subject: [PATCH 03/10] CompressionCodec cleanup. Moved it to spark.io package. --- .../scala/spark/io/CompressionCodec.scala | 82 +++++++++++++++++++ .../scala/spark/storage/BlockManager.scala | 32 ++------ .../spark/storage/CompressionCodec.scala | 13 --- .../spark/storage/LZFCompressionCodec.scala | 16 ---- .../storage/SnappyCompressionCodec.scala | 18 ---- .../spark/io/CompressionCodecSuite.scala | 0 6 files changed, 91 insertions(+), 70 deletions(-) create mode 100644 core/src/main/scala/spark/io/CompressionCodec.scala delete mode 100644 core/src/main/scala/spark/storage/CompressionCodec.scala delete mode 100644 core/src/main/scala/spark/storage/LZFCompressionCodec.scala delete mode 100644 core/src/main/scala/spark/storage/SnappyCompressionCodec.scala create mode 100644 core/src/test/scala/spark/io/CompressionCodecSuite.scala diff --git a/core/src/main/scala/spark/io/CompressionCodec.scala b/core/src/main/scala/spark/io/CompressionCodec.scala new file mode 100644 index 0000000000..2ba104a737 --- /dev/null +++ b/core/src/main/scala/spark/io/CompressionCodec.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.io + +import java.io.{InputStream, OutputStream} + +import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} + +import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} + + +/** + * CompressionCodec allows the customization of choosing different compression implementations + * to be used in block storage. + */ +trait CompressionCodec { + + def compressionOutputStream(s: OutputStream): OutputStream + + def compressionInputStream(s: InputStream): InputStream +} + + +private[spark] object CompressionCodec { + + def createCodec(): CompressionCodec = { + // Set the default codec to Snappy since the LZF implementation initializes a pretty large + // buffer for every stream, which results in a lot of memory overhead when the number of + // shuffle reduce buckets are large. + createCodec(classOf[SnappyCompressionCodec].getName) + } + + def createCodec(codecName: String): CompressionCodec = { + Class.forName( + System.getProperty("spark.io.compression.codec", codecName), + true, + Thread.currentThread.getContextClassLoader).newInstance().asInstanceOf[CompressionCodec] + } +} + + +/** + * LZF implementation of [[spark.io.CompressionCodec]]. + */ +class LZFCompressionCodec extends CompressionCodec { + + override def compressionOutputStream(s: OutputStream): OutputStream = { + new LZFOutputStream(s).setFinishBlockOnFlush(true) + } + + override def compressionInputStream(s: InputStream): InputStream = new LZFInputStream(s) +} + + +/** + * Snappy implementation of [[spark.io.CompressionCodec]]. + * Block size can be configured by spark.io.compression.snappy.block.size. + */ +class SnappyCompressionCodec extends CompressionCodec { + + override def compressionOutputStream(s: OutputStream): OutputStream = { + val blockSize = System.getProperty("spark.io.snappy.block.size", "32768").toInt + new SnappyOutputStream(s, blockSize) + } + + override def compressionInputStream(s: InputStream): InputStream = new SnappyInputStream(s) +} diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 4228c902f8..9ed4c01218 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -27,11 +27,10 @@ import akka.dispatch.{Await, Future} import akka.util.Duration import akka.util.duration._ -import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} - import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream import spark.{Logging, SparkEnv, SparkException, Utils} +import spark.io.CompressionCodec import spark.network._ import spark.serializer.Serializer import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStampedHashMap} @@ -158,7 +157,12 @@ private[spark] class BlockManager( val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks) initialize() - var compressionCodec: CompressionCodec = null + // The compression codec to use. Note that the "lazy" val is necessary because we want to delay + // the initialization of the compression codec until it is first used. The reason is that a Spark + // program could be using a user-defined codec in a third party jar, which is loaded in + // Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been + // loaded yet. + private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec() /** * Construct a BlockManager with a memory limit set based on system properties. @@ -921,32 +925,14 @@ private[spark] class BlockManager( * Wrap an output stream for compression if block compression is enabled for its block type */ def wrapForCompression(blockId: String, s: OutputStream): OutputStream = { - if (compressionCodec == null) { - compressionCodec = Class.forName(System.getProperty("spark.storage.compression.codec", - "spark.storage.LZFCompressionCodec"), true, Thread.currentThread.getContextClassLoader) - .newInstance().asInstanceOf[CompressionCodec] - } - - if (shouldCompress(blockId)) { - //(new LZFOutputStream(s)).setFinishBlockOnFlush(true) - compressionCodec.compressionOutputStream(s) - } else { - s - } + if (shouldCompress(blockId)) compressionCodec.compressionOutputStream(s) else s } /** * Wrap an input stream for compression if block compression is enabled for its block type */ def wrapForCompression(blockId: String, s: InputStream): InputStream = { - if (compressionCodec == null) { - compressionCodec = Class.forName(System.getProperty("spark.storage.compression.codec", - "spark.storage.LZFCompressionCodec"), true, Thread.currentThread.getContextClassLoader) - .newInstance().asInstanceOf[CompressionCodec] - } - - if (shouldCompress(blockId)) /*new LZFInputStream(s) */ - compressionCodec.compressionInputStream(s) else s + if (shouldCompress(blockId)) compressionCodec.compressionInputStream(s) else s } def dataSerialize( diff --git a/core/src/main/scala/spark/storage/CompressionCodec.scala b/core/src/main/scala/spark/storage/CompressionCodec.scala deleted file mode 100644 index cd80de33f6..0000000000 --- a/core/src/main/scala/spark/storage/CompressionCodec.scala +++ /dev/null @@ -1,13 +0,0 @@ -package spark.storage - -import java.io.{InputStream, OutputStream} - - -/** - * CompressionCodec allows the customization of the compression codec - */ -trait CompressionCodec { - def compressionOutputStream(s: OutputStream): OutputStream - - def compressionInputStream(s: InputStream): InputStream -} diff --git a/core/src/main/scala/spark/storage/LZFCompressionCodec.scala b/core/src/main/scala/spark/storage/LZFCompressionCodec.scala deleted file mode 100644 index 3328b949ef..0000000000 --- a/core/src/main/scala/spark/storage/LZFCompressionCodec.scala +++ /dev/null @@ -1,16 +0,0 @@ -package spark.storage - -import java.io.{InputStream, OutputStream} - -import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} - -/** - * LZF implementation of [[spark.storage.CompressionCodec]] - */ -class LZFCompressionCodec extends CompressionCodec { - def compressionOutputStream(s: OutputStream): OutputStream = - (new LZFOutputStream(s)).setFinishBlockOnFlush(true) - - def compressionInputStream(s: InputStream): InputStream = - new LZFInputStream(s) -} diff --git a/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala b/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala deleted file mode 100644 index 62b00ef3f6..0000000000 --- a/core/src/main/scala/spark/storage/SnappyCompressionCodec.scala +++ /dev/null @@ -1,18 +0,0 @@ -package spark.storage - -import java.io.{InputStream, OutputStream} - -import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} - -/** - * Snappy implementation of [[spark.storage.CompressionCodec]] - * block size can be configured by spark.snappy.block.size - */ -class SnappyCompressionCodec extends CompressionCodec { - def compressionOutputStream(s: OutputStream): OutputStream = - new SnappyOutputStream(s, - System.getProperty("spark.snappy.block.size", "32768").toInt) - - def compressionInputStream(s: InputStream): InputStream = - new SnappyInputStream(s) -} diff --git a/core/src/test/scala/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/spark/io/CompressionCodecSuite.scala new file mode 100644 index 0000000000..e69de29bb2 From 5227043f84e29bdca9a3be95d03886b0acea4ac6 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 30 Jul 2013 17:12:16 -0700 Subject: [PATCH 04/10] Documentation update for compression codec. --- docs/configuration.md | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 5c06897cae..0bcd73ca99 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -35,7 +35,7 @@ for these variables. * `SPARK_JAVA_OPTS`, to add JVM options. This includes any system properties that you'd like to pass with `-D`. * `SPARK_CLASSPATH`, to add elements to Spark's classpath. * `SPARK_LIBRARY_PATH`, to add search directories for native libraries. -* `SPARK_MEM`, to set the amount of memory used per node. This should be in the same format as the +* `SPARK_MEM`, to set the amount of memory used per node. This should be in the same format as the JVM's -Xmx option, e.g. `300m` or `1g`. Note that this option will soon be deprecated in favor of the `spark.executor.memory` system property, so we recommend using that in new code. @@ -77,7 +77,7 @@ there are at least five properties that you will commonly want to control: Class to use for serializing objects that will be sent over the network or need to be cached in serialized form. The default of Java serialization works with any Serializable Java object but is quite slow, so we recommend using spark.KryoSerializer - and configuring Kryo serialization when speed is necessary. Can be any subclass of + and configuring Kryo serialization when speed is necessary. Can be any subclass of spark.Serializer). @@ -86,7 +86,7 @@ there are at least five properties that you will commonly want to control: (none) If you use Kryo serialization, set this class to register your custom classes with Kryo. - You need to set it to a class that extends + You need to set it to a class that extends spark.KryoRegistrator). See the tuning guide for more details. @@ -180,6 +180,21 @@ Apart from these, the following properties are also available, and may be useful Can save substantial space at the cost of some extra CPU time. + + spark.io.compression.codec + spark.io.SnappyCompressionCodec + + The compression codec class to use for various compressions. By default, Spark provides two + codecs: spark.io.LZFCompressionCodec and spark.io.SnappyCompressionCodec. + + + + spark.io.compression.snappy.block.size + 32768 + + Block size (in bytes) used in Snappy compression, in the case when Snappy compression codec is used. + + spark.reducer.maxMbInFlight 48 From 56774b176eb7e7a556bb23d9c524621e156c5633 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 30 Jul 2013 17:12:33 -0700 Subject: [PATCH 05/10] Added unit test for compression codecs. --- .../spark/io/CompressionCodecSuite.scala | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/core/src/test/scala/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/spark/io/CompressionCodecSuite.scala index e69de29bb2..1b5daf4c97 100644 --- a/core/src/test/scala/spark/io/CompressionCodecSuite.scala +++ b/core/src/test/scala/spark/io/CompressionCodecSuite.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.io + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} + +import org.scalatest.FunSuite + + +class CompressionCodecSuite extends FunSuite { + + def testCodec(codec: CompressionCodec) { + // Write 1000 integers to the output stream, compressed. + val outputStream = new ByteArrayOutputStream() + val out = codec.compressionOutputStream(outputStream) + for (i <- 1 until 1000) { + out.write(i % 256) + } + out.close() + + // Read the 1000 integers back. + val inputStream = new ByteArrayInputStream(outputStream.toByteArray) + val in = codec.compressionInputStream(inputStream) + for (i <- 1 until 1000) { + assert(in.read() === i % 256) + } + in.close() + } + + test("default compression codec") { + val codec = CompressionCodec.createCodec() + assert(codec.getClass === classOf[SnappyCompressionCodec]) + testCodec(codec) + } + + test("lzf compression codec") { + val codec = CompressionCodec.createCodec(classOf[LZFCompressionCodec].getName) + assert(codec.getClass === classOf[LZFCompressionCodec]) + testCodec(codec) + } + + test("snappy compression codec") { + val codec = CompressionCodec.createCodec(classOf[SnappyCompressionCodec].getName) + assert(codec.getClass === classOf[SnappyCompressionCodec]) + testCodec(codec) + } +} From 3b1ced83fbd72703965aabf7f8dc52417e1cf166 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 30 Jul 2013 17:24:45 -0700 Subject: [PATCH 06/10] Exclude older version of Snappy in streaming and examples. --- project/SparkBuild.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index be3ef1f148..07d7b76901 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -151,6 +151,7 @@ object SparkBuild extends Build { val excludeJackson = ExclusionRule(organization = "org.codehaus.jackson") val excludeNetty = ExclusionRule(organization = "org.jboss.netty") val excludeAsm = ExclusionRule(organization = "asm") + val excludeSnappy = ExclusionRule(organization = "org.xerial.snappy") def coreSettings = sharedSettings ++ Seq( name := "spark-core", @@ -236,6 +237,7 @@ object SparkBuild extends Build { exclude("jline","jline") exclude("log4j","log4j") exclude("org.apache.cassandra.deps", "avro") + excludeAll(excludeSnappy) ) ) @@ -258,7 +260,7 @@ object SparkBuild extends Build { "Akka Repository" at "http://repo.akka.io/releases/" ), libraryDependencies ++= Seq( - "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty), + "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy), "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty), "com.typesafe.akka" % "akka-zeromq" % "2.0.5" excludeAll(excludeNetty) From 311aae76a2177629a6afb75a36563b36e3aa7b66 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 30 Jul 2013 17:25:42 -0700 Subject: [PATCH 07/10] Added Snappy dependency to Maven build files. --- core/pom.xml | 4 ++++ pom.xml | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/core/pom.xml b/core/pom.xml index f0c936c86a..ba0071f582 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -48,6 +48,10 @@ com.ning compress-lzf + + org.xerial.snappy + snappy-java + org.ow2.asm asm diff --git a/pom.xml b/pom.xml index 44729bd422..f3bca6c40b 100644 --- a/pom.xml +++ b/pom.xml @@ -182,6 +182,11 @@ compress-lzf 0.8.4 + + org.xerial.snappy + snappy-java + 1.0.5 + org.ow2.asm asm From dae12fef9ed473a9dcdfe00b04497eea21bfb96b Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 30 Jul 2013 17:49:31 -0700 Subject: [PATCH 08/10] Updated the configuration option for Snappy block size to be consistent with the documentation. --- core/src/main/scala/spark/io/CompressionCodec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/io/CompressionCodec.scala b/core/src/main/scala/spark/io/CompressionCodec.scala index 2ba104a737..b4d8481450 100644 --- a/core/src/main/scala/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/spark/io/CompressionCodec.scala @@ -74,7 +74,7 @@ class LZFCompressionCodec extends CompressionCodec { class SnappyCompressionCodec extends CompressionCodec { override def compressionOutputStream(s: OutputStream): OutputStream = { - val blockSize = System.getProperty("spark.io.snappy.block.size", "32768").toInt + val blockSize = System.getProperty("spark.io.compression.snappy.block.size", "32768").toInt new SnappyOutputStream(s, blockSize) } From 98024eadc3150a9a509132117875b8d0b18b1d50 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 30 Jul 2013 18:28:46 -0700 Subject: [PATCH 09/10] Renamed compressionOutputStream and compressionInputStream to compressedOutputStream and compressedInputStream. --- core/src/main/scala/spark/io/CompressionCodec.scala | 12 ++++++------ core/src/main/scala/spark/storage/BlockManager.scala | 4 ++-- .../test/scala/spark/io/CompressionCodecSuite.scala | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/spark/io/CompressionCodec.scala b/core/src/main/scala/spark/io/CompressionCodec.scala index b4d8481450..0adebecadb 100644 --- a/core/src/main/scala/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/spark/io/CompressionCodec.scala @@ -30,9 +30,9 @@ import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} */ trait CompressionCodec { - def compressionOutputStream(s: OutputStream): OutputStream + def compressedOutputStream(s: OutputStream): OutputStream - def compressionInputStream(s: InputStream): InputStream + def compressedInputStream(s: InputStream): InputStream } @@ -59,11 +59,11 @@ private[spark] object CompressionCodec { */ class LZFCompressionCodec extends CompressionCodec { - override def compressionOutputStream(s: OutputStream): OutputStream = { + override def compressedOutputStream(s: OutputStream): OutputStream = { new LZFOutputStream(s).setFinishBlockOnFlush(true) } - override def compressionInputStream(s: InputStream): InputStream = new LZFInputStream(s) + override def compressedInputStream(s: InputStream): InputStream = new LZFInputStream(s) } @@ -73,10 +73,10 @@ class LZFCompressionCodec extends CompressionCodec { */ class SnappyCompressionCodec extends CompressionCodec { - override def compressionOutputStream(s: OutputStream): OutputStream = { + override def compressedOutputStream(s: OutputStream): OutputStream = { val blockSize = System.getProperty("spark.io.compression.snappy.block.size", "32768").toInt new SnappyOutputStream(s, blockSize) } - override def compressionInputStream(s: InputStream): InputStream = new SnappyInputStream(s) + override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s) } diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 9ed4c01218..3a72474419 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -925,14 +925,14 @@ private[spark] class BlockManager( * Wrap an output stream for compression if block compression is enabled for its block type */ def wrapForCompression(blockId: String, s: OutputStream): OutputStream = { - if (shouldCompress(blockId)) compressionCodec.compressionOutputStream(s) else s + if (shouldCompress(blockId)) compressionCodec.compressedOutputStream(s) else s } /** * Wrap an input stream for compression if block compression is enabled for its block type */ def wrapForCompression(blockId: String, s: InputStream): InputStream = { - if (shouldCompress(blockId)) compressionCodec.compressionInputStream(s) else s + if (shouldCompress(blockId)) compressionCodec.compressedInputStream(s) else s } def dataSerialize( diff --git a/core/src/test/scala/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/spark/io/CompressionCodecSuite.scala index 1b5daf4c97..1ba82fe2b9 100644 --- a/core/src/test/scala/spark/io/CompressionCodecSuite.scala +++ b/core/src/test/scala/spark/io/CompressionCodecSuite.scala @@ -27,7 +27,7 @@ class CompressionCodecSuite extends FunSuite { def testCodec(codec: CompressionCodec) { // Write 1000 integers to the output stream, compressed. val outputStream = new ByteArrayOutputStream() - val out = codec.compressionOutputStream(outputStream) + val out = codec.compressedOutputStream(outputStream) for (i <- 1 until 1000) { out.write(i % 256) } @@ -35,7 +35,7 @@ class CompressionCodecSuite extends FunSuite { // Read the 1000 integers back. val inputStream = new ByteArrayInputStream(outputStream.toByteArray) - val in = codec.compressionInputStream(inputStream) + val in = codec.compressedInputStream(inputStream) for (i <- 1 until 1000) { assert(in.read() === i % 256) } From c61843a69fd50db66b01e9ef0fb2870baf51d351 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 30 Jul 2013 18:54:35 -0700 Subject: [PATCH 10/10] Changed other LZF uses to use the compression codec interface. --- .../scala/spark/broadcast/HttpBroadcast.scala | 36 ++++++++++--------- .../spark/scheduler/ShuffleMapTask.scala | 15 ++------ .../scala/spark/streaming/Checkpoint.scala | 26 +++++++++----- 3 files changed, 39 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala index c565876950..138a8c21bc 100644 --- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala @@ -17,21 +17,20 @@ package spark.broadcast -import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} - -import java.io._ -import java.net._ -import java.util.UUID +import java.io.{File, FileOutputStream, ObjectInputStream, OutputStream} +import java.net.URL import it.unimi.dsi.fastutil.io.FastBufferedInputStream import it.unimi.dsi.fastutil.io.FastBufferedOutputStream -import spark._ +import spark.{HttpServer, Logging, SparkEnv, Utils} +import spark.io.CompressionCodec import spark.storage.StorageLevel -import util.{MetadataCleaner, TimeStampedHashSet} +import spark.util.{MetadataCleaner, TimeStampedHashSet} + private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long) -extends Broadcast[T](id) with Logging with Serializable { + extends Broadcast[T](id) with Logging with Serializable { def value = value_ @@ -85,6 +84,7 @@ private object HttpBroadcast extends Logging { private val files = new TimeStampedHashSet[String] private val cleaner = new MetadataCleaner("HttpBroadcast", cleanup) + private lazy val compressionCodec = CompressionCodec.createCodec() def initialize(isDriver: Boolean) { synchronized { @@ -122,10 +122,12 @@ private object HttpBroadcast extends Logging { def write(id: Long, value: Any) { val file = new File(broadcastDir, "broadcast-" + id) - val out: OutputStream = if (compress) { - new LZFOutputStream(new FileOutputStream(file)) // Does its own buffering - } else { - new FastBufferedOutputStream(new FileOutputStream(file), bufferSize) + val out: OutputStream = { + if (compress) { + compressionCodec.compressedOutputStream(new FileOutputStream(file)) + } else { + new FastBufferedOutputStream(new FileOutputStream(file), bufferSize) + } } val ser = SparkEnv.get.serializer.newInstance() val serOut = ser.serializeStream(out) @@ -136,10 +138,12 @@ private object HttpBroadcast extends Logging { def read[T](id: Long): T = { val url = serverUri + "/broadcast-" + id - var in = if (compress) { - new LZFInputStream(new URL(url).openStream()) // Does its own buffering - } else { - new FastBufferedInputStream(new URL(url).openStream(), bufferSize) + val in = { + if (compress) { + compressionCodec.compressedInputStream(new URL(url).openStream()) + } else { + new FastBufferedInputStream(new URL(url).openStream(), bufferSize) + } } val ser = SparkEnv.get.serializer.newInstance() val serIn = ser.deserializeStream(in) diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index 1c25605f75..e3bb6d1e60 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -18,16 +18,9 @@ package spark.scheduler import java.io._ -import java.util.{HashMap => JHashMap} import java.util.zip.{GZIPInputStream, GZIPOutputStream} -import scala.collection.mutable.{ArrayBuffer, HashMap} -import scala.collection.JavaConversions._ - -import it.unimi.dsi.fastutil.io.FastBufferedOutputStream - -import com.ning.compress.lzf.LZFInputStream -import com.ning.compress.lzf.LZFOutputStream +import scala.collection.mutable.HashMap import spark._ import spark.executor.ShuffleWriteMetrics @@ -109,11 +102,7 @@ private[spark] class ShuffleMapTask( preferredLocs.foreach (hostPort => Utils.checkHost(Utils.parseHostPort(hostPort)._1, "preferredLocs : " + preferredLocs)) } - var split = if (rdd == null) { - null - } else { - rdd.partitions(partition) - } + var split = if (rdd == null) null else rdd.partitions(partition) override def writeExternal(out: ObjectOutput) { RDDCheckpointData.synchronized { diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index 1e4c1e3742..070d930b5e 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -17,16 +17,17 @@ package spark.streaming -import spark.{Logging, Utils} - -import org.apache.hadoop.fs.{FileUtil, Path} -import org.apache.hadoop.conf.Configuration - import java.io._ -import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} import java.util.concurrent.Executors import java.util.concurrent.RejectedExecutionException +import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration + +import spark.Logging +import spark.io.CompressionCodec + + private[streaming] class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) extends Logging with Serializable { @@ -49,6 +50,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) } } + /** * Convenience class to speed up the writing of graph checkpoint to file */ @@ -66,6 +68,8 @@ class CheckpointWriter(checkpointDir: String) extends Logging { val maxAttempts = 3 val executor = Executors.newFixedThreadPool(1) + private val compressionCodec = CompressionCodec.createCodec() + // Removed code which validates whether there is only one CheckpointWriter per path 'file' since // I did not notice any errors - reintroduce it ? @@ -103,7 +107,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging { def write(checkpoint: Checkpoint) { val bos = new ByteArrayOutputStream() - val zos = new LZFOutputStream(bos) + val zos = compressionCodec.compressedOutputStream(bos) val oos = new ObjectOutputStream(zos) oos.writeObject(checkpoint) oos.close() @@ -137,6 +141,8 @@ object CheckpointReader extends Logging { val fs = new Path(path).getFileSystem(new Configuration()) val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk")) + val compressionCodec = CompressionCodec.createCodec() + attempts.foreach(file => { if (fs.exists(file)) { logInfo("Attempting to load checkpoint from file '" + file + "'") @@ -147,7 +153,7 @@ object CheckpointReader extends Logging { // of ObjectInputStream is used to explicitly use the current thread's default class // loader to find and load classes. This is a well know Java issue and has popped up // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627) - val zis = new LZFInputStream(fis) + val zis = compressionCodec.compressedInputStream(fis) val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader) val cp = ois.readObject.asInstanceOf[Checkpoint] ois.close() @@ -170,7 +176,9 @@ object CheckpointReader extends Logging { } private[streaming] -class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader) extends ObjectInputStream(inputStream_) { +class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader) + extends ObjectInputStream(inputStream_) { + override def resolveClass(desc: ObjectStreamClass): Class[_] = { try { return loader.loadClass(desc.getName())