@@ -24,33 +24,33 @@ import org.apache.spark.sql.catalyst.plans._
2424import org .apache .spark .sql .catalyst .plans .logical ._
2525import org .apache .spark .sql .catalyst .rules ._
2626
27- class NullFilteringSuite extends PlanTest {
27+ class InferFiltersFromConstraintsSuite extends PlanTest {
2828
2929 object Optimize extends RuleExecutor [LogicalPlan ] {
30- val batches = Batch (" NullFiltering" , Once , NullFiltering ) ::
30+ val batches = Batch (" InferFilters" , FixedPoint (5 ), InferFiltersFromConstraints ) ::
31+ Batch (" PredicatePushdown" , FixedPoint (5 ), PushPredicateThroughJoin ) ::
3132 Batch (" CombineFilters" , FixedPoint (5 ), CombineFilters ) :: Nil
3233 }
3334
3435 val testRelation = LocalRelation (' a .int, ' b .int, ' c .int)
3536
36- test(" filter: filter out nulls in condition" ) {
37- val originalQuery = testRelation.where(' a === 1 ).analyze
38- val correctAnswer = testRelation.where(IsNotNull (' a ) && ' a === 1 ).analyze
37+ test(" filter: filter out constraints in condition" ) {
38+ val originalQuery = testRelation.where(' a === 1 && ' a === ' b ).analyze
39+ val correctAnswer = testRelation
40+ .where(IsNotNull (' a ) && IsNotNull (' b ) && ' a === ' b && ' a === 1 && ' b === 1 ).analyze
3941 val optimized = Optimize .execute(originalQuery)
4042 comparePlans(optimized, correctAnswer)
4143 }
4244
43- test(" single inner join: filter out nulls on either side on equi-join keys" ) {
45+ test(" single inner join: filter out values on either side on equi-join keys" ) {
4446 val x = testRelation.subquery(' x )
4547 val y = testRelation.subquery(' y )
4648 val originalQuery = x.join(y,
47- condition = Some ((" x.a" .attr === " y.a" .attr) && (" x.b" .attr === 1 ) && (" y.c" .attr > 5 )))
48- .analyze
49- val left = x.where(IsNotNull (' a ) && IsNotNull (' b ))
50- val right = y.where(IsNotNull (' a ) && IsNotNull (' c ))
51- val correctAnswer = left.join(right,
52- condition = Some ((" x.a" .attr === " y.a" .attr) && (" x.b" .attr === 1 ) && (" y.c" .attr > 5 )))
49+ condition = Some ((" x.a" .attr === " y.a" .attr) && (" x.a" .attr === 1 ) && (" y.c" .attr > 5 )))
5350 .analyze
51+ val left = x.where(IsNotNull (' a ) && " x.a" .attr === 1 )
52+ val right = y.where(IsNotNull (' a ) && IsNotNull (' c ) && " y.c" .attr > 5 && " y.a" .attr === 1 )
53+ val correctAnswer = left.join(right, condition = Some (" x.a" .attr === " y.a" .attr)).analyze
5454 val optimized = Optimize .execute(originalQuery)
5555 comparePlans(optimized, correctAnswer)
5656 }
@@ -61,24 +61,22 @@ class NullFilteringSuite extends PlanTest {
6161 val originalQuery = x.join(y,
6262 condition = Some ((" x.a" .attr =!= " y.a" .attr) && (" x.b" .attr === 1 ) && (" y.c" .attr > 5 )))
6363 .analyze
64- val left = x.where(IsNotNull (' a ) && IsNotNull (' b ))
65- val right = y.where(IsNotNull (' a ) && IsNotNull (' c ))
66- val correctAnswer = left.join(right,
67- condition = Some ((" x.a" .attr =!= " y.a" .attr) && (" x.b" .attr === 1 ) && (" y.c" .attr > 5 )))
68- .analyze
64+ val left = x.where(IsNotNull (' a ) && IsNotNull (' b ) && " x.b" .attr === 1 )
65+ val right = y.where(IsNotNull (' a ) && IsNotNull (' c ) && " y.c" .attr > 5 )
66+ val correctAnswer = left.join(right, condition = Some (" x.a" .attr =!= " y.a" .attr)).analyze
6967 val optimized = Optimize .execute(originalQuery)
7068 comparePlans(optimized, correctAnswer)
7169 }
7270
73- test(" single inner join with pre-existing filters: filter out nulls on either side" ) {
71+ test(" single inner join with pre-existing filters: filter out values on either side" ) {
7472 val x = testRelation.subquery(' x )
7573 val y = testRelation.subquery(' y )
76- val originalQuery = x.where(' b > 5 ).join(y.where(' c === 10 ),
77- condition = Some (" x.a" .attr === " y.a" .attr)).analyze
78- val left = x.where(IsNotNull (' a ) && IsNotNull (' b ) && ' b > 5 )
79- val right = y.where(IsNotNull (' a ) && IsNotNull (' c ) && ' c === 10 )
74+ val originalQuery = x.where(' b > 5 ).join(y.where(' a === 10 ),
75+ condition = Some (" x.a" .attr === " y.a" .attr && " x.b " .attr === " y.b " .attr )).analyze
76+ val left = x.where(IsNotNull (' a ) && ' a === 10 && IsNotNull (' b ) && ' b > 5 )
77+ val right = y.where(IsNotNull (' a ) && IsNotNull (' b ) && ' a === 10 && ' b > 5 )
8078 val correctAnswer = left.join(right,
81- condition = Some (" x.a" .attr === " y.a" .attr)).analyze
79+ condition = Some (" x.a" .attr === " y.a" .attr && " x.b " .attr === " y.b " .attr )).analyze
8280 val optimized = Optimize .execute(originalQuery)
8381 comparePlans(optimized, correctAnswer)
8482 }
@@ -92,20 +90,33 @@ class NullFilteringSuite extends PlanTest {
9290 comparePlans(optimized, originalQuery)
9391 }
9492
95- test(" multiple inner joins: filter out nulls on all sides on equi-join keys" ) {
93+ test(" multiple inner joins: filter out values on all sides on equi-join keys" ) {
9694 val t1 = testRelation.subquery(' t1 )
9795 val t2 = testRelation.subquery(' t2 )
9896 val t3 = testRelation.subquery(' t3 )
9997 val t4 = testRelation.subquery(' t4 )
10098
101- val originalQuery = t1
99+ val originalQuery = t1.where( ' b > 5 )
102100 .join(t2, condition = Some (" t1.b" .attr === " t2.b" .attr))
103101 .join(t3, condition = Some (" t2.b" .attr === " t3.b" .attr))
104102 .join(t4, condition = Some (" t3.b" .attr === " t4.b" .attr)).analyze
105- val correctAnswer = t1.where(IsNotNull (' b ))
106- .join(t2.where(IsNotNull (' b )), condition = Some (" t1.b" .attr === " t2.b" .attr))
107- .join(t3.where(IsNotNull (' b )), condition = Some (" t2.b" .attr === " t3.b" .attr))
108- .join(t4.where(IsNotNull (' b )), condition = Some (" t3.b" .attr === " t4.b" .attr)).analyze
103+ val correctAnswer = t1.where(IsNotNull (' b ) && ' b > 5 )
104+ .join(t2.where(IsNotNull (' b ) && ' b > 5 ), condition = Some (" t1.b" .attr === " t2.b" .attr))
105+ .join(t3.where(IsNotNull (' b ) && ' b > 5 ), condition = Some (" t2.b" .attr === " t3.b" .attr))
106+ .join(t4.where(IsNotNull (' b ) && ' b > 5 ), condition = Some (" t3.b" .attr === " t4.b" .attr))
107+ .analyze
108+ val optimized = Optimize .execute(originalQuery)
109+ comparePlans(optimized, correctAnswer)
110+ }
111+
112+ test(" inner join with filter: filter out values on all sides on equi-join keys" ) {
113+ val x = testRelation.subquery(' x )
114+ val y = testRelation.subquery(' y )
115+
116+ val originalQuery =
117+ x.join(y, Inner , Some (" x.a" .attr === " y.a" .attr)).where(" x.a" .attr > 5 ).analyze
118+ val correctAnswer = x.where(IsNotNull (' a ) && ' a .attr > 5 )
119+ .join(y.where(IsNotNull (' a ) && ' a .attr > 5 ), Inner , Some (" x.a" .attr === " y.a" .attr)).analyze
109120 val optimized = Optimize .execute(originalQuery)
110121 comparePlans(optimized, correctAnswer)
111122 }
0 commit comments