Skip to content

Commit eea2b87

Browse files
dongjoon-hyungatorsmile
authored andcommitted
[SPARK-21912][SQL] ORC/Parquet table should not create invalid column names
## What changes were proposed in this pull request? Currently, users meet job abortions while creating or altering ORC/Parquet tables with invalid column names. We had better prevent this by raising **AnalysisException** with a guide to use aliases instead like Paquet data source tables. **BEFORE** ```scala scala> sql("CREATE TABLE orc1 USING ORC AS SELECT 1 `a b`") 17/09/04 13:28:21 ERROR Utils: Aborting task java.lang.IllegalArgumentException: Error: : expected at the position 8 of 'struct<a b:int>' but ' ' is found. 17/09/04 13:28:21 ERROR FileFormatWriter: Job job_20170904132821_0001 aborted. 17/09/04 13:28:21 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) org.apache.spark.SparkException: Task failed while writing rows. ``` **AFTER** ```scala scala> sql("CREATE TABLE orc1 USING ORC AS SELECT 1 `a b`") 17/09/04 13:27:40 ERROR CreateDataSourceTableAsSelectCommand: Failed to write to table orc1 org.apache.spark.sql.AnalysisException: Attribute name "a b" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.; ``` ## How was this patch tested? Pass the Jenkins with a new test case. Author: Dongjoon Hyun <[email protected]> Closes #19124 from dongjoon-hyun/SPARK-21912.
1 parent ce7293c commit eea2b87

File tree

9 files changed

+109
-7
lines changed

9 files changed

+109
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ import org.apache.spark.sql.catalyst.catalog._
3434
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
3535
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
3636
import org.apache.spark.sql.execution.datasources.PartitioningUtils
37+
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
38+
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
39+
import org.apache.spark.sql.internal.HiveSerDe
3740
import org.apache.spark.sql.types._
3841
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
3942

@@ -848,4 +851,22 @@ object DDLUtils {
848851
}
849852
}
850853
}
854+
855+
private[sql] def checkDataSchemaFieldNames(table: CatalogTable): Unit = {
856+
table.provider.foreach {
857+
_.toLowerCase(Locale.ROOT) match {
858+
case HIVE_PROVIDER =>
859+
val serde = table.storage.serde
860+
if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) {
861+
OrcFileFormat.checkFieldNames(table.dataSchema)
862+
} else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde ||
863+
serde == Some("parquet.hive.serde.ParquetHiveSerDe")) {
864+
ParquetSchemaConverter.checkFieldNames(table.dataSchema)
865+
}
866+
case "parquet" => ParquetSchemaConverter.checkFieldNames(table.dataSchema)
867+
case "orc" => OrcFileFormat.checkFieldNames(table.dataSchema)
868+
case _ =>
869+
}
870+
}
871+
}
851872
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,13 +201,14 @@ case class AlterTableAddColumnsCommand(
201201

202202
// make sure any partition columns are at the end of the fields
203203
val reorderedSchema = catalogTable.dataSchema ++ columns ++ catalogTable.partitionSchema
204+
val newSchema = catalogTable.schema.copy(fields = reorderedSchema.toArray)
204205

205206
SchemaUtils.checkColumnNameDuplication(
206207
reorderedSchema.map(_.name), "in the table definition of " + table.identifier,
207208
conf.caseSensitiveAnalysis)
209+
DDLUtils.checkDataSchemaFieldNames(catalogTable.copy(schema = newSchema))
208210

209-
catalog.alterTableSchema(
210-
table, catalogTable.schema.copy(fields = reorderedSchema.toArray))
211+
catalog.alterTableSchema(table, newSchema)
211212

212213
Seq.empty[Row]
213214
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,12 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
130130

131131
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
132132
case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) =>
133+
DDLUtils.checkDataSchemaFieldNames(tableDesc)
133134
CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
134135

135136
case CreateTable(tableDesc, mode, Some(query))
136137
if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
138+
DDLUtils.checkDataSchemaFieldNames(tableDesc.copy(schema = query.schema))
137139
CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)
138140

139141
case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _),
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.orc
19+
20+
import org.apache.orc.TypeDescription
21+
22+
import org.apache.spark.sql.AnalysisException
23+
import org.apache.spark.sql.types.StructType
24+
25+
private[sql] object OrcFileFormat {
26+
private def checkFieldName(name: String): Unit = {
27+
try {
28+
TypeDescription.fromString(s"struct<$name:int>")
29+
} catch {
30+
case _: IllegalArgumentException =>
31+
throw new AnalysisException(
32+
s"""Column name "$name" contains invalid character(s).
33+
|Please use alias to rename it.
34+
""".stripMargin.split("\n").mkString(" ").trim)
35+
}
36+
}
37+
38+
def checkFieldNames(schema: StructType): StructType = {
39+
schema.fieldNames.foreach(checkFieldName)
40+
schema
41+
}
42+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ private[parquet] class ParquetSchemaConverter(
556556
}
557557
}
558558

