Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
<groupId>com.ning</groupId>
<artifactId>compress-lzf</artifactId>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</dependency>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
Expand Down
36 changes: 20 additions & 16 deletions core/src/main/scala/spark/broadcast/HttpBroadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,20 @@

package spark.broadcast

import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}

import java.io._
import java.net._
import java.util.UUID
import java.io.{File, FileOutputStream, ObjectInputStream, OutputStream}
import java.net.URL

import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream

import spark._
import spark.{HttpServer, Logging, SparkEnv, Utils}
import spark.io.CompressionCodec
import spark.storage.StorageLevel
import util.{MetadataCleaner, TimeStampedHashSet}
import spark.util.{MetadataCleaner, TimeStampedHashSet}


private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {
extends Broadcast[T](id) with Logging with Serializable {

def value = value_

Expand Down Expand Up @@ -85,6 +84,7 @@ private object HttpBroadcast extends Logging {
private val files = new TimeStampedHashSet[String]
private val cleaner = new MetadataCleaner("HttpBroadcast", cleanup)

private lazy val compressionCodec = CompressionCodec.createCodec()

def initialize(isDriver: Boolean) {
synchronized {
Expand Down Expand Up @@ -122,10 +122,12 @@ private object HttpBroadcast extends Logging {

def write(id: Long, value: Any) {
val file = new File(broadcastDir, "broadcast-" + id)
val out: OutputStream = if (compress) {
new LZFOutputStream(new FileOutputStream(file)) // Does its own buffering
} else {
new FastBufferedOutputStream(new FileOutputStream(file), bufferSize)
val out: OutputStream = {
if (compress) {
compressionCodec.compressedOutputStream(new FileOutputStream(file))
} else {
new FastBufferedOutputStream(new FileOutputStream(file), bufferSize)
}
}
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
Expand All @@ -136,10 +138,12 @@ private object HttpBroadcast extends Logging {

def read[T](id: Long): T = {
val url = serverUri + "/broadcast-" + id
var in = if (compress) {
new LZFInputStream(new URL(url).openStream()) // Does its own buffering
} else {
new FastBufferedInputStream(new URL(url).openStream(), bufferSize)
val in = {
if (compress) {
compressionCodec.compressedInputStream(new URL(url).openStream())
} else {
new FastBufferedInputStream(new URL(url).openStream(), bufferSize)
}
}
val ser = SparkEnv.get.serializer.newInstance()
val serIn = ser.deserializeStream(in)
Expand Down
82 changes: 82 additions & 0 deletions core/src/main/scala/spark/io/CompressionCodec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package spark.io

import java.io.{InputStream, OutputStream}

import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}

import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}


/**
* CompressionCodec allows the customization of choosing different compression implementations
* to be used in block storage.
*/
trait CompressionCodec {

def compressedOutputStream(s: OutputStream): OutputStream

def compressedInputStream(s: InputStream): InputStream
}


private[spark] object CompressionCodec {

def createCodec(): CompressionCodec = {
// Set the default codec to Snappy since the LZF implementation initializes a pretty large
// buffer for every stream, which results in a lot of memory overhead when the number of
// shuffle reduce buckets are large.
createCodec(classOf[SnappyCompressionCodec].getName)
}

def createCodec(codecName: String): CompressionCodec = {
Class.forName(
System.getProperty("spark.io.compression.codec", codecName),
true,
Thread.currentThread.getContextClassLoader).newInstance().asInstanceOf[CompressionCodec]
}
}


/**
* LZF implementation of [[spark.io.CompressionCodec]].
*/
class LZFCompressionCodec extends CompressionCodec {

override def compressedOutputStream(s: OutputStream): OutputStream = {
new LZFOutputStream(s).setFinishBlockOnFlush(true)
}

override def compressedInputStream(s: InputStream): InputStream = new LZFInputStream(s)
}


/**
* Snappy implementation of [[spark.io.CompressionCodec]].
* Block size can be configured by spark.io.compression.snappy.block.size.
*/
class SnappyCompressionCodec extends CompressionCodec {

override def compressedOutputStream(s: OutputStream): OutputStream = {
val blockSize = System.getProperty("spark.io.compression.snappy.block.size", "32768").toInt
new SnappyOutputStream(s, blockSize)
}

override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
}
15 changes: 2 additions & 13 deletions core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,9 @@
package spark.scheduler

import java.io._
import java.util.{HashMap => JHashMap}
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.collection.JavaConversions._

import it.unimi.dsi.fastutil.io.FastBufferedOutputStream

import com.ning.compress.lzf.LZFInputStream
import com.ning.compress.lzf.LZFOutputStream
import scala.collection.mutable.HashMap

import spark._
import spark.executor.ShuffleWriteMetrics
Expand Down Expand Up @@ -109,11 +102,7 @@ private[spark] class ShuffleMapTask(
preferredLocs.foreach (hostPort => Utils.checkHost(Utils.parseHostPort(hostPort)._1, "preferredLocs : " + preferredLocs))
}

var split = if (rdd == null) {
null
} else {
rdd.partitions(partition)
}
var split = if (rdd == null) null else rdd.partitions(partition)

override def writeExternal(out: ObjectOutput) {
RDDCheckpointData.synchronized {
Expand Down
18 changes: 10 additions & 8 deletions core/src/main/scala/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ import akka.dispatch.{Await, Future}
import akka.util.Duration
import akka.util.duration._

import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}

import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream

import spark.{Logging, SparkEnv, SparkException, Utils}
import spark.io.CompressionCodec
import spark.network._
import spark.serializer.Serializer
import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStampedHashMap}
Expand Down Expand Up @@ -158,6 +157,13 @@ private[spark] class BlockManager(
val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks)
initialize()

// The compression codec to use. Note that the "lazy" val is necessary because we want to delay
// the initialization of the compression codec until it is first used. The reason is that a Spark
// program could be using a user-defined codec in a third party jar, which is loaded in
// Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been
// loaded yet.
private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec()

/**
* Construct a BlockManager with a memory limit set based on system properties.
*/
Expand Down Expand Up @@ -919,18 +925,14 @@ private[spark] class BlockManager(
* Wrap an output stream for compression if block compression is enabled for its block type
*/
def wrapForCompression(blockId: String, s: OutputStream): OutputStream = {
if (shouldCompress(blockId)) {
(new LZFOutputStream(s)).setFinishBlockOnFlush(true)
} else {
s
}
if (shouldCompress(blockId)) compressionCodec.compressedOutputStream(s) else s
}

/**
* Wrap an input stream for compression if block compression is enabled for its block type
*/
def wrapForCompression(blockId: String, s: InputStream): InputStream = {
if (shouldCompress(blockId)) new LZFInputStream(s) else s
if (shouldCompress(blockId)) compressionCodec.compressedInputStream(s) else s
}

def dataSerialize(
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
override def close() {
if (initialized) {
objOut.close()
bs.close()
channel = null
bs = null
objOut = null
Expand Down
62 changes: 62 additions & 0 deletions core/src/test/scala/spark/io/CompressionCodecSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package spark.io

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

import org.scalatest.FunSuite


class CompressionCodecSuite extends FunSuite {

def testCodec(codec: CompressionCodec) {
// Write 1000 integers to the output stream, compressed.
val outputStream = new ByteArrayOutputStream()
val out = codec.compressedOutputStream(outputStream)
for (i <- 1 until 1000) {
out.write(i % 256)
}
out.close()

// Read the 1000 integers back.
val inputStream = new ByteArrayInputStream(outputStream.toByteArray)
val in = codec.compressedInputStream(inputStream)
for (i <- 1 until 1000) {
assert(in.read() === i % 256)
}
in.close()
}

test("default compression codec") {
val codec = CompressionCodec.createCodec()
assert(codec.getClass === classOf[SnappyCompressionCodec])
testCodec(codec)
}

test("lzf compression codec") {
val codec = CompressionCodec.createCodec(classOf[LZFCompressionCodec].getName)
assert(codec.getClass === classOf[LZFCompressionCodec])
testCodec(codec)
}

test("snappy compression codec") {
val codec = CompressionCodec.createCodec(classOf[SnappyCompressionCodec].getName)
assert(codec.getClass === classOf[SnappyCompressionCodec])
testCodec(codec)
}
}
21 changes: 18 additions & 3 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ for these variables.
* `SPARK_JAVA_OPTS`, to add JVM options. This includes any system properties that you'd like to pass with `-D`.
* `SPARK_CLASSPATH`, to add elements to Spark's classpath.
* `SPARK_LIBRARY_PATH`, to add search directories for native libraries.
* `SPARK_MEM`, to set the amount of memory used per node. This should be in the same format as the
* `SPARK_MEM`, to set the amount of memory used per node. This should be in the same format as the
JVM's -Xmx option, e.g. `300m` or `1g`. Note that this option will soon be deprecated in favor of
the `spark.executor.memory` system property, so we recommend using that in new code.

Expand Down Expand Up @@ -77,7 +77,7 @@ there are at least five properties that you will commonly want to control:
Class to use for serializing objects that will be sent over the network or need to be cached
in serialized form. The default of Java serialization works with any Serializable Java object but is
quite slow, so we recommend <a href="tuning.html">using <code>spark.KryoSerializer</code>
and configuring Kryo serialization</a> when speed is necessary. Can be any subclass of
and configuring Kryo serialization</a> when speed is necessary. Can be any subclass of
<a href="api/core/index.html#spark.Serializer"><code>spark.Serializer</code></a>).
</td>
</tr>
Expand All @@ -86,7 +86,7 @@ there are at least five properties that you will commonly want to control:
<td>(none)</td>
<td>
If you use Kryo serialization, set this class to register your custom classes with Kryo.
You need to set it to a class that extends
You need to set it to a class that extends
<a href="api/core/index.html#spark.KryoRegistrator"><code>spark.KryoRegistrator</code></a>).
See the <a href="tuning.html#data-serialization">tuning guide</a> for more details.
</td>
Expand Down Expand Up @@ -180,6 +180,21 @@ Apart from these, the following properties are also available, and may be useful
Can save substantial space at the cost of some extra CPU time.
</td>
</tr>
<tr>
<td>spark.io.compression.codec</td>
<td>spark.io.SnappyCompressionCodec</td>
<td>
The compression codec class to use for various compressions. By default, Spark provides two
codecs: <code>spark.io.LZFCompressionCodec</code> and <code>spark.io.SnappyCompressionCodec</code>.
</td>
</tr>
<tr>
<td>spark.io.compression.snappy.block.size</td>
<td>32768</td>
<td>
Block size (in bytes) used in Snappy compression, in the case when Snappy compression codec is used.
</td>
</tr>
<tr>
<td>spark.reducer.maxMbInFlight</td>
<td>48</td>
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@
<artifactId>compress-lzf</artifactId>
<version>0.8.4</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.0.5</version>
</dependency>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
Expand Down
Loading