Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -953,8 +953,10 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession
<td><code>spark.sql.parquet.compression.codec</code></td>
<td>snappy</td>
<td>
Sets the compression codec use when writing Parquet files. Acceptable values include:
uncompressed, snappy, gzip, lzo.
Sets the compression codec used when writing Parquet files. If either `compression` or
`parquet.compression` is specified in the table-specific options/properties, the precedence would be
`compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include:
none, uncompressed, snappy, gzip, lzo.
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,13 @@ object SQLConf {
.createWithDefault(false)

val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec")
.doc("Sets the compression codec use when writing Parquet files. Acceptable values include: " +
"uncompressed, snappy, gzip, lzo.")
.doc("Sets the compression codec used when writing Parquet files. If either `compression` or" +
"`parquet.compression` is specified in the table-specific options/properties, the precedence" +
"would be `compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`." +
"Acceptable values include: none, uncompressed, snappy, gzip, lzo.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("uncompressed", "snappy", "gzip", "lzo"))
.checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo"))
.createWithDefault("snappy")

val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown")
Expand Down Expand Up @@ -364,8 +366,10 @@ object SQLConf {
.createWithDefault(true)

val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
.doc("Sets the compression codec use when writing ORC files. Acceptable values include: " +
"none, uncompressed, snappy, zlib, lzo.")
.doc("Sets the compression codec used when writing ORC files. If either `compression` or" +
"`orc.compress` is specified in the table-specific options/properties, the precedence" +
"would be `compression`, `orc.compress`, `spark.sql.orc.compression.codec`." +
"Acceptable values include: none, uncompressed, snappy, zlib, lzo.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet

import java.util.Locale

import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.parquet.hadoop.metadata.CompressionCodecName

import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
Expand All @@ -42,8 +43,15 @@ private[parquet] class ParquetOptions(
* Acceptable values are defined in [[shortParquetCompressionCodecNames]].
*/
val compressionCodecClassName: String = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change compressionCodecClassName to compressionCodec instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Seems you're right.
@gatorsmile Are we mistaken, shouldn't we change ParquetOptions's compressionCodec to compressionCodecClassName ? Because OrcOptions and TextOptions are all using compressionCodec .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compressionCodecClassName is a better name. We should change all the others to this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could alternatively say compressionCodecName here. It's rather names like UNCOMPRESSED, LZO, etc in this case. For the text based sources, they are canonical class names so I am okay with compressionCodecClassName but for ORC and Parquet these are not classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compressionCodecName is also fine to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, change all compressionCodecClassName and compressionCodec to compressionCodecName? In TextOptions ,JSONOptions and CSVOptions too ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile @HyukjinKwon
In TextOptions ,JSONOptions and CSVOptions, it's "Option[String]", but in OrcOptions and ParquetOptions, it's a "String".
Just change compressionCodecClassName in OrcOptions and ParquetOptions to compressionCodecName is ok ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do Parquet and ORC ones here for now if that's also fine to @gatorsmile.

val codecName = parameters.getOrElse("compression",
sqlConf.parquetCompressionCodec).toLowerCase(Locale.ROOT)
// `compression`, `parquet.compression`(i.e., ParquetOutputFormat.COMPRESSION), and
// `spark.sql.parquet.compression.codec`
// are in order of precedence from highest to lowest.
val parquetCompressionConf = parameters.get(ParquetOutputFormat.COMPRESSION)
val codecName = parameters
.get("compression")
.orElse(parquetCompressionConf)
.getOrElse(sqlConf.parquetCompressionCodec)
.toLowerCase(Locale.ROOT)
if (!shortParquetCompressionCodecNames.contains(codecName)) {
val availableCodecs =
shortParquetCompressionCodecNames.keys.map(_.toLowerCase(Locale.ROOT))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.parquet

import java.io.File

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetOutputFormat

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext

class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSQLContext {
test("Test `spark.sql.parquet.compression.codec` config") {
Seq("NONE", "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO").foreach { c =>
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) {
val expected = if (c == "NONE") "UNCOMPRESSED" else c
val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf)
assert(option.compressionCodecClassName == expected)
}
}
}

test("[SPARK-21786] Test Acquiring 'compressionCodecClassName' for parquet in right order.") {
// When "compression" is configured, it should be the first choice.
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
val props = Map("compression" -> "uncompressed", ParquetOutputFormat.COMPRESSION -> "gzip")
val option = new ParquetOptions(props, spark.sessionState.conf)
assert(option.compressionCodecClassName == "UNCOMPRESSED")
}

// When "compression" is not configured, "parquet.compression" should be the preferred choice.
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
val props = Map(ParquetOutputFormat.COMPRESSION -> "gzip")
val option = new ParquetOptions(props, spark.sessionState.conf)
assert(option.compressionCodecClassName == "GZIP")
}

// When both "compression" and "parquet.compression" are not configured,
// spark.sql.parquet.compression.codec should be the right choice.
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
val props = Map.empty[String, String]
val option = new ParquetOptions(props, spark.sessionState.conf)
assert(option.compressionCodecClassName == "SNAPPY")
}
}

private def getTableCompressionCodec(path: String): Seq[String] = {
val hadoopConf = spark.sessionState.newHadoopConf()
val codecs = for {
footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)
block <- footer.getParquetMetadata.getBlocks.asScala
column <- block.getColumns.asScala
} yield column.getCodec.name()
codecs.distinct
}

private def createTableWithCompression(
tableName: String,
isPartitioned: Boolean,
compressionCodec: String,
rootDir: File): Unit = {
val options =
s"""
|OPTIONS('path'='${rootDir.toURI.toString.stripSuffix("/")}/$tableName',
|'parquet.compression'='$compressionCodec')
""".stripMargin
val partitionCreate = if (isPartitioned) "PARTITIONED BY (p)" else ""
sql(
s"""
|CREATE TABLE $tableName USING Parquet $options $partitionCreate
|AS SELECT 1 AS col1, 2 AS p
""".stripMargin)
}

private def checkCompressionCodec(compressionCodec: String, isPartitioned: Boolean): Unit = {
withTempDir { tmpDir =>
val tempTableName = "TempParquetTable"
withTable(tempTableName) {
createTableWithCompression(tempTableName, isPartitioned, compressionCodec, tmpDir)
val partitionPath = if (isPartitioned) "p=2" else ""
val path = s"${tmpDir.getPath.stripSuffix("/")}/$tempTableName/$partitionPath"
val realCompressionCodecs = getTableCompressionCodec(path)
assert(realCompressionCodecs.forall(_ == compressionCodec))
}
}
}

test("Create parquet table with compression") {
Seq(true, false).foreach { isPartitioned =>
Seq("UNCOMPRESSED", "SNAPPY", "GZIP").foreach { compressionCodec =>
checkCompressionCodec(compressionCodec, isPartitioned)
}
}
}

test("Create table with unknown compression") {
Seq(true, false).foreach { isPartitioned =>
val exception = intercept[IllegalArgumentException] {
checkCompressionCodec("aa", isPartitioned)
}
assert(exception.getMessage.contains("Codec [aa] is not available"))
}
}
}