From 808dfe0fcd9de2f43b33f0d1d084172b5624f2a8 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 4 Sep 2017 13:46:15 -0700 Subject: [PATCH 01/12] [SPARK-21912][SQL] Creating ORC datasource table should check invalid column names --- .../spark/sql/hive/orc/OrcFileFormat.scala | 16 ++++++++++++++-- .../spark/sql/hive/execution/SQLQuerySuite.scala | 9 +++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index edf2013a4c93..ac8cb1bb6c75 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -34,12 +34,12 @@ import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} import org.apache.spark.TaskContext -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} -import org.apache.spark.sql.sources.{Filter, _} +import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -83,6 +83,8 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable classOf[MapRedOutputFormat[_, _]]) } + dataSchema.map(_.name).foreach(checkFieldName) + new OutputWriterFactory { override def newInstance( path: String, @@ -169,6 +171,16 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable } } } + + private def checkFieldName(name: String): Unit = { + // ,;{}()\n\t= and space are special characters in ORC schema + if (name.matches(".*[ ,;{}()\n\t=].*")) { + throw new AnalysisException( + s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=". + |Please use alias to rename it. + """.stripMargin.split("\n").mkString(" ").trim) + } + } } private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index d2a6ef7b2b37..3b68906bacea 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2000,4 +2000,13 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { assert(setOfPath.size() == pathSizeToDeleteOnExit) } } + + test("SPARK-21912 Creating ORC datasource table should check invalid column names") { + withTable("orc1") { + val m = intercept[AnalysisException] { + sql("CREATE TABLE orc1 USING ORC AS SELECT 1 `a b`") + }.getMessage + assert(m.contains("""Attribute name "a b" contains invalid character(s)""")) + } + } } From a73894374d284484d9b28123db02dfe6f264567a Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 4 Sep 2017 18:33:37 -0700 Subject: [PATCH 02/12] Address comments. --- .../apache/spark/sql/hive/orc/OrcFileFormat.scala | 15 +++++++++------ .../spark/sql/hive/execution/SQLQuerySuite.scala | 10 ++++++---- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index ac8cb1bb6c75..95c48f1590d8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.io.{NullWritable, Writable} import org.apache.hadoop.mapred.{JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} +import org.apache.orc.TypeDescription import org.apache.spark.TaskContext import org.apache.spark.sql._ @@ -173,12 +174,14 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable } private def checkFieldName(name: String): Unit = { - // ,;{}()\n\t= and space are special characters in ORC schema - if (name.matches(".*[ ,;{}()\n\t=].*")) { - throw new AnalysisException( - s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=". - |Please use alias to rename it. - """.stripMargin.split("\n").mkString(" ").trim) + try { + TypeDescription.fromString(s"struct<$name:int>") + } catch { + case _: IllegalArgumentException => + throw new AnalysisException( + s"""Attribute name "$name" contains invalid character(s). + |Please use alias to rename it. + """.stripMargin.split("\n").mkString(" ").trim) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 3b68906bacea..44a823ba24ab 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2003,10 +2003,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { test("SPARK-21912 Creating ORC datasource table should check invalid column names") { withTable("orc1") { - val m = intercept[AnalysisException] { - sql("CREATE TABLE orc1 USING ORC AS SELECT 1 `a b`") - }.getMessage - assert(m.contains("""Attribute name "a b" contains invalid character(s)""")) + Seq(" ", "?", ",", ";", "{", "}", "(", ")", "\n", "\t", "=").foreach { name => + val m = intercept[AnalysisException] { + sql(s"CREATE TABLE orc1 USING ORC AS SELECT 1 `column$name`") + }.getMessage + assert(m.contains(s"contains invalid character(s)")) + } } } } From cd539fe56a0b15a9fde9b6878971eebc185fc94e Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 5 Sep 2017 01:37:11 -0700 Subject: [PATCH 03/12] Address comments. --- .../command/createDataSourceTables.scala | 9 +++++ .../datasources/orc/OrcFileFormat.scala | 36 +++++++++++++++++++ .../parquet/ParquetSchemaConverter.scala | 2 +- .../spark/sql/hive/orc/OrcFileFormat.scala | 18 ++-------- .../sql/hive/execution/SQLQuerySuite.scala | 19 ++++++---- 5 files changed, 62 insertions(+), 22 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 04b2534ca5eb..8e0352a70c22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -23,6 +23,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat +import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter import org.apache.spark.sql.sources.BaseRelation /** @@ -85,6 +87,13 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo } } + table.provider.get.toLowerCase match { + case "parquet" => + dataSource.schema.map(_.name).foreach(ParquetSchemaConverter.checkFieldName) + case "orc" => + dataSource.schema.map(_.name).foreach(OrcFileFormat.checkFieldName) + } + val newTable = table.copy( schema = dataSource.schema, partitionColumnNames = partitionColumnNames, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala new file mode 100644 index 000000000000..ff2ec698bb34 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.orc.TypeDescription + +import org.apache.spark.sql.AnalysisException + +private[sql] object OrcFileFormat { + def checkFieldName(name: String): Unit = { + try { + TypeDescription.fromString(s"struct<$name:int>") + } catch { + case _: IllegalArgumentException => + throw new AnalysisException( + s"""Attribute name "$name" contains invalid character(s). + |Please use alias to rename it. + """.stripMargin.split("\n").mkString(" ").trim) + } + } +} \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 0b805e436288..b3781cfc4a60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -556,7 +556,7 @@ private[parquet] class ParquetSchemaConverter( } } -private[parquet] object ParquetSchemaConverter { +private[sql] object ParquetSchemaConverter { val SPARK_PARQUET_SCHEMA_NAME = "spark_schema" val EMPTY_MESSAGE: MessageType = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 95c48f1590d8..29a901b9bbe3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -32,13 +32,13 @@ import org.apache.hadoop.io.{NullWritable, Writable} import org.apache.hadoop.mapred.{JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} -import org.apache.orc.TypeDescription import org.apache.spark.TaskContext -import org.apache.spark.sql._ +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType @@ -84,7 +84,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable classOf[MapRedOutputFormat[_, _]]) } - dataSchema.map(_.name).foreach(checkFieldName) + dataSchema.map(_.name).foreach(OrcFileFormat.checkFieldName) new OutputWriterFactory { override def newInstance( @@ -172,18 +172,6 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable } } } - - private def checkFieldName(name: String): Unit = { - try { - TypeDescription.fromString(s"struct<$name:int>") - } catch { - case _: IllegalArgumentException => - throw new AnalysisException( - s"""Attribute name "$name" contains invalid character(s). - |Please use alias to rename it. - """.stripMargin.split("\n").mkString(" ").trim) - } - } } private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 44a823ba24ab..cc86bbc8c822 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2001,13 +2001,20 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } - test("SPARK-21912 Creating ORC datasource table should check invalid column names") { + test("SPARK-21912 Creating ORC/Parquet datasource table should check invalid column names") { withTable("orc1") { - Seq(" ", "?", ",", ";", "{", "}", "(", ")", "\n", "\t", "=").foreach { name => - val m = intercept[AnalysisException] { - sql(s"CREATE TABLE orc1 USING ORC AS SELECT 1 `column$name`") - }.getMessage - assert(m.contains(s"contains invalid character(s)")) + Seq(" ", ",", ";", "{", "}", "(", ")", "\n", "\t", "=").foreach { name => + Seq("ORC", "PARQUET").foreach { dataSource => + val m = intercept[AnalysisException] { + sql(s"CREATE TABLE orc1(`column$name` INT) USING $dataSource") + }.getMessage + assert(m.contains(s"contains invalid character(s)")) + + val m2 = intercept[AnalysisException] { + sql(s"CREATE TABLE orc1 USING $dataSource AS SELECT 1 `column$name`") + }.getMessage + assert(m2.contains(s"contains invalid character(s)")) + } } } } From 66aff54914a323cc61d2a9a95a4cf669dee3c6d3 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 5 Sep 2017 01:47:40 -0700 Subject: [PATCH 04/12] Add a newline at the end of the file. --- .../spark/sql/execution/datasources/orc/OrcFileFormat.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index ff2ec698bb34..e946c29ac637 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -33,4 +33,4 @@ private[sql] object OrcFileFormat { """.stripMargin.split("\n").mkString(" ").trim) } } -} \ No newline at end of file +} From aa78eaf6d1724ca53eca42c785fbbaaf0064ab08 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 5 Sep 2017 02:13:51 -0700 Subject: [PATCH 05/12] fix --- .../scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 29a901b9bbe3..b78b3050f113 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -38,7 +38,6 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType @@ -84,7 +83,8 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable classOf[MapRedOutputFormat[_, _]]) } - dataSchema.map(_.name).foreach(OrcFileFormat.checkFieldName) + dataSchema.map(_.name).foreach( + org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.checkFieldName) new OutputWriterFactory { override def newInstance( From 0bf3b43626063c005f9fc930b6b81534d2a756fa Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 5 Sep 2017 03:51:02 -0700 Subject: [PATCH 06/12] Update answer file to show the result. --- .../command/createDataSourceTables.scala | 5 ++-- .../sql-tests/results/show_columns.sql.out | 30 ++++++++++--------- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 8e0352a70c22..3da5432bedd3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -89,9 +89,10 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo table.provider.get.toLowerCase match { case "parquet" => - dataSource.schema.map(_.name).foreach(ParquetSchemaConverter.checkFieldName) + table.dataSchema.map(_.name).foreach(ParquetSchemaConverter.checkFieldName) case "orc" => - dataSource.schema.map(_.name).foreach(OrcFileFormat.checkFieldName) + table.dataSchema.map(_.name).foreach(OrcFileFormat.checkFieldName) + case _ => } val newTable = table.copy( diff --git a/sql/core/src/test/resources/sql-tests/results/show_columns.sql.out b/sql/core/src/test/resources/sql-tests/results/show_columns.sql.out index 05c3a083ee3b..1bf257973c50 100644 --- a/sql/core/src/test/resources/sql-tests/results/show_columns.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/show_columns.sql.out @@ -23,7 +23,8 @@ CREATE TABLE showcolumn1 (col1 int, `col 2` int) USING parquet -- !query 2 schema struct<> -- !query 2 output - +org.apache.spark.sql.AnalysisException +Attribute name "col 2" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.; -- !query 3 @@ -53,28 +54,28 @@ struct<> -- !query 6 SHOW COLUMNS IN showcolumn1 -- !query 6 schema -struct +struct<> -- !query 6 output -col 2 -col1 +org.apache.spark.sql.catalyst.analysis.NoSuchTableException +Table or view 'showcolumn1' not found in database 'showdb'; -- !query 7 SHOW COLUMNS IN showdb.showcolumn1 -- !query 7 schema -struct +struct<> -- !query 7 output -col 2 -col1 +org.apache.spark.sql.catalyst.analysis.NoSuchTableException +Table or view 'showcolumn1' not found in database 'showdb'; -- !query 8 SHOW COLUMNS IN showcolumn1 FROM showdb -- !query 8 schema -struct +struct<> -- !query 8 output -col 2 -col1 +org.apache.spark.sql.catalyst.analysis.NoSuchTableException +Table or view 'showcolumn1' not found in database 'showdb'; -- !query 9 @@ -100,10 +101,10 @@ Table or view 'badtable' not found in database 'showdb'; -- !query 11 SHOW COLUMNS IN showdb.showcolumn1 from SHOWDB -- !query 11 schema -struct +struct<> -- !query 11 output -col 2 -col1 +org.apache.spark.sql.catalyst.analysis.NoSuchTableException +Table or view 'showcolumn1' not found in database 'showdb'; -- !query 12 @@ -174,7 +175,8 @@ DROP TABLE showcolumn1 -- !query 19 schema struct<> -- !query 19 output - +org.apache.spark.sql.catalyst.analysis.NoSuchTableException +Table or view 'showcolumn1' not found in database 'showdb'; -- !query 20 From 79929e907da3bc02a591bb4f4da193d6c698e8c4 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 5 Sep 2017 09:07:06 -0700 Subject: [PATCH 07/12] Replace parquet with json. --- .../sql-tests/inputs/show_columns.sql | 6 ++-- .../sql-tests/results/show_columns.sql.out | 36 +++++++++---------- 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/show_columns.sql b/sql/core/src/test/resources/sql-tests/inputs/show_columns.sql index 1e02c2f045ea..e086ea233698 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/show_columns.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/show_columns.sql @@ -2,9 +2,9 @@ CREATE DATABASE showdb; USE showdb; -CREATE TABLE showcolumn1 (col1 int, `col 2` int) USING parquet; -CREATE TABLE showcolumn2 (price int, qty int, year int, month int) USING parquet partitioned by (year, month); -CREATE TEMPORARY VIEW showColumn3 (col3 int, `col 4` int) USING parquet; +CREATE TABLE showcolumn1 (col1 int, `col 2` int) USING json; +CREATE TABLE showcolumn2 (price int, qty int, year int, month int) USING json partitioned by (year, month); +CREATE TEMPORARY VIEW showColumn3 (col3 int, `col 4` int) USING json; CREATE GLOBAL TEMP VIEW showColumn4 AS SELECT 1 as col1, 'abc' as `col 5`; diff --git a/sql/core/src/test/resources/sql-tests/results/show_columns.sql.out b/sql/core/src/test/resources/sql-tests/results/show_columns.sql.out index 1bf257973c50..c2d70a9eae85 100644 --- a/sql/core/src/test/resources/sql-tests/results/show_columns.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/show_columns.sql.out @@ -19,16 +19,15 @@ struct<> -- !query 2 -CREATE TABLE showcolumn1 (col1 int, `col 2` int) USING parquet +CREATE TABLE showcolumn1 (col1 int, `col 2` int) USING json -- !query 2 schema struct<> -- !query 2 output -org.apache.spark.sql.AnalysisException -Attribute name "col 2" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.; + -- !query 3 -CREATE TABLE showcolumn2 (price int, qty int, year int, month int) USING parquet partitioned by (year, month) +CREATE TABLE showcolumn2 (price int, qty int, year int, month int) USING json partitioned by (year, month) -- !query 3 schema struct<> -- !query 3 output @@ -36,7 +35,7 @@ struct<> -- !query 4 -CREATE TEMPORARY VIEW showColumn3 (col3 int, `col 4` int) USING parquet +CREATE TEMPORARY VIEW showColumn3 (col3 int, `col 4` int) USING json -- !query 4 schema struct<> -- !query 4 output @@ -54,28 +53,28 @@ struct<> -- !query 6 SHOW COLUMNS IN showcolumn1 -- !query 6 schema -struct<> +struct -- !query 6 output -org.apache.spark.sql.catalyst.analysis.NoSuchTableException -Table or view 'showcolumn1' not found in database 'showdb'; +col 2 +col1 -- !query 7 SHOW COLUMNS IN showdb.showcolumn1 -- !query 7 schema -struct<> +struct -- !query 7 output -org.apache.spark.sql.catalyst.analysis.NoSuchTableException -Table or view 'showcolumn1' not found in database 'showdb'; +col 2 +col1 -- !query 8 SHOW COLUMNS IN showcolumn1 FROM showdb -- !query 8 schema -struct<> +struct -- !query 8 output -org.apache.spark.sql.catalyst.analysis.NoSuchTableException -Table or view 'showcolumn1' not found in database 'showdb'; +col 2 +col1 -- !query 9 @@ -101,10 +100,10 @@ Table or view 'badtable' not found in database 'showdb'; -- !query 11 SHOW COLUMNS IN showdb.showcolumn1 from SHOWDB -- !query 11 schema -struct<> +struct -- !query 11 output -org.apache.spark.sql.catalyst.analysis.NoSuchTableException -Table or view 'showcolumn1' not found in database 'showdb'; +col 2 +col1 -- !query 12 @@ -175,8 +174,7 @@ DROP TABLE showcolumn1 -- !query 19 schema struct<> -- !query 19 output -org.apache.spark.sql.catalyst.analysis.NoSuchTableException -Table or view 'showcolumn1' not found in database 'showdb'; + -- !query 20 From 368b2421f924026ee76fe6bb787fdb90af7af5c3 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 5 Sep 2017 12:49:55 -0700 Subject: [PATCH 08/12] Address comments. --- .../command/createDataSourceTables.scala | 7 ++++--- .../execution/datasources/orc/OrcFileFormat.scala | 6 ++++++ .../apache/spark/sql/hive/HiveStrategies.scala | 15 ++++++++++++++- .../apache/spark/sql/hive/orc/OrcFileFormat.scala | 3 +-- .../spark/sql/hive/execution/SQLQuerySuite.scala | 15 +++++++++++---- 5 files changed, 36 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 3da5432bedd3..f0dcf741149f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.command import java.net.URI +import java.util.Locale import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ @@ -87,11 +88,11 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo } } - table.provider.get.toLowerCase match { + table.provider.get.toLowerCase(Locale.ROOT) match { case "parquet" => - table.dataSchema.map(_.name).foreach(ParquetSchemaConverter.checkFieldName) + ParquetSchemaConverter.checkFieldNames(table.dataSchema) case "orc" => - table.dataSchema.map(_.name).foreach(OrcFileFormat.checkFieldName) + OrcFileFormat.checkFieldNames(table.dataSchema) case _ => } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index e946c29ac637..e2236fc99476 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.orc import org.apache.orc.TypeDescription import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.types.StructType private[sql] object OrcFileFormat { def checkFieldName(name: String): Unit = { @@ -33,4 +34,9 @@ private[sql] object OrcFileFormat { """.stripMargin.split("\n").mkString(" ").trim) } } + + def checkFieldNames(schema: StructType): StructType = { + schema.fieldNames.foreach(checkFieldName) + schema + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index ae1e7e72e8c3..7d01fdd50fb9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -31,7 +31,8 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions, ParquetSchemaConverter} import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.hive.orc.OrcFileFormat import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -145,15 +146,27 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { * `PreprocessTableInsertion`. */ object HiveAnalysis extends Rule[LogicalPlan] { + private def checkFieldNames(table: CatalogTable): Unit = { + val serde = table.storage.serde + if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) { + OrcFileFormat.checkFieldNames(table.dataSchema) + } else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde) { + ParquetSchemaConverter.checkFieldNames(table.dataSchema) + } + } + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, ifPartitionNotExists) if DDLUtils.isHiveTable(r.tableMeta) => + checkFieldNames(r.tableMeta) InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists) case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => + checkFieldNames(tableDesc) CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) => + checkFieldNames(tableDesc) CreateHiveTableAsSelectCommand(tableDesc, query, mode) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index b78b3050f113..81156ef8699b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -83,8 +83,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable classOf[MapRedOutputFormat[_, _]]) } - dataSchema.map(_.name).foreach( - org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.checkFieldName) + org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.checkFieldNames(dataSchema) new OutputWriterFactory { override def newInstance( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index cc86bbc8c822..cb0ac9656bce 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2002,18 +2002,25 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } test("SPARK-21912 Creating ORC/Parquet datasource table should check invalid column names") { - withTable("orc1") { + withTable("t21912") { Seq(" ", ",", ";", "{", "}", "(", ")", "\n", "\t", "=").foreach { name => - Seq("ORC", "PARQUET").foreach { dataSource => + Seq("ORC", "PARQUET").foreach { source => val m = intercept[AnalysisException] { - sql(s"CREATE TABLE orc1(`column$name` INT) USING $dataSource") + sql(s"CREATE TABLE t21912(`col$name` INT) USING $source") }.getMessage assert(m.contains(s"contains invalid character(s)")) val m2 = intercept[AnalysisException] { - sql(s"CREATE TABLE orc1 USING $dataSource AS SELECT 1 `column$name`") + sql(s"CREATE TABLE t21912 USING $source AS SELECT 1 `col$name`") }.getMessage assert(m2.contains(s"contains invalid character(s)")) + + withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") { + val m3 = intercept[AnalysisException] { + sql(s"CREATE TABLE t21912(`col$name` INT) USING hive OPTIONS (fileFormat '$source')") + }.getMessage + assert(m3.contains(s"contains invalid character(s)")) + } } } } From c70c03cb6a46c1a53366ea3b3a96aec843d02608 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 5 Sep 2017 14:07:54 -0700 Subject: [PATCH 09/12] Use DataSourceStategy. --- .../command/createDataSourceTables.scala | 11 ----------- .../datasources/DataSourceStrategy.scala | 15 +++++++++++++++ .../execution/datasources/orc/OrcFileFormat.scala | 2 +- .../resources/sql-tests/inputs/show_columns.sql | 2 +- .../sql-tests/results/show_columns.sql.out | 2 +- .../apache/spark/sql/hive/orc/OrcFileFormat.scala | 6 ++---- 6 files changed, 20 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index f0dcf741149f..04b2534ca5eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -18,14 +18,11 @@ package org.apache.spark.sql.execution.command import java.net.URI -import java.util.Locale import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat -import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter import org.apache.spark.sql.sources.BaseRelation /** @@ -88,14 +85,6 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo } } - table.provider.get.toLowerCase(Locale.ROOT) match { - case "parquet" => - ParquetSchemaConverter.checkFieldNames(table.dataSchema) - case "orc" => - OrcFileFormat.checkFieldNames(table.dataSchema) - case _ => - } - val newTable = table.copy( schema = dataSource.schema, partitionColumnNames = partitionColumnNames, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 0deac1984bd6..c6622a0a968f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources +import java.util.Locale import java.util.concurrent.Callable import org.apache.spark.internal.Logging @@ -34,6 +35,8 @@ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat +import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -128,12 +131,24 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast projectList } + private def checkFieldNames(table: CatalogTable): Unit = { + table.provider.get.toLowerCase(Locale.ROOT) match { + case "parquet" => + ParquetSchemaConverter.checkFieldNames(table.dataSchema) + case "orc" => + OrcFileFormat.checkFieldNames(table.dataSchema) + case _ => + } + } + override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) => + checkFieldNames(tableDesc) CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) case CreateTable(tableDesc, mode, Some(query)) if query.resolved && DDLUtils.isDatasourceTable(tableDesc) => + checkFieldNames(tableDesc.copy(schema = query.schema)) CreateDataSourceTableAsSelectCommand(tableDesc, mode, query) case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index e2236fc99476..0c16de6ceec5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.types.StructType private[sql] object OrcFileFormat { - def checkFieldName(name: String): Unit = { + private def checkFieldName(name: String): Unit = { try { TypeDescription.fromString(s"struct<$name:int>") } catch { diff --git a/sql/core/src/test/resources/sql-tests/inputs/show_columns.sql b/sql/core/src/test/resources/sql-tests/inputs/show_columns.sql index e086ea233698..521018e94e50 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/show_columns.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/show_columns.sql @@ -3,7 +3,7 @@ CREATE DATABASE showdb; USE showdb; CREATE TABLE showcolumn1 (col1 int, `col 2` int) USING json; -CREATE TABLE showcolumn2 (price int, qty int, year int, month int) USING json partitioned by (year, month); +CREATE TABLE showcolumn2 (price int, qty int, year int, month int) USING parquet partitioned by (year, month); CREATE TEMPORARY VIEW showColumn3 (col3 int, `col 4` int) USING json; CREATE GLOBAL TEMP VIEW showColumn4 AS SELECT 1 as col1, 'abc' as `col 5`; diff --git a/sql/core/src/test/resources/sql-tests/results/show_columns.sql.out b/sql/core/src/test/resources/sql-tests/results/show_columns.sql.out index c2d70a9eae85..71d6e120e894 100644 --- a/sql/core/src/test/resources/sql-tests/results/show_columns.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/show_columns.sql.out @@ -27,7 +27,7 @@ struct<> -- !query 3 -CREATE TABLE showcolumn2 (price int, qty int, year int, month int) USING json partitioned by (year, month) +CREATE TABLE showcolumn2 (price int, qty int, year int, month int) USING parquet partitioned by (year, month) -- !query 3 schema struct<> -- !query 3 output diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 81156ef8699b..edf2013a4c93 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -34,12 +34,12 @@ import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} import org.apache.spark.TaskContext -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} -import org.apache.spark.sql.sources._ +import org.apache.spark.sql.sources.{Filter, _} import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -83,8 +83,6 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable classOf[MapRedOutputFormat[_, _]]) } - org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.checkFieldNames(dataSchema) - new OutputWriterFactory { override def newInstance( path: String, From 8ee87dd0d799d0e4504ca11c1f1d31f1141a0844 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 5 Sep 2017 15:18:31 -0700 Subject: [PATCH 10/12] Move to DDLUtils. --- .../spark/sql/execution/command/ddl.scala | 23 +++++++++++++++++++ .../datasources/DataSourceStrategy.scala | 17 ++------------ .../spark/sql/hive/HiveStrategies.scala | 18 ++++----------- 3 files changed, 29 insertions(+), 29 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index dae160f1bbb1..a32a236317e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -34,6 +34,9 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.datasources.PartitioningUtils +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat +import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter +import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.types._ import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} @@ -848,4 +851,24 @@ object DDLUtils { } } } + + private[sql] def checkFieldNames(table: CatalogTable): Unit = { + table.provider.get.toLowerCase(Locale.ROOT) match { + case "hive" => + val serde = table.storage.serde + if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) { + OrcFileFormat.checkFieldNames(table.dataSchema) + } else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde) { + ParquetSchemaConverter.checkFieldNames(table.dataSchema) + } + + case "parquet" => + ParquetSchemaConverter.checkFieldNames(table.dataSchema) + + case "orc" => + OrcFileFormat.checkFieldNames(table.dataSchema) + + case _ => + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index c6622a0a968f..65d69a269fa0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.datasources -import java.util.Locale import java.util.concurrent.Callable import org.apache.spark.internal.Logging @@ -35,8 +34,6 @@ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat -import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -131,24 +128,14 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast projectList } - private def checkFieldNames(table: CatalogTable): Unit = { - table.provider.get.toLowerCase(Locale.ROOT) match { - case "parquet" => - ParquetSchemaConverter.checkFieldNames(table.dataSchema) - case "orc" => - OrcFileFormat.checkFieldNames(table.dataSchema) - case _ => - } - } - override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) => - checkFieldNames(tableDesc) + DDLUtils.checkFieldNames(tableDesc) CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) case CreateTable(tableDesc, mode, Some(query)) if query.resolved && DDLUtils.isDatasourceTable(tableDesc) => - checkFieldNames(tableDesc.copy(schema = query.schema)) + DDLUtils.checkFieldNames(tableDesc.copy(schema = query.schema)) CreateDataSourceTableAsSelectCommand(tableDesc, mode, query) case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 7d01fdd50fb9..9ad3fb06d214 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -31,8 +31,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} -import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions, ParquetSchemaConverter} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.hive.orc.OrcFileFormat import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -146,27 +145,18 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { * `PreprocessTableInsertion`. */ object HiveAnalysis extends Rule[LogicalPlan] { - private def checkFieldNames(table: CatalogTable): Unit = { - val serde = table.storage.serde - if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) { - OrcFileFormat.checkFieldNames(table.dataSchema) - } else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde) { - ParquetSchemaConverter.checkFieldNames(table.dataSchema) - } - } - override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, ifPartitionNotExists) if DDLUtils.isHiveTable(r.tableMeta) => - checkFieldNames(r.tableMeta) + DDLUtils.checkFieldNames(r.tableMeta) InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists) case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => - checkFieldNames(tableDesc) + DDLUtils.checkFieldNames(tableDesc) CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) => - checkFieldNames(tableDesc) + DDLUtils.checkFieldNames(tableDesc) CreateHiveTableAsSelectCommand(tableDesc, query, mode) } } From c6e9ab6291dda034fe39263202ea5bc2373cd86c Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 5 Sep 2017 18:24:40 -0700 Subject: [PATCH 11/12] Add ALTER TABLE and address comments. --- .../spark/sql/execution/command/ddl.scala | 27 ++++++++----------- .../spark/sql/execution/command/tables.scala | 3 +++ .../datasources/orc/OrcFileFormat.scala | 2 +- .../sql/hive/execution/SQLQuerySuite.scala | 15 ++++++++--- 4 files changed, 27 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index a32a236317e7..a6f8306e67e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -853,22 +853,17 @@ object DDLUtils { } private[sql] def checkFieldNames(table: CatalogTable): Unit = { - table.provider.get.toLowerCase(Locale.ROOT) match { - case "hive" => - val serde = table.storage.serde - if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) { - OrcFileFormat.checkFieldNames(table.dataSchema) - } else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde) { - ParquetSchemaConverter.checkFieldNames(table.dataSchema) - } - - case "parquet" => - ParquetSchemaConverter.checkFieldNames(table.dataSchema) - - case "orc" => - OrcFileFormat.checkFieldNames(table.dataSchema) - - case _ => + val serde = table.storage.serde + if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) { + OrcFileFormat.checkFieldNames(table.dataSchema) + } else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde) { + ParquetSchemaConverter.checkFieldNames(table.dataSchema) + } else { + table.provider.get.toLowerCase(Locale.ROOT) match { + case "parquet" => ParquetSchemaConverter.checkFieldNames(table.dataSchema) + case "orc" => OrcFileFormat.checkFieldNames(table.dataSchema) + case _ => + } } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 694d517668a2..40e0e73abbe0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -206,6 +206,9 @@ case class AlterTableAddColumnsCommand( reorderedSchema.map(_.name), "in the table definition of " + table.identifier, conf.caseSensitiveAnalysis) + val newDataSchema = StructType(catalogTable.dataSchema ++ columns) + DDLUtils.checkFieldNames(catalogTable.copy(schema = newDataSchema)) + catalog.alterTableSchema( table, catalogTable.schema.copy(fields = reorderedSchema.toArray)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 0c16de6ceec5..2eeb0065455f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -29,7 +29,7 @@ private[sql] object OrcFileFormat { } catch { case _: IllegalArgumentException => throw new AnalysisException( - s"""Attribute name "$name" contains invalid character(s). + s"""Column name "$name" contains invalid character(s). |Please use alias to rename it. """.stripMargin.split("\n").mkString(" ").trim) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index cb0ac9656bce..85a6a77cedc4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2001,9 +2001,9 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } - test("SPARK-21912 Creating ORC/Parquet datasource table should check invalid column names") { - withTable("t21912") { - Seq(" ", ",", ";", "{", "}", "(", ")", "\n", "\t", "=").foreach { name => + test("SPARK-21912 ORC/Parquet table should not create invalid column names") { + Seq(" ", ",", ";", "{", "}", "(", ")", "\n", "\t", "=").foreach { name => + withTable("t21912") { Seq("ORC", "PARQUET").foreach { source => val m = intercept[AnalysisException] { sql(s"CREATE TABLE t21912(`col$name` INT) USING $source") @@ -2022,6 +2022,15 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { assert(m3.contains(s"contains invalid character(s)")) } } + + // TODO: After SPARK-21929, we need to check ORC, too. + Seq("PARQUET").foreach { source => + sql(s"CREATE TABLE t21912(`col` INT) USING $source") + val m = intercept[AnalysisException] { + sql(s"ALTER TABLE t21912 ADD COLUMNS(`col$name` INT)") + }.getMessage + assert(m.contains(s"contains invalid character(s)")) + } } } } From 46847f860d9ce445f8b0878c4bf64b983f3242f8 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 5 Sep 2017 23:45:54 -0700 Subject: [PATCH 12/12] Address comments. --- .../spark/sql/execution/command/ddl.scala | 19 +++++++++++-------- .../spark/sql/execution/command/tables.scala | 8 +++----- .../datasources/DataSourceStrategy.scala | 4 ++-- .../spark/sql/hive/HiveStrategies.scala | 5 ++--- 4 files changed, 18 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index a6f8306e67e0..7611e1c2e268 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -852,14 +852,17 @@ object DDLUtils { } } - private[sql] def checkFieldNames(table: CatalogTable): Unit = { - val serde = table.storage.serde - if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) { - OrcFileFormat.checkFieldNames(table.dataSchema) - } else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde) { - ParquetSchemaConverter.checkFieldNames(table.dataSchema) - } else { - table.provider.get.toLowerCase(Locale.ROOT) match { + private[sql] def checkDataSchemaFieldNames(table: CatalogTable): Unit = { + table.provider.foreach { + _.toLowerCase(Locale.ROOT) match { + case HIVE_PROVIDER => + val serde = table.storage.serde + if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) { + OrcFileFormat.checkFieldNames(table.dataSchema) + } else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde || + serde == Some("parquet.hive.serde.ParquetHiveSerDe")) { + ParquetSchemaConverter.checkFieldNames(table.dataSchema) + } case "parquet" => ParquetSchemaConverter.checkFieldNames(table.dataSchema) case "orc" => OrcFileFormat.checkFieldNames(table.dataSchema) case _ => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 40e0e73abbe0..1dddc1ca324b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -201,16 +201,14 @@ case class AlterTableAddColumnsCommand( // make sure any partition columns are at the end of the fields val reorderedSchema = catalogTable.dataSchema ++ columns ++ catalogTable.partitionSchema + val newSchema = catalogTable.schema.copy(fields = reorderedSchema.toArray) SchemaUtils.checkColumnNameDuplication( reorderedSchema.map(_.name), "in the table definition of " + table.identifier, conf.caseSensitiveAnalysis) + DDLUtils.checkDataSchemaFieldNames(catalogTable.copy(schema = newSchema)) - val newDataSchema = StructType(catalogTable.dataSchema ++ columns) - DDLUtils.checkFieldNames(catalogTable.copy(schema = newDataSchema)) - - catalog.alterTableSchema( - table, catalogTable.schema.copy(fields = reorderedSchema.toArray)) + catalog.alterTableSchema(table, newSchema) Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 65d69a269fa0..5d6223dffd28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -130,12 +130,12 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) => - DDLUtils.checkFieldNames(tableDesc) + DDLUtils.checkDataSchemaFieldNames(tableDesc) CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) case CreateTable(tableDesc, mode, Some(query)) if query.resolved && DDLUtils.isDatasourceTable(tableDesc) => - DDLUtils.checkFieldNames(tableDesc.copy(schema = query.schema)) + DDLUtils.checkDataSchemaFieldNames(tableDesc.copy(schema = query.schema)) CreateDataSourceTableAsSelectCommand(tableDesc, mode, query) case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 9ad3fb06d214..47203a80c37b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -148,15 +148,14 @@ object HiveAnalysis extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, ifPartitionNotExists) if DDLUtils.isHiveTable(r.tableMeta) => - DDLUtils.checkFieldNames(r.tableMeta) InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists) case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => - DDLUtils.checkFieldNames(tableDesc) + DDLUtils.checkDataSchemaFieldNames(tableDesc) CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) => - DDLUtils.checkFieldNames(tableDesc) + DDLUtils.checkDataSchemaFieldNames(tableDesc) CreateHiveTableAsSelectCommand(tableDesc, query, mode) } }