-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-38977][SQL] Fix schema pruning with correlated subqueries #36303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -935,4 +935,106 @@ abstract class SchemaPruningSuite | |
| .count() | ||
| assert(count == 0) | ||
| } | ||
|
|
||
| testSchemaPruning("SPARK-38977: schema pruning with correlated EXISTS subquery") { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All of these queries would previously fail for V2 tables.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this bug only happens for v2 tables, not file source tables?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess it will fail for both as the same method is used. Tests cover V1 and V2 so it should work for both now. |
||
|
|
||
| import testImplicits._ | ||
|
|
||
| withTempView("ids", "first_names") { | ||
| val df1 = Seq(1, 2, 3).toDF("value") | ||
| df1.createOrReplaceTempView("ids") | ||
|
|
||
| val df2 = Seq("John", "Bob").toDF("value") | ||
| df2.createOrReplaceTempView("first_names") | ||
|
|
||
| val query = sql( | ||
| """SELECT name FROM contacts c | ||
| |WHERE | ||
| | EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id) | ||
| | AND | ||
| | EXISTS (SELECT 1 FROM first_names n WHERE c.name.first = n.value) | ||
| |""".stripMargin) | ||
|
|
||
| checkScan(query, "struct<id:int,name:struct<first:string,middle:string,last:string>>") | ||
|
|
||
| checkAnswer(query, Row(Row("John", "Y.", "Doe")) :: Nil) | ||
| } | ||
| } | ||
|
|
||
| testSchemaPruning("SPARK-38977: schema pruning with correlated NOT EXISTS subquery") { | ||
|
|
||
| import testImplicits._ | ||
|
|
||
| withTempView("ids", "first_names") { | ||
| val df1 = Seq(1, 2, 3).toDF("value") | ||
| df1.createOrReplaceTempView("ids") | ||
|
|
||
| val df2 = Seq("John", "Bob").toDF("value") | ||
| df2.createOrReplaceTempView("first_names") | ||
|
|
||
| val query = sql( | ||
| """SELECT name FROM contacts c | ||
| |WHERE | ||
| | NOT EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id) | ||
| | AND | ||
| | NOT EXISTS (SELECT 1 FROM first_names n WHERE c.name.first = n.value) | ||
| |""".stripMargin) | ||
|
|
||
| checkScan(query, "struct<id:int,name:struct<first:string,middle:string,last:string>>") | ||
|
|
||
| checkAnswer(query, Row(Row("Jane", "X.", "Doe")) :: Nil) | ||
| } | ||
| } | ||
|
|
||
| testSchemaPruning("SPARK-38977: schema pruning with correlated IN subquery") { | ||
|
|
||
| import testImplicits._ | ||
|
|
||
| withTempView("ids", "first_names") { | ||
| val df1 = Seq(1, 2, 3).toDF("value") | ||
| df1.createOrReplaceTempView("ids") | ||
|
|
||
| val df2 = Seq("John", "Bob").toDF("value") | ||
| df2.createOrReplaceTempView("first_names") | ||
|
|
||
| val query = sql( | ||
| """SELECT name FROM contacts c | ||
| |WHERE | ||
| | id IN (SELECT * FROM ids i WHERE c.pets > i.value) | ||
| | AND | ||
| | name.first IN (SELECT * FROM first_names n WHERE c.name.last < n.value) | ||
| |""".stripMargin) | ||
|
|
||
| checkScan(query, | ||
| "struct<id:int,name:struct<first:string,middle:string,last:string>,pets:int>") | ||
|
|
||
| checkAnswer(query, Row(Row("John", "Y.", "Doe")) :: Nil) | ||
| } | ||
| } | ||
|
|
||
| testSchemaPruning("SPARK-38977: schema pruning with correlated NOT IN subquery") { | ||
|
|
||
| import testImplicits._ | ||
|
|
||
| withTempView("ids", "first_names") { | ||
| val df1 = Seq(1, 2, 3).toDF("value") | ||
| df1.createOrReplaceTempView("ids") | ||
|
|
||
| val df2 = Seq("John", "Janet", "Jim", "Bob").toDF("value") | ||
| df2.createOrReplaceTempView("first_names") | ||
|
|
||
| val query = sql( | ||
| """SELECT name FROM contacts c | ||
| |WHERE | ||
| | id NOT IN (SELECT * FROM ids i WHERE c.pets > i.value) | ||
| | AND | ||
| | name.first NOT IN (SELECT * FROM first_names n WHERE c.name.last > n.value) | ||
| |""".stripMargin) | ||
|
|
||
| checkScan(query, | ||
| "struct<id:int,name:struct<first:string,middle:string,last:string>,pets:int>") | ||
|
|
||
| checkAnswer(query, Row(Row("Jane", "X.", "Doe")) :: Nil) | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially, I tried another approach. I was passing
AttributeSetwith table attributes and checking above if an attribute belongs to the table output. However, that required changing many places. This change is much smaller. Let me know if there are cases when this will not work.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change looks reasonable to me. I am not aware of cases when this will not work. Let's wait for feedback from others.