Skip to content

Commit ec10032

Browse files
lianchengmarmbrus
authored andcommitted
[SPARK-5465] [SQL] Fixes filter push-down for Parquet data source
Not all Catalyst filter expressions can be converted to Parquet filter predicates. We should try to convert each individual predicate and then collect those convertible ones. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4255) <!-- Reviewable:end --> Author: Cheng Lian <[email protected]> Closes #4255 from liancheng/spark-5465 and squashes the following commits: 14ccd37 [Cheng Lian] Fixes filter push-down for Parquet data source
1 parent 8cf4a1f commit ec10032

File tree

1 file changed

+10
-8
lines changed

1 file changed

+10
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,26 @@ import java.util.{List => JList}
2020

2121
import scala.collection.JavaConversions._
2222

23-
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
2423
import org.apache.hadoop.conf.{Configurable, Configuration}
24+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
2525
import org.apache.hadoop.io.Writable
26-
import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job}
27-
26+
import org.apache.hadoop.mapreduce.{InputSplit, Job, JobContext}
27+
import parquet.filter2.predicate.FilterApi
2828
import parquet.hadoop.ParquetInputFormat
2929
import parquet.hadoop.util.ContextUtil
3030

3131
import org.apache.spark.annotation.DeveloperApi
32-
import org.apache.spark.{Partition => SparkPartition, Logging}
3332
import org.apache.spark.rdd.{NewHadoopPartition, RDD}
34-
import org.apache.spark.sql.{SQLConf, Row, SQLContext}
3533
import org.apache.spark.sql.catalyst.expressions._
3634
import org.apache.spark.sql.sources._
3735
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
36+
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
37+
import org.apache.spark.{Logging, Partition => SparkPartition}
3838

3939

4040
/**
4141
* Allows creation of parquet based tables using the syntax
42-
* `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option
42+
* `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option
4343
* required is `path`, which should be the location of a collection of, optionally partitioned,
4444
* parquet files.
4545
*/
@@ -193,10 +193,12 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
193193
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, selectedFiles: _*)
194194
}
195195

196-
// Push down filters when possible
196+
// Push down filters when possible. Notice that not all filters can be converted to Parquet
197+
// filter predicate. Here we try to convert each individual predicate and only collect those
198+
// convertible ones.
197199
predicates
198-
.reduceOption(And)
199200
.flatMap(ParquetFilters.createFilter)
201+
.reduceOption(FilterApi.and)
200202
.filter(_ => sqlContext.conf.parquetFilterPushDown)
201203
.foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))
202204

0 commit comments

Comments
 (0)