Skip to content

Commit ab648ad

Browse files
HyukjinKwonfalaki
authored andcommitted
Support to specify a shorten name for compression codec
#234 This PR is similar with apache/spark#10805. This PR adds the support for shorten names for compression codecs and added a `CompressionCodecs` class instead of the implicit function as its use is nor recommended. Author: hyukjinkwon <[email protected]> Closes #235 from HyukjinKwon/ISSUE-234-shorten-name.
1 parent e1e4712 commit ab648ad

File tree

8 files changed

+126
-14
lines changed

8 files changed

+126
-14
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ When reading files the API accepts several options:
5555
* `charset`: defaults to 'UTF-8' but can be set to other valid charset names
5656
* `inferSchema`: automatically infers column types. It requires one extra pass over the data and is false by default
5757
* `comment`: skip lines beginning with this character. Default is `"#"`. Disable comments by setting this to `null`.
58-
* `codec`: compression codec to use when saving to file. Should be the fully qualified name of a class implementing `org.apache.hadoop.io.compress.CompressionCodec`. Defaults to no compression when a codec is not specified.
58+
* `codec`: compression codec to use when saving to file. Should be the fully qualified name of a class implementing `org.apache.hadoop.io.compress.CompressionCodec` or one of case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). Defaults to no compression when a codec is not specified.
5959
* `nullValue`: specificy a string that indicates a null value, any fields matching this string will be set as nulls in the DataFrame
6060

6161
The package also support saving simple (non-nested) DataFrame. When saving you can specify the delimiter and whether we should generate a header row for the table. See following examples for more details.

src/main/scala/com/databricks/spark/csv/CsvRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ case class CsvRelation protected[spark] (
326326
}
327327
// Write the data. We assume that schema isn't changed, and we won't update it.
328328

