Skip to content

Commit ec12220

Browse files
committed
[SPARK-21165][SQL] FileFormatWriter should handle mismatched attribute ids between logical and physical plan
## What changes were proposed in this pull request? Due to optimizer removing some unnecessary aliases, the logical and physical plan may have different output attribute ids. FileFormatWriter should handle this when creating the physical sort node. ## How was this patch tested? new regression test. Author: Wenchen Fan <[email protected]> Closes #19483 from cloud-fan/bug2.
1 parent 3ff766f commit ec12220

File tree

3 files changed

+29
-2
lines changed

3 files changed

+29
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,13 @@ object FileFormatWriter extends Logging {
180180
val rdd = if (orderingMatched) {
181181
queryExecution.toRdd
182182
} else {
183+
// SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and
184+
// the physical plan may have different attribute ids due to optimizer removing some
185+
// aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch.
186+
val orderingExpr = requiredOrdering
187+
.map(SortOrder(_, Ascending)).map(BindReferences.bindReference(_, allColumns))
183188
SortExec(
184-
requiredOrdering.map(SortOrder(_, Ascending)),
189+
orderingExpr,
185190
global = false,
186191
child = queryExecution.executedPlan).execute()
187192
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class FileFormatWriterSuite extends QueryTest with SharedSQLContext {
3232
}
3333
}
3434

35-
test("FileFormatWriter should respect the input query schema") {
35+
test("SPARK-22252: FileFormatWriter should respect the input query schema") {
3636
withTable("t1", "t2", "t3", "t4") {
3737
spark.range(1).select('id as 'col1, 'id as 'col2).write.saveAsTable("t1")
3838
spark.sql("select COL1, COL2 from t1").write.saveAsTable("t2")

sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -728,4 +728,26 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
728728
assert(e.contains("mismatched input 'ROW'"))
729729
}
730730
}
731+
732+
test("SPARK-21165: FileFormatWriter should only rely on attributes from analyzed plan") {
733+
withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
734+
withTable("tab1", "tab2") {
735+
Seq(("a", "b", 3)).toDF("word", "first", "length").write.saveAsTable("tab1")
736+
737+
spark.sql(
738+
"""
739+
|CREATE TABLE tab2 (word string, length int)
740+
|PARTITIONED BY (first string)
741+
""".stripMargin)
742+
743+
spark.sql(
744+
"""
745+
|INSERT INTO TABLE tab2 PARTITION(first)
746+
|SELECT word, length, cast(first as string) as first FROM tab1
747+
""".stripMargin)
748+
749+
checkAnswer(spark.table("tab2"), Row("a", 3, "b"))
750+
}
751+
}
752+
}
731753
}

0 commit comments

Comments
 (0)