Skip to content

Commit b46f75a

Browse files
mgaido91cloud-fan
authored andcommitted
[SPARK-26057][SQL] Transform also analyzed plans when dedup references
## What changes were proposed in this pull request? In SPARK-24865 `AnalysisBarrier` was removed and in order to improve resolution speed, the `analyzed` flag was (re-)introduced in order to process only plans which are not yet analyzed. This should not be the case when performing attribute deduplication as in that case we need to transform also the plans which were already analyzed, otherwise we can miss to rewrite some attributes leading to invalid plans. ## How was this patch tested? added UT Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #23035 from mgaido91/SPARK-26057. Authored-by: Marco Gaido <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 44d4ef6 commit b46f75a

File tree

2 files changed

+26
-1
lines changed

2 files changed

+26
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -871,7 +871,7 @@ class Analyzer(
871871
private def dedupOuterReferencesInSubquery(
872872
plan: LogicalPlan,
873873
attrMap: AttributeMap[Attribute]): LogicalPlan = {
874-
plan resolveOperatorsDown { case currentFragment =>
874+
plan transformDown { case currentFragment =>
875875
currentFragment transformExpressions {
876876
case OuterReference(a: Attribute) =>
877877
OuterReference(dedupAttr(a, attrMap))

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2554,4 +2554,29 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
25542554

25552555
checkAnswer(swappedDf.filter($"key"($"map") > "a"), Row(2, Map(2 -> "b")))
25562556
}
2557+
2558+
test("SPARK-26057: attribute deduplication on already analyzed plans") {
2559+
withTempView("a", "b", "v") {
2560+
val df1 = Seq(("1-1", 6)).toDF("id", "n")
2561+
df1.createOrReplaceTempView("a")
2562+
val df3 = Seq("1-1").toDF("id")
2563+
df3.createOrReplaceTempView("b")
2564+
spark.sql(
2565+
"""
2566+
|SELECT a.id, n as m
2567+
|FROM a
2568+
|WHERE EXISTS(
2569+
| SELECT 1
2570+
| FROM b
2571+
| WHERE b.id = a.id)
2572+
""".stripMargin).createOrReplaceTempView("v")
2573+
val res = spark.sql(
2574+
"""
2575+
|SELECT a.id, n, m
2576+
| FROM a
2577+
| LEFT OUTER JOIN v ON v.id = a.id
2578+
""".stripMargin)
2579+
checkAnswer(res, Row("1-1", 6, 6))
2580+
}
2581+
}
25572582
}

0 commit comments

Comments
 (0)