Skip to content

Commit 0c9947d

Browse files
aokolnychyiviirya
authored andcommitted
[SPARK-38977][SQL] Fix schema pruning with correlated subqueries
### What changes were proposed in this pull request? This PR fixes schema pruning for queries with multiple correlated subqueries. Previously, Spark would throw an exception trying to determine root fields in `SchemaPruning$identifyRootFields`. That was happening because expressions in predicates that referenced attributes in subqueries were not ignored. That's why attributes from multiple subqueries could conflict with each other (e.g. incompatible types) even though they should be ignored. For instance, the following query would throw a runtime exception. ``` SELECT name FROM contacts c WHERE EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id) AND EXISTS (SELECT 1 FROM first_names n WHERE c.name.first = n.value) ``` ``` [info] org.apache.spark.SparkException: Failed to merge fields 'value' and 'value'. Failed to merge incompatible data types int and string [info] at org.apache.spark.sql.errors.QueryExecutionErrors$.failedMergingFieldsError(QueryExecutionErrors.scala:936) ``` ### Why are the changes needed? These changes are needed to avoid exceptions for some queries with multiple correlated subqueries. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This PR comes with tests. Closes #36303 from aokolnychyi/spark-38977. Authored-by: Anton Okolnychyi <[email protected]> Signed-off-by: Liang-Chi Hsieh <[email protected]>
1 parent 41c4f91 commit 0c9947d

File tree

2 files changed

+106
-0
lines changed

2 files changed

+106
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,10 @@ object SchemaPruning extends SQLConfHelper {
152152
RootField(field, derivedFromAtt = false, prunedIfAnyChildAccessed = true) :: Nil
153153
case IsNotNull(_: Attribute) | IsNull(_: Attribute) =>
154154
expr.children.flatMap(getRootFields).map(_.copy(prunedIfAnyChildAccessed = true))
155+
case s: SubqueryExpression =>
156+
// use subquery references that only include outer attrs and
157+
// ignore join conditions as those may include attributes from other tables
158+
s.references.toSeq.flatMap(getRootFields)
155159
case _ =>
156160
expr.children.flatMap(getRootFields)
157161
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -935,4 +935,106 @@ abstract class SchemaPruningSuite
935935
.count()
936936
assert(count == 0)
937937
}
938+
939+
testSchemaPruning("SPARK-38977: schema pruning with correlated EXISTS subquery") {
940+
941+
import testImplicits._
942+
943+
withTempView("ids", "first_names") {
944+
val df1 = Seq(1, 2, 3).toDF("value")
945+
df1.createOrReplaceTempView("ids")
946+
947+
val df2 = Seq("John", "Bob").toDF("value")
948+
df2.createOrReplaceTempView("first_names")
949+
950+
val query = sql(
951+
"""SELECT name FROM contacts c
952+
|WHERE
953+
| EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id)
954+
| AND
955+
| EXISTS (SELECT 1 FROM first_names n WHERE c.name.first = n.value)
956+
|""".stripMargin)
957+
958+
checkScan(query, "struct<id:int,name:struct<first:string,middle:string,last:string>>")
959+
960+
checkAnswer(query, Row(Row("John", "Y.", "Doe")) :: Nil)
961+
}
962+
}
963+
964+
testSchemaPruning("SPARK-38977: schema pruning with correlated NOT EXISTS subquery") {
965+
966+
import testImplicits._
967+
968+
withTempView("ids", "first_names") {
969+
val df1 = Seq(1, 2, 3).toDF("value")
970+
df1.createOrReplaceTempView("ids")
971+
972+
val df2 = Seq("John", "Bob").toDF("value")
973+
df2.createOrReplaceTempView("first_names")
974+
975+
val query = sql(
976+
"""SELECT name FROM contacts c
977+
|WHERE
978+
| NOT EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id)
979+
| AND
980+
| NOT EXISTS (SELECT 1 FROM first_names n WHERE c.name.first = n.value)
981+
|""".stripMargin)
982+
983+
checkScan(query, "struct<id:int,name:struct<first:string,middle:string,last:string>>")
984+
985+
checkAnswer(query, Row(Row("Jane", "X.", "Doe")) :: Nil)
986+
}
987+
}
988+
989+
testSchemaPruning("SPARK-38977: schema pruning with correlated IN subquery") {
990+
991+
import testImplicits._
992+
993+
withTempView("ids", "first_names") {
994+
val df1 = Seq(1, 2, 3).toDF("value")
995+
df1.createOrReplaceTempView("ids")
996+
997+
val df2 = Seq("John", "Bob").toDF("value")
998+
df2.createOrReplaceTempView("first_names")
999+
1000+
val query = sql(
1001+
"""SELECT name FROM contacts c
1002+
|WHERE
1003+
| id IN (SELECT * FROM ids i WHERE c.pets > i.value)
1004+
| AND
1005+
| name.first IN (SELECT * FROM first_names n WHERE c.name.last < n.value)
1006+
|""".stripMargin)
1007+
1008+
checkScan(query,
1009+
"struct<id:int,name:struct<first:string,middle:string,last:string>,pets:int>")
1010+
1011+
checkAnswer(query, Row(Row("John", "Y.", "Doe")) :: Nil)
1012+
}
1013+
}
1014+
1015+
testSchemaPruning("SPARK-38977: schema pruning with correlated NOT IN subquery") {
1016+
1017+
import testImplicits._
1018+
1019+
withTempView("ids", "first_names") {
1020+
val df1 = Seq(1, 2, 3).toDF("value")
1021+
df1.createOrReplaceTempView("ids")
1022+
1023+
val df2 = Seq("John", "Janet", "Jim", "Bob").toDF("value")
1024+
df2.createOrReplaceTempView("first_names")
1025+
1026+
val query = sql(
1027+
"""SELECT name FROM contacts c
1028+
|WHERE
1029+
| id NOT IN (SELECT * FROM ids i WHERE c.pets > i.value)
1030+
| AND
1031+
| name.first NOT IN (SELECT * FROM first_names n WHERE c.name.last > n.value)
1032+
|""".stripMargin)
1033+
1034+
checkScan(query,
1035+
"struct<id:int,name:struct<first:string,middle:string,last:string>,pets:int>")
1036+
1037+
checkAnswer(query, Row(Row("Jane", "X.", "Doe")) :: Nil)
1038+
}
1039+
}
9381040
}

0 commit comments

Comments
 (0)