Skip to content

Commit 06b96ed

Browse files
committed
address comments
1 parent 1e19944 commit 06b96ed

File tree

2 files changed

+27
-7
lines changed

2 files changed

+27
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -167,15 +167,17 @@ final class DataFrameWriter private[sql](df: DataFrame) {
167167
}
168168

169169
private def insertInto(tableIdent: TableIdentifier): Unit = {
170-
val partitions = partitioningColumns.map(_.map(col => col -> (None: Option[String])).toMap)
170+
val partitions = normalizedParCols.map(_.map(col => col -> (None: Option[String])).toMap)
171171
val overwrite = mode == SaveMode.Overwrite
172172

173-
// A partitioned relation schema's can be different from the input logicalPlan, since
174-
// partition columns are all moved after data column. We Project to adjust the ordering.
175-
// TODO: this belongs in the analyzer.
176-
val input = partitioningColumns.map { parCols =>
177-
val projectList = df.logicalPlan.output.filterNot(c => parCols.contains(c.name)) ++
178-
parCols.map(UnresolvedAttribute(_))
173+
// A partitioned relation's schema can be different from the input logicalPlan, since
174+
// partition columns are all moved after data columns. We Project to adjust the ordering.
175+
// TODO: this belongs to the analyzer.
176+
val input = normalizedParCols.map { parCols =>
177+
val (inputPartCols, inputDataCols) = df.logicalPlan.output.partition { attr =>
178+
parCols.contains(attr.name)
179+
}
180+
val projectList = inputDataCols ++ inputPartCols.map(c => UnresolvedAttribute(c.name))
179181
Project(projectList, df.logicalPlan)
180182
}.getOrElse(df.logicalPlan)
181183

@@ -188,6 +190,16 @@ final class DataFrameWriter private[sql](df: DataFrame) {
188190
ifNotExists = false)).toRdd
189191
}
190192

193+
private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { parCols =>
194+
parCols.map { col =>
195+
df.logicalPlan.output
196+
.map(_.name)
197+
.find(df.queryExecution.analyzer.resolver(_, col))
198+
.getOrElse(throw new AnalysisException(
199+
s"Partition column $col not found in schema ${df.logicalPlan.schema}"))
200+
}
201+
}
202+
191203
/**
192204
* Saves the content of the [[DataFrame]] as the specified table.
193205
*

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1426,11 +1426,19 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
14261426
withTable("tbl11453") {
14271427
Seq("1" -> "10", "2" -> "20").toDF("i", "j")
14281428
.write.partitionBy("i").saveAsTable("tbl11453")
1429+
14291430
Seq("3" -> "30").toDF("i", "j")
14301431
.write.mode(SaveMode.Append).partitionBy("i").saveAsTable("tbl11453")
14311432
checkAnswer(
14321433
sqlContext.read.table("tbl11453").select("i", "j").orderBy("i"),
14331434
Row("1", "10") :: Row("2", "20") :: Row("3", "30") :: Nil)
1435+
1436+
// make sure case sensitivity is correct.
1437+
Seq("4" -> "40").toDF("i", "j")
1438+
.write.mode(SaveMode.Append).partitionBy("I").saveAsTable("tbl11453")
1439+
checkAnswer(
1440+
sqlContext.read.table("tbl11453").select("i", "j").orderBy("i"),
1441+
Row("1", "10") :: Row("2", "20") :: Row("3", "30") :: Row("4", "40") :: Nil)
14341442
}
14351443
}
14361444
}

0 commit comments

Comments
 (0)