Skip to content

Commit d8b50f7

Browse files
cloud-fanyhuai
authored andcommitted
[SPARK-11453][SQL] append data to partitioned table will messes up the result
The reason is that: 1. For partitioned hive table, we will move the partitioned columns after data columns. (e.g. `<a: Int, b: Int>` partition by `a` will become `<b: Int, a: Int>`) 2. When append data to table, we use position to figure out how to match input columns to table's columns. So when we append data to partitioned table, we will match wrong columns between input and table. A solution is reordering the input columns before match by position, like what we did for [`InsertIntoHadoopFsRelation`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala#L101-L105) Author: Wenchen Fan <[email protected]> Closes #9408 from cloud-fan/append.
1 parent 97b7080 commit d8b50f7

File tree

3 files changed

+53
-4
lines changed

3 files changed

+53
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ import scala.collection.JavaConverters._
2323

2424
import org.apache.spark.annotation.Experimental
2525
import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier}
26-
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
27-
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
26+
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
27+
import org.apache.spark.sql.catalyst.plans.logical.{Project, InsertIntoTable}
2828
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
2929
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, ResolvedDataSource}
3030
import org.apache.spark.sql.sources.HadoopFsRelation
@@ -167,17 +167,38 @@ final class DataFrameWriter private[sql](df: DataFrame) {
167167
}
168168

169169
private def insertInto(tableIdent: TableIdentifier): Unit = {
170-
val partitions = partitioningColumns.map(_.map(col => col -> (None: Option[String])).toMap)
170+
val partitions = normalizedParCols.map(_.map(col => col -> (None: Option[String])).toMap)
171171
val overwrite = mode == SaveMode.Overwrite
172+
173+
// A partitioned relation's schema can be different from the input logicalPlan, since
174+
// partition columns are all moved after data columns. We Project to adjust the ordering.
175+
// TODO: this belongs to the analyzer.
176+
val input = normalizedParCols.map { parCols =>
177+
val (inputPartCols, inputDataCols) = df.logicalPlan.output.partition { attr =>
178+
parCols.contains(attr.name)
179+
}
180+
Project(inputDataCols ++ inputPartCols, df.logicalPlan)
181+
}.getOrElse(df.logicalPlan)
182+
172183
df.sqlContext.executePlan(
173184
InsertIntoTable(
174185
UnresolvedRelation(tableIdent),
175186
partitions.getOrElse(Map.empty[String, Option[String]]),
176-
df.logicalPlan,
187+
input,
177188
overwrite,
178189
ifNotExists = false)).toRdd
179190
}
180191

192+
private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { parCols =>
193+
parCols.map { col =>
194+
df.logicalPlan.output
195+
.map(_.name)
196+
.find(df.sqlContext.analyzer.resolver(_, col))
197+
.getOrElse(throw new AnalysisException(s"Partition column $col not found in existing " +
198+
s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})"))
199+
}
200+
}
201+
181202
/**
182203
* Saves the content of the [[DataFrame]] as the specified table.
183204
*

sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,12 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
5353

5454
Utils.deleteRecursively(path)
5555
}
56+
57+
test("partitioned columns should appear at the end of schema") {
58+
withTempPath { f =>
59+
val path = f.getAbsolutePath
60+
Seq(1 -> "a").toDF("i", "j").write.partitionBy("i").parquet(path)
61+
assert(sqlContext.read.parquet(path).schema.map(_.name) == Seq("j", "i"))
62+
}
63+
}
5664
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1428,4 +1428,24 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
14281428
checkAnswer(sql("SELECT val FROM tbl10562 WHERE Year == 2012"), Row("a"))
14291429
}
14301430
}
1431+
1432+
test("SPARK-11453: append data to partitioned table") {
1433+
withTable("tbl11453") {
1434+
Seq("1" -> "10", "2" -> "20").toDF("i", "j")
1435+
.write.partitionBy("i").saveAsTable("tbl11453")
1436+
1437+
Seq("3" -> "30").toDF("i", "j")
1438+
.write.mode(SaveMode.Append).partitionBy("i").saveAsTable("tbl11453")
1439+
checkAnswer(
1440+
sqlContext.read.table("tbl11453").select("i", "j").orderBy("i"),
1441+
Row("1", "10") :: Row("2", "20") :: Row("3", "30") :: Nil)
1442+
1443+
// make sure case sensitivity is correct.
1444+
Seq("4" -> "40").toDF("i", "j")
1445+
.write.mode(SaveMode.Append).partitionBy("I").saveAsTable("tbl11453")
1446+
checkAnswer(
1447+
sqlContext.read.table("tbl11453").select("i", "j").orderBy("i"),
1448+
Row("1", "10") :: Row("2", "20") :: Row("3", "30") :: Row("4", "40") :: Nil)
1449+
}
1450+
}
14311451
}

0 commit comments

Comments
 (0)