-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-36290][SQL] Pull out join condition #33522
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
# Conflicts: # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q72.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q72/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q72.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q72/explain.txt
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #141646 has finished for PR 33522 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #141777 has finished for PR 33522 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #141921 has finished for PR 33522 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #141923 has finished for PR 33522 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #141928 has finished for PR 33522 at commit
|
|
Test build #141930 has finished for PR 33522 at commit
|
|
Kubernetes integration test starting |
|
|
Kubernetes integration test status failure |
|
Test build #144900 has finished for PR 33522 at commit
|
# Conflicts: # sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
|
Kubernetes integration test starting |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #144902 has finished for PR 33522 at commit
|
|
Kubernetes integration test status failure |
|
Test build #144904 has finished for PR 33522 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #144918 has finished for PR 33522 at commit
|
| private val y = testRelation1.subquery('y) | ||
|
|
||
| test("Pull out join keys evaluation(String expressions)") { | ||
| val joinType = Inner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just inline it? the join type doesn't matter anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto for other places in this test suite
| val joinType = Inner | ||
| Seq(Upper("y.d".attr), Substring("y.d".attr, 1, 5)).foreach { udf => | ||
| val originalQuery = x.join(y, joinType, Option("x.a".attr === udf)) | ||
| .select("x.a".attr, "y.e".attr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to use qualified column names. There is no name conflict.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto for other places in this test suite
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it. But UDF need it otherwise:
== FAIL: Plans do not match ===
'Project [a#0, e#0] 'Project [a#0, e#0]
!+- 'Join Inner, (a#0 = upper(y.d)#0) +- 'Join Inner, (upper(d)#0 = a#0)
:- Project [a#0, b#0, c#0] :- Project [a#0, b#0, c#0]
: +- LocalRelation <empty>, [a#0, b#0, c#0] : +- LocalRelation <empty>, [a#0, b#0, c#0]
! +- Project [d#0, e#0, upper(d#0) AS upper(y.d)#0] +- Project [d#0, e#0, upper(d#0) AS upper(d)#0]
+- LocalRelation <empty>, [d#0, e#0] +- LocalRelation <empty>, [d#0, e#0]
|
|
||
| test("Pull out join keys evaluation(null expressions)") { | ||
| val joinType = Inner | ||
| val udf = Coalesce(Seq("x.b".attr, "x.c".attr)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we merge this test case with the one above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seq(Upper("d".attr), Substring("d".attr, 1, 5), Coalesce(Seq("b".attr, "c".attr)))...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this test to SQLQuerySuite to test infer more IsNotNull:
spark/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Lines 4231 to 4253 in 4aebe0a
| test("SPARK-36290: Pull out join condition can infer more filter conditions") { | |
| import org.apache.spark.sql.catalyst.dsl.expressions.DslString | |
| withTable("t1", "t2") { | |
| spark.sql("CREATE TABLE t1(a int, b int) using parquet") | |
| spark.sql("CREATE TABLE t2(a string, b string, c string) using parquet") | |
| spark.sql("SELECT t1.* FROM t1 RIGHT JOIN t2 ON coalesce(t1.a, t1.b) = t2.a") | |
| .queryExecution.optimizedPlan.find(_.isInstanceOf[Filter]) match { | |
| case Some(Filter(condition, _)) => | |
| condition === IsNotNull(Coalesce(Seq("a".attr, "b".attr))) | |
| case _ => | |
| fail("It should contains Filter") | |
| } | |
| spark.sql("SELECT t1.* FROM t1 LEFT JOIN t2 ON t1.a = t2.a") | |
| .queryExecution.optimizedPlan.find(_.isInstanceOf[Filter]) match { | |
| case Some(Filter(condition, _)) => | |
| condition === IsNotNull(Cast("a".attr, IntegerType)) | |
| case _ => | |
| fail("It should contains Filter") | |
| } | |
| } |
| } | ||
|
|
||
| test("Pull out EqualNullSafe join condition") { | ||
| withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it.
| case p: BatchEvalPythonExec => p | ||
| } | ||
| assert(pythonEvals.size == 2) | ||
| assert(pythonEvals.size == 4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a big problem. Can you investigate why we run python UDF 2 more times?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is because we will infer two isnotnull(cast(pythonUDF0 as int)):
== Optimized Logical Plan ==
Project [a#225, b#226, c#236, d#237]
+- Join Inner, (CAST(udf(cast(a as string)) AS INT)#250 = CAST(udf(cast(c as string)) AS INT)#251)
:- Project [_1#220 AS a#225, _2#221 AS b#226, cast(pythonUDF0#253 as int) AS CAST(udf(cast(a as string)) AS INT)#250]
: +- BatchEvalPython [udf(cast(_1#220 as string))], [pythonUDF0#253]
: +- Project [_1#220, _2#221]
: +- Filter isnotnull(cast(pythonUDF0#252 as int))
: +- BatchEvalPython [udf(cast(_1#220 as string))], [pythonUDF0#252]
: +- LocalRelation [_1#220, _2#221]
+- Project [_1#231 AS c#236, _2#232 AS d#237, cast(pythonUDF0#255 as int) AS CAST(udf(cast(c as string)) AS INT)#251]
+- BatchEvalPython [udf(cast(_1#231 as string))], [pythonUDF0#255]
+- Project [_1#231, _2#232]
+- Filter isnotnull(cast(pythonUDF0#254 as int))
+- BatchEvalPython [udf(cast(_1#231 as string))], [pythonUDF0#254]
+- LocalRelation [_1#231, _2#232]
Before this pr:
== Optimized Logical Plan ==
Project [a#225, b#226, c#236, d#237]
+- Join Inner, (cast(pythonUDF0#250 as int) = cast(pythonUDF0#251 as int))
:- BatchEvalPython [udf(cast(a#225 as string))], [pythonUDF0#250]
: +- Project [_1#220 AS a#225, _2#221 AS b#226]
: +- LocalRelation [_1#220, _2#221]
+- BatchEvalPython [udf(cast(c#236 as string))], [pythonUDF0#251]
+- Project [_1#231 AS c#236, _2#232 AS d#237]
+- LocalRelation [_1#231, _2#232]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, so it's because filter push down can lead to extra expression evaluation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. If we disable spark.sql.constraintPropagation.enabled, the plan is:
== Optimized Logical Plan ==
Project [a#225, b#226, c#236, d#237]
+- Join Inner, (CAST(udf(cast(a as string)) AS INT)#250 = CAST(udf(cast(c as string)) AS INT)#251)
:- Project [_1#220 AS a#225, _2#221 AS b#226, cast(pythonUDF0#252 as int) AS CAST(udf(cast(a as string)) AS INT)#250]
: +- BatchEvalPython [udf(cast(_1#220 as string))], [pythonUDF0#252]
: +- LocalRelation [_1#220, _2#221]
+- Project [_1#231 AS c#236, _2#232 AS d#237, cast(pythonUDF0#253 as int) AS CAST(udf(cast(c as string)) AS INT)#251]
+- BatchEvalPython [udf(cast(_1#231 as string))], [pythonUDF0#253]
+- LocalRelation [_1#231, _2#232]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wangyum @cloud-fan Seems we can just put the rule after InferFiltersFromConstraints, and actually after the earlyScanPushDownRules. The python UDF test passed in my pr, see #36874.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think pull out has 3 advantages:
- Reduce complex join key runs from 3 to 2 for SMJ.
- Infer additional filters, sometimes can avoid data skew. For example: [SPARK-31809][SQL] Infer IsNotNull from some special equality join keys #28642
- Avoid other rules also handle this case. For example: https://github.com/apache/spark/blob/dee7396204e2f6e7346e220867953fc74cd4253d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushPartialAggregationThroughJoin.scala#L325-L327
It has two disadvantage:
- Increase complex join key runs from 1 to 2 for BHJ.
- It may increase the data size of shuffle. For example: the join key is:
concat(col1, col2, col3, col4 ...).
Personally, I think this rule is valuable. We have been using this rule for half a year.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Increase complex join key runs from 1 to 2 for BHJ.
We can check if the poll out side can be broadcast so it should not be a blocker ?
It may increase the data size of shuffle. For example: the join key is: concat(col1, col2, col3, col4 ...).
This is really a trade-off, one conservative option may be: We only poll out the complex keys which the inside attribute is not the final output. So we can avoid the extra shuffle data as far as possible, for example:
SELECT a FROM t1 JOIN t2 on t1.a = t2.x + 1;And a config should be introduced for enable or disable easily.
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #144936 has finished for PR 33522 at commit
|
| p.isInstanceOf[WholeStageCodegenExec] && | ||
| p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[BroadcastHashJoinExec]).isDefined) | ||
| val broadcastHashJoin = df.queryExecution.executedPlan.find { | ||
| case WholeStageCodegenExec(ProjectExec(_, _: BroadcastHashJoinExec)) => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering why we now need an extra project after join here? Can we remove it? The join seems not have complex key here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a cast.
== Optimized Logical Plan ==
Project [id#6L, k#2, v#3]
+- Join Inner, (id#6L = CAST(k AS BIGINT)#14L), rightHint=(strategy=broadcast)
:- Range (0, 10, step=1, splits=Some(2))
+- Project [k#2, v#3, cast(k#2 as bigint) AS CAST(k AS BIGINT)#14L]
+- Filter isnotnull(cast(k#2 as bigint))
+- LogicalRDD [k#2, v#3], false
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
Similar to
PullOutGroupingExpressions. This pr add a new rule(PullOutJoinCondition) to pull out join condition. Otherwise the expression in join condition may be evaluated three times(ShuffleExchangeExec,SortExecand the join itself). For example:Before this pr:
After this pr:
Why are the changes needed?
Improve query performance.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test and benchmark test:
Benchmark result: