Skip to content

Commit 14cf753

Browse files
yhuailiancheng
authored andcommitted
[SPARK-11661][SQL] Still pushdown filters returned by unhandledFilters.
https://issues.apache.org/jira/browse/SPARK-11661 Author: Yin Huai <[email protected]> Closes #9634 from yhuai/unhandledFilters.
1 parent e2957bc commit 14cf753

File tree

5 files changed

+71
-24
lines changed

5 files changed

+71
-24
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -453,8 +453,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
453453
*
454454
* @return A pair of `Seq[Expression]` and `Seq[Filter]`. The first element contains all Catalyst
455455
* predicate [[Expression]]s that are either not convertible or cannot be handled by
456-
* `relation`. The second element contains all converted data source [[Filter]]s that can
457-
* be handled by `relation`.
456+
* `relation`. The second element contains all converted data source [[Filter]]s that
457+
* will be pushed down to the data source.
458458
*/
459459
protected[sql] def selectFilters(
460460
relation: BaseRelation,
@@ -476,7 +476,9 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
476476
// Catalyst predicate expressions that cannot be translated to data source filters.
477477
val unrecognizedPredicates = predicates.filterNot(translatedMap.contains)
478478

479-
// Data source filters that cannot be handled by `relation`
479+
// Data source filters that cannot be handled by `relation`. The semantic of a unhandled filter
480+
// at here is that a data source may not be able to apply this filter to every row
481+
// of the underlying dataset.
480482
val unhandledFilters = relation.unhandledFilters(translatedMap.values.toArray).toSet
481483

482484
val (unhandled, handled) = translated.partition {
@@ -491,6 +493,11 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
491493
// Translated data source filters that can be handled by `relation`
492494
val (_, handledFilters) = handled.unzip
493495

494-
(unrecognizedPredicates ++ unhandledPredicates, handledFilters)
496+
// translated contains all filters that have been converted to the public Filter interface.
497+
// We should always push them to the data source no matter whether the data source can apply
498+
// a filter to every row or not.
499+
val (_, translatedFilters) = translated.unzip
500+
501+
(unrecognizedPredicates ++ unhandledPredicates, translatedFilters)
495502
}
496503
}

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -235,9 +235,11 @@ abstract class BaseRelation {
235235
def needConversion: Boolean = true
236236

237237
/**
238-
* Given an array of [[Filter]]s, returns an array of [[Filter]]s that this data source relation
239-
* cannot handle. Spark SQL will apply all returned [[Filter]]s against rows returned by this
240-
* data source relation.
238+
* Returns the list of [[Filter]]s that this datasource may not be able to handle.
239+
* These returned [[Filter]]s will be evaluated by Spark SQL after data is output by a scan.
240+
* By default, this function will return all filters, as it is always safe to
241+
* double evaluate a [[Filter]]. However, specific implementations can override this function to
242+
* avoid double filtering when they are capable of processing a filter internally.
241243
*
242244
* @since 1.6.0
243245
*/

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,4 +336,29 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
336336
}
337337
}
338338
}
339+
340+
test("SPARK-11661 Still pushdown filters returned by unhandledFilters") {
341+
import testImplicits._
342+
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
343+
withTempPath { dir =>
344+
val path = s"${dir.getCanonicalPath}/part=1"
345+
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
346+
val df = sqlContext.read.parquet(path).filter("a = 2")
347+
348+
// This is the source RDD without Spark-side filtering.
349+
val childRDD =
350+
df
351+
.queryExecution
352+
.executedPlan.asInstanceOf[org.apache.spark.sql.execution.Filter]
353+
.child
354+
.execute()
355+
356+
// The result should be single row.
357+
// When a filter is pushed to Parquet, Parquet can apply it to every row.
358+
// So, we can check the number of rows returned from the Parquet
359+
// to make sure our filter pushdown work.
360+
assert(childRDD.count == 1)
361+
}
362+
}
363+
}
339364
}

sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,11 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
254254
testPushDown("SELECT * FROM oneToTenFiltered WHERE a IN (1,3,5)", 3, Set("a", "b", "c"))
255255

256256
testPushDown("SELECT * FROM oneToTenFiltered WHERE a = 20", 0, Set("a", "b", "c"))
257-
testPushDown("SELECT * FROM oneToTenFiltered WHERE b = 1", 10, Set("a", "b", "c"))
257+
testPushDown(
258+
"SELECT * FROM oneToTenFiltered WHERE b = 1",
259+
10,
260+
Set("a", "b", "c"),
261+
Set(EqualTo("b", 1)))
258262

