Skip to content

Commit 7b78041

Browse files
fjh100456gatorsmile
authored andcommitted
[SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 'ParquetOptions', parquet.compression needs to be considered.
[SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 'ParquetOptions', `parquet.compression` needs to be considered. ## What changes were proposed in this pull request? Since Hive 1.1, Hive allows users to set parquet compression codec via table-level properties parquet.compression. See the JIRA: https://issues.apache.org/jira/browse/HIVE-7858 . We do support orc.compression for ORC. Thus, for external users, it is more straightforward to support both. See the stackflow question: https://stackoverflow.com/questions/36941122/spark-sql-ignores-parquet-compression-propertie-specified-in-tblproperties In Spark side, our table-level compression conf compression was added by #11464 since Spark 2.0. We need to support both table-level conf. Users might also use session-level conf spark.sql.parquet.compression.codec. The priority rule will be like If other compression codec configuration was found through hive or parquet, the precedence would be compression, parquet.compression, spark.sql.parquet.compression.codec. Acceptable values include: none, uncompressed, snappy, gzip, lzo. The rule for Parquet is consistent with the ORC after the change. Changes: 1.Increased acquiring 'compressionCodecClassName' from `parquet.compression`,and the precedence order is `compression`,`parquet.compression`,`spark.sql.parquet.compression.codec`, just like what we do in `OrcOptions`. 2.Change `spark.sql.parquet.compression.codec` to support "none".Actually in `ParquetOptions`,we do support "none" as equivalent to "uncompressed", but it does not allowed to configured to "none". 3.Change `compressionCode` to `compressionCodecClassName`. ## How was this patch tested? Add test. Author: fjh100456 <[email protected]> Closes #20076 from fjh100456/ParquetOptionIssue.
1 parent be9a804 commit 7b78041

File tree

4 files changed

+145
-9
lines changed

4 files changed

+145
-9
lines changed

docs/sql-programming-guide.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -953,8 +953,10 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession
953953
<td><code>spark.sql.parquet.compression.codec</code></td>
954954
<td>snappy</td>
955955
<td>
956-
Sets the compression codec use when writing Parquet files. Acceptable values include:
957-
uncompressed, snappy, gzip, lzo.
956+
Sets the compression codec used when writing Parquet files. If either `compression` or
957+
`parquet.compression` is specified in the table-specific options/properties, the precedence would be
958+
`compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include:
959+
none, uncompressed, snappy, gzip, lzo.
958960
</td>
959961
</tr>
960962
<tr>

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -325,11 +325,13 @@ object SQLConf {
325325
.createWithDefault(false)
326326

327327
val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec")
328-
.doc("Sets the compression codec use when writing Parquet files. Acceptable values include: " +
329-
"uncompressed, snappy, gzip, lzo.")
328+
.doc("Sets the compression codec used when writing Parquet files. If either `compression` or" +
329+
"`parquet.compression` is specified in the table-specific options/properties, the precedence" +
330+
"would be `compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`." +
331+
"Acceptable values include: none, uncompressed, snappy, gzip, lzo.")
330332
.stringConf
331333
.transform(_.toLowerCase(Locale.ROOT))
332-
.checkValues(Set("uncompressed", "snappy", "gzip", "lzo"))
334+
.checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo"))
333335
.createWithDefault("snappy")
334336

335337
val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown")
@@ -366,8 +368,10 @@ object SQLConf {
366368
.createWithDefault(true)
367369

368370
val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
369-
.doc("Sets the compression codec use when writing ORC files. Acceptable values include: " +
370-
"none, uncompressed, snappy, zlib, lzo.")
371+
.doc("Sets the compression codec used when writing ORC files. If either `compression` or" +
372+
"`orc.compress` is specified in the table-specific options/properties, the precedence" +
373+
"would be `compression`, `orc.compress`, `spark.sql.orc.compression.codec`." +
374+
"Acceptable values include: none, uncompressed, snappy, zlib, lzo.")
371375
.stringConf
372376
.transform(_.toLowerCase(Locale.ROOT))
373377
.checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet
1919

2020
import java.util.Locale
2121

22+
import org.apache.parquet.hadoop.ParquetOutputFormat
2223
import org.apache.parquet.hadoop.metadata.CompressionCodecName
2324

2425
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
@@ -42,8 +43,15 @@ private[parquet] class ParquetOptions(
4243
* Acceptable values are defined in [[shortParquetCompressionCodecNames]].
4344
*/
4445
val compressionCodecClassName: String = {
45-
val codecName = parameters.getOrElse("compression",
46-
sqlConf.parquetCompressionCodec).toLowerCase(Locale.ROOT)
46+
// `compression`, `parquet.compression`(i.e., ParquetOutputFormat.COMPRESSION), and
47+
// `spark.sql.parquet.compression.codec`
48+
// are in order of precedence from highest to lowest.
49+
val parquetCompressionConf = parameters.get(ParquetOutputFormat.COMPRESSION)
50+
val codecName = parameters
51+
.get("compression")
52+
.orElse(parquetCompressionConf)
53+
.getOrElse(sqlConf.parquetCompressionCodec)
54+
.toLowerCase(Locale.ROOT)
4755
if (!shortParquetCompressionCodecNames.contains(codecName)) {
4856
val availableCodecs =
4957
shortParquetCompressionCodecNames.keys.map(_.toLowerCase(Locale.ROOT))
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.parquet
19+
20+
import java.io.File
21+
22+
import scala.collection.JavaConverters._
23+
24+
import org.apache.hadoop.fs.Path
25+
import org.apache.parquet.hadoop.ParquetOutputFormat
26+
27+
import org.apache.spark.sql.internal.SQLConf
28+
import org.apache.spark.sql.test.SharedSQLContext
29+
30+
class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSQLContext {
31+
test("Test `spark.sql.parquet.compression.codec` config") {
32+
Seq("NONE", "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO").foreach { c =>
33+
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) {
34+
val expected = if (c == "NONE") "UNCOMPRESSED" else c
35+
val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf)
36+
assert(option.compressionCodecClassName == expected)
37+
}
38+
}
39+
}
40+
41+
test("[SPARK-21786] Test Acquiring 'compressionCodecClassName' for parquet in right order.") {
42+
// When "compression" is configured, it should be the first choice.
43+
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
44+
val props = Map("compression" -> "uncompressed", ParquetOutputFormat.COMPRESSION -> "gzip")
45+
val option = new ParquetOptions(props, spark.sessionState.conf)
46+
assert(option.compressionCodecClassName == "UNCOMPRESSED")
47+
}
48+
49+
// When "compression" is not configured, "parquet.compression" should be the preferred choice.
50+
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
51+
val props = Map(ParquetOutputFormat.COMPRESSION -> "gzip")
52+
val option = new ParquetOptions(props, spark.sessionState.conf)
53+
assert(option.compressionCodecClassName == "GZIP")
54+
}
55+
56+
// When both "compression" and "parquet.compression" are not configured,
57+
// spark.sql.parquet.compression.codec should be the right choice.
58+
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
59+
val props = Map.empty[String, String]
60+
val option = new ParquetOptions(props, spark.sessionState.conf)
61+
assert(option.compressionCodecClassName == "SNAPPY")
62+
}
63+
}
64+
65+
private def getTableCompressionCodec(path: String): Seq[String] = {
66+
val hadoopConf = spark.sessionState.newHadoopConf()
67+
val codecs = for {
68+
footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)
69+
block <- footer.getParquetMetadata.getBlocks.asScala
70+
column <- block.getColumns.asScala
71+
} yield column.getCodec.name()
72+
codecs.distinct
73+
}
74+
75+
private def createTableWithCompression(
76+
tableName: String,
77+
isPartitioned: Boolean,
78+
compressionCodec: String,
79+
rootDir: File): Unit = {
80+
val options =
81+
s"""
82+
|OPTIONS('path'='${rootDir.toURI.toString.stripSuffix("/")}/$tableName',
83+
|'parquet.compression'='$compressionCodec')
84+
""".stripMargin
85+
val partitionCreate = if (isPartitioned) "PARTITIONED BY (p)" else ""
86+
sql(
87+
s"""
88+
|CREATE TABLE $tableName USING Parquet $options $partitionCreate
89+
|AS SELECT 1 AS col1, 2 AS p
90+
""".stripMargin)
91+
}
92+
93+
private def checkCompressionCodec(compressionCodec: String, isPartitioned: Boolean): Unit = {
94+
withTempDir { tmpDir =>
95+
val tempTableName = "TempParquetTable"
96+
withTable(tempTableName) {
97+
createTableWithCompression(tempTableName, isPartitioned, compressionCodec, tmpDir)
98+
val partitionPath = if (isPartitioned) "p=2" else ""
99+
val path = s"${tmpDir.getPath.stripSuffix("/")}/$tempTableName/$partitionPath"
100+
val realCompressionCodecs = getTableCompressionCodec(path)
101+
assert(realCompressionCodecs.forall(_ == compressionCodec))
102+
}
103+
}
104+
}
105+
106+
test("Create parquet table with compression") {
107+
Seq(true, false).foreach { isPartitioned =>
108+
Seq("UNCOMPRESSED", "SNAPPY", "GZIP").foreach { compressionCodec =>
109+
checkCompressionCodec(compressionCodec, isPartitioned)
110+
}
111+
}
112+
}
113+
114+
test("Create table with unknown compression") {
115+
Seq(true, false).foreach { isPartitioned =>
116+
val exception = intercept[IllegalArgumentException] {
117+
checkCompressionCodec("aa", isPartitioned)
118+
}
119+
assert(exception.getMessage.contains("Codec [aa] is not available"))
120+
}
121+
}
122+
}

0 commit comments

Comments
 (0)