Skip to content

Commit f858f46

Browse files
Cody Koeningermarmbrus
authored andcommitted
SPARK-3462 push down filters and projections into Unions
Author: Cody Koeninger <[email protected]> Closes #2345 from koeninger/SPARK-3462 and squashes the following commits: 5c8d24d [Cody Koeninger] SPARK-3462 remove now-unused parameter 0788691 [Cody Koeninger] SPARK-3462 add tests, handle compatible schema with different aliases, per marmbrus feedback ef47b3b [Cody Koeninger] SPARK-3462 push down filters and projections into Unions
1 parent ce59725 commit f858f46

File tree

2 files changed

+110
-0
lines changed

2 files changed

+110
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,60 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
4040
SimplifyCasts,
4141
SimplifyCaseConversionExpressions) ::
4242
Batch("Filter Pushdown", FixedPoint(100),
43+
UnionPushdown,
4344
CombineFilters,
4445
PushPredicateThroughProject,
4546
PushPredicateThroughJoin,
4647
ColumnPruning) :: Nil
4748
}
4849

50+
/**
51+
* Pushes operations to either side of a Union.
52+
*/
53+
object UnionPushdown extends Rule[LogicalPlan] {
54+
55+
/**
56+
* Maps Attributes from the left side to the corresponding Attribute on the right side.
57+
*/
58+
def buildRewrites(union: Union): AttributeMap[Attribute] = {
59+
assert(union.left.output.size == union.right.output.size)
60+
61+
AttributeMap(union.left.output.zip(union.right.output))
62+
}
63+
64+
/**
65+
* Rewrites an expression so that it can be pushed to the right side of a Union operator.
66+
* This method relies on the fact that the output attributes of a union are always equal
67+
* to the left child's output.
68+
*/
69+
def pushToRight[A <: Expression](e: A, rewrites: AttributeMap[Attribute]): A = {
70+
val result = e transform {
71+
case a: Attribute => rewrites(a)
72+
}
73+
74+
// We must promise the compiler that we did not discard the names in the case of project
75+
// expressions. This is safe since the only transformation is from Attribute => Attribute.
76+
result.asInstanceOf[A]
77+
}
78+
79+
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
80+
// Push down filter into union
81+
case Filter(condition, u @ Union(left, right)) =>
82+
val rewrites = buildRewrites(u)
83+
Union(
84+
Filter(condition, left),
85+
Filter(pushToRight(condition, rewrites), right))
86+
87+
// Push down projection into union
88+
case Project(projectList, u @ Union(left, right)) =>
89+
val rewrites = buildRewrites(u)
90+
Union(
91+
Project(projectList, left),
92+
Project(projectList.map(pushToRight(_, rewrites)), right))
93+
}
94+
}
95+
96+
4997
/**
5098
* Attempts to eliminate the reading of unneeded columns from the query plan using the following
5199
* transformations:
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import org.apache.spark.sql.catalyst.analysis
21+
import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
22+
import org.apache.spark.sql.catalyst.plans.logical._
23+
import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter}
24+
import org.apache.spark.sql.catalyst.rules._
25+
import org.apache.spark.sql.catalyst.dsl.plans._
26+
import org.apache.spark.sql.catalyst.dsl.expressions._
27+
28+
class UnionPushdownSuite extends PlanTest {
29+
object Optimize extends RuleExecutor[LogicalPlan] {
30+
val batches =
31+
Batch("Subqueries", Once,
32+
EliminateAnalysisOperators) ::
33+
Batch("Union Pushdown", Once,
34+
UnionPushdown) :: Nil
35+
}
36+
37+
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
38+
val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int)
39+
val testUnion = Union(testRelation, testRelation2)
40+
41+
test("union: filter to each side") {
42+
val query = testUnion.where('a === 1)
43+
44+
val optimized = Optimize(query.analyze)
45+
46+
val correctAnswer =
47+
Union(testRelation.where('a === 1), testRelation2.where('d === 1)).analyze
48+
49+
comparePlans(optimized, correctAnswer)
50+
}
51+
52+
test("union: project to each side") {
53+
val query = testUnion.select('b)
54+
55+
val optimized = Optimize(query.analyze)
56+
57+
val correctAnswer =
58+
Union(testRelation.select('b), testRelation2.select('e)).analyze
59+
60+
comparePlans(optimized, correctAnswer)
61+
}
62+
}

0 commit comments

Comments
 (0)