Skip to content

Commit eaecabc

Browse files
committed
[SPARK-24112][SQL] Add convertMetastoreTableProperty conf
1 parent 4d5de4d commit eaecabc

File tree

4 files changed

+64
-5
lines changed

4 files changed

+64
-5
lines changed

docs/sql-programming-guide.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1812,6 +1812,8 @@ working with timestamps in `pandas_udf`s to get the best performance, see
18121812
- Since Spark 2.4, creating a managed table with nonempty location is not allowed. An exception is thrown when attempting to create a managed table with nonempty location. To set `true` to `spark.sql.allowCreatingManagedTableUsingNonemptyLocation` restores the previous behavior. This option will be removed in Spark 3.0.
18131813
- Since Spark 2.4, the type coercion rules can automatically promote the argument types of the variadic SQL functions (e.g., IN/COALESCE) to the widest common type, no matter how the input arguments order. In prior Spark versions, the promotion could fail in some specific orders (e.g., TimestampType, IntegerType and StringType) and throw an exception.
18141814
- In version 2.3 and earlier, `to_utc_timestamp` and `from_utc_timestamp` respect the timezone in the input timestamp string, which breaks the assumption that the input timestamp is in a specific timezone. Therefore, these 2 functions can return unexpected results. In version 2.4 and later, this problem has been fixed. `to_utc_timestamp` and `from_utc_timestamp` will return null if the input timestamp string contains timezone. As an example, `from_utc_timestamp('2000-10-10 00:00:00', 'GMT+1')` will return `2000-10-10 01:00:00` in both Spark 2.3 and 2.4. However, `from_utc_timestamp('2000-10-10 00:00:00+00:00', 'GMT+1')`, assuming a local timezone of GMT+8, will return `2000-10-10 09:00:00` in Spark 2.3 but `null` in 2.4. For people who don't care about this problem and want to retain the previous behaivor to keep their query unchanged, you can set `spark.sql.function.rejectTimezoneInString` to false. This option will be removed in Spark 3.0 and should only be used as a temporary workaround.
1815+
- In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark supports Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files. To set `false` to `spark.sql.hive.convertMetastoreTableProperty` restores the previous behavior.
1816+
18151817
## Upgrading From Spark SQL 2.2 to 2.3
18161818

