Skip to content

Commit 1300813

Browse files
committed
Follow up of SPARK-16036
1 parent 28d130d commit 1300813

File tree

8 files changed

+70
-38
lines changed

8 files changed

+70
-38
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,8 @@ case class InsertIntoTable(
369369
if (table.output.isEmpty) {
370370
None
371371
} else {
372+
// Note: The parser (visitPartitionSpec in AstBuilder) already turns
373+
// keys in partition to their lowercase forms.
372374
val staticPartCols = partition.filter(_._2.isDefined).keySet
373375
Some(table.output.filterNot(a => staticPartCols.contains(a.name)))
374376
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ case class DataSource(
446446
}.getOrElse(Seq.empty[String])
447447
// TODO: Case sensitivity.
448448
val sameColumns =
449-
existingPartitionColumns.map(_.toLowerCase) == partitionColumns.map(_.toLowerCase)
449+
existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
450450
if (existingPartitionColumns.size > 0 && !sameColumns) {
451451
throw new AnalysisException(
452452
s"""Requested partitioning does not match existing partitioning.

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[Lo
6767
* table. It also does data type casting and field renaming, to make sure that the columns to be
6868
* inserted have the correct data type and fields have the correct names.
6969
*/
70-
private[sql] object PreprocessTableInsertion extends Rule[LogicalPlan] {
70+
private[sql] case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
7171
private def preprocess(
7272
insert: InsertIntoTable,
7373
tblName: String,
@@ -84,7 +84,13 @@ private[sql] object PreprocessTableInsertion extends Rule[LogicalPlan] {
8484
if (insert.partition.nonEmpty) {
8585
// the query's partitioning must match the table's partitioning
8686
// this is set for queries like: insert into ... partition (one = "a", two = <expr>)
87-
if (insert.partition.keySet != partColNames.toSet) {
87+
val samePartitionColumns =
88+
if (conf.caseSensitiveAnalysis) {
89+
insert.partition.keySet == partColNames.toSet
90+
} else {
91+
insert.partition.keySet.map(_.toLowerCase) == partColNames.map(_.toLowerCase).toSet
92+
}
93+
if (!samePartitionColumns) {
8894
throw new AnalysisException(
8995
s"""
9096
|Requested partitioning does not match the table $tblName:
@@ -94,7 +100,8 @@ private[sql] object PreprocessTableInsertion extends Rule[LogicalPlan] {
94100
}
95101
expectedColumns.map(castAndRenameChildOutput(insert, _)).getOrElse(insert)
96102
} else {
97-
// All partition columns are dynamic because this InsertIntoTable had no partitioning
103+
// All partition columns are dynamic because because the InsertIntoTable command does
104+
// not explicitly specify partitioning columns.
98105
expectedColumns.map(castAndRenameChildOutput(insert, _)).getOrElse(insert)
99106
.copy(partition = partColNames.map(_ -> None).toMap)
100107
}

sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
111111
lazy val analyzer: Analyzer = {
112112
new Analyzer(catalog, conf) {
113113
override val extendedResolutionRules =
114-
PreprocessTableInsertion ::
114+
PreprocessTableInsertion(conf) ::
115115
new FindDataSourceTable(sparkSession) ::
116116
DataSourceAnalysis ::
117117
(if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
6565
catalog.ParquetConversions ::
6666
catalog.OrcConversions ::
6767
catalog.CreateTables ::
68-
PreprocessTableInsertion ::
68+
PreprocessTableInsertion(conf) ::
6969
DataSourceAnalysis ::
7070
(if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)
7171

sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,4 +372,24 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
372372
assert(!logical.resolved, "Should not resolve: missing partition data")
373373
}
374374
}
375+
376+
testPartitionedTable(
377+
"SPARK-16036: better error message when insert into a table with mismatch schema") {
378+
tableName =>
379+
val e = intercept[AnalysisException] {
380+
sql(s"INSERT INTO TABLE $tableName PARTITION(b=1, c=2) SELECT 1, 2, 3")
381+
}
382+
assert(e.message.contains("the number of columns are different"))
383+
}
384+
385+
testPartitionedTable(
386+
"SPARK-16037: INSERT statement should match columns by position") {
387+
tableName =>
388+
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
389+
sql(s"INSERT INTO TABLE $tableName SELECT 1, 2 AS c, 3 AS b")
390+
checkAnswer(sql(s"SELECT a, b, c FROM $tableName"), Row(1, 2, 3))
391+
sql(s"INSERT OVERWRITE TABLE $tableName SELECT 1, 2, 3")
392+
checkAnswer(sql(s"SELECT a, b, c FROM $tableName"), Row(1, 2, 3))
393+
}
394+
}
375395
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,6 +1033,41 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
10331033
sql("SELECT * FROM boom").queryExecution.analyzed
10341034
}
10351035

1036+
test("SPARK-3810: PreprocessTableInsertion static partitioning support") {
1037+
val analyzedPlan = {
1038+
loadTestTable("srcpart")
1039+
sql("DROP TABLE IF EXISTS withparts")
1040+
sql("CREATE TABLE withparts LIKE srcpart")
1041+
sql("INSERT INTO TABLE withparts PARTITION(ds='1', hr='2') SELECT key, value FROM src")
1042+
.queryExecution.analyzed
1043+
}
1044+
1045+
assertResult(1, "Duplicated project detected\n" + analyzedPlan) {
1046+
analyzedPlan.collect {
1047+
case _: Project => ()
1048+
}.size
1049+
}
1050+
}
1051+
1052+
test("SPARK-3810: PreprocessTableInsertion dynamic partitioning support") {
1053+
val analyzedPlan = {
1054+
loadTestTable("srcpart")
1055+
sql("DROP TABLE IF EXISTS withparts")
1056+
sql("CREATE TABLE withparts LIKE srcpart")
1057+
sql("SET hive.exec.dynamic.partition.mode=nonstrict")
1058+
1059+
sql("CREATE TABLE IF NOT EXISTS withparts LIKE srcpart")
1060+
sql("INSERT INTO TABLE withparts PARTITION(ds, hr) SELECT key, value, '1', '2' FROM src")
1061+
.queryExecution.analyzed
1062+
}
1063+
1064+
assertResult(2, "Duplicated project detected\n" + analyzedPlan) {
1065+
analyzedPlan.collect {
1066+
case _: Project => ()
1067+
}.size
1068+
}
1069+
}
1070+
10361071
test("parse HQL set commands") {
10371072
// Adapted from its SQL counterpart.
10381073
val testKey = "spark.sql.key.usedfortestonly"

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1684,36 +1684,4 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
16841684
)
16851685
}
16861686
}
1687-
1688-
test("SPARK-16036: better error message when insert into a table with mismatch schema") {
1689-
withTable("hive_table", "datasource_table") {
1690-
sql("CREATE TABLE hive_table(a INT) PARTITIONED BY (b INT, c INT)")
1691-
sql("CREATE TABLE datasource_table(a INT, b INT, c INT) USING parquet PARTITIONED BY (b, c)")
1692-
val e1 = intercept[AnalysisException] {
1693-
sql("INSERT INTO TABLE hive_table PARTITION(b=1, c=2) SELECT 1, 2, 3")
1694-
}
1695-
assert(e1.message.contains("the number of columns are different"))
1696-
val e2 = intercept[AnalysisException] {
1697-
sql("INSERT INTO TABLE datasource_table PARTITION(b=1, c=2) SELECT 1, 2, 3")
1698-
}
1699-
assert(e2.message.contains("the number of columns are different"))
1700-
}
1701-
}
1702-
1703-
test("SPARK-16037: INSERT statement should match columns by position") {
1704-
withTable("hive_table", "datasource_table") {
1705-
sql("CREATE TABLE hive_table(a INT) PARTITIONED BY (b INT, c INT)")
1706-
sql("CREATE TABLE datasource_table(a INT, b INT, c INT) USING parquet PARTITIONED BY (b, c)")
1707-
1708-
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
1709-
sql("INSERT INTO TABLE hive_table SELECT 1, 2 AS c, 3 AS b")
1710-
checkAnswer(sql("SELECT a, b, c FROM hive_table"), Row(1, 2, 3))
1711-
sql("INSERT OVERWRITE TABLE hive_table SELECT 1, 2, 3")
1712-
checkAnswer(sql("SELECT a, b, c FROM hive_table"), Row(1, 2, 3))
1713-
}
1714-
1715-
sql("INSERT INTO TABLE datasource_table SELECT 1, 2 AS c, 3 AS b")
1716-
checkAnswer(sql("SELECT a, b, c FROM datasource_table"), Row(1, 2, 3))
1717-
}
1718-
}
17191687
}

0 commit comments

Comments
 (0)