Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 34 additions & 21 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options)
self._jwrite.saveAsTable(name)

@since(1.4)
def json(self, path, mode=None):
def json(self, path, mode=None, compression=None):
"""Saves the content of the :class:`DataFrame` in JSON format at the specified path.

:param path: the path in any Hadoop supported file system
Expand All @@ -464,18 +464,19 @@ def json(self, path, mode=None):
* ``overwrite``: Overwrite existing data.
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.

You can set the following JSON-specific option(s) for writing JSON files:
* ``compression`` (default ``None``): compression codec to use when saving to file.
This can be one of the known case-insensitive shorten names
(``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate).

>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)._jwrite.json(path)
self.mode(mode)
if compression is not None:
self.option("compression", compression)
self._jwrite.json(path)

@since(1.4)
def parquet(self, path, mode=None, partitionBy=None):
def parquet(self, path, mode=None, partitionBy=None, compression=None):
"""Saves the content of the :class:`DataFrame` in Parquet format at the specified path.

:param path: the path in any Hadoop supported file system
Expand All @@ -486,32 +487,37 @@ def parquet(self, path, mode=None, partitionBy=None):
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.
:param partitionBy: names of partitioning columns
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, snappy, gzip, and lzo).
This will overwrite ``spark.sql.parquet.compression.codec``.

>>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
if partitionBy is not None:
self.partitionBy(partitionBy)
if compression is not None:
self.option("compression", compression)
self._jwrite.parquet(path)

@since(1.6)
def text(self, path):
def text(self, path, compression=None):
"""Saves the content of the DataFrame in a text file at the specified path.

:param path: the path in any Hadoop supported file system
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate).

The DataFrame must have only one column that is of string type.
Each row becomes a new line in the output file.

You can set the following option(s) for writing text files:
* ``compression`` (default ``None``): compression codec to use when saving to file.
This can be one of the known case-insensitive shorten names
(``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
"""
if compression is not None:
self.option("compression", compression)
self._jwrite.text(path)

@since(2.0)
def csv(self, path, mode=None):
def csv(self, path, mode=None, compression=None):
"""Saves the content of the [[DataFrame]] in CSV format at the specified path.

:param path: the path in any Hadoop supported file system
Expand All @@ -522,17 +528,19 @@ def csv(self, path, mode=None):
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.

You can set the following CSV-specific option(s) for writing CSV files:
* ``compression`` (default ``None``): compression codec to use when saving to file.
This can be one of the known case-insensitive shorten names
(``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate).

>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)._jwrite.csv(path)
self.mode(mode)
if compression is not None:
self.option("compression", compression)
self._jwrite.csv(path)

@since(1.5)
def orc(self, path, mode=None, partitionBy=None):
def orc(self, path, mode=None, partitionBy=None, compression=None):
"""Saves the content of the :class:`DataFrame` in ORC format at the specified path.

::Note: Currently ORC support is only available together with
Expand All @@ -546,13 +554,18 @@ def orc(self, path, mode=None, partitionBy=None):
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.
:param partitionBy: names of partitioning columns
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, snappy, zlib, and lzo).
This will overwrite ``orc.compress``.

>>> orc_df = hiveContext.read.orc('python/test_support/sql/orc_partitioned')
>>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
if partitionBy is not None:
self.partitionBy(partitionBy)
if compression is not None:
self.option("compression", compression)
self._jwrite.orc(path)

@since(1.4)
Expand Down
19 changes: 16 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,8 @@ final class DataFrameWriter private[sql](df: DataFrame) {
*
* You can set the following JSON-specific option(s) for writing JSON files:
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). </li>
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
* `snappy` and `deflate`). </li>
*
* @since 1.4.0
*/
Expand All @@ -468,6 +469,11 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* format("parquet").save(path)
* }}}
*
* You can set the following Parquet-specific option(s) for writing Parquet files:
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names(`none`, `snappy`, `gzip`, and `lzo`).
* This will overwrite `spark.sql.parquet.compression.codec`. </li>
*
* @since 1.4.0
*/
def parquet(path: String): Unit = format("parquet").save(path)
Expand All @@ -479,6 +485,11 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* format("orc").save(path)
* }}}
*
* You can set the following ORC-specific option(s) for writing ORC files:
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names(`none`, `snappy`, `zlib`, and `lzo`).
* This will overwrite `orc.compress`. </li>
*
* @since 1.5.0
* @note Currently, this method can only be used together with `HiveContext`.
*/
Expand All @@ -498,7 +509,8 @@ final class DataFrameWriter private[sql](df: DataFrame) {
*
* You can set the following option(s) for writing text files:
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). </li>
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
* `snappy` and `deflate`). </li>
*
* @since 1.6.0
*/
Expand All @@ -513,7 +525,8 @@ final class DataFrameWriter private[sql](df: DataFrame) {
*
* You can set the following CSV-specific option(s) for writing CSV files:
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). </li>
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
* `snappy` and `deflate`). </li>
*
* @since 2.0.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.apache.spark.util.Utils