559-
private[parquet] object ParquetSchemaConverter {
559+
private[sql] object ParquetSchemaConverter {
560560
val SPARK_PARQUET_SCHEMA_NAME = "spark_schema"
561561

562562
val EMPTY_MESSAGE: MessageType =

sql/core/src/test/resources/sql-tests/inputs/show_columns.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ CREATE DATABASE showdb;
22

33
USE showdb;
44

5-
CREATE TABLE showcolumn1 (col1 int, `col 2` int) USING parquet;
5+
CREATE TABLE showcolumn1 (col1 int, `col 2` int) USING json;
66
CREATE TABLE showcolumn2 (price int, qty int, year int, month int) USING parquet partitioned by (year, month);
7-
CREATE TEMPORARY VIEW showColumn3 (col3 int, `col 4` int) USING parquet;
7+
CREATE TEMPORARY VIEW showColumn3 (col3 int, `col 4` int) USING json;
88
CREATE GLOBAL TEMP VIEW showColumn4 AS SELECT 1 as col1, 'abc' as `col 5`;
99

1010

sql/core/src/test/resources/sql-tests/results/show_columns.sql.out

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ struct<>
1919

2020

2121
-- !query 2
22-
CREATE TABLE showcolumn1 (col1 int, `col 2` int) USING parquet
22+
CREATE TABLE showcolumn1 (col1 int, `col 2` int) USING json
2323
-- !query 2 schema
2424
struct<>
2525
-- !query 2 output
@@ -35,7 +35,7 @@ struct<>
3535

3636

3737
-- !query 4
38-
CREATE TEMPORARY VIEW showColumn3 (col3 int, `col 4` int) USING parquet
38+
CREATE TEMPORARY VIEW showColumn3 (col3 int, `col 4` int) USING json
3939
-- !query 4 schema
4040
struct<>
4141
-- !query 4 output

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,11 @@ object HiveAnalysis extends Rule[LogicalPlan] {
151151
InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists)
152152

153153
case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
154+
DDLUtils.checkDataSchemaFieldNames(tableDesc)
154155
CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
155156

156157
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
158+
DDLUtils.checkDataSchemaFieldNames(tableDesc)
157159
CreateHiveTableAsSelectCommand(tableDesc, query, mode)
158160
}
159161
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2000,4 +2000,38 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
20002000
assert(setOfPath.size() == pathSizeToDeleteOnExit)
20012001
}
20022002
}
2003+
2004+
test("SPARK-21912 ORC/Parquet table should not create invalid column names") {
2005+
Seq(" ", ",", ";", "{", "}", "(", ")", "\n", "\t", "=").foreach { name =>
2006+
withTable("t21912") {
2007+
Seq("ORC", "PARQUET").foreach { source =>
2008+
val m = intercept[AnalysisException] {
2009+
sql(s"CREATE TABLE t21912(`col$name` INT) USING $source")
2010+
}.getMessage
2011+
assert(m.contains(s"contains invalid character(s)"))
2012+
2013+
val m2 = intercept[AnalysisException] {
2014+
sql(s"CREATE TABLE t21912 USING $source AS SELECT 1 `col$name`")
2015+
}.getMessage
2016+
assert(m2.contains(s"contains invalid character(s)"))
2017+
2018+
withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") {
2019+
val m3 = intercept[AnalysisException] {
2020+
sql(s"CREATE TABLE t21912(`col$name` INT) USING hive OPTIONS (fileFormat '$source')")
2021+
}.getMessage
2022+
assert(m3.contains(s"contains invalid character(s)"))
2023+
}
2024+
}
2025+
2026+
// TODO: After SPARK-21929, we need to check ORC, too.
2027+
Seq("PARQUET").foreach { source =>
2028+
sql(s"CREATE TABLE t21912(`col` INT) USING $source")
2029+
val m = intercept[AnalysisException] {
2030+
sql(s"ALTER TABLE t21912 ADD COLUMNS(`col$name` INT)")
2031+
}.getMessage
2032+
assert(m.contains(s"contains invalid character(s)"))
2033+
}
2034+
}
2035+
}
2036+
}
20032037
}

0 commit comments

Comments
 (0)