259263
testPushDown("SELECT * FROM oneToTenFiltered WHERE a < 5 AND a > 1", 3, Set("a", "b", "c"))
260264
testPushDown("SELECT * FROM oneToTenFiltered WHERE a < 3 OR a > 8", 4, Set("a", "b", "c"))
@@ -283,12 +287,23 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
283287
| WHERE a + b > 9
284288
| AND b < 16
285289
| AND c IN ('bbbbbBBBBB', 'cccccCCCCC', 'dddddDDDDD', 'foo')
286-
""".stripMargin.split("\n").map(_.trim).mkString(" "), 3, Set("a", "b"))
290+
""".stripMargin.split("\n").map(_.trim).mkString(" "),
291+
3,
292+
Set("a", "b"),
293+
Set(LessThan("b", 16)))
287294

288295
def testPushDown(
289-
sqlString: String,
290-
expectedCount: Int,
291-
requiredColumnNames: Set[String]): Unit = {
296+
sqlString: String,
297+
expectedCount: Int,
298+
requiredColumnNames: Set[String]): Unit = {
299+
testPushDown(sqlString, expectedCount, requiredColumnNames, Set.empty[Filter])
300+
}
301+
302+
def testPushDown(
303+
sqlString: String,
304+
expectedCount: Int,
305+
requiredColumnNames: Set[String],
306+
expectedUnhandledFilters: Set[Filter]): Unit = {
292307
test(s"PushDown Returns $expectedCount: $sqlString") {
293308
val queryExecution = sql(sqlString).queryExecution
294309
val rawPlan = queryExecution.executedPlan.collect {
@@ -300,15 +315,13 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
300315
val rawCount = rawPlan.execute().count()
301316
assert(ColumnsRequired.set === requiredColumnNames)
302317

303-
assert {
304-
val table = caseInsensitiveContext.table("oneToTenFiltered")
305-
val relation = table.queryExecution.logical.collectFirst {
306-
case LogicalRelation(r, _) => r
307-
}.get
318+
val table = caseInsensitiveContext.table("oneToTenFiltered")
319+
val relation = table.queryExecution.logical.collectFirst {
320+
case LogicalRelation(r, _) => r
321+
}.get
308322

309-
// `relation` should be able to handle all pushed filters
310-
relation.unhandledFilters(FiltersPushed.list.toArray).isEmpty
311-
}
323+
assert(
324+
relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters)
312325

313326
if (rawCount != expectedCount) {
314327
fail(

sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
248248
projections = Seq('c, 'p),
249249
filter = 'a < 3 && 'p > 0,
250250
requiredColumns = Seq("c", "a"),
251-
pushedFilters = Nil,
251+
pushedFilters = Seq(LessThan("a", 3)),
252252
inconvertibleFilters = Nil,
253253
unhandledFilters = Seq('a < 3),
254254
partitioningFilters = Seq('p > 0)
@@ -327,7 +327,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
327327
projections = Seq('b, 'p),
328328
filter = 'c > "val_7" && 'b < 18 && 'p > 0,
329329
requiredColumns = Seq("b"),
330-
pushedFilters = Seq(GreaterThan("c", "val_7")),
330+
pushedFilters = Seq(GreaterThan("c", "val_7"), LessThan("b", 18)),
331331
inconvertibleFilters = Nil,
332332
unhandledFilters = Seq('b < 18),
333333
partitioningFilters = Seq('p > 0)
@@ -344,7 +344,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
344344
projections = Seq('b, 'p),
345345
filter = 'a % 2 === 0 && 'c > "val_7" && 'b < 18 && 'p > 0,
346346
requiredColumns = Seq("b", "a"),
347-
pushedFilters = Seq(GreaterThan("c", "val_7")),
347+
pushedFilters = Seq(GreaterThan("c", "val_7"), LessThan("b", 18)),
348348
inconvertibleFilters = Seq('a % 2 === 0),
349349
unhandledFilters = Seq('b < 18),
350350
partitioningFilters = Seq('p > 0)
@@ -361,7 +361,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
361361
projections = Seq('b, 'p),
362362
filter = 'a > 7 && 'a < 9,
363363
requiredColumns = Seq("b", "a"),
364-
pushedFilters = Seq(GreaterThan("a", 7)),
364+
pushedFilters = Seq(GreaterThan("a", 7), LessThan("a", 9)),
365365
inconvertibleFilters = Nil,
366366
unhandledFilters = Seq('a < 9),
367367
partitioningFilters = Nil

0 commit comments

Comments
 (0)