From 15b40eea444f69a08114edd3c82fb0998e4edab3 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 8 Oct 2015 02:11:27 +0800 Subject: [PATCH 1/2] SQL with windowing function should be able to refer column in inner select block. --- .../sql/catalyst/analysis/Analyzer.scala | 14 ++++++++++ .../sql/hive/execution/SQLQuerySuite.scala | 27 +++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index bf72d47ce1ea..e215bb990003 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -761,6 +761,13 @@ class Analyzer( }.isDefined } + private def hasAggregation(expr: NamedExpression): Boolean = { + expr.find { + case agg: AggregateExpression => true + case _ => false + }.isDefined + } + /** * From a Seq of [[NamedExpression]]s, extract expressions containing window expressions and * other regular expressions that do not contain any window expression. For example, for @@ -831,6 +838,13 @@ class Analyzer( val withName = Alias(agg, s"_w${extractedExprBuffer.length}")() extractedExprBuffer += withName withName.toAttribute + + case ne: Alias if hasWindowFunction(ne) && !hasAggregation(ne) => + ne.children.map(_.transform { + case e: NamedExpression => extractExpr(e) + }) + ne + }.asInstanceOf[NamedExpression] } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 8c3f9ac20263..3a6900cdf6ef 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -833,6 +833,33 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { ).map(i => Row(i._1, i._2, i._3))) } + test("window function: refer column in inner select block") { + val data = Seq( + WindowData(1, "a", 5), + WindowData(2, "a", 6), + WindowData(3, "b", 7), + WindowData(4, "b", 8), + WindowData(5, "c", 9), + WindowData(6, "c", 10) + ) + sparkContext.parallelize(data).toDF().registerTempTable("windowData") + + checkAnswer( + sql( + """ + |select area, rank() over (partition by area order by tmp.month) + tmp.tmp1 as c1 + |from (select month, area, product, 1 as tmp1 from windowData) tmp + """.stripMargin), + Seq( + ("a", 2), + ("a", 3), + ("b", 2), + ("b", 3), + ("c", 2), + ("c", 3) + ).map(i => Row(i._1, i._2))) + } + test("window function: partition and order expressions") { val data = Seq( WindowData(1, "a", 5), From 23f106d25fdf3cd01117c8d5fc69cd6d2b8496d1 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 10 Oct 2015 09:20:22 +0800 Subject: [PATCH 2/2] Extracts all other attributes. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e215bb990003..f5597a08d359 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -761,13 +761,6 @@ class Analyzer( }.isDefined } - private def hasAggregation(expr: NamedExpression): Boolean = { - expr.find { - case agg: AggregateExpression => true - case _ => false - }.isDefined - } - /** * From a Seq of [[NamedExpression]]s, extract expressions containing window expressions and * other regular expressions that do not contain any window expression. For example, for @@ -839,11 +832,8 @@ class Analyzer( extractedExprBuffer += withName withName.toAttribute - case ne: Alias if hasWindowFunction(ne) && !hasAggregation(ne) => - ne.children.map(_.transform { - case e: NamedExpression => extractExpr(e) - }) - ne + // Extracts other attributes + case attr: Attribute => extractExpr(attr) }.asInstanceOf[NamedExpression] }