Skip to content

Commit cf95d72

Browse files
HyukjinKwonrxin
authored andcommitted
[SPARK-13543][SQL] Support for specifying compression codec for Parquet/ORC via option()
## What changes were proposed in this pull request? This PR adds the support to specify compression codecs for both ORC and Parquet. ## How was this patch tested? unittests within IDE and code style tests with `dev/run_tests`. Author: hyukjinkwon <[email protected]> Closes #11464 from HyukjinKwon/SPARK-13543.
1 parent 511d492 commit cf95d72

File tree

10 files changed

+301
-43
lines changed

10 files changed

+301
-43
lines changed

python/pyspark/sql/readwriter.py

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,7 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options)
454454
self._jwrite.saveAsTable(name)
455455

456456
@since(1.4)
457-
def json(self, path, mode=None):
457+
def json(self, path, mode=None, compression=None):
458458
"""Saves the content of the :class:`DataFrame` in JSON format at the specified path.
459459
460460
:param path: the path in any Hadoop supported file system
@@ -464,18 +464,19 @@ def json(self, path, mode=None):
464464
* ``overwrite``: Overwrite existing data.
465465
* ``ignore``: Silently ignore this operation if data already exists.
466466
* ``error`` (default case): Throw an exception if data already exists.
467-
468-
You can set the following JSON-specific option(s) for writing JSON files:
469-
* ``compression`` (default ``None``): compression codec to use when saving to file.
470-
This can be one of the known case-insensitive shorten names
471-
(``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
467+
:param compression: compression codec to use when saving to file. This can be one of the
468+
known case-insensitive shorten names (none, bzip2, gzip, lz4,
469+
snappy and deflate).
472470
473471
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
474472
"""
475-
self.mode(mode)._jwrite.json(path)
473+
self.mode(mode)
474+
if compression is not None:
475+
self.option("compression", compression)
476+
self._jwrite.json(path)
476477

477478
@since(1.4)
478-
def parquet(self, path, mode=None, partitionBy=None):
479+
def parquet(self, path, mode=None, partitionBy=None, compression=None):
479480
"""Saves the content of the :class:`DataFrame` in Parquet format at the specified path.
480481
481482
:param path: the path in any Hadoop supported file system
@@ -486,32 +487,37 @@ def parquet(self, path, mode=None, partitionBy=None):
486487
* ``ignore``: Silently ignore this operation if data already exists.
487488
* ``error`` (default case): Throw an exception if data already exists.
488489
:param partitionBy: names of partitioning columns
490+
:param compression: compression codec to use when saving to file. This can be one of the
491+
known case-insensitive shorten names (none, snappy, gzip, and lzo).
492+
This will overwrite ``spark.sql.parquet.compression.codec``.
489493
490494
>>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
491495
"""
492496
self.mode(mode)
493497
if partitionBy is not None:
494498
self.partitionBy(partitionBy)
499+
if compression is not None:
500+
self.option("compression", compression)
495501
self._jwrite.parquet(path)
496502

497503
@since(1.6)
498-
def text(self, path):
504+
def text(self, path, compression=None):
499505
"""Saves the content of the DataFrame in a text file at the specified path.
500506
501507
:param path: the path in any Hadoop supported file system
508+
:param compression: compression codec to use when saving to file. This can be one of the
509+
known case-insensitive shorten names (none, bzip2, gzip, lz4,
510+
snappy and deflate).
502511
503512
The DataFrame must have only one column that is of string type.
504513
Each row becomes a new line in the output file.
505-
506-
You can set the following option(s) for writing text files:
507-
* ``compression`` (default ``None``): compression codec to use when saving to file.
508-
This can be one of the known case-insensitive shorten names
509-
(``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
510514
"""
515+
if compression is not None:
516+
self.option("compression", compression)
511517
self._jwrite.text(path)
512518

