-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-42660][SQL] Infer filters for Join produced by IN and EXISTS clause (RewritePredicateSubquery rule) #40266
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @cloud-fan |
|
cc: @wangyum @peter-toth |
|
This change makes sense to me and new plans look ok to me. |
|
I had a change like this before: #22778. |
Ah ok, thanks @wangyum! It looks like the very same discussuion has come up before: #22778 (comment) |
|
@wangyum @peter-toth Thanks for pointing on previous attempts. It does seem moving In this pr #17520, they tried to put RewritePredicateSubquery right after If this seems right to you guys, I can update this PR to move |
|
Looks like there are a few failures after moving the rule (22e7886). @mskapilks, do you think you can look into those failures? |
Yup I am working on them. I had wrong SPARK_HOME setup so missed the plan changes |
| @@ -1158,12 +1158,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan | |||
| var joinExec = assertJoin(( | |||
| "select * from testData where key not in (select a from testData2)", | |||
| classOf[BroadcastHashJoinExec])) | |||
| assert(joinExec.asInstanceOf[BroadcastHashJoinExec].isNullAwareAntiJoin) | |||
| assert(!joinExec.asInstanceOf[BroadcastHashJoinExec].isNullAwareAntiJoin) | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two queries don't need NWAJ now due to more inferred filters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please ellaborate on this a bit more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plan for this query before this change:
Join LeftAnti, ((key#13 = a#23) OR isnull((key#13 = a#23)))
:- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14]
: +- ExternalRDD [obj#12]
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23]
+- ExternalRDD [obj#22]
New plan
Join LeftAnti, (key#13 = a#23)
:- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14]
: +- ExternalRDD [obj#12]
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23]
+- ExternalRDD [obj#22]
isnull((key#13 = a#23)) condition got removed by NullPropagation rule (as now all optimization rules will run after subquery rewrite).
So now the join does get convert to Null Aware Anti Join as that's only happens when condition like previous plan exists. LeftAnti(condition: Or(EqualTo(a=b), IsNull(EqualTo(a=b))) Code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, then I think we need to fix the test query (and not the expected result) as not in can't be rewritten to a simple (not null-aware) BroadcastHashJoinExec if we don't know the key's and a's nullability. I think the problem here is that we use TestData and TestData2 where key and a are Ints and not Integers.
| +- * ColumnarToRow (42) | ||
| +- Scan parquet spark_catalog.default.customer_demographics (41) | ||
| +- * Filter (46) | ||
| +- * SortMergeJoin ExistenceJoin(exists#1) (45) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems pushdown is not happening? Need to check this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems PushLeftSemiLeftAntiThroughJoin PushDownLeftSemiAntiJoin doesn't consider ExistenceJoin. Might need to update these rules or do predicate pushdown before subquery rewrite (this may not be ideal)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filter (46)'s condition is exists#2 OR exists#1, that can't be pushed down. But that's ok as it is basically the same as the old Filter (30) was.
In the new plan the order of joins are a bit different, but I'm not sure the new plan would be worse. Actually we have 3 SMJ + 5 BHJ now whereas we had 4 + 4...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please run a TPCDS benchmark to make sure we don't introduce performance regression?
|
More failures. Seems this might take real effort to make it work like other rules modifications. |
Why was the latest commit (b1ed7be) needed? |
|
@mskapilks, do you have any update on this? I can take over this PR and investigate the idea further if you don't have time for it. |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |


What changes were proposed in this pull request?
We should run
InferFiltersFromConstraintsagain after runningRewritePredicateSubqueryrule.RewritePredicateSubqueryrewrite IN and EXISTS queries to LEFT SEMI/LEFT ANTI joins. But we don't infer filters for these newly generated joins. We noticed in TPCH 1TB q21 by inferring filter for these new joins, onelineitemtable scan can be reduced asReusedExchangegot introduce. Previously due to mismatch in filter predicates reuse was not happening.Why are the changes needed?
Can improve query performance.
Does this PR introduce any user-facing change?
No
How was this patch tested?
PlanStability test