From 352bb3d3c4ba72276e8fc869d1f47d79df83d032 Mon Sep 17 00:00:00 2001 From: Adam Roberts Date: Tue, 6 Oct 2015 17:19:05 +0100 Subject: [PATCH 1/5] Update Snappy version to 1.1.2 and modify test case as concatenation of serialized streams is now supported --- .../scala/org/apache/spark/io/CompressionCodecSuite.scala | 6 ++---- pom.xml | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala index cbdb33c89d0fb..1553ab60bddaa 100644 --- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala @@ -100,12 +100,10 @@ class CompressionCodecSuite extends SparkFunSuite { testCodec(codec) } - test("snappy does not support concatenation of serialized streams") { + test("snappy supports concatenation of serialized streams") { val codec = CompressionCodec.createCodec(conf, classOf[SnappyCompressionCodec].getName) assert(codec.getClass === classOf[SnappyCompressionCodec]) - intercept[Exception] { - testConcatenationOfSerializedStreams(codec) - } + testConcatenationOfSerializedStreams(codec) } test("bad compression codec") { diff --git a/pom.xml b/pom.xml index d04ed1e798657..d370376f9358e 100644 --- a/pom.xml +++ b/pom.xml @@ -164,7 +164,7 @@ org.scala-lang 1.9.13 2.4.4 - 1.1.1.7 + 1.1.2 1.1.2 1.2.0-incubating 1.10 From ae13aa82242107ce428c7e729fa241c24e9a3f5d Mon Sep 17 00:00:00 2001 From: Adam Roberts Date: Sat, 17 Oct 2015 13:41:02 +0100 Subject: [PATCH 2/5] Update CompressionCodec.scala Add known compression codecs that support concatenation of serialized streams --- .../main/scala/org/apache/spark/io/CompressionCodec.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 9dc36704a676d..270e304099606 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -47,6 +47,11 @@ trait CompressionCodec { private[spark] object CompressionCodec { private val configKey = "spark.io.compression.codec" + + private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = { + codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec] + } + private val shortCompressionCodecNames = Map( "lz4" -> classOf[LZ4CompressionCodec].getName, "lzf" -> classOf[LZFCompressionCodec].getName, From 3d650c82eda1c6a3b826be8c8b052db5e3b95fc3 Mon Sep 17 00:00:00 2001 From: Adam Roberts Date: Sat, 17 Oct 2015 13:43:39 +0100 Subject: [PATCH 3/5] Update UnsafeShuffleWriter.java Update fastMergeIsSupported so we can support concatenation of serialized streams --- .../org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java index fdb309e365f69..4cd7e3d6cb9b9 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java @@ -265,8 +265,8 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException { final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf); final boolean fastMergeEnabled = sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true); - final boolean fastMergeIsSupported = - !compressionEnabled || compressionCodec instanceof LZFCompressionCodec; + final boolean fastMergeIsSupported = !compressionEnabled || + CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec); try { if (spills.length == 0) { new FileOutputStream(outputFile).close(); // Create an empty file From 1949bcb017579de53b891f369ecf00087b3541b6 Mon Sep 17 00:00:00 2001 From: Adam Roberts Date: Sun, 18 Oct 2015 00:10:05 +0100 Subject: [PATCH 4/5] Update CompressionCodec.scala Scalastyle fixes for whitespace --- core/src/main/scala/org/apache/spark/io/CompressionCodec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 270e304099606..a962a481381bc 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -49,7 +49,7 @@ private[spark] object CompressionCodec { private val configKey = "spark.io.compression.codec" private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = { - codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec] + codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec] } private val shortCompressionCodecNames = Map( From 0f870521d1ed2183e4fe5ee0b8e4dc633ccb3cb5 Mon Sep 17 00:00:00 2001 From: Adam Roberts Date: Mon, 19 Oct 2015 12:59:57 +0100 Subject: [PATCH 5/5] Whitespace --- .../src/main/scala/org/apache/spark/io/CompressionCodec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index a962a481381bc..ca74eedf89be5 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -47,11 +47,11 @@ trait CompressionCodec { private[spark] object CompressionCodec { private val configKey = "spark.io.compression.codec" - + private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = { codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec] } - + private val shortCompressionCodecNames = Map( "lz4" -> classOf[LZ4CompressionCodec].getName, "lzf" -> classOf[LZFCompressionCodec].getName,