From 62a895febb8a91ce578133528a31060fe44981f9 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 14 Nov 2018 16:16:37 +0100 Subject: [PATCH 1/4] [SPARK-26054][SQL] Trasform also analyzed plans when dedup references --- .../sql/catalyst/analysis/Analyzer.scala | 2 +- .../org/apache/spark/sql/DataFrameSuite.scala | 30 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 6dc5b3f28b914..46d98de6d0ec2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -871,7 +871,7 @@ class Analyzer( private def dedupOuterReferencesInSubquery( plan: LogicalPlan, attrMap: AttributeMap[Attribute]): LogicalPlan = { - plan resolveOperatorsDown { case currentFragment => + plan transformDown { case currentFragment => currentFragment transformExpressions { case OuterReference(a: Attribute) => OuterReference(dedupAttr(a, attrMap)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 2bb18f48e0ae2..aab5059964cb0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2554,4 +2554,34 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { checkAnswer(swappedDf.filter($"key"($"map") > "a"), Row(2, Map(2 -> "b"))) } + + test("SPARK-26057: attribute deduplication on already analyzed plans") { + withTempView("cc", "p", "c") { + val df1 = Seq(("1-1", "sp", 6)).toDF("id", "layout", "n") + df1.createOrReplaceTempView("cc") + val df2 = Seq(("sp", 1)).toDF("layout", "ts") + df2.createOrReplaceTempView("p") + val df3 = Seq(("1-1", "sp", 3)).toDF("id", "layout", "ts") + df3.createOrReplaceTempView("c") + spark.sql( + """ + |SELECT cc.id, cc.layout, count(*) as m + |FROM cc + |JOIN p USING(layout) + |WHERE EXISTS( + | SELECT 1 + | FROM c + | WHERE c.id = cc.id AND c.layout = cc.layout AND c.ts > p.ts) + |GROUP BY cc.id, cc.layout + """.stripMargin).createOrReplaceTempView("pcc") + val res = spark.sql( + """ + |SELECT cc.id, cc.layout, n, m + | FROM cc + | LEFT OUTER JOIN pcc ON pcc.id = cc.id AND pcc.layout = cc.layout + """.stripMargin) + checkAnswer(res, Row("1-1", "sp", 6, 1)) + + } + } } From 63c70e5591b82fe16dcfd7a30d892dccb79bc72e Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 15 Nov 2018 09:10:44 +0100 Subject: [PATCH 2/4] simplify ut --- .../org/apache/spark/sql/DataFrameSuite.scala | 32 ++++++++----------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index aab5059964cb0..0d8f75474e7ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2556,31 +2556,27 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("SPARK-26057: attribute deduplication on already analyzed plans") { - withTempView("cc", "p", "c") { - val df1 = Seq(("1-1", "sp", 6)).toDF("id", "layout", "n") - df1.createOrReplaceTempView("cc") - val df2 = Seq(("sp", 1)).toDF("layout", "ts") - df2.createOrReplaceTempView("p") - val df3 = Seq(("1-1", "sp", 3)).toDF("id", "layout", "ts") - df3.createOrReplaceTempView("c") + withTempView("a", "b", "c", "v") { + val df1 = Seq(("1-1", 6)).toDF("id", "n") + df1.createOrReplaceTempView("a") + val df3 = Seq("1-1").toDF("id") + df3.createOrReplaceTempView("b") spark.sql( """ - |SELECT cc.id, cc.layout, count(*) as m - |FROM cc - |JOIN p USING(layout) + |SELECT a.id, n as m + |FROM a |WHERE EXISTS( | SELECT 1 - | FROM c - | WHERE c.id = cc.id AND c.layout = cc.layout AND c.ts > p.ts) - |GROUP BY cc.id, cc.layout - """.stripMargin).createOrReplaceTempView("pcc") + | FROM b + | WHERE b.id = a.id) + """.stripMargin).createOrReplaceTempView("v") val res = spark.sql( """ - |SELECT cc.id, cc.layout, n, m - | FROM cc - | LEFT OUTER JOIN pcc ON pcc.id = cc.id AND pcc.layout = cc.layout + |SELECT a.id, n, m + | FROM a + | LEFT OUTER JOIN v ON v.id = a.id """.stripMargin) - checkAnswer(res, Row("1-1", "sp", 6, 1)) + checkAnswer(res, Row("1-1", 6, 6)) } } From f23a46e8e2543d6fead27606525ea2e574ff24bb Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 15 Nov 2018 09:11:09 +0100 Subject: [PATCH 3/4] fix --- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 0d8f75474e7ac..963a5613a5281 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2556,7 +2556,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("SPARK-26057: attribute deduplication on already analyzed plans") { - withTempView("a", "b", "c", "v") { + withTempView("a", "b", "v") { val df1 = Seq(("1-1", 6)).toDF("id", "n") df1.createOrReplaceTempView("a") val df3 = Seq("1-1").toDF("id") From 98d91a323d891b8995791fc991d80cee9c4f7f97 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 15 Nov 2018 09:11:36 +0100 Subject: [PATCH 4/4] remove newline --- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 963a5613a5281..0ee2627814ba0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2577,7 +2577,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { | LEFT OUTER JOIN v ON v.id = a.id """.stripMargin) checkAnswer(res, Row("1-1", 6, 6)) - } } }