Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp
* `Int` field named `i`. Expression `s.i` is nullable because `s` can be null. However, for all
* non-null `s`, `s.i` can't be null.
*/
case class AssertNotNull(child: Expression, walkedTypePath: Seq[String])
case class AssertNotNull(child: Expression, walkedTypePath: Seq[String] = Nil)
extends UnaryExpression with NonSQLExpression {

override def dataType: DataType = child.dataType
Expand All @@ -1005,7 +1005,7 @@ case class AssertNotNull(child: Expression, walkedTypePath: Seq[String])
override def eval(input: InternalRow): Any = {
val result = child.eval(input)
if (result == null) {
throw new RuntimeException(errMsg)
throw new NullPointerException(errMsg)
}
result
}
Expand All @@ -1021,7 +1021,7 @@ case class AssertNotNull(child: Expression, walkedTypePath: Seq[String])
${childGen.code}

if (${childGen.isNull}) {
throw new RuntimeException($errMsgField);
throw new NullPointerException($errMsgField);
}
"""
ev.copy(code = code, isNull = "false", value = childGen.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
// implies that, for a given input row, the output are determined by the expression's initial
// state and all the input rows processed before. In another word, the order of input rows
// matters for non-deterministic expressions, while pushing down predicates changes the order.
case filter @ Filter(condition, project @ Project(fields, grandChild))
// This also applies to Aggregate.
case Filter(condition, project @ Project(fields, grandChild))
if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, condition) =>

// Create a map of Aliases to their values from the child projection.
Expand All @@ -766,33 +767,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {

project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))

// Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be
// pushed beneath must satisfy the following conditions:
// 1. All the expressions are part of window partitioning key. The expressions can be compound.
// 2. Deterministic.
// 3. Placed before any non-deterministic predicates.
case filter @ Filter(condition, w: Window)
if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))

val (candidates, containingNonDeterministic) =
splitConjunctivePredicates(condition).span(_.deterministic)

val (pushDown, rest) = candidates.partition { cond =>
cond.references.subsetOf(partitionAttrs)
}

val stayUp = rest ++ containingNonDeterministic

if (pushDown.nonEmpty) {
val pushDownPredicate = pushDown.reduce(And)
val newWindow = w.copy(child = Filter(pushDownPredicate, w.child))
if (stayUp.isEmpty) newWindow else Filter(stayUp.reduce(And), newWindow)
} else {
filter
}

case filter @ Filter(condition, aggregate: Aggregate) =>
case filter @ Filter(condition, aggregate: Aggregate)
if aggregate.aggregateExpressions.forall(_.deterministic) =>
// Find all the aliased expressions in the aggregate list that don't include any actual
// AggregateExpression, and create a map from the alias to the expression
val aliasMap = AttributeMap(aggregate.aggregateExpressions.collect {
Expand Down Expand Up @@ -823,6 +799,32 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
filter
}

// Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be
// pushed beneath must satisfy the following conditions:
// 1. All the expressions are part of window partitioning key. The expressions can be compound.
// 2. Deterministic.
// 3. Placed before any non-deterministic predicates.
case filter @ Filter(condition, w: Window)
if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))

val (candidates, containingNonDeterministic) =
splitConjunctivePredicates(condition).span(_.deterministic)

val (pushDown, rest) = candidates.partition { cond =>
cond.references.subsetOf(partitionAttrs)
}

val stayUp = rest ++ containingNonDeterministic

if (pushDown.nonEmpty) {
val pushDownPredicate = pushDown.reduce(And)
val newWindow = w.copy(child = Filter(pushDownPredicate, w.child))
if (stayUp.isEmpty) newWindow else Filter(stayUp.reduce(And), newWindow)
} else {
filter
}

case filter @ Filter(condition, union: Union) =>
// Union could change the rows, so non-deterministic predicate can't be pushed down
val (pushDown, stayUp) = splitConjunctivePredicates(condition).span(_.deterministic)
Expand All @@ -848,7 +850,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
filter
}

case filter @ Filter(condition, u: UnaryNode)
case filter @ Filter(_, u: UnaryNode)
if canPushThrough(u) && u.expressions.forall(_.deterministic) =>
pushDownPredicate(filter, u.child) { predicate =>
u.withNewChildren(Seq(Filter(predicate, u.child)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,20 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("nondeterministic: can't push down filter with nondeterministic condition through project") {
test("nondeterministic: can always push down filter through project with deterministic field") {
val originalQuery = testRelation
.select(Rand(10).as('rand), 'a)
.where('rand > 5 || 'a > 5)
.select('a)
.where(Rand(10) > 5 || 'a > 5)
.analyze

val optimized = Optimize.execute(originalQuery)

comparePlans(optimized, originalQuery)
val correctAnswer = testRelation
.where(Rand(10) > 5 || 'a > 5)
.select('a)
.analyze

comparePlans(optimized, correctAnswer)
}

test("nondeterministic: can't push down filter through project with nondeterministic field") {
Expand All @@ -156,6 +161,34 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, originalQuery)
}

test("nondeterministic: can't push down filter through aggregate with nondeterministic field") {
val originalQuery = testRelation
.groupBy('a)('a, Rand(10).as('rand))
.where('a > 5)
.analyze

val optimized = Optimize.execute(originalQuery)

comparePlans(optimized, originalQuery)
}

test("nondeterministic: push down part of filter through aggregate with deterministic field") {
val originalQuery = testRelation
.groupBy('a)('a)
.where('a > 5 && Rand(10) > 5)
.analyze

val optimized = Optimize.execute(originalQuery.analyze)

val correctAnswer = testRelation
.where('a > 5)
.groupBy('a)('a)
.where(Rand(10) > 5)
.analyze

comparePlans(optimized, correctAnswer)
}

test("filters: combines filters") {
val originalQuery = testRelation
.select('a)
Expand Down