From a0b115f1d56d7ac9a53986f25cdf40a7ae728a6d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 7 Sep 2021 00:37:19 -0700 Subject: [PATCH 1/3] Add Hadoop sequence test for different codecs. --- .../scala/org/apache/spark/FileSuite.scala | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index f953bf4043f33..b1e7599adf872 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -28,7 +28,7 @@ import com.google.common.io.Files import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io._ -import org.apache.hadoop.io.compress.DefaultCodec +import org.apache.hadoop.io.compress.{BZip2Codec, CompressionCodec, DefaultCodec, GzipCodec, Lz4Codec, SnappyCodec} import org.apache.hadoop.mapred.{FileAlreadyExistsException, FileSplit, JobConf, TextInputFormat, TextOutputFormat} import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat} @@ -113,25 +113,33 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)")) } - test("SequenceFile (compressed)") { - sc = new SparkContext("local", "test") - val normalDir = new File(tempDir, "output_normal").getAbsolutePath - val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath - val codec = new DefaultCodec() + def runSequenceFileCodecTest(codec: CompressionCodec, codecName: String): Unit = { + test(s"SequenceFile (compressed) - $codecName") { + sc = new SparkContext("local", "test") + val normalDir = new File(tempDir, "output_normal").getAbsolutePath + val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath - val data = sc.parallelize(Seq.fill(100)("abc"), 1).map(x => (x, x)) - data.saveAsSequenceFile(normalDir) - data.saveAsSequenceFile(compressedOutputDir, Some(classOf[DefaultCodec])) + val data = sc.parallelize(Seq.fill(100)("abc"), 1).map(x => (x, x)) + data.saveAsSequenceFile(normalDir) + data.saveAsSequenceFile(compressedOutputDir, Some(codec.getClass)) - val normalFile = new File(normalDir, "part-00000") - val normalContent = sc.sequenceFile[String, String](normalDir).collect - assert(normalContent === Array.fill(100)(("abc", "abc"))) + val normalFile = new File(normalDir, "part-00000") + val normalContent = sc.sequenceFile[String, String](normalDir).collect + assert(normalContent === Array.fill(100)(("abc", "abc"))) - val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension) - val compressedContent = sc.sequenceFile[String, String](compressedOutputDir).collect - assert(compressedContent === Array.fill(100)(("abc", "abc"))) + val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension) + val compressedContent = sc.sequenceFile[String, String](compressedOutputDir).collect + assert(compressedContent === Array.fill(100)(("abc", "abc"))) - assert(compressedFile.length < normalFile.length) + assert(compressedFile.length < normalFile.length) + } + } + + // Hadoop "gzip" codec doesn't support sequence file yet. + // Hadoop "zstd" codecs needs native library installed. + Seq((new DefaultCodec(), "default"), (new BZip2Codec(), "bzip2"), (new GzipCodec(), "gzip"), + (new Lz4Codec(), "lz4"), (new SnappyCodec, "snappy")).foreach { case (codec, codecName) => + runSequenceFileCodecTest(codec, codecName) } test("SequenceFile with writable key") { From a9139b34637e0c6b1eec8eb3be55569500a0fbfa Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 7 Sep 2021 10:21:21 -0700 Subject: [PATCH 2/3] Remove "snappy" and "lz4" which cannot work for now. --- core/src/test/scala/org/apache/spark/FileSuite.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index b1e7599adf872..83de1af7e9697 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -28,7 +28,7 @@ import com.google.common.io.Files import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io._ -import org.apache.hadoop.io.compress.{BZip2Codec, CompressionCodec, DefaultCodec, GzipCodec, Lz4Codec, SnappyCodec} +import org.apache.hadoop.io.compress.{BZip2Codec, CompressionCodec, DefaultCodec} import org.apache.hadoop.mapred.{FileAlreadyExistsException, FileSplit, JobConf, TextInputFormat, TextOutputFormat} import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat} @@ -136,10 +136,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { } // Hadoop "gzip" codec doesn't support sequence file yet. - // Hadoop "zstd" codecs needs native library installed. - Seq((new DefaultCodec(), "default"), (new BZip2Codec(), "bzip2"), (new GzipCodec(), "gzip"), - (new Lz4Codec(), "lz4"), (new SnappyCodec, "snappy")).foreach { case (codec, codecName) => - runSequenceFileCodecTest(codec, codecName) + // Hadoop "zstd" codec needs native library installed. + // "snappy" and "lz4" codecs do not work due to SPARK-36669 and SPARK-36681. + Seq((new DefaultCodec(), "default"), (new BZip2Codec(), "bzip2")).foreach { + case (codec, codecName) => + runSequenceFileCodecTest(codec, codecName) } test("SequenceFile with writable key") { From 4aee74c3bd946ff37c0297211c6fc9617bae2e97 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 7 Sep 2021 13:12:28 -0700 Subject: [PATCH 3/3] Revise comment. --- core/src/test/scala/org/apache/spark/FileSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 83de1af7e9697..67a9764ee63a9 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -135,8 +135,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { } } - // Hadoop "gzip" codec doesn't support sequence file yet. - // Hadoop "zstd" codec needs native library installed. + // Hadoop "gzip" and "zstd" codecs require native library installed for sequence files // "snappy" and "lz4" codecs do not work due to SPARK-36669 and SPARK-36681. Seq((new DefaultCodec(), "default"), (new BZip2Codec(), "bzip2")).foreach { case (codec, codecName) =>