Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan] with AliasHelper {
.map(_.toAttribute)
))

upperHasNoDuplicateSensitiveAgg && upperRefsOnlyDeterministicNonAgg
// If the lower aggregation is global, it is not redundant because a project with
// non-aggregate expressions is different with global aggregation in semantics.
// E.g., if the input relation is empty, a project might be optimized to an empty
// relation, while a global aggregation will return a single row.
lazy val lowerIsGlobalAgg = lower.groupingExpressions.isEmpty

upperHasNoDuplicateSensitiveAgg && upperRefsOnlyDeterministicNonAgg && !lowerIsGlobalAgg
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the JIRA issue, is this bug starting at Spark 3.4? Or, is it a long-standing bug which we have before?

Copy link
Member Author

@viirya viirya Aug 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, the issue seems there since RemoveRedundantAggregates was introduced.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Aug 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

private def isDuplicateSensitive(ae: AggregateExpression): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{Expression, PythonUDAF}
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, PythonUDAF}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical.{Distinct, LocalRelation, LogicalPlan}
Expand Down Expand Up @@ -289,4 +289,23 @@ class RemoveRedundantAggregatesSuite extends PlanTest {
val originalQuery = Distinct(x.groupBy($"a", $"b")($"a", TrueLiteral)).analyze
comparePlans(Optimize.execute(originalQuery), originalQuery)
}

test("SPARK-53155: global lower aggregation should not be removed") {
object OptimizeNonRemovedRedundantAgg extends RuleExecutor[LogicalPlan] {
val batches = Batch("RemoveRedundantAggregates", FixedPoint(10),
PropagateEmptyRelation,
RemoveRedundantAggregates) :: Nil
}

val query = relation
.groupBy()(Literal(1).as("col1"), Literal(2).as("col2"), Literal(3).as("col3"))
.groupBy($"col1")(max($"col1"))
.analyze
val expected = relation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expected = query?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, as seems it doesn't look like necessary so I didn't change this to save another CI round. Thank you.

.groupBy()(Literal(1).as("col1"), Literal(2).as("col2"), Literal(3).as("col3"))
.groupBy($"col1")(max($"col1"))
.analyze
val optimized = OptimizeNonRemovedRedundantAgg.execute(query)
comparePlans(optimized, expected)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2594,6 +2594,13 @@ class DataFrameAggregateSuite extends QueryTest
res,
Row(LocalTime.of(22, 1, 0), LocalTime.of(3, 0, 0)))
}

test("SPARK-53155: global lower aggregation should not be removed") {
val df = emptyTestData
.groupBy().agg(lit(1).as("col1"), lit(2).as("col2"), lit(3).as("col3"))
.groupBy($"col1").agg(max("col1"))
checkAnswer(df, Seq(Row(1, 1)))
}
}

case class B(c: Option[Double])
Expand Down