Skip to content

Commit 5124f1b

Browse files
committed
spark.sql.parquet.compression.codec[SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 'ParquetOptions', parquet.compression needs to be considered.
## What changes were proposed in this pull request? 1.Increased acquiring 'compressionCodecClassName' from `parquet.compression`,and the order is `compression`,`parquet.compression`,`spark.sql.parquet.compression.codec`, just like what we do in `OrcOptions`. 2.Change `spark.sql.parquet.compression.codec` to support "none".Actually in `ParquetOptions`,we do support "none" as equivalent to "uncompressed", but it does not allowed to configured to "none". 3.Change `compressionCode` to `compressionCodecClassName`. ## How was this patch tested? Manual test.
1 parent 5dbd3ed commit 5124f1b

File tree

6 files changed

+74
-10
lines changed

6 files changed

+74
-10
lines changed

docs/sql-programming-guide.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -953,7 +953,9 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession
953953
<td><code>spark.sql.parquet.compression.codec</code></td>
954954
<td>snappy</td>
955955
<td>
956-
Sets the compression codec use when writing Parquet files. Acceptable values include:
956+
Sets the compression codec use when writing Parquet files. If other compression codec
957+
configuration was found through hive or parquet, the precedence would be `compression`,
958+
`parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include:
957959
none, uncompressed, snappy, gzip, lzo.
958960
</td>
959961
</tr>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class OrcFileFormat
9494

9595
conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, dataSchema.catalogString)
9696

97-
conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec)
97+
conf.set(COMPRESS.getAttribute, orcOptions.compressionCodecClassName)
9898

9999
conf.asInstanceOf[JobConf]
100100
.setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]])

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class OrcOptions(
4141
* Compression codec to use.
4242
* Acceptable values are defined in [[shortOrcCompressionCodecNames]].
4343
*/
44-
val compressionCodec: String = {
44+
val compressionCodecClassName: String = {
4545
// `compression`, `orc.compress`(i.e., OrcConf.COMPRESS), and `spark.sql.orc.compression.codec`
4646
// are in order of precedence from highest to lowest.
4747
val orcCompressionConf = parameters.get(COMPRESS.getAttribute)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -134,29 +134,30 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
134134
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
135135
val conf = spark.sessionState.conf
136136
val option = new OrcOptions(Map(COMPRESS.getAttribute.toUpperCase(Locale.ROOT) -> "NONE"), conf)
137-
assert(option.compressionCodec == "NONE")
137+
assert(option.compressionCodecClassName == "NONE")
138138
}
139139

140140
test("SPARK-21839: Add SQL config for ORC compression") {
141141
val conf = spark.sessionState.conf
142142
// Test if the default of spark.sql.orc.compression.codec is snappy
143-
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "SNAPPY")
143+
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodecClassName == "SNAPPY")
144144

145145
// OrcOptions's parameters have a higher priority than SQL configuration.
146146
// `compression` -> `orc.compression` -> `spark.sql.orc.compression.codec`
147147
withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") {
148-
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE")
148+
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodecClassName == "NONE")
149149
val map1 = Map(COMPRESS.getAttribute -> "zlib")
150150
val map2 = Map(COMPRESS.getAttribute -> "zlib", "compression" -> "lzo")
151-
assert(new OrcOptions(map1, conf).compressionCodec == "ZLIB")
152-
assert(new OrcOptions(map2, conf).compressionCodec == "LZO")
151+
assert(new OrcOptions(map1, conf).compressionCodecClassName == "ZLIB")
152+
assert(new OrcOptions(map2, conf).compressionCodecClassName == "LZO")
153153
}
154154

155155
// Test all the valid options of spark.sql.orc.compression.codec
156156
Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO").foreach { c =>
157157
withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) {
158158
val expected = if (c == "UNCOMPRESSED") "NONE" else c
159-
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == expected)
159+
assert(
160+
new OrcOptions(Map.empty[String, String], conf).compressionCodecClassName == expected)
160161
}
161162
}
162163
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
7474

7575
val configuration = job.getConfiguration
7676

77-
configuration.set(COMPRESS.getAttribute, orcOptions.compressionCodec)
77+
configuration.set(COMPRESS.getAttribute, orcOptions.compressionCodecClassName)
7878
configuration match {
7979
case conf: JobConf =>
8080
conf.setOutputFormat(classOf[OrcOutputFormat])
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive
19+
20+
import org.apache.parquet.hadoop.ParquetOutputFormat
21+
22+
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
23+
import org.apache.spark.sql.hive.test.TestHiveSingleton
24+
import org.apache.spark.sql.internal.SQLConf
25+
import org.apache.spark.sql.test.SQLTestUtils
26+
27+
class CompressionCodecSuite extends TestHiveSingleton with SQLTestUtils {
28+
test("Test `spark.sql.parquet.compression.codec` config") {
29+
Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO").foreach { c =>
30+
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) {
31+
val expected = if (c == "NONE") "UNCOMPRESSED" else c
32+
val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf)
33+
assert(option.compressionCodecClassName == expected)
34+
}
35+
}
36+
}
37+
38+
test("[SPARK-21786] Test Acquiring 'compressionCodecClassName' for parquet in right order.") {
39+
// When "compression" is configured, it should be the first choice.
40+
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
41+
val props = Map("compression" -> "uncompressed", ParquetOutputFormat.COMPRESSION -> "gzip")
42+
val option = new ParquetOptions(props, spark.sessionState.conf)
43+
assert(option.compressionCodecClassName == "UNCOMPRESSED")
44+
}
45+
46+
// When "compression" is not configured, "parquet.compression" should be the preferred choice.
47+
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
48+
val props = Map(ParquetOutputFormat.COMPRESSION -> "gzip")
49+
val option = new ParquetOptions(props, spark.sessionState.conf)
50+
assert(option.compressionCodecClassName == "GZIP")
51+
}
52+
53+
// When both "compression" and "parquet.compression" are not configured,
54+
// spark.sql.parquet.compression.codec should be the right choice.
55+
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
56+
val props = Map.empty[String, String]
57+
val option = new ParquetOptions(props, spark.sessionState.conf)
58+
assert(option.compressionCodecClassName == "SNAPPY")
59+
}
60+
}
61+
}

0 commit comments

Comments
 (0)