513519
@since(2.0)
514-
def csv(self, path, mode=None):
520+
def csv(self, path, mode=None, compression=None):
515521
"""Saves the content of the [[DataFrame]] in CSV format at the specified path.
516522
517523
:param path: the path in any Hadoop supported file system
@@ -522,17 +528,19 @@ def csv(self, path, mode=None):
522528
* ``ignore``: Silently ignore this operation if data already exists.
523529
* ``error`` (default case): Throw an exception if data already exists.
524530
525-
You can set the following CSV-specific option(s) for writing CSV files:
526-
* ``compression`` (default ``None``): compression codec to use when saving to file.
527-
This can be one of the known case-insensitive shorten names
528-
(``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
531+
:param compression: compression codec to use when saving to file. This can be one of the
532+
known case-insensitive shorten names (none, bzip2, gzip, lz4,
533+
snappy and deflate).
529534
530535
>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
531536
"""
532-
self.mode(mode)._jwrite.csv(path)
537+
self.mode(mode)
538+
if compression is not None:
539+
self.option("compression", compression)
540+
self._jwrite.csv(path)
533541

534542
@since(1.5)
535-
def orc(self, path, mode=None, partitionBy=None):
543+
def orc(self, path, mode=None, partitionBy=None, compression=None):
536544
"""Saves the content of the :class:`DataFrame` in ORC format at the specified path.
537545
538546
::Note: Currently ORC support is only available together with
@@ -546,13 +554,18 @@ def orc(self, path, mode=None, partitionBy=None):
546554
* ``ignore``: Silently ignore this operation if data already exists.
547555
* ``error`` (default case): Throw an exception if data already exists.
548556
:param partitionBy: names of partitioning columns
557+
:param compression: compression codec to use when saving to file. This can be one of the
558+
known case-insensitive shorten names (none, snappy, zlib, and lzo).
559+
This will overwrite ``orc.compress``.
549560
550561
>>> orc_df = hiveContext.read.orc('python/test_support/sql/orc_partitioned')
551562
>>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))
552563
"""
553564
self.mode(mode)
554565
if partitionBy is not None:
555566
self.partitionBy(partitionBy)
567+
if compression is not None:
568+
self.option("compression", compression)
556569
self._jwrite.orc(path)
557570

558571
@since(1.4)

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,8 @@ final class DataFrameWriter private[sql](df: DataFrame) {
455455
*
456456
* You can set the following JSON-specific option(s) for writing JSON files:
457457
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
458-
* one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). </li>
458+
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
459+
* `snappy` and `deflate`). </li>
459460
*
460461
* @since 1.4.0
461462
*/
@@ -468,6 +469,11 @@ final class DataFrameWriter private[sql](df: DataFrame) {
468469
* format("parquet").save(path)
469470
* }}}
470471
*
472+
* You can set the following Parquet-specific option(s) for writing Parquet files:
473+
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
474+
* one of the known case-insensitive shorten names(`none`, `snappy`, `gzip`, and `lzo`).
475+
* This will overwrite `spark.sql.parquet.compression.codec`. </li>
476+
*
471477
* @since 1.4.0
472478
*/
473479
def parquet(path: String): Unit = format("parquet").save(path)
@@ -479,6 +485,11 @@ final class DataFrameWriter private[sql](df: DataFrame) {
479485
* format("orc").save(path)
480486
* }}}
481487
*
488+
* You can set the following ORC-specific option(s) for writing ORC files:
489+
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
490+
* one of the known case-insensitive shorten names(`none`, `snappy`, `zlib`, and `lzo`).
491+
* This will overwrite `orc.compress`. </li>
492+
*
482493
* @since 1.5.0
483494
* @note Currently, this method can only be used together with `HiveContext`.
484495
*/
@@ -498,7 +509,8 @@ final class DataFrameWriter private[sql](df: DataFrame) {
498509
*
499510
* You can set the following option(s) for writing text files:
500511
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
501-
* one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). </li>
512+
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
513+
* `snappy` and `deflate`). </li>
502514
*
503515
* @since 1.6.0
504516
*/
@@ -513,7 +525,8 @@ final class DataFrameWriter private[sql](df: DataFrame) {
513525
*
514526
* You can set the following CSV-specific option(s) for writing CSV files:
515527
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
516-
* one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). </li>
528+
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
529+
* `snappy` and `deflate`). </li>
517530
*
518531
* @since 2.0.0
519532
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import org.apache.spark.util.Utils
2525

