Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,33 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
val output = predicate.collect { case a: Attribute => a }.distinct

withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
val query = df
.select(output.map(e => Column(e)): _*)
.where(Column(predicate))

val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation, _)) => filters
}.flatten.reduceLeftOption(_ && _)
assert(maybeAnalyzedPredicate.isDefined)

val selectedFilters = maybeAnalyzedPredicate.flatMap(DataSourceStrategy.translateFilter)
assert(selectedFilters.nonEmpty)

selectedFilters.foreach { pred =>
val maybeFilter = ParquetFilters.createFilter(df.schema, pred)
assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
maybeFilter.foreach { f =>
// Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
assert(f.getClass === filterClass)
withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Actually you can specify multiple confs within a single withSQLConf call.

But I think it's OK to leave it here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks!

val query = df
.select(output.map(e => Column(e)): _*)
.where(Column(predicate))

var maybeRelation: Option[ParquetRelation] = None
val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
case PhysicalOperation(_, filters, LogicalRelation(relation: ParquetRelation, _)) =>
maybeRelation = Some(relation)
filters
}.flatten.reduceLeftOption(_ && _)
assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query")

val (_, selectedFilters) =
DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq)
assert(selectedFilters.nonEmpty, "No filter is pushed down")

selectedFilters.foreach { pred =>
val maybeFilter = ParquetFilters.createFilter(df.schema, pred)
assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
maybeFilter.foreach { f =>
// Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
assert(f.getClass === filterClass)
}
}
checker(stripSparkFilter(query), expected)
}
checker(query, expected)
}
}

Expand Down Expand Up @@ -104,6 +110,21 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df)
}

/**
* Strip Spark-side filtering in order to check if a datasource filters rows correctly.
*/
protected def stripSparkFilter(df: DataFrame): DataFrame = {
val schema = df.schema
val childRDD = df
.queryExecution
.executedPlan.asInstanceOf[org.apache.spark.sql.execution.Filter]
.child
.execute()
.map(row => Row.fromSeq(row.toSeq(schema)))

sqlContext.createDataFrame(childRDD, schema)
}

test("filter pushdown - boolean") {
withParquetDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
Expand Down Expand Up @@ -347,19 +368,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
val df = sqlContext.read.parquet(path).filter("a = 2")

// This is the source RDD without Spark-side filtering.
val childRDD =
df
.queryExecution
.executedPlan.asInstanceOf[org.apache.spark.sql.execution.Filter]
.child
.execute()

// The result should be single row.
// When a filter is pushed to Parquet, Parquet can apply it to every row.
// So, we can check the number of rows returned from the Parquet
// to make sure our filter pushdown work.
assert(childRDD.count == 1)
assert(stripSparkFilter(df).count == 1)
}
}
}
Expand Down