private[datasources] object CompressionCodecs {
private val shortCompressionCodecNames = Map(
"none" -> null,
"uncompressed" -> null,
"bzip2" -> classOf[BZip2Codec].getName,
"deflate" -> classOf[DeflateCodec].getName,
"gzip" -> classOf[GzipCodec].getName,
Expand All @@ -39,7 +41,9 @@ private[datasources] object CompressionCodecs {
val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name)
try {
// Validate the codec name
Utils.classForName(codecName)
if (codecName != null) {
Utils.classForName(codecName)
}
codecName
} catch {
case e: ClassNotFoundException =>
Expand All @@ -53,10 +57,16 @@ private[datasources] object CompressionCodecs {
* `codec` should be a full class path
*/
def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
conf.set("mapreduce.output.fileoutputformat.compress", "true")
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
conf.set("mapreduce.map.output.compress", "true")
conf.set("mapreduce.map.output.compress.codec", codec)
if (codec != null){
conf.set("mapreduce.output.fileoutputformat.compress", "true")
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
conf.set("mapreduce.map.output.compress", "true")
conf.set("mapreduce.map.output.compress.codec", codec)
} else {
// This infers the option `compression` is set to `uncompressed` or `none`.
conf.set("mapreduce.output.fileoutputformat.compress", "false")
conf.set("mapreduce.map.output.compress", "false")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,19 @@ private[sql] class ParquetRelation(
.get(ParquetRelation.METASTORE_SCHEMA)
.map(DataType.fromJson(_).asInstanceOf[StructType])

private val compressionCodec: Option[String] = parameters
.get("compression")
.map { codecName =>
// Validate if given compression codec is supported or not.
val shortParquetCompressionCodecNames = ParquetRelation.shortParquetCompressionCodecNames
if (!shortParquetCompressionCodecNames.contains(codecName.toLowerCase)) {
val availableCodecs = shortParquetCompressionCodecNames.keys.map(_.toLowerCase)
throw new IllegalArgumentException(s"Codec [$codecName] " +
s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.")
}
codecName.toLowerCase
}

private lazy val metadataCache: MetadataCache = {
val meta = new MetadataCache
meta.refresh()
Expand Down Expand Up @@ -286,7 +299,8 @@ private[sql] class ParquetRelation(
ParquetRelation
.shortParquetCompressionCodecNames
.getOrElse(
sqlContext.conf.parquetCompressionCodec.toLowerCase(),
compressionCodec
.getOrElse(sqlContext.conf.parquetCompressionCodec.toLowerCase),
CompressionCodecName.UNCOMPRESSED).name())

new BucketedOutputWriterFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import java.io.File
import java.nio.charset.UnsupportedCharsetException
import java.sql.Timestamp

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec

import org.apache.spark.SparkException
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
Expand Down Expand Up @@ -396,6 +402,46 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
}

test("SPARK-13543 Write the output as uncompressed via option()") {
val clonedConf = new Configuration(hadoopConfiguration)
hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true")
hadoopConfiguration
.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
hadoopConfiguration
.set("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec].getName)
hadoopConfiguration.set("mapreduce.map.output.compress", "true")
hadoopConfiguration.set("mapreduce.map.output.compress.codec", classOf[GzipCodec].getName)
withTempDir { dir =>
try {
val csvDir = new File(dir, "csv").getCanonicalPath
val cars = sqlContext.read
.format("csv")
.option("header", "true")
.load(testFile(carsFile))

cars.coalesce(1).write
.format("csv")
.option("header", "true")
.option("compression", "none")
.save(csvDir)

val compressedFiles = new File(csvDir).listFiles()
assert(compressedFiles.exists(!_.getName.endsWith(".gz")))

val carsCopy = sqlContext.read
.format("csv")
.option("header", "true")
.load(csvDir)

verifyCars(carsCopy, withHeader = true)
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
hadoopConfiguration.clear()
clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}
}

test("Schema inference correctly identifies the datatype when data is sparse.") {
val df = sqlContext.read
.format("csv")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters._

import com.fasterxml.jackson.core.JsonFactory
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
import org.scalactic.Tolerance._

import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -1524,6 +1525,49 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
}

test("SPARK-13543 Write the output as uncompressed via option()") {
val clonedConf = new Configuration(hadoopConfiguration)
hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true")
hadoopConfiguration
.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
hadoopConfiguration
.set("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec].getName)
hadoopConfiguration.set("mapreduce.map.output.compress", "true")
hadoopConfiguration.set("mapreduce.map.output.compress.codec", classOf[GzipCodec].getName)
withTempDir { dir =>
try {
val dir = Utils.createTempDir()
dir.delete()

val path = dir.getCanonicalPath
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)

val jsonDF = sqlContext.read.json(path)
val jsonDir = new File(dir, "json").getCanonicalPath
jsonDF.coalesce(1).write
.format("json")
.option("compression", "none")
.save(jsonDir)

val compressedFiles = new File(jsonDir).listFiles()
assert(compressedFiles.exists(!_.getName.endsWith(".gz")))

val jsonCopy = sqlContext.read
.format("json")
.load(jsonDir)

assert(jsonCopy.count == jsonDF.count)
val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
checkAnswer(jsonCopySome, jsonDFSome)
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
hadoopConfiguration.clear()
clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}
}

test("Casting long as timestamp") {
withTempTable("jsonTable") {
val schema = (new StructType).add("ts", TimestampType)
Expand Down
Loading