Skip to content

Commit f96997b

Browse files
sameeragarwalyhuai
authored andcommitted
[SPARK-13871][SQL] Support for inferring filters from data constraints
## What changes were proposed in this pull request? This PR generalizes the `NullFiltering` optimizer rule in catalyst to `InferFiltersFromConstraints` that can automatically infer all relevant filters based on an operator's constraints while making sure of 2 things: (a) no redundant filters are generated, and (b) filters that do not contribute to any further optimizations are not generated. ## How was this patch tested? Extended all tests in `InferFiltersFromConstraintsSuite` (that were initially based on `NullFilteringSuite` to test filter inference in `Filter` and `Join` operators. In particular the 2 tests ( `single inner join with pre-existing filters: filter out values on either side` and `multiple inner joins: filter out values on all sides on equi-join keys` attempts to highlight/test the real potential of this rule for join optimization. Author: Sameer Agarwal <[email protected]> Closes #11665 from sameeragarwal/infer-filters.
1 parent b90c020 commit f96997b

File tree

2 files changed

+63
-62
lines changed

2 files changed

+63
-62
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 23 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,14 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
7272
LimitPushDown,
7373
ColumnPruning,
7474
EliminateOperators,
75+
InferFiltersFromConstraints,
7576
// Operator combine
7677
CollapseRepartition,
7778
CollapseProject,
7879
CombineFilters,
7980
CombineLimits,
8081
CombineUnions,
8182
// Constant folding and strength reduction
82-
NullFiltering,
8383
NullPropagation,
8484
OptimizeIn,
8585
ConstantFolding,
@@ -607,50 +607,40 @@ object NullPropagation extends Rule[LogicalPlan] {
607607
}
608608

609609
/**
610-
* Attempts to eliminate reading (unnecessary) NULL values if they are not required for correctness
611-
* by inserting isNotNull filters in the query plan. These filters are currently inserted beneath
612-
* existing Filters and Join operators and are inferred based on their data constraints.
610+
* Generate a list of additional filters from an operator's existing constraint but remove those
611+
* that are either already part of the operator's condition or are part of the operator's child
612+
* constraints. These filters are currently inserted to the existing conditions in the Filter
613+
* operators and on either side of Join operators.
613614
*
614615
* Note: While this optimization is applicable to all types of join, it primarily benefits Inner and
615616
* LeftSemi joins.
616617
*/
617-
object NullFiltering extends Rule[LogicalPlan] with PredicateHelper {
618+
object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
618619
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
619620
case filter @ Filter(condition, child) =>
620-
// We generate a list of additional isNotNull filters from the operator's existing constraints
621-
// but remove those that are either already part of the filter condition or are part of the
622-
// operator's child constraints.
623-
val newIsNotNullConstraints = filter.constraints.filter(_.isInstanceOf[IsNotNull]) --
621+
val newFilters = filter.constraints --
624622
(child.constraints ++ splitConjunctivePredicates(condition))
625-
if (newIsNotNullConstraints.nonEmpty) {
626-
Filter(And(newIsNotNullConstraints.reduce(And), condition), child)
623+
if (newFilters.nonEmpty) {
624+
Filter(And(newFilters.reduce(And), condition), child)
627625
} else {
628626
filter
629627
}
630628

631-
case join @ Join(left, right, joinType, condition) =>
632-
val leftIsNotNullConstraints = join.constraints
633-
.filter(_.isInstanceOf[IsNotNull])
634-
.filter(_.references.subsetOf(left.outputSet)) -- left.constraints
635-
val rightIsNotNullConstraints =
636-
join.constraints
637-
.filter(_.isInstanceOf[IsNotNull])
638-
.filter(_.references.subsetOf(right.outputSet)) -- right.constraints
639-
val newLeftChild = if (leftIsNotNullConstraints.nonEmpty) {
640-
Filter(leftIsNotNullConstraints.reduce(And), left)
641-
} else {
642-
left
643-
}
644-
val newRightChild = if (rightIsNotNullConstraints.nonEmpty) {
645-
Filter(rightIsNotNullConstraints.reduce(And), right)
646-
} else {
647-
right
648-
}
649-
if (newLeftChild != left || newRightChild != right) {
650-
Join(newLeftChild, newRightChild, joinType, condition)
651-
} else {
652-
join
629+
case join @ Join(left, right, joinType, conditionOpt) =>
630+
// Only consider constraints that can be pushed down completely to either the left or the
631+
// right child
632+
val constraints = join.constraints.filter { c =>
633+
c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet)}
634+
// Remove those constraints that are already enforced by either the left or the right child
635+
val additionalConstraints = constraints -- (left.constraints ++ right.constraints)
636+
val newConditionOpt = conditionOpt match {
637+
case Some(condition) =>
638+
val newFilters = additionalConstraints -- splitConjunctivePredicates(condition)
639+
if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None
640+
case None =>
641+
additionalConstraints.reduceOption(And)
653642
}
643+
if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join
654644
}
655645
}
656646

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala renamed to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -24,33 +24,33 @@ import org.apache.spark.sql.catalyst.plans._
2424
import org.apache.spark.sql.catalyst.plans.logical._
2525
import org.apache.spark.sql.catalyst.rules._
2626