2626
private[datasources] object CompressionCodecs {
2727
private val shortCompressionCodecNames = Map(
28+
"none" -> null,
29+
"uncompressed" -> null,
2830
"bzip2" -> classOf[BZip2Codec].getName,
2931
"deflate" -> classOf[DeflateCodec].getName,
3032
"gzip" -> classOf[GzipCodec].getName,
@@ -39,7 +41,9 @@ private[datasources] object CompressionCodecs {
3941
val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name)
4042
try {
4143
// Validate the codec name
42-
Utils.classForName(codecName)
44+
if (codecName != null) {
45+
Utils.classForName(codecName)
46+
}
4347
codecName
4448
} catch {
4549
case e: ClassNotFoundException =>
@@ -53,10 +57,16 @@ private[datasources] object CompressionCodecs {
5357
* `codec` should be a full class path
5458
*/
5559
def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
56-
conf.set("mapreduce.output.fileoutputformat.compress", "true")
57-
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
58-
conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
59-
conf.set("mapreduce.map.output.compress", "true")
60-
conf.set("mapreduce.map.output.compress.codec", codec)
60+
if (codec != null){
61+
conf.set("mapreduce.output.fileoutputformat.compress", "true")
62+
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
63+
conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
64+
conf.set("mapreduce.map.output.compress", "true")
65+
conf.set("mapreduce.map.output.compress.codec", codec)
66+
} else {
67+
// This infers the option `compression` is set to `uncompressed` or `none`.
68+
conf.set("mapreduce.output.fileoutputformat.compress", "false")
69+
conf.set("mapreduce.map.output.compress", "false")
70+
}
6171
}
6272
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,19 @@ private[sql] class ParquetRelation(
148148
.get(ParquetRelation.METASTORE_SCHEMA)
149149
.map(DataType.fromJson(_).asInstanceOf[StructType])
150150

151+
private val compressionCodec: Option[String] = parameters
152+
.get("compression")
153+
.map { codecName =>
154+
// Validate if given compression codec is supported or not.
155+
val shortParquetCompressionCodecNames = ParquetRelation.shortParquetCompressionCodecNames
156+
if (!shortParquetCompressionCodecNames.contains(codecName.toLowerCase)) {
157+
val availableCodecs = shortParquetCompressionCodecNames.keys.map(_.toLowerCase)
158+
throw new IllegalArgumentException(s"Codec [$codecName] " +
159+
s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.")
160+
}
161+
codecName.toLowerCase
162+
}
163+
151164
private lazy val metadataCache: MetadataCache = {
152165
val meta = new MetadataCache
153166
meta.refresh()
@@ -286,7 +299,8 @@ private[sql] class ParquetRelation(
286299
ParquetRelation
287300
.shortParquetCompressionCodecNames
288301
.getOrElse(
289-
sqlContext.conf.parquetCompressionCodec.toLowerCase(),
302+
compressionCodec
303+
.getOrElse(sqlContext.conf.parquetCompressionCodec.toLowerCase),
290304
CompressionCodecName.UNCOMPRESSED).name())
291305

292306
new BucketedOutputWriterFactory {

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ import java.io.File
2121
import java.nio.charset.UnsupportedCharsetException
2222
import java.sql.Timestamp
2323

24+
import scala.collection.JavaConverters._
25+
26+
import org.apache.hadoop.conf.Configuration
27+
import org.apache.hadoop.io.SequenceFile.CompressionType
28+
import org.apache.hadoop.io.compress.GzipCodec
29+
2430
import org.apache.spark.SparkException
2531
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
2632
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
@@ -396,6 +402,46 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
396402
}
397403
}
398404

405+
test("SPARK-13543 Write the output as uncompressed via option()") {
406+
val clonedConf = new Configuration(hadoopConfiguration)
407+
hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true")
408+
hadoopConfiguration
409+
.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
410+
hadoopConfiguration
411+
.set("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec].getName)
412+
hadoopConfiguration.set("mapreduce.map.output.compress", "true")
413+
hadoopConfiguration.set("mapreduce.map.output.compress.codec", classOf[GzipCodec].getName)
414+
withTempDir { dir =>
415+
try {
416+
val csvDir = new File(dir, "csv").getCanonicalPath
417+
val cars = sqlContext.read
418+
.format("csv")
419+
.option("header", "true")
420+
.load(testFile(carsFile))
421+
422+
cars.coalesce(1).write
423+
.format("csv")
424+
.option("header", "true")
425+
.option("compression", "none")
426+
.save(csvDir)
427+
428+
val compressedFiles = new File(csvDir).listFiles()
429+
assert(compressedFiles.exists(!_.getName.endsWith(".gz")))
430+
431+
val carsCopy = sqlContext.read
432+
.format("csv")
433+
.option("header", "true")
434+
.load(csvDir)
435+
436+
verifyCars(carsCopy, withHeader = true)
437+
} finally {
438+
// Hadoop 1 doesn't have `Configuration.unset`
439+
hadoopConfiguration.clear()
440+
clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
441+
}
442+
}
443+
}
444+
399445
test("Schema inference correctly identifies the datatype when data is sparse.") {
400446
val df = sqlContext.read
401447
.format("csv")

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@ import java.sql.{Date, Timestamp}
2323
import scala.collection.JavaConverters._
2424

2525
import com.fasterxml.jackson.core.JsonFactory
26-
import org.apache.commons.io.FileUtils
2726
import org.apache.hadoop.conf.Configuration
2827
import org.apache.hadoop.fs.{Path, PathFilter}
28+
import org.apache.hadoop.io.SequenceFile.CompressionType
29+
import org.apache.hadoop.io.compress.GzipCodec
2930
import org.scalactic.Tolerance._
3031

3132
import org.apache.spark.rdd.RDD
@@ -1524,6 +1525,49 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
15241525
}
15251526
}
15261527

1528+
test("SPARK-13543 Write the output as uncompressed via option()") {
1529+
val clonedConf = new Configuration(hadoopConfiguration)
1530+
hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true")
1531+
hadoopConfiguration
1532+
.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
1533+
hadoopConfiguration
1534+
.set("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec].getName)
1535+
hadoopConfiguration.set("mapreduce.map.output.compress", "true")
1536+
hadoopConfiguration.set("mapreduce.map.output.compress.codec", classOf[GzipCodec].getName)
1537+
withTempDir { dir =>
1538+
try {
1539+
val dir = Utils.createTempDir()
1540+
dir.delete()
1541+
1542+
val path = dir.getCanonicalPath
1543+
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
1544+
1545+
val jsonDF = sqlContext.read.json(path)
1546+
val jsonDir = new File(dir, "json").getCanonicalPath
1547+
jsonDF.coalesce(1).write
1548+
.format("json")
1549+
.option("compression", "none")
1550+
.save(jsonDir)
1551+
1552+
val compressedFiles = new File(jsonDir).listFiles()
1553+
assert(compressedFiles.exists(!_.getName.endsWith(".gz")))
1554+
1555+
val jsonCopy = sqlContext.read
1556+
.format("json")
1557+
.load(jsonDir)
1558+
1559+
assert(jsonCopy.count == jsonDF.count)
1560+
val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
1561+
val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
1562+
checkAnswer(jsonCopySome, jsonDFSome)
1563+
} finally {
1564+
// Hadoop 1 doesn't have `Configuration.unset`
1565+
hadoopConfiguration.clear()
1566+
clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
1567+
}
1568+
}
1569+
}
1570+
15271571
test("Casting long as timestamp") {
15281572
withTempTable("jsonTable") {
15291573
val schema = (new StructType).add("ts", TimestampType)

0 commit comments

Comments
 (0)