Skip to content

Commit a0b499f

Browse files
cloud-fangatorsmile
authored andcommitted
[SPARK-20246][SQL] should not push predicate down through aggregate with non-deterministic expressions
## What changes were proposed in this pull request? Similar to `Project`, when `Aggregate` has non-deterministic expressions, we should not push predicate down through it, as it will change the number of input rows and thus change the evaluation result of non-deterministic expressions in `Aggregate`. ## How was this patch tested? new regression test Author: Wenchen Fan <[email protected]> Closes #17562 from cloud-fan/filter. (cherry picked from commit 7577e9c) Signed-off-by: Xiao Li <[email protected]>
1 parent 9016e17 commit a0b499f

File tree

2 files changed

+68
-33
lines changed

2 files changed

+68
-33
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,7 +1045,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
10451045
// implies that, for a given input row, the output are determined by the expression's initial
10461046
// state and all the input rows processed before. In another word, the order of input rows
10471047
// matters for non-deterministic expressions, while pushing down predicates changes the order.
1048-
case filter @ Filter(condition, project @ Project(fields, grandChild))
1048+
// This also applies to Aggregate.
1049+
case Filter(condition, project @ Project(fields, grandChild))
10491050
if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, condition) =>
10501051

10511052
// Create a map of Aliases to their values from the child projection.
@@ -1056,33 +1057,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
10561057

10571058
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
10581059

1059-
// Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be
1060-
// pushed beneath must satisfy the following conditions:
1061-
// 1. All the expressions are part of window partitioning key. The expressions can be compound.
1062-
// 2. Deterministic.
1063-
// 3. Placed before any non-deterministic predicates.
1064-
case filter @ Filter(condition, w: Window)
1065-
if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
1066-
val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))
1067-
1068-
val (candidates, containingNonDeterministic) =
1069-
splitConjunctivePredicates(condition).span(_.deterministic)
1070-
1071-
val (pushDown, rest) = candidates.partition { cond =>
1072-
cond.references.subsetOf(partitionAttrs)
1073-
}
1074-
1075-
val stayUp = rest ++ containingNonDeterministic
1076-
1077-
if (pushDown.nonEmpty) {
1078-
val pushDownPredicate = pushDown.reduce(And)
1079-
val newWindow = w.copy(child = Filter(pushDownPredicate, w.child))
1080-
if (stayUp.isEmpty) newWindow else Filter(stayUp.reduce(And), newWindow)
1081-
} else {
1082-
filter
1083-
}
1084-
1085-
case filter @ Filter(condition, aggregate: Aggregate) =>
1060+
case filter @ Filter(condition, aggregate: Aggregate)
1061+
if aggregate.aggregateExpressions.forall(_.deterministic) =>
10861062
// Find all the aliased expressions in the aggregate list that don't include any actual
10871063
// AggregateExpression, and create a map from the alias to the expression
10881064
val aliasMap = AttributeMap(aggregate.aggregateExpressions.collect {
@@ -1113,6 +1089,32 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
11131089
filter
11141090
}
11151091

1092+
// Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be
1093+
// pushed beneath must satisfy the following conditions:
1094+
// 1. All the expressions are part of window partitioning key. The expressions can be compound.
1095+
// 2. Deterministic.
1096+
// 3. Placed before any non-deterministic predicates.
1097+
case filter @ Filter(condition, w: Window)
1098+
if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
1099+
val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))
1100+
1101+
val (candidates, containingNonDeterministic) =
1102+
splitConjunctivePredicates(condition).span(_.deterministic)
1103+
1104+
val (pushDown, rest) = candidates.partition { cond =>
1105+
cond.references.subsetOf(partitionAttrs)
1106+
}
1107+
1108+
val stayUp = rest ++ containingNonDeterministic
1109+
1110+
if (pushDown.nonEmpty) {
1111+
val pushDownPredicate = pushDown.reduce(And)
1112+
val newWindow = w.copy(child = Filter(pushDownPredicate, w.child))
1113+
if (stayUp.isEmpty) newWindow else Filter(stayUp.reduce(And), newWindow)
1114+
} else {
1115+
filter
1116+
}
1117+
11161118
case filter @ Filter(condition, union: Union) =>
11171119
// Union could change the rows, so non-deterministic predicate can't be pushed down
11181120
val (pushDown, stayUp) = splitConjunctivePredicates(condition).span(_.deterministic)
@@ -1138,7 +1140,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
11381140
filter
11391141
}
11401142

1141-
case filter @ Filter(condition, u: UnaryNode)
1143+
case filter @ Filter(_, u: UnaryNode)
11421144
if canPushThrough(u) && u.expressions.forall(_.deterministic) =>
11431145
pushDownPredicate(filter, u.child) { predicate =>
11441146
u.withNewChildren(Seq(Filter(predicate, u.child)))

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,15 +134,20 @@ class FilterPushdownSuite extends PlanTest {
134134
comparePlans(optimized, correctAnswer)
135135
}
136136

137-
test("nondeterministic: can't push down filter with nondeterministic condition through project") {
137+
test("nondeterministic: can always push down filter through project with deterministic field") {
138138
val originalQuery = testRelation
139-
.select(Rand(10).as('rand), 'a)
140-
.where('rand > 5 || 'a > 5)
139+
.select('a)
140+
.where(Rand(10) > 5 || 'a > 5)
141141
.analyze
142142

143143
val optimized = Optimize.execute(originalQuery)
144144

145-
comparePlans(optimized, originalQuery)
145+
val correctAnswer = testRelation
146+
.where(Rand(10) > 5 || 'a > 5)
147+
.select('a)
148+
.analyze
149+
150+
comparePlans(optimized, correctAnswer)
146151
}
147152

148153
test("nondeterministic: can't push down filter through project with nondeterministic field") {
@@ -156,6 +161,34 @@ class FilterPushdownSuite extends PlanTest {
156161
comparePlans(optimized, originalQuery)
157162
}
158163

164+
test("nondeterministic: can't push down filter through aggregate with nondeterministic field") {
165+
val originalQuery = testRelation
166+
.groupBy('a)('a, Rand(10).as('rand))
167+
.where('a > 5)
168+
.analyze
169+
170+
val optimized = Optimize.execute(originalQuery)
171+
172+
comparePlans(optimized, originalQuery)
173+
}
174+
175+
test("nondeterministic: push down part of filter through aggregate with deterministic field") {
176+
val originalQuery = testRelation
177+
.groupBy('a)('a)
178+
.where('a > 5 && Rand(10) > 5)
179+
.analyze
180+
181+
val optimized = Optimize.execute(originalQuery.analyze)
182+
183+
val correctAnswer = testRelation
184+
.where('a > 5)
185+
.groupBy('a)('a)
186+
.where(Rand(10) > 5)
187+
.analyze
188+
189+
comparePlans(optimized, correctAnswer)
190+
}
191+
159192
test("filters: combines filters") {
160193
val originalQuery = testRelation
161194
.select('a)

0 commit comments

Comments
 (0)