Skip to content

Commit ce3b98b

Browse files
clockflyyhuai
authored andcommitted
[SPARK-16034][SQL] Checks the partition columns when calling dataFrame.write.mode("append").saveAsTable
## What changes were proposed in this pull request? `DataFrameWriter` can be used to append data to existing data source tables. It becomes tricky when partition columns used in `DataFrameWriter.partitionBy(columns)` don't match the actual partition columns of the underlying table. This pull request enforces the check so that the partition columns of these two always match. ## How was this patch tested? Unit test. Author: Sean Zhong <[email protected]> Closes #13749 from clockfly/SPARK-16034.
1 parent 3d010c8 commit ce3b98b

File tree

3 files changed

+50
-22
lines changed

3 files changed

+50
-22
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,13 @@ case class CreateDataSourceTableAsSelectCommand(
242242
bucketSpec = bucketSpec,
243243
options = optionsWithPath)
244244

245-
val result = dataSource.write(mode, df)
246-
245+
val result = try {
246+
dataSource.write(mode, df)
247+
} catch {
248+
case ex: AnalysisException =>
249+
logError(s"Failed to write to table ${tableIdent.identifier} in $mode mode", ex)
250+
throw ex
251+
}
247252
if (createMetastoreTable) {
248253
// We will use the schema of resolved.relation as the schema of the table (instead of
249254
// the schema of df). It is important since the nullability may be changed by the relation

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -435,26 +435,25 @@ case class DataSource(
435435
// If we are appending to a table that already exists, make sure the partitioning matches
436436
// up. If we fail to load the table for whatever reason, ignore the check.
437437
if (mode == SaveMode.Append) {
438-
val existingPartitionColumnSet = try {
439-
Some(
440-
resolveRelation()
441-
.asInstanceOf[HadoopFsRelation]
442-
.location
443-
.partitionSpec()
444-
.partitionColumns
445-
.fieldNames
446-
.toSet)
447-
} catch {
448-
case e: Exception =>
449-
None
450-
}
451-
452-
existingPartitionColumnSet.foreach { ex =>
453-
if (ex.map(_.toLowerCase) != partitionColumns.map(_.toLowerCase()).toSet) {
454-
throw new AnalysisException(
455-
s"Requested partitioning does not equal existing partitioning: " +
456-
s"$ex != ${partitionColumns.toSet}.")
457-
}
438+
val existingColumns = Try {
439+
resolveRelation()
440+
.asInstanceOf[HadoopFsRelation]
441+
.location
442+
.partitionSpec()
443+
.partitionColumns
444+
.fieldNames
445+
.toSeq
446+
}.getOrElse(Seq.empty[String])
447+
val sameColumns =
448+
existingColumns.map(_.toLowerCase) == partitionColumns.map(_.toLowerCase)
449+
if (existingColumns.size > 0 && !sameColumns) {
450+
throw new AnalysisException(
451+
s"""Requested partitioning does not match existing partitioning.
452+
|Existing partitioning columns:
453+
| ${existingColumns.mkString(", ")}
454+
|Requested partitioning columns:
455+
| ${partitionColumns.mkString(", ")}
456+
|""".stripMargin)
458457
}
459458
}
460459

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1317,4 +1317,28 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
13171317
assertUnsupported("TRUNCATE TABLE my_tab PARTITION (age=10)")
13181318
}
13191319

1320+
test("SPARK-16034 Partition columns should match when appending to existing data source tables") {
1321+
import testImplicits._
1322+
val df = Seq((1, 2, 3)).toDF("a", "b", "c")
1323+
withTable("partitionedTable") {
1324+
df.write.mode("overwrite").partitionBy("a", "b").saveAsTable("partitionedTable")
1325+
// Misses some partition columns
1326+
intercept[AnalysisException] {
1327+
df.write.mode("append").partitionBy("a").saveAsTable("partitionedTable")
1328+
}
1329+
// Wrong order
1330+
intercept[AnalysisException] {
1331+
df.write.mode("append").partitionBy("b", "a").saveAsTable("partitionedTable")
1332+
}
1333+
// Partition columns not specified
1334+
intercept[AnalysisException] {
1335+
df.write.mode("append").saveAsTable("partitionedTable")
1336+
}
1337+
assert(sql("select * from partitionedTable").collect().size == 1)
1338+
// Inserts new data successfully when partition columns are correctly specified in
1339+
// partitionBy(...).
1340+
df.write.mode("append").partitionBy("a", "b").saveAsTable("partitionedTable")
1341+
assert(sql("select * from partitionedTable").collect().size == 2)
1342+
}
1343+
}
13201344
}

0 commit comments

Comments
 (0)