From fba05c7975682dba4e5c97b4765df689e93f667e Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Sun, 21 Mar 2021 11:03:07 +0200 Subject: [PATCH 1/3] Address comments --- .../sql/catalyst/optimizer/Optimizer.scala | 44 ------------ .../optimizer/RemoveRedundantAggregates.scala | 68 +++++++++++++++++++ 2 files changed, 68 insertions(+), 44 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 3e3550d5da89..6f8e51e1de7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -497,50 +497,6 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = removeRedundantAliases(plan, AttributeSet.empty) } -/** - * Remove redundant aggregates from a query plan. A redundant aggregate is an aggregate whose - * only goal is to keep distinct values, while its parent aggregate would ignore duplicate values. - */ -object RemoveRedundantAggregates extends Rule[LogicalPlan] with AliasHelper { - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case upper @ Aggregate(_, _, lower: Aggregate) if lowerIsRedundant(upper, lower) => - val aliasMap = getAliasMap(lower) - - val newAggregate = upper.copy( - child = lower.child, - groupingExpressions = upper.groupingExpressions.map(replaceAlias(_, aliasMap)), - aggregateExpressions = upper.aggregateExpressions.map( - replaceAliasButKeepName(_, aliasMap)) - ) - - // We might have introduces non-deterministic grouping expression - if (newAggregate.groupingExpressions.exists(!_.deterministic)) { - PullOutNondeterministic.applyLocally.applyOrElse(newAggregate, identity[LogicalPlan]) - } else { - newAggregate - } - } - - private def lowerIsRedundant(upper: Aggregate, lower: Aggregate): Boolean = { - val upperHasNoAggregateExpressions = !upper.aggregateExpressions.exists(isAggregate) - - lazy val upperRefsOnlyDeterministicNonAgg = upper.references.subsetOf(AttributeSet( - lower - .aggregateExpressions - .filter(_.deterministic) - .filter(!isAggregate(_)) - .map(_.toAttribute) - )) - - upperHasNoAggregateExpressions && upperRefsOnlyDeterministicNonAgg - } - - private def isAggregate(expr: Expression): Boolean = { - expr.find(e => e.isInstanceOf[AggregateExpression] || - PythonUDF.isGroupedAggPandasUDF(e)).isDefined - } -} - /** * Remove no-op operators from the query plan that do not make any modifications. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala new file mode 100644 index 000000000000..9daefd4ec900 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.analysis.PullOutNondeterministic +import org.apache.spark.sql.catalyst.expressions.{AliasHelper, AttributeSet, Expression, PythonUDF} +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule + +/** + * Remove redundant aggregates from a query plan. A redundant aggregate is an aggregate whose + * only goal is to keep distinct values, while its parent aggregate would ignore duplicate values. + */ +object RemoveRedundantAggregates extends Rule[LogicalPlan] with AliasHelper { + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + case upper @ Aggregate(_, _, lower: Aggregate) if isLowerRedundant(upper, lower) => + val aliasMap = getAliasMap(lower) + + val newAggregate = upper.copy( + child = lower.child, + groupingExpressions = upper.groupingExpressions.map(replaceAlias(_, aliasMap)), + aggregateExpressions = upper.aggregateExpressions.map( + replaceAliasButKeepName(_, aliasMap)) + ) + + // We might have introduced non-deterministic grouping expressions + if (newAggregate.groupingExpressions.exists(!_.deterministic)) { + PullOutNondeterministic.applyLocally.applyOrElse(newAggregate, identity[LogicalPlan]) + } else { + newAggregate + } + } + + private def isLowerRedundant(upper: Aggregate, lower: Aggregate): Boolean = { + val upperHasNoAggregateExpressions = !upper.aggregateExpressions.exists(isAggregate) + + lazy val upperRefsOnlyDeterministicNonAgg = upper.references.subsetOf(AttributeSet( + lower + .aggregateExpressions + .filter(_.deterministic) + .filterNot(isAggregate) + .map(_.toAttribute) + )) + + upperHasNoAggregateExpressions && upperRefsOnlyDeterministicNonAgg + } + + private def isAggregate(expr: Expression): Boolean = { + expr.find(e => e.isInstanceOf[AggregateExpression] || + PythonUDF.isGroupedAggPandasUDF(e)).isDefined + } +} From 19748dcc88108019071971aeac7d1370d24d46cd Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Sun, 21 Mar 2021 14:26:15 +0200 Subject: [PATCH 2/3] Allow duplicate angostic aggs --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../optimizer/RemoveRedundantAggregates.scala | 13 +++++++++++-- .../RemoveRedundantAggregatesSuite.scala | 16 +++++++++++++++- 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 6f8e51e1de7d..a204f8552d04 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -358,7 +358,7 @@ object EliminateDistinct extends Rule[LogicalPlan] { ae.copy(isDistinct = false) } - private def isDuplicateAgnostic(af: AggregateFunction): Boolean = af match { + def isDuplicateAgnostic(af: AggregateFunction): Boolean = af match { case _: Max => true case _: Min => true case _: BitAndAgg => true diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala index 9daefd4ec900..8d938b93d664 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala @@ -48,7 +48,9 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan] with AliasHelper { } private def isLowerRedundant(upper: Aggregate, lower: Aggregate): Boolean = { - val upperHasNoAggregateExpressions = !upper.aggregateExpressions.exists(isAggregate) + val upperHasNoDuplicateSensitiveAgg = !upper + .aggregateExpressions + .exists(isDuplicateSensitiveAggregate) lazy val upperRefsOnlyDeterministicNonAgg = upper.references.subsetOf(AttributeSet( lower @@ -58,11 +60,18 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan] with AliasHelper { .map(_.toAttribute) )) - upperHasNoAggregateExpressions && upperRefsOnlyDeterministicNonAgg + upperHasNoDuplicateSensitiveAgg && upperRefsOnlyDeterministicNonAgg } private def isAggregate(expr: Expression): Boolean = { expr.find(e => e.isInstanceOf[AggregateExpression] || PythonUDF.isGroupedAggPandasUDF(e)).isDefined } + + private def isDuplicateSensitiveAggregate(expr: Expression): Boolean = { + expr.find { + case ae: AggregateExpression => !EliminateDistinct.isDuplicateAgnostic(ae.aggregateFunction) + case e => PythonUDF.isGroupedAggPandasUDF(e) + }.isDefined + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregatesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregatesSuite.scala index d376c31ef965..7ca7c45aeab5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregatesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregatesSuite.scala @@ -126,7 +126,7 @@ class RemoveRedundantAggregatesSuite extends PlanTest { comparePlans(optimized, expected) } - test("Keep non-redundant aggregate - upper has agg expression") { + test("Keep non-redundant aggregate - upper has duplicate sensitive agg expression") { val relation = LocalRelation('a.int, 'b.int) for (agg <- aggregates('b)) { val query = relation @@ -139,6 +139,20 @@ class RemoveRedundantAggregatesSuite extends PlanTest { } } + test("Remove redundant aggregate - upper has duplicate agnostic agg expression") { + val relation = LocalRelation('a.int, 'b.int) + val query = relation + .groupBy('a, 'b)('a, 'b) + // The max does not change if there are duplicate values + .groupBy('a)('a, max('b)) + .analyze + val expected = relation + .groupBy('a)('a, max('b)) + .analyze + val optimized = Optimize.execute(query) + comparePlans(optimized, expected) + } + test("Keep non-redundant aggregate - upper references agg expression") { val relation = LocalRelation('a.int, 'b.int) for (agg <- aggregates('b)) { From 3fe698593cf78aeb7163130a2456fb7cca545719 Mon Sep 17 00:00:00 2001 From: Tanel Kiis Date: Mon, 24 May 2021 18:23:26 +0300 Subject: [PATCH 3/3] Merge master --- .../optimizer/RemoveRedundantAggregates.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala index 4cf1029a2cf3..a2f23acea92b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.AGGREGATE object RemoveRedundantAggregates extends Rule[LogicalPlan] with AliasHelper { def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( _.containsPattern(AGGREGATE), ruleId) { - case upper @ Aggregate(_, _, lower: Aggregate) if lowerIsRedundant(upper, lower) => + case upper @ Aggregate(_, _, lower: Aggregate) if isLowerRedundant(upper, lower) => val aliasMap = getAliasMap(lower) val newAggregate = upper.copy( @@ -49,9 +49,13 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan] with AliasHelper { } } - private def lowerIsRedundant(upper: Aggregate, lower: Aggregate): Boolean = { - val upperHasNoAggregateExpressions = - !upper.aggregateExpressions.exists(AggregateExpression.containsAggregate) + private def isLowerRedundant(upper: Aggregate, lower: Aggregate): Boolean = { + val upperHasNoDuplicateSensitiveAgg = upper + .aggregateExpressions + .forall(expr => expr.find { + case ae: AggregateExpression => !EliminateDistinct.isDuplicateAgnostic(ae.aggregateFunction) + case e => AggregateExpression.isAggregate(e) + }.isEmpty) lazy val upperRefsOnlyDeterministicNonAgg = upper.references.subsetOf(AttributeSet( lower @@ -61,6 +65,6 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan] with AliasHelper { .map(_.toAttribute) )) - upperHasNoAggregateExpressions && upperRefsOnlyDeterministicNonAgg + upperHasNoDuplicateSensitiveAgg && upperRefsOnlyDeterministicNonAgg } }