Skip to content

Commit ef47b3b

Browse files
author
Cody Koeninger
committed
SPARK-3462 push down filters and projections into Unions
1 parent 2686233 commit ef47b3b

File tree

1 file changed

+31
-0
lines changed
  • sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer

1 file changed

+31
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,43 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
4040
SimplifyCasts,
4141
SimplifyCaseConversionExpressions) ::
4242
Batch("Filter Pushdown", FixedPoint(100),
43+
UnionPushdown,
4344
CombineFilters,
4445
PushPredicateThroughProject,
4546
PushPredicateThroughJoin,
4647
ColumnPruning) :: Nil
4748
}
4849

50+
object UnionPushdown extends Rule[LogicalPlan] {
51+
def fixProject(project: Project, child: LogicalPlan): Project = {
52+
val pl = project.projectList.map(p => child.output.find { c =>
53+
c.name == p.name && c.qualifiers == p.qualifiers
54+
}.getOrElse(p))
55+
Project(pl, child)
56+
}
57+
58+
def fixFilter(filter: Filter, child: LogicalPlan): Filter = {
59+
val cond = filter.condition.transform {
60+
case a: AttributeReference =>
61+
child.output.find { c =>
62+
c.name == a.name && c.qualifiers == a.qualifiers
63+
}.getOrElse(a)
64+
}
65+
Filter(cond, child)
66+
}
67+
68+
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
69+
// Push down filter into union
70+
case f @ Filter(_, Union(left, right)) =>
71+
Union(fixFilter(f, left), fixFilter(f, right))
72+
73+
// Push down projection into union
74+
case p @ Project(_, Union(left, right)) =>
75+
Union(fixProject(p, left), fixProject(p, right))
76+
}
77+
}
78+
79+
4980
/**
5081
* Attempts to eliminate the reading of unneeded columns from the query plan using the following
5182
* transformations:

0 commit comments

Comments
 (0)