@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
2222import org .apache .hadoop .conf .{Configurable , Configuration }
2323import org .apache .hadoop .io .Writable
2424import org .apache .hadoop .mapreduce .{JobContext , InputSplit , Job }
25+ import org .apache .spark .sql .catalyst .expressions .codegen .GeneratePredicate
2526
2627import parquet .hadoop .ParquetInputFormat
2728import parquet .hadoop .util .ContextUtil
@@ -31,8 +32,8 @@ import org.apache.spark.{Partition => SparkPartition, Logging}
3132import org .apache .spark .rdd .{NewHadoopPartition , RDD }
3233
3334import org .apache .spark .sql .{SQLConf , Row , SQLContext }
34- import org .apache .spark .sql .catalyst .expressions .{ SpecificMutableRow , And , Expression , Attribute }
35- import org .apache .spark .sql .catalyst .types .{IntegerType , StructField , StructType }
35+ import org .apache .spark .sql .catalyst .expressions ._
36+ import org .apache .spark .sql .catalyst .types .{StringType , IntegerType , StructField , StructType }
3637import org .apache .spark .sql .sources ._
3738
3839import scala .collection .JavaConversions ._
@@ -151,44 +152,41 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
151152 override def buildScan (output : Seq [Attribute ], predicates : Seq [Expression ]): RDD [Row ] = {
152153 // This is mostly a hack so that we can use the existing parquet filter code.
153154 val requiredColumns = output.map(_.name)
154- // TODO: Parquet filters should be based on data sources API, not catalyst expressions.
155- val filters = DataSourceStrategy .selectFilters(predicates)
156155
157156 val job = new Job (sparkContext.hadoopConfiguration)
158157 ParquetInputFormat .setReadSupportClass(job, classOf [RowReadSupport ])
159158 val jobConf : Configuration = ContextUtil .getConfiguration(job)
160159
161160 val requestedSchema = StructType (requiredColumns.map(schema(_)))
162161
163- // TODO: Make folder based partitioning a first class citizen of the Data Sources API.
164- val partitionFilters = filters.collect {
165- case e @ EqualTo (attr, value) if partitionKeys.contains(attr) =>
166- logInfo(s " Parquet scan partition filter: $attr= $value" )
167- (p : Partition ) => p.partitionValues(attr) == value
168-
169- case e @ In (attr, values) if partitionKeys.contains(attr) =>
170- logInfo(s " Parquet scan partition filter: $attr IN ${values.mkString(" {" , " ," , " }" )}" )
171- val set = values.toSet
172- (p : Partition ) => set.contains(p.partitionValues(attr))
173-
174- case e @ GreaterThan (attr, value) if partitionKeys.contains(attr) =>
175- logInfo(s " Parquet scan partition filter: $attr > $value" )
176- (p : Partition ) => p.partitionValues(attr).asInstanceOf [Int ] > value.asInstanceOf [Int ]
177-
178- case e @ GreaterThanOrEqual (attr, value) if partitionKeys.contains(attr) =>
179- logInfo(s " Parquet scan partition filter: $attr >= $value" )
180- (p : Partition ) => p.partitionValues(attr).asInstanceOf [Int ] >= value.asInstanceOf [Int ]
162+ val partitionKeySet = partitionKeys.toSet
163+ val rawPredicate =
164+ predicates
165+ .filter(_.references.map(_.name).toSet.subsetOf(partitionKeySet))
166+ .reduceOption(And )
167+ .getOrElse(Literal (true ))
168+
169+ // Translate the predicate so that it reads from the information derived from the
170+ // folder structure
171+ val castedPredicate = rawPredicate transform {
172+ case a : AttributeReference =>
173+ val idx = partitionKeys.indexWhere(a.name == _)
174+ BoundReference (idx, IntegerType , nullable = true )
175+ }
181176
182- case e @ LessThan (attr, value) if partitionKeys.contains(attr) =>
183- logInfo(s " Parquet scan partition filter: $attr < $value" )
184- (p : Partition ) => p.partitionValues(attr).asInstanceOf [Int ] < value.asInstanceOf [Int ]
177+ val inputData = new GenericMutableRow (partitionKeys.size)
178+ val pruningCondition = InterpretedPredicate (castedPredicate)
185179
186- case e @ LessThanOrEqual (attr, value) if partitionKeys.contains(attr) =>
187- logInfo(s " Parquet scan partition filter: $attr <= $value" )
188- (p : Partition ) => p.partitionValues(attr).asInstanceOf [Int ] <= value.asInstanceOf [Int ]
189- }
180+ val selectedPartitions =
181+ if (partitionKeys.nonEmpty && predicates.nonEmpty) {
182+ partitions.filter { part =>
183+ inputData(0 ) = part.partitionValues.values.head
184+ pruningCondition(inputData)
185+ }
186+ } else {
187+ partitions
188+ }
190189
191- val selectedPartitions = partitions.filter(p => partitionFilters.forall(_(p)))
192190 val fs = FileSystem .get(new java.net.URI (path), sparkContext.hadoopConfiguration)
193191 val selectedFiles = selectedPartitions.flatMap(_.files).map(f => fs.makeQualified(f.getPath))
194192 // FileInputFormat cannot handle empty lists.
0 commit comments