From 4224f6299f3aa24a0c261f2bed35951980aeb7d8 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 6 Aug 2025 13:26:27 -0700 Subject: [PATCH 1/3] fix --- .../optimizer/RemoveRedundantAggregates.scala | 8 ++++++- .../RemoveRedundantAggregatesSuite.scala | 21 ++++++++++++++++++- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala index d6a4bd030c9d..b4602d0ddcc9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala @@ -54,7 +54,13 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan] with AliasHelper { .map(_.toAttribute) )) - upperHasNoDuplicateSensitiveAgg && upperRefsOnlyDeterministicNonAgg + // If the lower aggregation is global, it is not redundant because a project with + // non-aggregate expressions is different with global aggregation in semantics. + // E.g., if the input relation is empty, a project might be optimized to an empty + // relation, while a global aggregation will return a single row. + lazy val lowerIsGlobalAgg = lower.groupingExpressions.isEmpty + + upperHasNoDuplicateSensitiveAgg && upperRefsOnlyDeterministicNonAgg && !lowerIsGlobalAgg } private def isDuplicateSensitive(ae: AggregateExpression): Boolean = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregatesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregatesSuite.scala index 2af3057c0b85..4c175b0bab6b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregatesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregatesSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{Expression, PythonUDAF} +import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, PythonUDAF} import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi, PlanTest} import org.apache.spark.sql.catalyst.plans.logical.{Distinct, LocalRelation, LogicalPlan} @@ -289,4 +289,23 @@ class RemoveRedundantAggregatesSuite extends PlanTest { val originalQuery = Distinct(x.groupBy($"a", $"b")($"a", TrueLiteral)).analyze comparePlans(Optimize.execute(originalQuery), originalQuery) } + + test("global lower agggregation should not be removed") { + object OptimizeNonRemovedRedundantAgg extends RuleExecutor[LogicalPlan] { + val batches = Batch("RemoveRedundantAggregates", FixedPoint(10), + PropagateEmptyRelation, + RemoveRedundantAggregates) :: Nil + } + + val query = relation + .groupBy()(Literal(1).as("col1"), Literal(2).as("col2"), Literal(3).as("col3")) + .groupBy($"col1")(max($"col1")) + .analyze + val expected = relation + .groupBy()(Literal(1).as("col1"), Literal(2).as("col2"), Literal(3).as("col3")) + .groupBy($"col1")(max($"col1")) + .analyze + val optimized = OptimizeNonRemovedRedundantAgg.execute(query) + comparePlans(optimized, expected) + } } From 75123c88ff009107ae0b0d274d4990afacfaaed5 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 6 Aug 2025 13:30:22 -0700 Subject: [PATCH 2/3] add jira --- .../sql/catalyst/optimizer/RemoveRedundantAggregatesSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregatesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregatesSuite.scala index 4c175b0bab6b..40b3d36d4bfc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregatesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregatesSuite.scala @@ -290,7 +290,7 @@ class RemoveRedundantAggregatesSuite extends PlanTest { comparePlans(Optimize.execute(originalQuery), originalQuery) } - test("global lower agggregation should not be removed") { + test("SPARK-53155: global lower aggregation should not be removed") { object OptimizeNonRemovedRedundantAgg extends RuleExecutor[LogicalPlan] { val batches = Batch("RemoveRedundantAggregates", FixedPoint(10), PropagateEmptyRelation, From 808a35c6378c49cab2e738aa323d9cf54fc6b6bf Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 6 Aug 2025 13:43:23 -0700 Subject: [PATCH 3/3] add e2e test --- .../org/apache/spark/sql/DataFrameAggregateSuite.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 656c739af246..721d1c1deea9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -2594,6 +2594,13 @@ class DataFrameAggregateSuite extends QueryTest res, Row(LocalTime.of(22, 1, 0), LocalTime.of(3, 0, 0))) } + + test("SPARK-53155: global lower aggregation should not be removed") { + val df = emptyTestData + .groupBy().agg(lit(1).as("col1"), lit(2).as("col2"), lit(3).as("col3")) + .groupBy($"col1").agg(max("col1")) + checkAnswer(df, Seq(Row(1, 1))) + } } case class B(c: Option[Double])