Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.types._
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}

Expand Down Expand Up @@ -848,4 +851,22 @@ object DDLUtils {
}
}
}

private[sql] def checkDataSchemaFieldNames(table: CatalogTable): Unit = {
table.provider.foreach {
_.toLowerCase(Locale.ROOT) match {
case HIVE_PROVIDER =>
val serde = table.storage.serde
if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) {
OrcFileFormat.checkFieldNames(table.dataSchema)
} else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde ||
serde == Some("parquet.hive.serde.ParquetHiveSerDe")) {
ParquetSchemaConverter.checkFieldNames(table.dataSchema)
}
case "parquet" => ParquetSchemaConverter.checkFieldNames(table.dataSchema)
case "orc" => OrcFileFormat.checkFieldNames(table.dataSchema)
case _ =>
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,14 @@ case class AlterTableAddColumnsCommand(

// make sure any partition columns are at the end of the fields
val reorderedSchema = catalogTable.dataSchema ++ columns ++ catalogTable.partitionSchema
val newSchema = catalogTable.schema.copy(fields = reorderedSchema.toArray)

SchemaUtils.checkColumnNameDuplication(
reorderedSchema.map(_.name), "in the table definition of " + table.identifier,
conf.caseSensitiveAnalysis)
DDLUtils.checkDataSchemaFieldNames(catalogTable.copy(schema = newSchema))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newSchema also contains partition schema. How about partition schema? Do we have the same limits on it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's okay. Inside checkDataSchemaFieldNames, we only uses table.dataSchema like the following.

ParquetSchemaConverter.checkFieldNames(table.dataSchema)

For the partition columns, we have been allowing the special characters.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add test cases and ensure the partitioning columns with special characters work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR passes the above two test cases, too.


catalog.alterTableSchema(
table, catalogTable.schema.copy(fields = reorderedSchema.toArray))
catalog.alterTableSchema(table, newSchema)

Seq.empty[Row]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,12 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) =>
DDLUtils.checkDataSchemaFieldNames(tableDesc)
CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)

case CreateTable(tableDesc, mode, Some(query))
if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
DDLUtils.checkDataSchemaFieldNames(tableDesc.copy(schema = query.schema))
CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)

case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.orc

import org.apache.orc.TypeDescription

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.types.StructType

private[sql] object OrcFileFormat {
private def checkFieldName(name: String): Unit = {
try {
TypeDescription.fromString(s"struct<$name:int>")
Copy link
Member

@viirya viirya Sep 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parseName looks not public though .. I don't like this line too but could not think of another alternative for now.

Copy link
Member

@viirya viirya Sep 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right, I forgot that is java...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I agree that it's a little urgly now.

} catch {
case _: IllegalArgumentException =>
throw new AnalysisException(
s"""Column name "$name" contains invalid character(s).
|Please use alias to rename it.
""".stripMargin.split("\n").mkString(" ").trim)
}
}

def checkFieldNames(schema: StructType): StructType = {
schema.fieldNames.foreach(checkFieldName)
schema
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ private[parquet] class ParquetSchemaConverter(
}
}

private[parquet] object ParquetSchemaConverter {
private[sql] object ParquetSchemaConverter {
val SPARK_PARQUET_SCHEMA_NAME = "spark_schema"

val EMPTY_MESSAGE: MessageType =
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/test/resources/sql-tests/inputs/show_columns.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ CREATE DATABASE showdb;

USE showdb;

CREATE TABLE showcolumn1 (col1 int, `col 2` int) USING parquet;
CREATE TABLE showcolumn1 (col1 int, `col 2` int) USING json;
CREATE TABLE showcolumn2 (price int, qty int, year int, month int) USING parquet partitioned by (year, month);
CREATE TEMPORARY VIEW showColumn3 (col3 int, `col 4` int) USING parquet;
CREATE TEMPORARY VIEW showColumn3 (col3 int, `col 4` int) USING json;
CREATE GLOBAL TEMP VIEW showColumn4 AS SELECT 1 as col1, 'abc' as `col 5`;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ struct<>


-- !query 2
CREATE TABLE showcolumn1 (col1 int, `col 2` int) USING parquet
CREATE TABLE showcolumn1 (col1 int, `col 2` int) USING json
-- !query 2 schema
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change it to JSON

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, thank you for the guide!

struct<>
-- !query 2 output
Expand All @@ -35,7 +35,7 @@ struct<>


-- !query 4
CREATE TEMPORARY VIEW showColumn3 (col3 int, `col 4` int) USING parquet
CREATE TEMPORARY VIEW showColumn3 (col3 int, `col 4` int) USING json
-- !query 4 schema
struct<>
-- !query 4 output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,11 @@ object HiveAnalysis extends Rule[LogicalPlan] {
InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists)

case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
DDLUtils.checkDataSchemaFieldNames(tableDesc)
CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)

case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
DDLUtils.checkDataSchemaFieldNames(tableDesc)
CreateHiveTableAsSelectCommand(tableDesc, query, mode)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2000,4 +2000,38 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
assert(setOfPath.size() == pathSizeToDeleteOnExit)
}
}

test("SPARK-21912 ORC/Parquet table should not create invalid column names") {
Seq(" ", ",", ";", "{", "}", "(", ")", "\n", "\t", "=").foreach { name =>
withTable("t21912") {
Seq("ORC", "PARQUET").foreach { source =>
val m = intercept[AnalysisException] {
sql(s"CREATE TABLE t21912(`col$name` INT) USING $source")
}.getMessage
assert(m.contains(s"contains invalid character(s)"))

val m2 = intercept[AnalysisException] {
sql(s"CREATE TABLE t21912 USING $source AS SELECT 1 `col$name`")
}.getMessage
assert(m2.contains(s"contains invalid character(s)"))

withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") {
val m3 = intercept[AnalysisException] {
sql(s"CREATE TABLE t21912(`col$name` INT) USING hive OPTIONS (fileFormat '$source')")
}.getMessage
assert(m3.contains(s"contains invalid character(s)"))
}
}

// TODO: After SPARK-21929, we need to check ORC, too.
Seq("PARQUET").foreach { source =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added only Parquet test case due to SPARK-21929.

sql(s"CREATE TABLE t21912(`col` INT) USING $source")
val m = intercept[AnalysisException] {
sql(s"ALTER TABLE t21912 ADD COLUMNS(`col$name` INT)")
}.getMessage
assert(m.contains(s"contains invalid character(s)"))
}
}
}
}
}