Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 24 additions & 16 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress.DefaultCodec
import org.apache.hadoop.io.compress.{BZip2Codec, CompressionCodec, DefaultCodec}
import org.apache.hadoop.mapred.{FileAlreadyExistsException, FileSplit, JobConf, TextInputFormat, TextOutputFormat}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
Expand Down Expand Up @@ -113,25 +113,33 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
}

test("SequenceFile (compressed)") {
sc = new SparkContext("local", "test")
val normalDir = new File(tempDir, "output_normal").getAbsolutePath
val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath
val codec = new DefaultCodec()
def runSequenceFileCodecTest(codec: CompressionCodec, codecName: String): Unit = {
test(s"SequenceFile (compressed) - $codecName") {
sc = new SparkContext("local", "test")
val normalDir = new File(tempDir, "output_normal").getAbsolutePath
val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath

val data = sc.parallelize(Seq.fill(100)("abc"), 1).map(x => (x, x))
data.saveAsSequenceFile(normalDir)
data.saveAsSequenceFile(compressedOutputDir, Some(classOf[DefaultCodec]))
val data = sc.parallelize(Seq.fill(100)("abc"), 1).map(x => (x, x))
data.saveAsSequenceFile(normalDir)
data.saveAsSequenceFile(compressedOutputDir, Some(codec.getClass))

val normalFile = new File(normalDir, "part-00000")
val normalContent = sc.sequenceFile[String, String](normalDir).collect
assert(normalContent === Array.fill(100)(("abc", "abc")))
val normalFile = new File(normalDir, "part-00000")
val normalContent = sc.sequenceFile[String, String](normalDir).collect
assert(normalContent === Array.fill(100)(("abc", "abc")))

val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension)
val compressedContent = sc.sequenceFile[String, String](compressedOutputDir).collect
assert(compressedContent === Array.fill(100)(("abc", "abc")))
val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension)
val compressedContent = sc.sequenceFile[String, String](compressedOutputDir).collect
assert(compressedContent === Array.fill(100)(("abc", "abc")))

assert(compressedFile.length < normalFile.length)
assert(compressedFile.length < normalFile.length)
}
}

// Hadoop "gzip" and "zstd" codecs require native library installed for sequence files
// "snappy" and "lz4" codecs do not work due to SPARK-36669 and SPARK-36681.
Seq((new DefaultCodec(), "default"), (new BZip2Codec(), "bzip2")).foreach {
case (codec, codecName) =>
runSequenceFileCodecTest(codec, codecName)
}

test("SequenceFile with writable key") {
Expand Down