18171819
- Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named `_corrupt_record` by default). For example, `spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()` and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. Instead, you can cache or save the parsed results and then send the same query. For example, `val df = spark.read.schema(schema).json(file).cache()` and then `df.filter($"_corrupt_record".isNotNull).count()`.

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,10 +189,12 @@ case class RelationConversions(
189189
// Return true for Apache ORC and Hive ORC-related configuration names.
190190
// Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`.
191191
private def isOrcProperty(key: String) =
192-
key.startsWith("orc.") || key.contains(".orc.")
192+
conf.getConf(HiveUtils.CONVERT_METASTORE_TABLE_PROPERTY) &&
193+
(key.startsWith("orc.") || key.contains(".orc."))
193194

194195
private def isParquetProperty(key: String) =
195-
key.startsWith("parquet.") || key.contains(".parquet.")
196+
conf.getConf(HiveUtils.CONVERT_METASTORE_TABLE_PROPERTY) &&
197+
key.startsWith("parquet.") || key.contains(".parquet.")
196198

197199
private def convert(relation: HiveTableRelation): LogicalRelation = {
198200
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,13 @@ private[spark] object HiveUtils extends Logging {
104104
.booleanConf
105105
.createWithDefault(false)
106106

107+
val CONVERT_METASTORE_TABLE_PROPERTY =
108+
buildConf("spark.sql.hive.convertMetastoreTableProperty")
109+
.doc("When true, ORC/Parquet table properties are converted together while converting " +
110+
"metastore tables")
111+
.booleanConf
112+
.createWithDefault(true)
113+
107114
val CONVERT_METASTORE_ORC = buildConf("spark.sql.hive.convertMetastoreOrc")
108115
.internal()
109116
.doc("When set to true, the built-in ORC reader and writer are used to process " +

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAl
3434
import org.apache.spark.sql.catalyst.catalog._
3535
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
3636
import org.apache.spark.sql.hive.HiveExternalCatalog
37-
import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET}
37+
import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET, CONVERT_METASTORE_TABLE_PROPERTY}
3838
import org.apache.spark.sql.hive.orc.OrcFileOperator
3939
import org.apache.spark.sql.hive.test.TestHiveSingleton
4040
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
@@ -2157,7 +2157,10 @@ class HiveDDLSuite
21572157

21582158
test("SPARK-23355 convertMetastoreOrc should not ignore table properties - STORED AS") {
21592159
Seq("native", "hive").foreach { orcImpl =>
2160-
withSQLConf(ORC_IMPLEMENTATION.key -> orcImpl, CONVERT_METASTORE_ORC.key -> "true") {
2160+
withSQLConf(
2161+
ORC_IMPLEMENTATION.key -> orcImpl,
2162+
CONVERT_METASTORE_ORC.key -> "true",
2163+
CONVERT_METASTORE_TABLE_PROPERTY.key -> "true") {
21612164
withTable("t") {
21622165
withTempPath { path =>
21632166
sql(
@@ -2197,7 +2200,9 @@ class HiveDDLSuite
21972200
}
21982201

21992202
test("SPARK-23355 convertMetastoreParquet should not ignore table properties - STORED AS") {
2200-
withSQLConf(CONVERT_METASTORE_PARQUET.key -> "true") {
2203+
withSQLConf(
2204+
CONVERT_METASTORE_PARQUET.key -> "true",
2205+
CONVERT_METASTORE_TABLE_PROPERTY.key -> "true") {
22012206
withTable("t") {
22022207
withTempPath { path =>
22032208
sql(
@@ -2225,6 +2230,49 @@ class HiveDDLSuite
22252230
}
22262231
}
22272232

2233+
test("Ignore ORC table properties for backward compatibility") {
2234+
Seq("native", "hive").foreach { orcImpl =>
2235+
withSQLConf(
2236+
ORC_IMPLEMENTATION.key -> orcImpl,
2237+
CONVERT_METASTORE_ORC.key -> "true",
2238+
CONVERT_METASTORE_TABLE_PROPERTY.key -> "false") {
2239+
withTable("t") {
2240+
withTempPath { path =>
2241+
sql(
2242+
s"""
2243+
|CREATE TABLE t(id int) STORED AS ORC
2244+
|TBLPROPERTIES (orc.compress 'NONE')
2245+
|LOCATION '${path.toURI}'
2246+
""".stripMargin)
2247+
sql("INSERT INTO t SELECT 1")
2248+
val maybeFile = path.listFiles().find(_.getName.startsWith("part"))
2249+
assertCompression(maybeFile, "orc", "SNAPPY")
2250+
}
2251+
}
2252+
}
2253+
}
2254+
}
2255+
2256+
test("Ignore Parquet table properties for backward compatibility") {
2257+
withSQLConf(
2258+
CONVERT_METASTORE_PARQUET.key -> "true",
2259+
CONVERT_METASTORE_TABLE_PROPERTY.key -> "false") {
2260+
withTable("t") {
2261+
withTempPath { path =>
2262+
sql(
2263+
s"""
2264+
|CREATE TABLE t(id int) STORED AS PARQUET
2265+
|TBLPROPERTIES (parquet.compression 'NONE')
2266+
|LOCATION '${path.toURI}'
2267+
""".stripMargin)
2268+
sql("INSERT INTO t SELECT 1")
2269+
val maybeFile = path.listFiles().find(_.getName.startsWith("part"))
2270+
assertCompression(maybeFile, "parquet", "SNAPPY")
2271+
}
2272+
}
2273+
}
2274+
}
2275+
22282276
test("load command for non local invalid path validation") {
22292277
withTable("tbl") {
22302278
sql("CREATE TABLE tbl(i INT, j STRING)")

0 commit comments

Comments
 (0)