From 48b49a2e9853cc5b098aa55690745cfb1a572cf9 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Wed, 20 Apr 2022 21:40:09 -0700 Subject: [PATCH 1/2] [SPARK-38977][SQL] Fix schema pruning with correlated subqueries --- .../catalyst/expressions/SchemaPruning.scala | 4 + .../datasources/SchemaPruningSuite.scala | 102 ++++++++++++++++++ 2 files changed, 106 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala index fd5b2db61f31e..e14bcba0ace2d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala @@ -152,6 +152,10 @@ object SchemaPruning extends SQLConfHelper { RootField(field, derivedFromAtt = false, prunedIfAnyChildAccessed = true) :: Nil case IsNotNull(_: Attribute) | IsNull(_: Attribute) => expr.children.flatMap(getRootFields).map(_.copy(prunedIfAnyChildAccessed = true)) + case s: SubqueryExpression => + // use subquery references that only include outer attrs and + // ignore join conditions as those may include attributes from other tables + s.references.toSeq.flatMap(getRootFields) case _ => expr.children.flatMap(getRootFields) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala index 4eb8258830ce5..d12e1ebda75ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala @@ -935,4 +935,106 @@ abstract class SchemaPruningSuite .count() assert(count == 0) } + + testSchemaPruning("SPARK-38977: schema pruning with correlated EXISTS subquery") { + + import testImplicits._ + + withTempView("ids", "first_names") { + val df1 = Seq(1, 2, 3).toDF("value") + df1.createOrReplaceTempView("ids") + + val df2 = Seq("John", "Bob").toDF("value") + df2.createOrReplaceTempView("first_names") + + val query = sql( + s"""SELECT name FROM contacts c + |WHERE + | EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id) + | AND + | EXISTS (SELECT 1 FROM first_names n WHERE c.name.first = n.value) + |""".stripMargin) + + checkScan(query, "struct>") + + checkAnswer(query, Row(Row("John", "Y.", "Doe")) :: Nil) + } + } + + testSchemaPruning("SPARK-38977: schema pruning with correlated NOT EXISTS subquery") { + + import testImplicits._ + + withTempView("ids", "first_names") { + val df1 = Seq(1, 2, 3).toDF("value") + df1.createOrReplaceTempView("ids") + + val df2 = Seq("John", "Bob").toDF("value") + df2.createOrReplaceTempView("first_names") + + val query = sql( + s"""SELECT name FROM contacts c + |WHERE + | NOT EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id) + | AND + | NOT EXISTS (SELECT 1 FROM first_names n WHERE c.name.first = n.value) + |""".stripMargin) + + checkScan(query, "struct>") + + checkAnswer(query, Row(Row("Jane", "X.", "Doe")) :: Nil) + } + } + + testSchemaPruning("SPARK-38977: schema pruning with correlated IN subquery") { + + import testImplicits._ + + withTempView("ids", "first_names") { + val df1 = Seq(1, 2, 3).toDF("value") + df1.createOrReplaceTempView("ids") + + val df2 = Seq("John", "Bob").toDF("value") + df2.createOrReplaceTempView("first_names") + + val query = sql( + s"""SELECT name FROM contacts c + |WHERE + | id IN (SELECT * FROM ids i WHERE c.pets > i.value) + | AND + | name.first IN (SELECT * FROM first_names n WHERE c.name.last < n.value) + |""".stripMargin) + + checkScan(query, + "struct,pets:int>") + + checkAnswer(query, Row(Row("John", "Y.", "Doe")) :: Nil) + } + } + + testSchemaPruning("SPARK-38977: schema pruning with correlated NOT IN subquery") { + + import testImplicits._ + + withTempView("ids", "first_names") { + val df1 = Seq(1, 2, 3).toDF("value") + df1.createOrReplaceTempView("ids") + + val df2 = Seq("John", "Janet", "Jim", "Bob").toDF("value") + df2.createOrReplaceTempView("first_names") + + val query = sql( + s"""SELECT name FROM contacts c + |WHERE + | id NOT IN (SELECT * FROM ids i WHERE c.pets > i.value) + | AND + | name.first NOT IN (SELECT * FROM first_names n WHERE c.name.last > n.value) + |""".stripMargin) + + checkScan(query, + "struct,pets:int>") + + checkAnswer(query, Row(Row("Jane", "X.", "Doe")) :: Nil) + } + } } From 011039ef4e72c83ad8cf92061151778675be1db5 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Fri, 22 Apr 2022 09:42:19 -0700 Subject: [PATCH 2/2] Review --- .../datasources/SchemaPruningSuite.scala | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala index d12e1ebda75ff..becace3c69b98 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala @@ -948,12 +948,12 @@ abstract class SchemaPruningSuite df2.createOrReplaceTempView("first_names") val query = sql( - s"""SELECT name FROM contacts c - |WHERE - | EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id) - | AND - | EXISTS (SELECT 1 FROM first_names n WHERE c.name.first = n.value) - |""".stripMargin) + """SELECT name FROM contacts c + |WHERE + | EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id) + | AND + | EXISTS (SELECT 1 FROM first_names n WHERE c.name.first = n.value) + |""".stripMargin) checkScan(query, "struct>") @@ -973,12 +973,12 @@ abstract class SchemaPruningSuite df2.createOrReplaceTempView("first_names") val query = sql( - s"""SELECT name FROM contacts c - |WHERE - | NOT EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id) - | AND - | NOT EXISTS (SELECT 1 FROM first_names n WHERE c.name.first = n.value) - |""".stripMargin) + """SELECT name FROM contacts c + |WHERE + | NOT EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id) + | AND + | NOT EXISTS (SELECT 1 FROM first_names n WHERE c.name.first = n.value) + |""".stripMargin) checkScan(query, "struct>") @@ -998,12 +998,12 @@ abstract class SchemaPruningSuite df2.createOrReplaceTempView("first_names") val query = sql( - s"""SELECT name FROM contacts c - |WHERE - | id IN (SELECT * FROM ids i WHERE c.pets > i.value) - | AND - | name.first IN (SELECT * FROM first_names n WHERE c.name.last < n.value) - |""".stripMargin) + """SELECT name FROM contacts c + |WHERE + | id IN (SELECT * FROM ids i WHERE c.pets > i.value) + | AND + | name.first IN (SELECT * FROM first_names n WHERE c.name.last < n.value) + |""".stripMargin) checkScan(query, "struct,pets:int>") @@ -1024,12 +1024,12 @@ abstract class SchemaPruningSuite df2.createOrReplaceTempView("first_names") val query = sql( - s"""SELECT name FROM contacts c - |WHERE - | id NOT IN (SELECT * FROM ids i WHERE c.pets > i.value) - | AND - | name.first NOT IN (SELECT * FROM first_names n WHERE c.name.last > n.value) - |""".stripMargin) + """SELECT name FROM contacts c + |WHERE + | id NOT IN (SELECT * FROM ids i WHERE c.pets > i.value) + | AND + | name.first NOT IN (SELECT * FROM first_names n WHERE c.name.last > n.value) + |""".stripMargin) checkScan(query, "struct,pets:int>")