Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
// implies that, for a given input row, the output are determined by the expression's initial
// state and all the input rows processed before. In another word, the order of input rows
// matters for non-deterministic expressions, while pushing down predicates changes the order.
case filter @ Filter(condition, project @ Project(fields, grandChild))
// This also applies to Aggregate.
case Filter(condition, project @ Project(fields, grandChild))
if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, condition) =>

// Create a map of Aliases to their values from the child projection.
Expand All @@ -766,33 +767,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {

project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))

// Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be
// pushed beneath must satisfy the following conditions:
// 1. All the expressions are part of window partitioning key. The expressions can be compound.
// 2. Deterministic.
// 3. Placed before any non-deterministic predicates.
case filter @ Filter(condition, w: Window)
if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))

val (candidates, containingNonDeterministic) =
splitConjunctivePredicates(condition).span(_.deterministic)

val (pushDown, rest) = candidates.partition { cond =>
cond.references.subsetOf(partitionAttrs)
}

val stayUp = rest ++ containingNonDeterministic

if (pushDown.nonEmpty) {
val pushDownPredicate = pushDown.reduce(And)
val newWindow = w.copy(child = Filter(pushDownPredicate, w.child))
if (stayUp.isEmpty) newWindow else Filter(stayUp.reduce(And), newWindow)
} else {
filter
}

case filter @ Filter(condition, aggregate: Aggregate) =>
case filter @ Filter(condition, aggregate: Aggregate)
if aggregate.aggregateExpressions.forall(_.deterministic) =>
// Find all the aliased expressions in the aggregate list that don't include any actual
// AggregateExpression, and create a map from the alias to the expression
val aliasMap = AttributeMap(aggregate.aggregateExpressions.collect {
Expand Down Expand Up @@ -823,6 +799,32 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
filter
}

// Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be
// pushed beneath must satisfy the following conditions:
// 1. All the expressions are part of window partitioning key. The expressions can be compound.
// 2. Deterministic.
// 3. Placed before any non-deterministic predicates.
case filter @ Filter(condition, w: Window)
if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))

val (candidates, containingNonDeterministic) =
splitConjunctivePredicates(condition).span(_.deterministic)

val (pushDown, rest) = candidates.partition { cond =>
cond.references.subsetOf(partitionAttrs)
}

val stayUp = rest ++ containingNonDeterministic

if (pushDown.nonEmpty) {
val pushDownPredicate = pushDown.reduce(And)
val newWindow = w.copy(child = Filter(pushDownPredicate, w.child))
if (stayUp.isEmpty) newWindow else Filter(stayUp.reduce(And), newWindow)
} else {
filter
}

case filter @ Filter(condition, union: Union) =>
// Union could change the rows, so non-deterministic predicate can't be pushed down
val (pushDown, stayUp) = splitConjunctivePredicates(condition).span(_.deterministic)
Expand All @@ -848,7 +850,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
filter
}

case filter @ Filter(condition, u: UnaryNode)
case filter @ Filter(_, u: UnaryNode)
if canPushThrough(u) && u.expressions.forall(_.deterministic) =>
pushDownPredicate(filter, u.child) { predicate =>
u.withNewChildren(Seq(Filter(predicate, u.child)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,20 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("nondeterministic: can't push down filter with nondeterministic condition through project") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was wrong, actually we can push down nondeterministic filter through project, as long as the project list is all deterministic.

test("nondeterministic: can always push down filter through project with deterministic field") {
val originalQuery = testRelation
.select(Rand(10).as('rand), 'a)
.where('rand > 5 || 'a > 5)
.select('a)
.where(Rand(10) > 5 || 'a > 5)
.analyze

val optimized = Optimize.execute(originalQuery)

comparePlans(optimized, originalQuery)
val correctAnswer = testRelation
.where(Rand(10) > 5 || 'a > 5)
.select('a)
.analyze

comparePlans(optimized, correctAnswer)
}

test("nondeterministic: can't push down filter through project with nondeterministic field") {
Expand All @@ -156,6 +161,34 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, originalQuery)
}

test("nondeterministic: can't push down filter through aggregate with nondeterministic field") {
val originalQuery = testRelation
.groupBy('a)('a, Rand(10).as('rand))
.where('a > 5)
.analyze

val optimized = Optimize.execute(originalQuery)

comparePlans(optimized, originalQuery)
}

test("nondeterministic: push down part of filter through aggregate with deterministic field") {
val originalQuery = testRelation
.groupBy('a)('a)
.where('a > 5 && Rand(10) > 5)
.analyze

val optimized = Optimize.execute(originalQuery.analyze)

val correctAnswer = testRelation
.where('a > 5)
.groupBy('a)('a)
.where(Rand(10) > 5)
.analyze

comparePlans(optimized, correctAnswer)
}

test("filters: combines filters") {
val originalQuery = testRelation
.select('a)
Expand Down