Skip to content

Commit fc9fa2d

Browse files
committed
PullOutJoinCondition
1 parent c16d606 commit fc9fa2d

File tree

29 files changed

+897
-823
lines changed

29 files changed

+897
-823
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
151151
ReplaceExpressions,
152152
RewriteNonCorrelatedExists,
153153
PullOutGroupingExpressions,
154+
PullOutJoinCondition,
154155
ComputeCurrentTime,
155156
ReplaceCurrentLike(catalogManager)) ::
156157
//////////////////////////////////////////////////////////////////////////////////////////
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, PredicateHelper}
21+
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project}
22+
import org.apache.spark.sql.catalyst.rules.Rule
23+
import org.apache.spark.sql.catalyst.trees.TreePattern.JOIN
24+
25+
/**
26+
* This rule ensures that [[Join]] condition doesn't contain complex expressions in the
27+
* optimization phase.
28+
*
29+
* Complex condition expressions are pulled out to a [[Project]] node under [[Join]] and are
30+
* referenced in join condition.
31+
*
32+
* {{{
33+
* SELECT * FROM t1 JOIN t2 ON t1.a + 10 = t2.x ==>
34+
* Project [a#0, b#1, x#2, y#3]
35+
* +- Join Inner, ((spark_catalog.default.t1.a + 10)#8 = x#2)
36+
* :- Project [a#0, b#1, (a#0 + 10) AS (spark_catalog.default.t1.a + 10)#8]
37+
* : +- Filter isnotnull((a#0 + 10))
38+
* : +- Relation default.t1[a#0,b#1] parquet
39+
* +- Filter isnotnull(x#2)
40+
* +- Relation default.t2[x#2,y#3] parquet
41+
* }}}
42+
*/
43+
object PullOutJoinCondition extends Rule[LogicalPlan]
44+
with JoinSelectionHelper with PredicateHelper {
45+
46+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(_.containsPattern(JOIN)) {
47+
case j @ Join(left, right, _, Some(condition), _) if j.resolved =>
48+
val complexExpressions = splitConjunctivePredicates(condition).flatMap(_.children).flatMap {
49+
case e: Expression if !e.foldable && e.children.nonEmpty => Seq(e)
50+
case _ => Nil
51+
}
52+
53+
val leftComplexExpressions = complexExpressions.filter(canEvaluate(_, left))
54+
val rightComplexExpressions = complexExpressions.filter(canEvaluate(_, right))
55+
56+
val leftComplexExpressionMap =
57+
leftComplexExpressions.map(e => e.canonicalized -> Alias(e, e.sql)()).toMap
58+
val rightComplexExpressionMap =
59+
rightComplexExpressions.map(e => e.canonicalized -> Alias(e, e.sql)()).toMap
60+
val allComplexExpressionMap = leftComplexExpressionMap ++ rightComplexExpressionMap
61+
62+
if (allComplexExpressionMap.nonEmpty) {
63+
val newCondition = condition.transformDown {
64+
case e: Expression
65+
if e.children.nonEmpty && allComplexExpressionMap.contains(e.canonicalized) =>
66+
allComplexExpressionMap.get(e.canonicalized).map(_.toAttribute).getOrElse(e)
67+
}
68+
val newLeft = Project(left.output ++ leftComplexExpressionMap.values, left)
69+
val newRight = Project(right.output ++ rightComplexExpressionMap.values, right)
70+
j.copy(left = newLeft, right = newRight, condition = Some(newCondition))
71+
} else {
72+
j
73+
}
74+
}
75+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownJoinConditionEvaluation.scala

Lines changed: 0 additions & 63 deletions
This file was deleted.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,6 @@ object RuleIdCollection {
144144
"org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin" ::
145145
"org.apache.spark.sql.catalyst.optimizer.RewriteExceptAll" ::
146146
"org.apache.spark.sql.catalyst.optimizer.RewriteIntersectAll" ::
147-
"org.apache.spark.sql.catalyst.optimizer.PushDownJoinConditionEvaluation" ::
148147
"org.apache.spark.sql.catalyst.optimizer.SimplifyBinaryComparison" ::
149148
"org.apache.spark.sql.catalyst.optimizer.SimplifyCaseConversionExpressions" ::
150149
"org.apache.spark.sql.catalyst.optimizer.SimplifyCasts" ::
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import org.apache.spark.sql.catalyst.dsl.expressions._
21+
import org.apache.spark.sql.catalyst.dsl.plans._
22+
import org.apache.spark.sql.catalyst.expressions.{Alias, Coalesce, Substring, Upper}
23+
import org.apache.spark.sql.catalyst.plans._
24+
import org.apache.spark.sql.catalyst.plans.logical._
25+
import org.apache.spark.sql.catalyst.rules._
26+
27+
class PullOutJoinConditionSuite extends PlanTest {
28+
29+
private object Optimize extends RuleExecutor[LogicalPlan] {
30+
val batches =
31+
Batch("Pull out join condition", Once,
32+
PullOutJoinCondition,
33+
CollapseProject) :: Nil
34+
}
35+
36+
private val testRelation = LocalRelation('a.string, 'b.int, 'c.int)
37+
private val testRelation1 = LocalRelation('d.string, 'e.int)
38+
private val x = testRelation.subquery('x)
39+
private val y = testRelation1.subquery('y)
40+
41+
test("Push down join condition evaluation(String expressions)") {
42+
val joinType = Inner
43+
Seq(Upper("y.d".attr), Substring("y.d".attr, 1, 5)).foreach { udf =>
44+
val originalQuery = x.join(y, joinType, Option("x.a".attr === udf))
45+
.select("x.a".attr, "y.e".attr)
46+
val correctAnswer = x.select("x.a".attr, "x.b".attr, "x.c".attr)
47+
.join(y.select("y.d".attr, "y.e".attr, Alias(udf, udf.sql)()),
48+
joinType, Option("x.a".attr === s"`${udf.sql}`".attr)).select("x.a".attr, "y.e".attr)
49+
50+
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze)
51+
}
52+
}
53+
54+
test("Push down join condition evaluation(null expressions)") {
55+
val joinType = Inner
56+
val udf = Coalesce(Seq("x.b".attr, "x.c".attr))
57+
val originalQuery = x.join(y, joinType, Option(udf === "y.e".attr))
58+
.select("x.a".attr, "y.e".attr)
59+
val correctAnswer =
60+
x.select("x.a".attr, "x.b".attr, "x.c".attr, Alias(udf, udf.sql)()).join(
61+
y.select("y.d".attr, "y.e".attr),
62+
joinType, Option(s"`${udf.sql}`".attr === "y.e".attr))
63+
.select("x.a".attr, "y.e".attr)
64+
65+
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze)
66+
}
67+
68+
test("Negative case: all children are Attributes") {
69+
val condition = Option("x.a".attr === "y.d".attr)
70+
val originalQuery = x.join(y, Inner, condition)
71+
val correctAnswer = x.join(y, Inner, condition)
72+
73+
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze)
74+
}
75+
76+
test("Negative case: contains Literal") {
77+
val condition = Option("x.a".attr === "string")
78+
val originalQuery = x.join(y, Inner, condition)
79+
val correctAnswer = x.join(y, Inner, condition)
80+
81+
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze)
82+
}
83+
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushDownJoinConditionEvaluationSuite.scala

Lines changed: 0 additions & 102 deletions
This file was deleted.

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,7 @@ class SparkOptimizer(
4848
PushDownPredicates) :+
4949
Batch("Cleanup filters that cannot be pushed down", Once,
5050
CleanupDynamicPruningFilters,
51-
PruneFilters) :+
52-
Batch("Push down join condition evaluation", Once,
53-
PushDownJoinConditionEvaluation,
54-
CollapseProject)) ++
51+
PruneFilters)) ++
5552
postHocOptimizationBatches :+
5653
Batch("Extract Python UDFs", Once,
5754
ExtractPythonUDFFromJoinCondition,

0 commit comments

Comments
 (0)