329-
val codecClass = compresionCodecClass(codec)
329+
val codecClass = CompressionCodecs.getCodecClass(codec)
330330
data.saveAsCsvFile(filesystemPath.toString, Map("delimiter" -> delimiter.toString),
331331
codecClass)
332332
} else {

src/main/scala/com/databricks/spark/csv/DefaultSource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import org.apache.hadoop.fs.Path
1919
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
2020
import org.apache.spark.sql.sources._
2121
import org.apache.spark.sql.types.StructType
22-
import com.databricks.spark.csv.util.{ParserLibs, TextFile, TypeCast}
22+
import com.databricks.spark.csv.util.{CompressionCodecs, ParserLibs, TextFile, TypeCast}
2323

2424
/**
2525
* Provides access to CSV data from pure SQL statements (i.e. for users of the
@@ -183,7 +183,7 @@ class DefaultSource
183183
}
184184
if (doSave) {
185185
// Only save data when the save mode is not ignore.
186-
val codecClass = compresionCodecClass(parameters.getOrElse("codec", null))
186+
val codecClass = CompressionCodecs.getCodecClass(parameters.getOrElse("codec", null))
187187
data.saveAsCsvFile(path, parameters, codecClass)
188188
}
189189

src/main/scala/com/databricks/spark/csv/package.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,6 @@ package object csv {
2626
val defaultCsvFormat =
2727
CSVFormat.DEFAULT.withRecordSeparator(System.getProperty("line.separator", "\n"))
2828

29-
private[csv] def compresionCodecClass(className: String): Class[_ <: CompressionCodec] = {
30-
className match {
31-
case null => null
32-
case codec =>
33-
// scalastyle:off classforname
34-
Class.forName(codec).asInstanceOf[Class[CompressionCodec]]
35-
// scalastyle:on classforname
36-
}
37-
}
38-
3929
/**
4030
* Adds a method, `csvFile`, to SQLContext that allows reading CSV data.
4131
*/
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2014 Databricks
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.databricks.spark.csv.util
17+
18+
import scala.util.control.Exception._
19+
20+
import org.apache.hadoop.io.compress._
21+
22+
private[csv] object CompressionCodecs {
23+
private val shortCompressionCodecNames: Map[String, String] = {
24+
val codecMap = collection.mutable.Map.empty[String, String]
25+
allCatch toTry(codecMap += "bzip2" -> classOf[BZip2Codec].getName)
26+
allCatch toTry(codecMap += "gzip" -> classOf[GzipCodec].getName)
27+
allCatch toTry(codecMap += "lz4" -> classOf[Lz4Codec].getName)
28+
allCatch toTry(codecMap += "snappy" -> classOf[SnappyCodec].getName)
29+
codecMap.toMap
30+
}
31+
32+
/**
33+
* Return the codec class of the given name.
34+
*/
35+
def getCodecClass: String => Class[_ <: CompressionCodec] = {
36+
case null => null
37+
case codec =>
38+
val codecName = shortCompressionCodecNames.getOrElse(codec.toLowerCase, codec)
39+
try {
40+
// scalastyle:off classforname
41+
Class.forName(codecName).asInstanceOf[Class[CompressionCodec]]
42+
// scalastyle:on classforname
43+
} catch {
44+
case e: ClassNotFoundException =>
45+
throw new IllegalArgumentException(s"Codec [$codecName] is not " +
46+
s"available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.")
47+
}
48+
}
49+
}

src/test/scala/com/databricks/spark/csv/CsvSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,24 @@ abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll {
473473
assert(carsCopy.collect.map(_.toString).toSet == cars.collect.map(_.toString).toSet)
474474
}
475475

476+
test("Scala API save with gzip compression codec by shorten name") {
477+
// Create temp directory
478+
TestUtils.deleteRecursively(new File(tempEmptyDir))
479+
new File(tempEmptyDir).mkdirs()
480+
val copyFilePath = tempEmptyDir + "cars-copy.csv"
481+
482+
val cars = sqlContext.csvFile(carsFile, parserLib = parserLib)
483+
cars.save("com.databricks.spark.csv", SaveMode.Overwrite,
484+
Map("path" -> copyFilePath, "header" -> "true", "codec" -> "gZiP"))
485+
val carsCopyPartFile = new File(copyFilePath, "part-00000.gz")
486+
// Check that the part file has a .gz extension
487+
assert(carsCopyPartFile.exists())
488+
489+
val carsCopy = sqlContext.csvFile(copyFilePath + "/")
490+
491+
assert(carsCopy.count == cars.count)
492+
assert(carsCopy.collect.map(_.toString).toSet == cars.collect.map(_.toString).toSet)
493+
}
476494

477495
test("DSL save with quoting") {
478496
// Create temp directory
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2014 Databricks
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.databricks.spark.csv.util
17+
18+
import org.apache.hadoop.io.compress._
19+
import org.scalatest.FunSuite
20+
21+
class CompressionCodecsSuite extends FunSuite {
22+
23+
/**
24+
* Note that Lz4 codec was added from Hadoop 2.x. So, some tests might fail with
25+
* class-not-found exception when Hadoop version is lower.
26+
*/
27+
test("Get classes of compression codecs") {
28+
assert(CompressionCodecs.getCodecClass(classOf[GzipCodec].getName) == classOf[GzipCodec])
29+
assert(CompressionCodecs.getCodecClass(classOf[SnappyCodec].getName) == classOf[SnappyCodec])
30+
assert(CompressionCodecs.getCodecClass(classOf[Lz4Codec].getName) == classOf[Lz4Codec])
31+
assert(CompressionCodecs.getCodecClass(classOf[BZip2Codec].getName) == classOf[BZip2Codec])
32+
}
33+
34+
test("Get classes of compression codecs with short names") {
35+
assert(CompressionCodecs.getCodecClass("GzIp") == classOf[GzipCodec])
36+
assert(CompressionCodecs.getCodecClass("Snappy") == classOf[SnappyCodec])
37+
assert(CompressionCodecs.getCodecClass("lz4") == classOf[Lz4Codec])
38+
assert(CompressionCodecs.getCodecClass("bZip2") == classOf[BZip2Codec])
39+
}
40+
}

src/test/scala/com/databricks/spark/csv/util/InferSchemaSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
/*
2+
* Copyright 2014 Databricks
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package com.databricks.spark.csv.util
217

318
import org.apache.spark.sql.types._

0 commit comments

Comments
 (0)