Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,16 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
val scanBuilder = relation.newScanBuilder()

val (withSubquery, withoutSubquery) = filters.partition(SubqueryExpression.hasSubquery)
val normalizedFilters = DataSourceStrategy.normalizeFilters(
filters.filterNot(SubqueryExpression.hasSubquery), relation.output)
withoutSubquery, relation.output)

// `pushedFilters` will be pushed down and evaluated in the underlying data sources.
// `postScanFilters` need to be evaluated after the scan.
// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter.
val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, normalizedFilters)
val (pushedFilters, postScanFiltersWithoutSubquery) =
pushFilters(scanBuilder, normalizedFilters)
val postScanFilters = postScanFiltersWithoutSubquery ++ withSubquery
val (scan, output) = pruneColumns(scanBuilder, relation, project ++ postScanFilters)
logInfo(
s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,19 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-27411: DataSourceV2Strategy should not eliminate subquery") {
withTempView("t1") {
val t2 = spark.read.format(classOf[SimpleDataSourceV2].getName).load()
Seq(2, 3).toDF("a").createTempView("t1")
val df = t2.where("i < (select max(a) from t1)").select('i)
val subqueries = df.queryExecution.executedPlan.collect {
case p => p.subqueries
}.flatten
assert(subqueries.length == 1)
checkAnswer(df, (0 until 3).map(i => Row(i)))
}
}
}


Expand Down