From 0a49fbe9031608f71beb472713da8f14fe1d23e3 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Wed, 20 Jul 2022 18:08:31 +0800 Subject: [PATCH 1/8] [SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging --- .../v2/V2ScanRelationPushDown.scala | 40 ++++++- .../apache/spark/sql/jdbc/JDBCV2Suite.scala | 107 +++++++++++++----- 2 files changed, 112 insertions(+), 35 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 01b0ae451b2a9..ea3de8c796eda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.mutable -import org.apache.spark.sql.catalyst.expressions.{aggregate, Alias, And, Attribute, AttributeReference, AttributeSet, Cast, Expression, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, AttributeSet, Cast, Expression, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.aggregate import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.optimizer.CollapseProject import org.apache.spark.sql.catalyst.planning.ScanOperation @@ -400,6 +401,13 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { } } + private def findGroupColumn(alias: Alias): Option[AttributeReference] = alias match { + case alias @ Alias(attr: AttributeReference, name) if attr.name.startsWith("group_col_") => + Some(AttributeReference(name, attr.dataType)(alias.exprId)) + case Alias(alias: Alias, _) => findGroupColumn(alias) + case _ => None + } + private def pushDownLimit(plan: LogicalPlan, limit: Int): (LogicalPlan, Boolean) = plan match { case operation @ ScanOperation(_, filter, sHolder: ScanBuilderHolder) if filter.isEmpty => val (isPushed, isPartiallyPushed) = PushDownUtils.pushLimit(sHolder.builder, limit) @@ -410,12 +418,34 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { case s @ Sort(order, _, operation @ ScanOperation(project, filter, sHolder: ScanBuilderHolder)) // Without building the Scan, we do not know the resulting column names after aggregate // push-down, and thus can't push down Top-N which needs to know the ordering column names. - // TODO: we can support simple cases like GROUP BY columns directly and ORDER BY the same - // columns, which we know the resulting column names: the original table columns. - if sHolder.pushedAggregate.isEmpty && filter.isEmpty && + // In particular, we push down the simple cases like GROUP BY columns directly and ORDER BY + // the same columns, which we know the resulting column names: the original table columns. + if filter.isEmpty && CollapseProject.canCollapseExpressions(order, project, alwaysInline = true) => val aliasMap = getAliasMap(project) - val newOrder = order.map(replaceAlias(_, aliasMap)).asInstanceOf[Seq[SortOrder]] + + def findGroupColForSortOrder(sortOrder: SortOrder): Option[SortOrder] = sortOrder match { + case SortOrder(attr: AttributeReference, direction, nullOrdering, sameOrderExpressions) + if aliasMap.contains(attr) => + val newAttr = findGroupColumn(aliasMap(attr)) + if (newAttr.isDefined) { + Some(SortOrder(newAttr.get, direction, nullOrdering, sameOrderExpressions)) + } else { + None + } + case _ => None + } + + lazy val orderByGroupCols = order.flatMap(findGroupColForSortOrder) + if (sHolder.pushedAggregate.isDefined && orderByGroupCols.length != order.length) { + return (s, false) + } + + val newOrder = if (sHolder.pushedAggregate.isDefined) { + orderByGroupCols + } else { + order.map(replaceAlias(_, aliasMap)).asInstanceOf[Seq[SortOrder]] + } val normalizedOrders = DataSourceStrategy.normalizeExprs( newOrder, sHolder.relation.output).asInstanceOf[Seq[SortOrder]] val orders = DataSourceStrategy.translateSortOrders(normalizedOrders) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index d64b181500753..d986c15640c0d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -718,56 +718,45 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df5, Seq(Row(1, "cathy", 9000.00, 1200.0, false), Row(1, "amy", 10000.00, 1000.0, true))) + val name = udf { (x: String) => x.matches("cat|dav|amy") } + val sub = udf { (x: String) => x.substring(0, 3) } val df6 = spark.read .table("h2.test.employee") - .groupBy("DEPT").sum("SALARY") - .orderBy("DEPT") + .select($"SALARY", $"BONUS", sub($"NAME").as("shortName")) + .filter(name($"shortName")) + .sort($"SALARY".desc) .limit(1) + // LIMIT is pushed down only if all the filters are pushed down checkSortRemoved(df6, false) checkLimitRemoved(df6, false) - checkPushedInfo(df6, "PushedAggregates: [SUM(SALARY)]," + - " PushedFilters: [], PushedGroupByExpressions: [DEPT], ") - checkAnswer(df6, Seq(Row(1, 19000.00))) + checkPushedInfo(df6, "PushedFilters: [], ") + checkAnswer(df6, Seq(Row(10000.00, 1000.0, "amy"))) - val name = udf { (x: String) => x.matches("cat|dav|amy") } - val sub = udf { (x: String) => x.substring(0, 3) } val df7 = spark.read .table("h2.test.employee") - .select($"SALARY", $"BONUS", sub($"NAME").as("shortName")) - .filter(name($"shortName")) - .sort($"SALARY".desc) + .sort(sub($"NAME")) .limit(1) - // LIMIT is pushed down only if all the filters are pushed down checkSortRemoved(df7, false) checkLimitRemoved(df7, false) checkPushedInfo(df7, "PushedFilters: [], ") - checkAnswer(df7, Seq(Row(10000.00, 1000.0, "amy"))) + checkAnswer(df7, Seq(Row(2, "alex", 12000.00, 1200.0, false))) val df8 = spark.read - .table("h2.test.employee") - .sort(sub($"NAME")) - .limit(1) - checkSortRemoved(df8, false) - checkLimitRemoved(df8, false) - checkPushedInfo(df8, "PushedFilters: [], ") - checkAnswer(df8, Seq(Row(2, "alex", 12000.00, 1200.0, false))) - - val df9 = spark.read .table("h2.test.employee") .select($"DEPT", $"name", $"SALARY", when(($"SALARY" > 8000).and($"SALARY" < 10000), $"salary").otherwise(0).as("key")) .sort("key", "dept", "SALARY") .limit(3) - checkSortRemoved(df9) - checkLimitRemoved(df9) - checkPushedInfo(df9, "PushedFilters: [], " + + checkSortRemoved(df8) + checkLimitRemoved(df8) + checkPushedInfo(df8, "PushedFilters: [], " + "PushedTopN: " + "ORDER BY [CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END " + "ASC NULLS FIRST, DEPT ASC NULLS FIRST, SALARY ASC NULLS FIRST] LIMIT 3,") - checkAnswer(df9, + checkAnswer(df8, Seq(Row(1, "amy", 10000, 0), Row(2, "david", 10000, 0), Row(2, "alex", 12000, 0))) - val df10 = spark.read + val df9 = spark.read .option("partitionColumn", "dept") .option("lowerBound", "0") .option("upperBound", "2") @@ -777,13 +766,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel when(($"SALARY" > 8000).and($"SALARY" < 10000), $"salary").otherwise(0).as("key")) .orderBy($"key", $"dept", $"SALARY") .limit(3) - checkSortRemoved(df10, false) - checkLimitRemoved(df10, false) - checkPushedInfo(df10, "PushedFilters: [], " + + checkSortRemoved(df9, false) + checkLimitRemoved(df9, false) + checkPushedInfo(df9, "PushedFilters: [], " + "PushedTopN: " + "ORDER BY [CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END " + "ASC NULLS FIRST, DEPT ASC NULLS FIRST, SALARY ASC NULLS FIRST] LIMIT 3,") - checkAnswer(df10, + checkAnswer(df9, Seq(Row(1, "amy", 10000, 0), Row(2, "david", 10000, 0), Row(2, "alex", 12000, 0))) } @@ -811,6 +800,64 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df2, Seq(Row(2, "david", 10000.00))) } + test("scan with aggregate push-down and top N push-down") { + val df1 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .orderBy("DEPT") + .limit(1) + checkSortRemoved(df1) + checkLimitRemoved(df1) + checkPushedInfo(df1, "PushedAggregates: [SUM(SALARY)]," + + " PushedFilters: [], PushedGroupByExpressions: [DEPT], ") + checkAnswer(df1, Seq(Row(1, 19000.00))) + + val df2 = sql( + """ + |SELECT dept AS my_dept, SUM(SALARY) FROM h2.test.employee + |GROUP BY dept + |ORDER BY my_dept + |LIMIT 1 + |""".stripMargin) + checkSortRemoved(df2) + checkLimitRemoved(df2) + checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []") + checkAnswer(df2, Seq(Row(1, 19000.00))) + + val df4 = sql( + """ + |SELECT dept, SUM(SALARY) FROM h2.test.employee + |GROUP BY dept + |ORDER BY SUM(SALARY) + |LIMIT 1 + |""".stripMargin) + checkSortRemoved(df4, false) + checkLimitRemoved(df4, false) + checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []") + checkAnswer(df4, Seq(Row(6, 12000.00))) + + val df5 = sql( + """ + |SELECT dept, SUM(SALARY) AS total FROM h2.test.employee + |GROUP BY dept + |ORDER BY total + |LIMIT 1 + |""".stripMargin) + checkSortRemoved(df5, false) + checkLimitRemoved(df5, false) + checkPushedInfo(df5, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []") + checkAnswer(df5, Seq(Row(6, 12000.00))) + } + test("scan with filter push-down") { val df = spark.table("h2.test.people").filter($"id" > 1) checkFiltersRemoved(df) From ae0940d7c06bb85c0bdacfc96ff110de82c54df0 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Wed, 20 Jul 2022 19:40:52 +0800 Subject: [PATCH 2/8] Update code --- .../v2/V2ScanRelationPushDown.scala | 14 +-- .../apache/spark/sql/jdbc/JDBCV2Suite.scala | 111 +++++++++++++++++- 2 files changed, 114 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index ea3de8c796eda..3edea8f4dae86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -424,15 +424,13 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { CollapseProject.canCollapseExpressions(order, project, alwaysInline = true) => val aliasMap = getAliasMap(project) + // TODO support push down ORDER BY expressions. def findGroupColForSortOrder(sortOrder: SortOrder): Option[SortOrder] = sortOrder match { - case SortOrder(attr: AttributeReference, direction, nullOrdering, sameOrderExpressions) - if aliasMap.contains(attr) => - val newAttr = findGroupColumn(aliasMap(attr)) - if (newAttr.isDefined) { - Some(SortOrder(newAttr.get, direction, nullOrdering, sameOrderExpressions)) - } else { - None - } + case SortOrder(attr: AttributeReference, direction, nullOrdering, sameOrderExpressions) => + findGroupColumn(aliasMap(attr)).filter { groupCol => + sHolder.relation.output.exists(out => out.name.equalsIgnoreCase(groupCol.name) && + out.exprId == groupCol.exprId) + }.map(SortOrder(_, direction, nullOrdering, sameOrderExpressions)) case _ => None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index d986c15640c0d..80714c7cde517 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -808,8 +808,11 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .limit(1) checkSortRemoved(df1) checkLimitRemoved(df1) - checkPushedInfo(df1, "PushedAggregates: [SUM(SALARY)]," + - " PushedFilters: [], PushedGroupByExpressions: [DEPT], ") + checkPushedInfo(df1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") checkAnswer(df1, Seq(Row(1, 19000.00))) val df2 = sql( @@ -824,9 +827,26 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkPushedInfo(df2, "PushedAggregates: [SUM(SALARY)]", "PushedGroupByExpressions: [DEPT]", - "PushedFilters: []") + "PushedFilters: []", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 1") checkAnswer(df2, Seq(Row(1, 19000.00))) + val df3 = spark.read + .table("h2.test.employee") + .select($"SALARY", + when(($"SALARY" > 8000).and($"SALARY" < 10000), $"salary").otherwise(0).as("key")) + .groupBy("key").sum("SALARY") + .orderBy("key") + .limit(1) + checkSortRemoved(df3, false) + checkLimitRemoved(df3, false) + checkPushedInfo(df3, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + + "[CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END]", + "PushedFilters: []") + checkAnswer(df3, Seq(Row(0, 44000.00))) + val df4 = sql( """ |SELECT dept, SUM(SALARY) FROM h2.test.employee @@ -858,6 +878,91 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df5, Seq(Row(6, 12000.00))) } + test("scan with aggregate push-down and paging push-down") { + val df1 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .orderBy("DEPT") + .offset(1) + .limit(1) + checkSortRemoved(df1) + checkLimitRemoved(df1) + checkPushedInfo(df1, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedOffset: OFFSET 1", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 2") + checkAnswer(df1, Seq(Row(2, 22000.00))) + + val df2 = sql( + """ + |SELECT dept AS my_dept, SUM(SALARY) FROM h2.test.employee + |GROUP BY dept + |ORDER BY my_dept + |LIMIT 1 + |OFFSET 1 + |""".stripMargin) + checkSortRemoved(df2) + checkLimitRemoved(df2) + checkPushedInfo(df2, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []", + "PushedOffset: OFFSET 1", + "PushedTopN: ORDER BY [DEPT ASC NULLS FIRST] LIMIT 2") + checkAnswer(df2, Seq(Row(2, 22000.00))) + + val df3 = spark.read + .table("h2.test.employee") + .select($"SALARY", + when(($"SALARY" > 8000).and($"SALARY" < 10000), $"salary").otherwise(0).as("key")) + .groupBy("key").sum("SALARY") + .orderBy("key") + .offset(1) + .limit(1) + checkSortRemoved(df3, false) + checkLimitRemoved(df3, false) + checkPushedInfo(df3, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: " + + "[CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END]", + "PushedFilters: []") + checkAnswer(df3, Seq(Row(9000, 9000.00))) + + val df4 = sql( + """ + |SELECT dept, SUM(SALARY) FROM h2.test.employee + |GROUP BY dept + |ORDER BY SUM(SALARY) + |LIMIT 1 + |OFFSET 1 + |""".stripMargin) + checkSortRemoved(df4, false) + checkLimitRemoved(df4, false) + checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []") + checkAnswer(df4, Seq(Row(1, 19000.00))) + + val df5 = sql( + """ + |SELECT dept, SUM(SALARY) AS total FROM h2.test.employee + |GROUP BY dept + |ORDER BY total + |LIMIT 1 + |OFFSET 1 + |""".stripMargin) + checkSortRemoved(df5, false) + checkLimitRemoved(df5, false) + checkPushedInfo(df5, + "PushedAggregates: [SUM(SALARY)]", + "PushedGroupByExpressions: [DEPT]", + "PushedFilters: []") + checkAnswer(df5, Seq(Row(1, 19000.00))) + } + test("scan with filter push-down") { val df = spark.table("h2.test.people").filter($"id" > 1) checkFiltersRemoved(df) From 6563aff4f680b6716ac9b1e0925d68b90d2bfd89 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Wed, 20 Jul 2022 19:42:40 +0800 Subject: [PATCH 3/8] Update code --- .../sql/execution/datasources/v2/V2ScanRelationPushDown.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 3edea8f4dae86..c3e023548c4e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -428,8 +428,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { def findGroupColForSortOrder(sortOrder: SortOrder): Option[SortOrder] = sortOrder match { case SortOrder(attr: AttributeReference, direction, nullOrdering, sameOrderExpressions) => findGroupColumn(aliasMap(attr)).filter { groupCol => - sHolder.relation.output.exists(out => out.name.equalsIgnoreCase(groupCol.name) && - out.exprId == groupCol.exprId) + sHolder.relation.output.exists(out => out.semanticEquals(groupCol)) }.map(SortOrder(_, direction, nullOrdering, sameOrderExpressions)) case _ => None } From eedf418890e53c29f2ff3ba84f1c588d4c993a29 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Wed, 20 Jul 2022 19:43:21 +0800 Subject: [PATCH 4/8] Update code --- .../sql/execution/datasources/v2/V2ScanRelationPushDown.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index c3e023548c4e6..5558440676fa5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -424,7 +424,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { CollapseProject.canCollapseExpressions(order, project, alwaysInline = true) => val aliasMap = getAliasMap(project) - // TODO support push down ORDER BY expressions. + // TODO support push down Aggregate with ORDER BY expressions. def findGroupColForSortOrder(sortOrder: SortOrder): Option[SortOrder] = sortOrder match { case SortOrder(attr: AttributeReference, direction, nullOrdering, sameOrderExpressions) => findGroupColumn(aliasMap(attr)).filter { groupCol => From 55a965722650eb951701625cd0a72a874fa45067 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Fri, 22 Jul 2022 18:37:46 +0800 Subject: [PATCH 5/8] Update code --- .../sql/execution/datasources/v2/V2ScanRelationPushDown.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 5558440676fa5..48d05e798918c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -420,11 +420,11 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { // push-down, and thus can't push down Top-N which needs to know the ordering column names. // In particular, we push down the simple cases like GROUP BY columns directly and ORDER BY // the same columns, which we know the resulting column names: the original table columns. + // TODO support push down Aggregate with ORDER BY expressions. if filter.isEmpty && CollapseProject.canCollapseExpressions(order, project, alwaysInline = true) => val aliasMap = getAliasMap(project) - // TODO support push down Aggregate with ORDER BY expressions. def findGroupColForSortOrder(sortOrder: SortOrder): Option[SortOrder] = sortOrder match { case SortOrder(attr: AttributeReference, direction, nullOrdering, sameOrderExpressions) => findGroupColumn(aliasMap(attr)).filter { groupCol => @@ -433,7 +433,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { case _ => None } - lazy val orderByGroupCols = order.flatMap(findGroupColForSortOrder) + val orderByGroupCols = order.flatMap(findGroupColForSortOrder) if (sHolder.pushedAggregate.isDefined && orderByGroupCols.length != order.length) { return (s, false) } From ad36dc11b0db86edf9f2f97b6a7a2fe618204296 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Mon, 25 Jul 2022 15:18:31 +0800 Subject: [PATCH 6/8] Update code --- .../sql/execution/datasources/v2/V2ScanRelationPushDown.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 48d05e798918c..8d21ade982bbc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -433,7 +433,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { case _ => None } - val orderByGroupCols = order.flatMap(findGroupColForSortOrder) + lazy val orderByGroupCols = order.flatMap(findGroupColForSortOrder) if (sHolder.pushedAggregate.isDefined && orderByGroupCols.length != order.length) { return (s, false) } From 39e109a3b77ba112d2c1c366f3b760b341fbc3cf Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Mon, 25 Jul 2022 15:22:16 +0800 Subject: [PATCH 7/8] Update code --- .../datasources/v2/V2ScanRelationPushDown.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 8d21ade982bbc..17411d277239f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -433,12 +433,11 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { case _ => None } - lazy val orderByGroupCols = order.flatMap(findGroupColForSortOrder) - if (sHolder.pushedAggregate.isDefined && orderByGroupCols.length != order.length) { - return (s, false) - } - val newOrder = if (sHolder.pushedAggregate.isDefined) { + val orderByGroupCols = order.flatMap(findGroupColForSortOrder) + if (orderByGroupCols.length != order.length) { + return (s, false) + } orderByGroupCols } else { order.map(replaceAlias(_, aliasMap)).asInstanceOf[Seq[SortOrder]] From eeba4e7a285dc43c75951fa14e5c50f7e93a349e Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Tue, 26 Jul 2022 11:03:57 +0800 Subject: [PATCH 8/8] Update code --- .../sql/execution/datasources/v2/V2ScanRelationPushDown.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 17411d277239f..59954f88c36bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -19,8 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.mutable -import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, AttributeSet, Cast, Expression, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, SubqueryExpression} -import org.apache.spark.sql.catalyst.expressions.aggregate +import org.apache.spark.sql.catalyst.expressions.{aggregate, Alias, And, Attribute, AttributeReference, AttributeSet, Cast, Expression, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.optimizer.CollapseProject import org.apache.spark.sql.catalyst.planning.ScanOperation