From af74d517312b01c2061b719aa52f5b5bca597ae8 Mon Sep 17 00:00:00 2001 From: francis0407 Date: Tue, 9 Apr 2019 14:18:18 +0800 Subject: [PATCH 1/3] SPARK-27411: DataSourceV2Strategy should not eliminate subquery --- .../datasources/v2/DataSourceV2Strategy.scala | 8 +++++--- .../spark/sql/sources/v2/DataSourceV2Suite.scala | 10 ++++++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index f8c7e2c826a36..5a2c3d2752621 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -106,13 +106,16 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { case PhysicalOperation(project, filters, relation: DataSourceV2Relation) => val scanBuilder = relation.newScanBuilder() + val (withSubquery, withoutSubquery) = filters.partition(SubqueryExpression.hasSubquery) val normalizedFilters = DataSourceStrategy.normalizeFilters( - filters.filterNot(SubqueryExpression.hasSubquery), relation.output) + withoutSubquery, relation.output) // `pushedFilters` will be pushed down and evaluated in the underlying data sources. // `postScanFilters` need to be evaluated after the scan. // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. - val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, normalizedFilters) + val (pushedFilters, postScanFiltersWithoutSubquery) = + pushFilters(scanBuilder, normalizedFilters) + val postScanFilters = postScanFiltersWithoutSubquery ++ withSubquery val (scan, output) = pruneColumns(scanBuilder, relation, project ++ postScanFilters) logInfo( s""" @@ -121,7 +124,6 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { |Post-Scan Filters: ${postScanFilters.mkString(",")} |Output: ${output.mkString(", ")} """.stripMargin) - val plan = BatchScanExec(output, scan) val filterCondition = postScanFilters.reduceLeftOption(And) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index 587cfa9bd6647..12bf49d07a5ca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -392,6 +392,16 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-27411: DataSourceV2Strategy should not eliminate subquery") { + val t2 = spark.read.format(classOf[SimpleDataSourceV2].getName).load() + sql("create temporary view t1 (a int) using parquet") + val df = t2.where("i < (select max(a) from t1)") + val subqueries = df.queryExecution.executedPlan.collect { + case p => p.subqueries + }.flatten + assert(subqueries.length == 1) + } } From d92e309acd5f7e755d7c7c9c0b5e4498742bc2e5 Mon Sep 17 00:00:00 2001 From: francis0407 Date: Tue, 9 Apr 2019 14:25:08 +0800 Subject: [PATCH 2/3] fix --- .../sql/execution/datasources/v2/DataSourceV2Strategy.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 5a2c3d2752621..7681dc8dfb37a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -124,6 +124,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { |Post-Scan Filters: ${postScanFilters.mkString(",")} |Output: ${output.mkString(", ")} """.stripMargin) + val plan = BatchScanExec(output, scan) val filterCondition = postScanFilters.reduceLeftOption(And) From 8afc71e1b8b312fcdbffcf9bca64c06ed7213598 Mon Sep 17 00:00:00 2001 From: francis0407 Date: Tue, 9 Apr 2019 18:27:39 +0800 Subject: [PATCH 3/3] fix test --- .../sql/sources/v2/DataSourceV2Suite.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index 12bf49d07a5ca..4e071c5af6a62 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -394,13 +394,16 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { } test("SPARK-27411: DataSourceV2Strategy should not eliminate subquery") { - val t2 = spark.read.format(classOf[SimpleDataSourceV2].getName).load() - sql("create temporary view t1 (a int) using parquet") - val df = t2.where("i < (select max(a) from t1)") - val subqueries = df.queryExecution.executedPlan.collect { - case p => p.subqueries - }.flatten - assert(subqueries.length == 1) + withTempView("t1") { + val t2 = spark.read.format(classOf[SimpleDataSourceV2].getName).load() + Seq(2, 3).toDF("a").createTempView("t1") + val df = t2.where("i < (select max(a) from t1)").select('i) + val subqueries = df.queryExecution.executedPlan.collect { + case p => p.subqueries + }.flatten + assert(subqueries.length == 1) + checkAnswer(df, (0 until 3).map(i => Row(i))) + } } }