27-
class NullFilteringSuite extends PlanTest {
27+
class InferFiltersFromConstraintsSuite extends PlanTest {
2828

2929
object Optimize extends RuleExecutor[LogicalPlan] {
30-
val batches = Batch("NullFiltering", Once, NullFiltering) ::
30+
val batches = Batch("InferFilters", FixedPoint(5), InferFiltersFromConstraints) ::
31+
Batch("PredicatePushdown", FixedPoint(5), PushPredicateThroughJoin) ::
3132
Batch("CombineFilters", FixedPoint(5), CombineFilters) :: Nil
3233
}
3334

3435
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
3536

36-
test("filter: filter out nulls in condition") {
37-
val originalQuery = testRelation.where('a === 1).analyze
38-
val correctAnswer = testRelation.where(IsNotNull('a) && 'a === 1).analyze
37+
test("filter: filter out constraints in condition") {
38+
val originalQuery = testRelation.where('a === 1 && 'a === 'b).analyze
39+
val correctAnswer = testRelation
40+
.where(IsNotNull('a) && IsNotNull('b) && 'a === 'b && 'a === 1 && 'b === 1).analyze
3941
val optimized = Optimize.execute(originalQuery)
4042
comparePlans(optimized, correctAnswer)
4143
}
4244

43-
test("single inner join: filter out nulls on either side on equi-join keys") {
45+
test("single inner join: filter out values on either side on equi-join keys") {
4446
val x = testRelation.subquery('x)
4547
val y = testRelation.subquery('y)
4648
val originalQuery = x.join(y,
47-
condition = Some(("x.a".attr === "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
48-
.analyze
49-
val left = x.where(IsNotNull('a) && IsNotNull('b))
50-
val right = y.where(IsNotNull('a) && IsNotNull('c))
51-
val correctAnswer = left.join(right,
52-
condition = Some(("x.a".attr === "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
49+
condition = Some(("x.a".attr === "y.a".attr) && ("x.a".attr === 1) && ("y.c".attr > 5)))
5350
.analyze
51+
val left = x.where(IsNotNull('a) && "x.a".attr === 1)
52+
val right = y.where(IsNotNull('a) && IsNotNull('c) && "y.c".attr > 5 && "y.a".attr === 1)
53+
val correctAnswer = left.join(right, condition = Some("x.a".attr === "y.a".attr)).analyze
5454
val optimized = Optimize.execute(originalQuery)
5555
comparePlans(optimized, correctAnswer)
5656
}
@@ -61,24 +61,22 @@ class NullFilteringSuite extends PlanTest {
6161
val originalQuery = x.join(y,
6262
condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
6363
.analyze
64-
val left = x.where(IsNotNull('a) && IsNotNull('b))
65-
val right = y.where(IsNotNull('a) && IsNotNull('c))
66-
val correctAnswer = left.join(right,
67-
condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
68-
.analyze
64+
val left = x.where(IsNotNull('a) && IsNotNull('b) && "x.b".attr === 1)
65+
val right = y.where(IsNotNull('a) && IsNotNull('c) && "y.c".attr > 5)
66+
val correctAnswer = left.join(right, condition = Some("x.a".attr =!= "y.a".attr)).analyze
6967
val optimized = Optimize.execute(originalQuery)
7068
comparePlans(optimized, correctAnswer)
7169
}
7270

73-
test("single inner join with pre-existing filters: filter out nulls on either side") {
71+
test("single inner join with pre-existing filters: filter out values on either side") {
7472
val x = testRelation.subquery('x)
7573
val y = testRelation.subquery('y)
76-
val originalQuery = x.where('b > 5).join(y.where('c === 10),
77-
condition = Some("x.a".attr === "y.a".attr)).analyze
78-
val left = x.where(IsNotNull('a) && IsNotNull('b) && 'b > 5)
79-
val right = y.where(IsNotNull('a) && IsNotNull('c) && 'c === 10)
74+
val originalQuery = x.where('b > 5).join(y.where('a === 10),
75+
condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).analyze
76+
val left = x.where(IsNotNull('a) && 'a === 10 && IsNotNull('b) && 'b > 5)
77+
val right = y.where(IsNotNull('a) && IsNotNull('b) && 'a === 10 && 'b > 5)
8078
val correctAnswer = left.join(right,
81-
condition = Some("x.a".attr === "y.a".attr)).analyze
79+
condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).analyze
8280
val optimized = Optimize.execute(originalQuery)
8381
comparePlans(optimized, correctAnswer)
8482
}
@@ -92,20 +90,33 @@ class NullFilteringSuite extends PlanTest {
9290
comparePlans(optimized, originalQuery)
9391
}
9492

95-
test("multiple inner joins: filter out nulls on all sides on equi-join keys") {
93+
test("multiple inner joins: filter out values on all sides on equi-join keys") {
9694
val t1 = testRelation.subquery('t1)
9795
val t2 = testRelation.subquery('t2)
9896
val t3 = testRelation.subquery('t3)
9997
val t4 = testRelation.subquery('t4)
10098

101-
val originalQuery = t1
99+
val originalQuery = t1.where('b > 5)
102100
.join(t2, condition = Some("t1.b".attr === "t2.b".attr))
103101
.join(t3, condition = Some("t2.b".attr === "t3.b".attr))
104102
.join(t4, condition = Some("t3.b".attr === "t4.b".attr)).analyze
105-
val correctAnswer = t1.where(IsNotNull('b))
106-
.join(t2.where(IsNotNull('b)), condition = Some("t1.b".attr === "t2.b".attr))
107-
.join(t3.where(IsNotNull('b)), condition = Some("t2.b".attr === "t3.b".attr))
108-
.join(t4.where(IsNotNull('b)), condition = Some("t3.b".attr === "t4.b".attr)).analyze
103+
val correctAnswer = t1.where(IsNotNull('b) && 'b > 5)
104+
.join(t2.where(IsNotNull('b) && 'b > 5), condition = Some("t1.b".attr === "t2.b".attr))
105+
.join(t3.where(IsNotNull('b) && 'b > 5), condition = Some("t2.b".attr === "t3.b".attr))
106+
.join(t4.where(IsNotNull('b) && 'b > 5), condition = Some("t3.b".attr === "t4.b".attr))
107+
.analyze
108+
val optimized = Optimize.execute(originalQuery)
109+
comparePlans(optimized, correctAnswer)
110+
}
111+
112+
test("inner join with filter: filter out values on all sides on equi-join keys") {
113+
val x = testRelation.subquery('x)
114+
val y = testRelation.subquery('y)
115+
116+
val originalQuery =
117+
x.join(y, Inner, Some("x.a".attr === "y.a".attr)).where("x.a".attr > 5).analyze
118+
val correctAnswer = x.where(IsNotNull('a) && 'a.attr > 5)
119+
.join(y.where(IsNotNull('a) && 'a.attr > 5), Inner, Some("x.a".attr === "y.a".attr)).analyze
109120
val optimized = Optimize.execute(originalQuery)
110121
comparePlans(optimized, correctAnswer)
111122
}

0 commit comments

Comments
 (0)