Skip to content

Commit c4dedd2

Browse files
committed
address comments and added more test cases.
1 parent 5c4f4d3 commit c4dedd2

File tree

2 files changed

+88
-10
lines changed

2 files changed

+88
-10
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -959,15 +959,18 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
959959
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
960960

961961
// Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be
962-
// pushed beneath must satisfy three conditions:
963-
// 1. All the columns are part of window partitioning key.
964-
// 2. Window partitioning key should be just a sequence of [[AttributeReference]].
965-
// 3. Deterministic
962+
// pushed beneath must satisfy the following two conditions:
963+
// 1. All the expressions are part of window partitioning key. The expressions can be compound.
964+
// 2. Deterministic
966965
case filter @ Filter(condition, w: Window)
967966
if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
968967
val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))
969968
val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
970-
cond.references.subsetOf(partitionAttrs) && cond.deterministic
969+
cond.references.subsetOf(partitionAttrs) && cond.deterministic &&
970+
// This is for ensuring all the partitioning expressions have been converted to alias
971+
// in Analyzer. Thus, we do not need to check if the expressions in conditions are
972+
// the same as the expressions used in partitioning columns.
973+
partitionAttrs.forall(_.isInstanceOf[Attribute])
971974
}
972975
if (pushDown.nonEmpty) {
973976
val pushDownPredicate = pushDown.reduce(And)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -871,10 +871,39 @@ class FilterPushdownSuite extends PlanTest {
871871
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
872872
}
873873

874+
// complex predicates with the same references but the same expressions
875+
// Todo: in Analyzer, to enable it, we need to convert the expression in conditions
876+
// to the alias that is defined as the same expression
877+
ignore("Window: predicate push down -- complex predicate with the same expressions") {
878+
val winSpec = windowSpec(
879+
partitionSpec = 'a.attr + 'b.attr :: Nil,
880+
orderSpec = 'b.asc :: Nil,
881+
UnspecifiedFrame)
882+
val winExpr = windowExpr(count('b), winSpec)
883+
884+
val winSpecAnalyzed = windowSpec(
885+
partitionSpec = '_w0.attr :: Nil,
886+
orderSpec = 'b.asc :: Nil,
887+
UnspecifiedFrame)
888+
val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed)
889+
890+
val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1)
891+
val correctAnswer = testRelation
892+
.where('a + 'b > 1).select('a, 'b, 'c, ('a + 'b).as("_w0"))
893+
.window(winExprAnalyzed.as('window) :: Nil, '_w0 :: Nil, 'b.asc :: Nil)
894+
.select('a, 'b, 'c, 'window).analyze
895+
896+
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
897+
}
898+
874899
test("Window: no predicate push down -- predicates are not from partitioning keys") {
875-
val winExpr =
876-
windowExpr(count('b), windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
900+
val winSpec = windowSpec(
901+
partitionSpec = 'a.attr :: 'b.attr :: Nil,
902+
orderSpec = 'b.asc :: Nil,
903+
UnspecifiedFrame)
904+
val winExpr = windowExpr(count('b), winSpec)
877905

906+
// No push down: the predicate is c > 1, but the partitioning key is (a, b).
878907
val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('c > 1)
879908
val correctAnswer = testRelation.select('a, 'b, 'c)
880909
.window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
@@ -883,17 +912,63 @@ class FilterPushdownSuite extends PlanTest {
883912
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
884913
}
885914

886-
test("Window: no predicate push down -- compound partition key") {
887-
val winSpec = windowSpec('a.attr + 'b.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
915+
test("Window: no predicate push down -- partial compound partition key") {
916+
val winSpec = windowSpec(
917+
partitionSpec = 'a.attr + 'b.attr :: 'b.attr :: Nil,
918+
orderSpec = 'b.asc :: Nil,
919+
UnspecifiedFrame)
888920
val winExpr = windowExpr(count('b), winSpec)
921+
922+
// No push down: the predicate is a > 1, but the partitioning key is (a + b, b)
889923
val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1)
890924

891-
val winSpecAnalyzed = windowSpec('_w0.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
925+
val winSpecAnalyzed = windowSpec(
926+
partitionSpec = '_w0.attr :: 'b.attr :: Nil,
927+
orderSpec = 'b.asc :: Nil,
928+
UnspecifiedFrame)
892929
val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed)
893930
val correctAnswer = testRelation.select('a, 'b, 'c, ('a + 'b).as("_w0"))
894931
.window(winExprAnalyzed.as('window) :: Nil, '_w0 :: 'b.attr :: Nil, 'b.asc :: Nil)
895932
.where('a > 1).select('a, 'b, 'c, 'window).analyze
896933

897934
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
898935
}
936+
937+
test("Window: no predicate push down -- complex predicates containing non partitioning columns") {
938+
val winSpec =
939+
windowSpec(partitionSpec = 'b.attr :: Nil, orderSpec = 'b.asc :: Nil, UnspecifiedFrame)
940+
val winExpr = windowExpr(count('b), winSpec)
941+
942+
// No push down: the predicate is a + b > 1, but the partitioning key is b.
943+
val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1)
944+
val correctAnswer = testRelation
945+
.select('a, 'b, 'c)
946+
.window(winExpr.as('window) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil)
947+
.where('a + 'b > 1).select('a, 'b, 'c, 'window).analyze
948+
949+
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
950+
}
951+
952+
// complex predicates with the same references but different expressions
953+
test("Window: no predicate push down -- complex predicate with different expressions") {
954+
val winSpec = windowSpec(
955+
partitionSpec = 'a.attr + 'b.attr :: Nil,
956+
orderSpec = 'b.asc :: Nil,
957+
UnspecifiedFrame)
958+
val winExpr = windowExpr(count('b), winSpec)
959+
960+
val winSpecAnalyzed = windowSpec(
961+
partitionSpec = '_w0.attr :: Nil,
962+
orderSpec = 'b.asc :: Nil,
963+
UnspecifiedFrame)
964+
val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed)
965+
966+
// No push down: the predicate is a + b > 1, but the partitioning key is a + b.
967+
val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a - 'b > 1)
968+
val correctAnswer = testRelation.select('a, 'b, 'c, ('a + 'b).as("_w0"))
969+
.window(winExprAnalyzed.as('window) :: Nil, '_w0 :: Nil, 'b.asc :: Nil)
970+
.where('a - 'b > 1).select('a, 'b, 'c, 'window).analyze
971+
972+
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
973+
}
899974
}

0 commit comments

Comments
 (0)