From 93e819221801c15acdef389899b6c37bf43a375f Mon Sep 17 00:00:00 2001 From: Andre Schumacher Date: Mon, 21 Apr 2014 18:04:07 +0300 Subject: [PATCH 01/13] First commit Parquet record filtering --- .../catalyst/expressions/BoundAttribute.scala | 2 +- .../sql/catalyst/expressions/predicates.scala | 6 + .../spark/sql/catalyst/types/dataTypes.scala | 10 +- .../spark/sql/execution/SparkStrategies.scala | 9 +- .../spark/sql/parquet/ParquetFilters.scala | 129 ++++++++++++++++++ .../spark/sql/parquet/ParquetRelation.scala | 14 +- .../sql/parquet/ParquetTableOperations.scala | 43 ++++-- .../spark/sql/parquet/ParquetTestData.scala | 10 +- .../spark/sql/parquet/ParquetQuerySuite.scala | 6 +- 9 files changed, 204 insertions(+), 25 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala index 4ebf6c4584b94..40e7174fa8004 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala @@ -45,7 +45,7 @@ case class BoundReference(ordinal: Int, baseReference: Attribute) override def toString = s"$baseReference:$ordinal" - override def eval(input: Row): Any = input(ordinal) + override def eval(input: Row): Any = if (input != null) input(ordinal) else null } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index d111578530506..442b4959a8401 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -174,6 +174,12 @@ case class GreaterThanOrEqual(left: Expression, right: Expression) extends Binar override def eval(input: Row): Any = c2(input, left, right, _.gteq(_, _)) } +// A simple filter condition on a single column +/*case class ColumnFilterPredicate(val comparison: BinaryComparison) extends BinaryComparison { + override def eval(input: Row): Any = comparison.eval(input) + +} */ + case class If(predicate: Expression, trueValue: Expression, falseValue: Expression) extends Expression { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index da34bd3a21503..dd0dc8c7b51c4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -19,10 +19,13 @@ package org.apache.spark.sql.catalyst.types import java.sql.Timestamp -import scala.reflect.runtime.universe.{typeTag, TypeTag} +import scala.reflect.ClassTag +import scala.reflect.runtime.universe.{typeTag, TypeTag, runtimeMirror} import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.util.Utils + abstract class DataType { /** Matches any expression that evaluates to this DataType */ def unapply(a: Expression): Boolean = a match { @@ -37,6 +40,11 @@ abstract class NativeType extends DataType { type JvmType @transient val tag: TypeTag[JvmType] val ordering: Ordering[JvmType] + + @transient val classTag = { + val mirror = runtimeMirror(Utils.getSparkClassLoader) + ClassTag[JvmType](mirror.runtimeClass(tag.tpe)) + } } case object StringType extends NativeType { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index f763106da4e0e..8ee142f27cccc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -141,11 +141,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) => InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil case PhysicalOperation(projectList, filters, relation: ParquetRelation) => - // TODO: Should be pushing down filters as well. - pruneFilterProject( - projectList, - filters, - ParquetTableScan(_, relation, None)(sparkContext)) :: Nil + pruneFilterProject( + projectList, + filters, + ParquetTableScan(_, relation, Some(filters))(sparkContext)) :: Nil case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala new file mode 100644 index 0000000000000..d50a9ac2eb3e9 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import parquet.filter._ +import parquet.column.ColumnReader +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.Equals +import org.apache.spark.sql.execution.SparkSqlSerializer +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.catalyst.types.{IntegerType, BooleanType, NativeType} +import scala.reflect.runtime.universe.{typeTag, TypeTag} +import scala.reflect.ClassTag +import com.google.common.io.BaseEncoding +import parquet.filter +import parquet.filter.ColumnPredicates.BooleanPredicateFunction + +// Implicits +import collection.JavaConversions._ + +object ParquetFilters { + val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter" + + def createFilter(filterExpressions: Seq[Expression]): UnboundRecordFilter = { + def createEqualityFilter(name: String, literal: Literal) = literal.dataType match { + case BooleanType => new ComparisonFilter(name, literal.value.asInstanceOf[Boolean]) + case IntegerType => new ComparisonFilter(name, _ == literal.value.asInstanceOf[Int]) + } + + val filters: Seq[UnboundRecordFilter] = filterExpressions.map { + case Equals(left: Literal, right: NamedExpression) => { + val name: String = right.name + createEqualityFilter(name, left) + } + case Equals(left: NamedExpression, right: Literal) => { + val name: String = left.name + createEqualityFilter(name, right) + } + } + + if (filters.length > 0) filters.reduce(AndRecordFilter.and) else null + } + + // Note: Inside the Hadoop API we only have access to `Configuration`, not to + // [[SparkContext]], so we cannot use broadcasts to convey the actual filter + // predicate. + def serializeFilterExpressions(filters: Seq[Expression], conf: Configuration): Unit = { + val serialized: Array[Byte] = SparkSqlSerializer.serialize(filters) + val encoded: String = BaseEncoding.base64().encode(serialized) + conf.set(PARQUET_FILTER_DATA, encoded) + } + + // Note: Inside the Hadoop API we only have access to `Configuration`, not to + // [[SparkContext]], so we cannot use broadcasts to convey the actual filter + // predicate. + def deserializeFilterExpressions(conf: Configuration): Option[Seq[Expression]] = { + val data = conf.get(PARQUET_FILTER_DATA) + if (data != null) { + val decoded: Array[Byte] = BaseEncoding.base64().decode(data) + Some(SparkSqlSerializer.deserialize(decoded)) + } else { + None + } + } +} + +class ComparisonFilter( + private val columnName: String, + private var filter: UnboundRecordFilter) + extends UnboundRecordFilter { + def this(columnName: String, value: Boolean) = + this( + columnName, + ColumnRecordFilter.column( + columnName, + ColumnPredicates.applyFunctionToBoolean( + new ColumnPredicates.BooleanPredicateFunction { + def functionToApply(input: Boolean): Boolean = input == value + }))) + def this(columnName: String, func: Int => Boolean) = + this( + columnName, + ColumnRecordFilter.column( + columnName, + ColumnPredicates.applyFunctionToInteger( + new ColumnPredicates.IntegerPredicateFunction { + def functionToApply(input: Int) = if (input != null) func(input) else false + }))) + override def bind(readers: java.lang.Iterable[ColumnReader]): RecordFilter = { + filter.bind(readers) + } +} + +/*class EqualityFilter( + private val columnName: String, + private var filter: UnboundRecordFilter) + extends UnboundRecordFilter { + def this(columnName: String, value: Boolean) = + this(columnName, ColumnRecordFilter.column(columnName, ColumnPredicates.equalTo(value))) + def this(columnName: String, value: Int) = + this(columnName, ColumnRecordFilter.column(columnName, ColumnPredicates.equalTo(value))) + def this(columnName: String, value: Long) = + this(columnName, ColumnRecordFilter.column(columnName, ColumnPredicates.equalTo(value))) + def this(columnName: String, value: Double) = + this(columnName, ColumnRecordFilter.column(columnName, ColumnPredicates.equalTo(value))) + def this(columnName: String, value: Float) = + this(columnName, ColumnRecordFilter.column(columnName, ColumnPredicates.equalTo(value))) + def this(columnName: String, value: String) = + this(columnName, ColumnRecordFilter.column(columnName, ColumnPredicates.equalTo(value))) + override def bind(readers: java.lang.Iterable[ColumnReader]): RecordFilter = { + filter.bind(readers) + } +}*/ + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 32813a66de3c3..3b9ae2c67805f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -33,9 +33,11 @@ import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeNa import parquet.schema.Type.Repetition import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException} -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode} import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.catalyst.types.ArrayType +import org.apache.spark.sql.catalyst.expressions.AttributeReference // Implicits import scala.collection.JavaConversions._ @@ -144,6 +146,16 @@ private[sql] object ParquetRelation { new ParquetRelation(path.toString) } + def checkPredicatePushdownPossible(filters: Seq[Expression]): Boolean = { + def checkFeasible(left: Expression, right: Expression) = + left.isInstanceOf[Literal] || right.isInstanceOf[Literal] + filters.forall { + case Equals(left, right) => checkFeasible(left, right) + case LessThan(left, right) => checkFeasible(left, right) + case _ => false + } + } + private def checkPath(pathStr: String, allowExisting: Boolean, conf: Configuration): Path = { if (pathStr == null) { throw new IllegalArgumentException("Unable to create ParquetRelation: path is null") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index f825ca3c028ef..85768ab0209f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -27,15 +27,17 @@ import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat, FileOutputCommitter} -import parquet.hadoop.{ParquetInputFormat, ParquetOutputFormat} +import parquet.hadoop.{ParquetRecordReader, BadConfigurationException, ParquetInputFormat, ParquetOutputFormat} import parquet.hadoop.util.ContextUtil import parquet.io.InvalidRecordException import parquet.schema.MessageType import org.apache.spark.{SerializableWritable, SparkContext, TaskContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row} +import org.apache.spark.sql.catalyst.expressions.{BinaryComparison, Attribute, Expression, Row} import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode} +import parquet.filter.UnboundRecordFilter +import parquet.hadoop.api.ReadSupport /** * Parquet table scan operator. Imports the file that backs the given @@ -46,7 +48,7 @@ case class ParquetTableScan( // https://issues.apache.org/jira/browse/SPARK-1367 output: Seq[Attribute], relation: ParquetRelation, - columnPruningPred: Option[Expression])( + columnPruningPred: Option[Seq[Expression]])( @transient val sc: SparkContext) extends LeafNode { @@ -62,17 +64,21 @@ case class ParquetTableScan( for (path <- fileList if !path.getName.startsWith("_")) { NewFileInputFormat.addInputPath(job, path) } + + // Store Parquet schema in `Configuration` conf.set( RowReadSupport.PARQUET_ROW_REQUESTED_SCHEMA, ParquetTypesConverter.convertFromAttributes(output).toString) - // TODO: think about adding record filters - /* Comments regarding record filters: it would be nice to push down as much filtering - to Parquet as possible. However, currently it seems we cannot pass enough information - to materialize an (arbitrary) Catalyst [[Predicate]] inside Parquet's - ``FilteredRecordReader`` (via Configuration, for example). Simple - filter-rows-by-column-values however should be supported. - */ - sc.newAPIHadoopRDD(conf, classOf[ParquetInputFormat[Row]], classOf[Void], classOf[Row]) + + // Store record filtering predicate in `Configuration` + // Note: the input format ignores all predicates that cannot be expressed + // as simple column predicate filters in Parquet. Here we just record + // the whole pruning predicate. + if (columnPruningPred.isDefined) { + ParquetFilters.serializeFilterExpressions(columnPruningPred.get, conf) + } + + sc.newAPIHadoopRDD(conf, classOf[org.apache.spark.sql.parquet.FilteringParquetRowInputFormat], classOf[Void], classOf[Row]) .map(_._2) } @@ -262,6 +268,21 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int) } } +// We extend ParquetInputFormat in order to have more control over which +// RecordFilter we want to use +private[parquet] class FilteringParquetRowInputFormat extends parquet.hadoop.ParquetInputFormat[Row] { + override def createRecordReader(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): RecordReader[Void, Row] = { + val readSupport: ReadSupport[Row] = new RowReadSupport() + + val filterExpressions = ParquetFilters.deserializeFilterExpressions(ContextUtil.getConfiguration(taskAttemptContext)) + if (filterExpressions.isDefined) { + new ParquetRecordReader[Row](readSupport, ParquetFilters.createFilter(filterExpressions.get)) + } else { + new ParquetRecordReader[Row](readSupport) + } + } +} + private[parquet] object FileSystemHelper { def listFiles(pathStr: String, conf: Configuration): Seq[Path] = { val origPath = new Path(pathStr) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala index f37976f7313c1..25e5085722a3d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala @@ -43,7 +43,7 @@ private[sql] object ParquetTestData { // field names for test assertion error messages val testSchemaFieldNames = Seq( "myboolean:Boolean", - "mtint:Int", + "myint:Int", "mystring:String", "mylong:Long", "myfloat:Float", @@ -85,11 +85,11 @@ private[sql] object ParquetTestData { } else { data.update(0, false) } - if (i % 5 == 0) { - data.update(1, 5) - } else { + //if (i % 5 == 0) { + // data.update(1, 5) + //} else { data.update(1, null) // optional - } + //} data.update(2, "abc") data.update(3, i.toLong << 33) data.update(4, 2.5F) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index ff1677eb8a480..fb75c6953075d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -196,7 +196,6 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll { assert(true) } - test("insert (appending) to same table via Scala API") { sql("INSERT INTO testsource SELECT * FROM testsource").collect() val double_rdd = sql("SELECT * FROM testsource").collect() @@ -239,5 +238,10 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll { Utils.deleteRecursively(file) assert(true) } + + test("SELECT WHERE") { + val result = sql("SELECT * FROM testsource WHERE myint = 5").collect() + assert(result != null) + } } From 6d22666a801dac936311b4f6dbda615f35c250a8 Mon Sep 17 00:00:00 2001 From: Andre Schumacher Date: Tue, 22 Apr 2014 17:15:49 +0300 Subject: [PATCH 02/13] Extending ParquetFilters --- .../sql/catalyst/expressions/predicates.scala | 6 - .../spark/sql/parquet/ParquetFilters.scala | 220 +++++++++++++----- .../spark/sql/parquet/ParquetTestData.scala | 67 ++++-- .../spark/sql/parquet/ParquetQuerySuite.scala | 13 ++ 4 files changed, 230 insertions(+), 76 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 442b4959a8401..d111578530506 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -174,12 +174,6 @@ case class GreaterThanOrEqual(left: Expression, right: Expression) extends Binar override def eval(input: Row): Any = c2(input, left, right, _.gteq(_, _)) } -// A simple filter condition on a single column -/*case class ColumnFilterPredicate(val comparison: BinaryComparison) extends BinaryComparison { - override def eval(input: Row): Any = comparison.eval(input) - -} */ - case class If(predicate: Expression, trueValue: Expression, falseValue: Expression) extends Expression { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala index d50a9ac2eb3e9..084bc6e89ae7f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala @@ -17,42 +17,125 @@ package org.apache.spark.sql.parquet +import org.apache.hadoop.conf.Configuration + import parquet.filter._ +import parquet.filter.ColumnPredicates._ import parquet.column.ColumnReader -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.Equals -import org.apache.spark.sql.execution.SparkSqlSerializer -import org.apache.hadoop.conf.Configuration -import org.apache.spark.sql.catalyst.types.{IntegerType, BooleanType, NativeType} -import scala.reflect.runtime.universe.{typeTag, TypeTag} -import scala.reflect.ClassTag + import com.google.common.io.BaseEncoding -import parquet.filter -import parquet.filter.ColumnPredicates.BooleanPredicateFunction -// Implicits -import collection.JavaConversions._ +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.SparkSqlSerializer object ParquetFilters { val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter" def createFilter(filterExpressions: Seq[Expression]): UnboundRecordFilter = { def createEqualityFilter(name: String, literal: Literal) = literal.dataType match { - case BooleanType => new ComparisonFilter(name, literal.value.asInstanceOf[Boolean]) - case IntegerType => new ComparisonFilter(name, _ == literal.value.asInstanceOf[Int]) + case BooleanType => + ComparisonFilter.createBooleanFilter(name, literal.value.asInstanceOf[Boolean]) + case IntegerType => + ComparisonFilter.createIntFilter(name, (x: Int) => x == literal.value.asInstanceOf[Int]) + case LongType => + ComparisonFilter.createLongFilter(name, (x: Long) => x == literal.value.asInstanceOf[Long]) + case DoubleType => + ComparisonFilter.createDoubleFilter( + name, + (x: Double) => x == literal.value.asInstanceOf[Double]) + case FloatType => + ComparisonFilter.createFloatFilter( + name, + (x: Float) => x == literal.value.asInstanceOf[Float]) + case StringType => + ComparisonFilter.createStringFilter(name, literal.value.asInstanceOf[String]) } - - val filters: Seq[UnboundRecordFilter] = filterExpressions.map { - case Equals(left: Literal, right: NamedExpression) => { - val name: String = right.name - createEqualityFilter(name, left) - } - case Equals(left: NamedExpression, right: Literal) => { - val name: String = left.name - createEqualityFilter(name, right) - } + def createLessThanFilter(name: String, literal: Literal) = literal.dataType match { + case IntegerType => + ComparisonFilter.createIntFilter(name, (x: Int) => x < literal.value.asInstanceOf[Int]) + case LongType => + ComparisonFilter.createLongFilter(name, (x: Long) => x < literal.value.asInstanceOf[Long]) + case DoubleType => + ComparisonFilter.createDoubleFilter( + name, + (x: Double) => x < literal.value.asInstanceOf[Double]) + case FloatType => + ComparisonFilter.createFloatFilter( + name, + (x: Float) => x < literal.value.asInstanceOf[Float]) } - + def createLessThanOrEqualFilter(name: String, literal: Literal) = literal.dataType match { + case IntegerType => + ComparisonFilter.createIntFilter(name, (x: Int) => x <= literal.value.asInstanceOf[Int]) + case LongType => + ComparisonFilter.createLongFilter(name, (x: Long) => x <= literal.value.asInstanceOf[Long]) + case DoubleType => + ComparisonFilter.createDoubleFilter( + name, + (x: Double) => x <= literal.value.asInstanceOf[Double]) + case FloatType => + ComparisonFilter.createFloatFilter( + name, + (x: Float) => x <= literal.value.asInstanceOf[Float]) + } + // TODO: combine these two types somehow? + def createGreaterThanFilter(name: String, literal: Literal) = literal.dataType match { + case IntegerType => + ComparisonFilter.createIntFilter(name, (x: Int) => x > literal.value.asInstanceOf[Int]) + case LongType => + ComparisonFilter.createLongFilter(name, (x: Long) => x > literal.value.asInstanceOf[Long]) + case DoubleType => + ComparisonFilter.createDoubleFilter( + name, + (x: Double) => x > literal.value.asInstanceOf[Double]) + case FloatType => + ComparisonFilter.createFloatFilter( + name, + (x: Float) => x > literal.value.asInstanceOf[Float]) + } + def createGreaterThanOrEqualFilter(name: String, literal: Literal) = literal.dataType match { + case IntegerType => + ComparisonFilter.createIntFilter(name, (x: Int) => x >= literal.value.asInstanceOf[Int]) + case LongType => + ComparisonFilter.createLongFilter(name, (x: Long) => x >= literal.value.asInstanceOf[Long]) + case DoubleType => + ComparisonFilter.createDoubleFilter( + name, + (x: Double) => x >= literal.value.asInstanceOf[Double]) + case FloatType => + ComparisonFilter.createFloatFilter( + name, + (x: Float) => x >= literal.value.asInstanceOf[Float]) + } + // TODO: can we actually rely on the predicate being normalized as in expression < literal? + // That would simplify this pattern matching + // TODO: we currently only filter on non-nullable (Parquet REQUIRED) attributes until + // https://github.com/Parquet/parquet-mr/issues/371 + // has been resolved + val filters: Seq[UnboundRecordFilter] = filterExpressions.collect { + case Equals(left: Literal, right: NamedExpression) if !right.nullable => + createEqualityFilter(right.name, left) + case Equals(left: NamedExpression, right: Literal) if !left.nullable => + createEqualityFilter(left.name, right) + case LessThan(left: Literal, right: NamedExpression) if !right.nullable => + createLessThanFilter(right.name, left) + case LessThan(left: NamedExpression, right: Literal) if !left.nullable => + createLessThanFilter(left.name, right) + case LessThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable => + createLessThanOrEqualFilter(right.name, left) + case LessThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable => + createLessThanOrEqualFilter(left.name, right) + case GreaterThan(left: Literal, right: NamedExpression) if !right.nullable => + createGreaterThanFilter(right.name, left) + case GreaterThan(left: NamedExpression, right: Literal) if !left.nullable => + createGreaterThanFilter(left.name, right) + case GreaterThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable => + createGreaterThanOrEqualFilter(right.name, left) + case GreaterThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable => + createGreaterThanOrEqualFilter(left.name, right) + } + // TODO: How about disjunctions? (Or-ed) if (filters.length > 0) filters.reduce(AndRecordFilter.and) else null } @@ -83,47 +166,72 @@ class ComparisonFilter( private val columnName: String, private var filter: UnboundRecordFilter) extends UnboundRecordFilter { - def this(columnName: String, value: Boolean) = - this( + override def bind(readers: java.lang.Iterable[ColumnReader]): RecordFilter = { + filter.bind(readers) + } +} + +object ComparisonFilter { + def createBooleanFilter(columnName: String, value: Boolean): UnboundRecordFilter = + new ComparisonFilter( columnName, ColumnRecordFilter.column( columnName, ColumnPredicates.applyFunctionToBoolean( - new ColumnPredicates.BooleanPredicateFunction { + new BooleanPredicateFunction { def functionToApply(input: Boolean): Boolean = input == value - }))) - def this(columnName: String, func: Int => Boolean) = - this( + } + ))) + def createStringFilter(columnName: String, value: String): UnboundRecordFilter = + new ComparisonFilter( + columnName, + ColumnRecordFilter.column( + columnName, + ColumnPredicates.applyFunctionToString ( + new ColumnPredicates.PredicateFunction[String] { + def functionToApply(input: String): Boolean = input == value + } + ))) + def createIntFilter(columnName: String, func: Int => Boolean): UnboundRecordFilter = + new ComparisonFilter( columnName, ColumnRecordFilter.column( columnName, ColumnPredicates.applyFunctionToInteger( - new ColumnPredicates.IntegerPredicateFunction { - def functionToApply(input: Int) = if (input != null) func(input) else false - }))) - override def bind(readers: java.lang.Iterable[ColumnReader]): RecordFilter = { - filter.bind(readers) - } + new IntegerPredicateFunction { + def functionToApply(input: Int) = func(input) + } + ))) + def createLongFilter(columnName: String, func: Long => Boolean): UnboundRecordFilter = + new ComparisonFilter( + columnName, + ColumnRecordFilter.column( + columnName, + ColumnPredicates.applyFunctionToLong( + new LongPredicateFunction { + def functionToApply(input: Long) = func(input) + } + ))) + def createDoubleFilter(columnName: String, func: Double => Boolean): UnboundRecordFilter = + new ComparisonFilter( + columnName, + ColumnRecordFilter.column( + columnName, + ColumnPredicates.applyFunctionToDouble( + new DoublePredicateFunction { + def functionToApply(input: Double) = func(input) + } + ))) + def createFloatFilter(columnName: String, func: Float => Boolean): UnboundRecordFilter = + new ComparisonFilter( + columnName, + ColumnRecordFilter.column( + columnName, + ColumnPredicates.applyFunctionToFloat( + new FloatPredicateFunction { + def functionToApply(input: Float) = func(input) + } + ))) } -/*class EqualityFilter( - private val columnName: String, - private var filter: UnboundRecordFilter) - extends UnboundRecordFilter { - def this(columnName: String, value: Boolean) = - this(columnName, ColumnRecordFilter.column(columnName, ColumnPredicates.equalTo(value))) - def this(columnName: String, value: Int) = - this(columnName, ColumnRecordFilter.column(columnName, ColumnPredicates.equalTo(value))) - def this(columnName: String, value: Long) = - this(columnName, ColumnRecordFilter.column(columnName, ColumnPredicates.equalTo(value))) - def this(columnName: String, value: Double) = - this(columnName, ColumnRecordFilter.column(columnName, ColumnPredicates.equalTo(value))) - def this(columnName: String, value: Float) = - this(columnName, ColumnRecordFilter.column(columnName, ColumnPredicates.equalTo(value))) - def this(columnName: String, value: String) = - this(columnName, ColumnRecordFilter.column(columnName, ColumnPredicates.equalTo(value))) - override def bind(readers: java.lang.Iterable[ColumnReader]): RecordFilter = { - filter.bind(readers) - } -}*/ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala index 25e5085722a3d..33a31935e3cb7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala @@ -27,6 +27,29 @@ import parquet.schema.{MessageType, MessageTypeParser} import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.util.Utils +import parquet.hadoop.metadata.CompressionCodecName +import parquet.hadoop.api.WriteSupport +import parquet.example.data.{GroupWriter, Group} +import parquet.io.api.RecordConsumer +import parquet.hadoop.api.WriteSupport.WriteContext +import parquet.example.data.simple.SimpleGroup + +// Write support class for nested groups: +// ParquetWriter initializes GroupWriteSupport with an empty configuration +// (it is after all not intended to be used in this way?) +// and members are private so we need to make our own +private class TestGroupWriteSupport(schema: MessageType) extends WriteSupport[Group] { + var groupWriter: GroupWriter = null + override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { + groupWriter = new GroupWriter(recordConsumer, schema) + } + override def init(configuration: Configuration): WriteContext = { + new WriteContext(schema, new java.util.HashMap[String, String]()) + } + override def write(record: Group) { + groupWriter.write(record) + } +} private[sql] object ParquetTestData { @@ -75,26 +98,42 @@ private[sql] object ParquetTestData { val configuration: Configuration = ContextUtil.getConfiguration(job) val schema: MessageType = MessageTypeParser.parseMessageType(testSchema) - val writeSupport = new RowWriteSupport() - writeSupport.setSchema(schema, configuration) - val writer = new ParquetWriter(path, writeSupport) + //val writeSupport = new MutableRowWriteSupport() + //writeSupport.setSchema(schema, configuration) + //val writer = new ParquetWriter(path, writeSupport) + val writeSupport = new TestGroupWriteSupport(schema) + //val writer = //new ParquetWriter[Group](path, writeSupport) + val writer = new ParquetWriter[Group](path, writeSupport) + for(i <- 0 until 15) { - val data = new Array[Any](6) + val record = new SimpleGroup(schema) + //val data = new Array[Any](6) if (i % 3 == 0) { - data.update(0, true) + //data.update(0, true) + record.add(0, true) } else { - data.update(0, false) + //data.update(0, false) + record.add(0, false) } - //if (i % 5 == 0) { + if (i % 5 == 0) { + record.add(1, 5) // data.update(1, 5) - //} else { - data.update(1, null) // optional + } else { + if (i % 5 == 1) record.add(1, 4) + } + //else { + // data.update(1, null) // optional //} - data.update(2, "abc") - data.update(3, i.toLong << 33) - data.update(4, 2.5F) - data.update(5, 4.5D) - writer.write(new GenericRow(data.toArray)) + //data.update(2, "abc") + record.add(2, "abc") + //data.update(3, i.toLong << 33) + record.add(3, i.toLong << 33) + //data.update(4, 2.5F) + record.add(4, 2.5F) + //data.update(5, 4.5D) + record.add(5, 4.5D) + //writer.write(new GenericRow(data.toArray)) + writer.write(record) } writer.close() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index fb75c6953075d..49734dc0d225f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -241,7 +241,20 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll { test("SELECT WHERE") { val result = sql("SELECT * FROM testsource WHERE myint = 5").collect() + /*test("SELECT WHERE") { + //val result = parquetFile("/home/andre/input.adam").registerAsTable("adamtable") + //sql("SELECT * FROM adamtable WHERE mapq = 0").collect() + //assert(result != null) + //val result = sql("SELECT * FROM testsource WHERE myint = 5").collect() + // TODO: ADD larger case SchemaRDD with filtering on REQUIRED field! + implicit def anyToRow(value: Any): Row = value.asInstanceOf[Row] + TestSQLContext + .parquetFile(ParquetTestData.testNestedDir1.toString) + .toSchemaRDD.registerAsTable("xtmptable") + val result = sql("SELECT * FROM xtmptable WHERE owner = \"Julien Le Dem\"").collect() +>>>>>>> Extending ParquetFilters assert(result != null) + }*/ } } From a93a588b45384e9823c5fa8a6f9574ec3d6f1fe1 Mon Sep 17 00:00:00 2001 From: Andre Schumacher Date: Tue, 22 Apr 2014 18:16:34 +0300 Subject: [PATCH 03/13] Adding unit test for filtering --- .../sql/parquet/ParquetTableOperations.scala | 22 +++++-- .../spark/sql/parquet/ParquetTestData.scala | 66 ++++++++++++------- .../spark/sql/parquet/ParquetQuerySuite.scala | 27 ++++++-- 3 files changed, 78 insertions(+), 37 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 85768ab0209f3..bd397c3c944a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -32,7 +32,7 @@ import parquet.hadoop.util.ContextUtil import parquet.io.InvalidRecordException import parquet.schema.MessageType -import org.apache.spark.{SerializableWritable, SparkContext, TaskContext} +import org.apache.spark.{Logging, SerializableWritable, SparkContext, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{BinaryComparison, Attribute, Expression, Row} import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode} @@ -78,8 +78,13 @@ case class ParquetTableScan( ParquetFilters.serializeFilterExpressions(columnPruningPred.get, conf) } - sc.newAPIHadoopRDD(conf, classOf[org.apache.spark.sql.parquet.FilteringParquetRowInputFormat], classOf[Void], classOf[Row]) - .map(_._2) + sc.newAPIHadoopRDD( + conf, + classOf[org.apache.spark.sql.parquet.FilteringParquetRowInputFormat], + classOf[Void], + classOf[Row]) + .map(_._2) + .filter(_ != null) // Parquet's record filters may produce null values } override def otherCopyArgs = sc :: Nil @@ -270,12 +275,17 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int) // We extend ParquetInputFormat in order to have more control over which // RecordFilter we want to use -private[parquet] class FilteringParquetRowInputFormat extends parquet.hadoop.ParquetInputFormat[Row] { - override def createRecordReader(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): RecordReader[Void, Row] = { +private[parquet] class FilteringParquetRowInputFormat + extends parquet.hadoop.ParquetInputFormat[Row] with Logging { + override def createRecordReader( + inputSplit: InputSplit, + taskAttemptContext: TaskAttemptContext): RecordReader[Void, Row] = { val readSupport: ReadSupport[Row] = new RowReadSupport() - val filterExpressions = ParquetFilters.deserializeFilterExpressions(ContextUtil.getConfiguration(taskAttemptContext)) + val filterExpressions = + ParquetFilters.deserializeFilterExpressions(ContextUtil.getConfiguration(taskAttemptContext)) if (filterExpressions.isDefined) { + logInfo(s"Pushing down predicates for RecordFilter: ${filterExpressions.mkString(", ")}") new ParquetRecordReader[Row](readSupport, ParquetFilters.createFilter(filterExpressions.get)) } else { new ParquetRecordReader[Row](readSupport) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala index 33a31935e3cb7..2f690198fc784 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala @@ -34,10 +34,10 @@ import parquet.io.api.RecordConsumer import parquet.hadoop.api.WriteSupport.WriteContext import parquet.example.data.simple.SimpleGroup -// Write support class for nested groups: -// ParquetWriter initializes GroupWriteSupport with an empty configuration -// (it is after all not intended to be used in this way?) -// and members are private so we need to make our own +// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport +// with an empty configuration (it is after all not intended to be used in this way?) +// and members are private so we need to make our own in order to pass the schema +// to the writer. private class TestGroupWriteSupport(schema: MessageType) extends WriteSupport[Group] { var groupWriter: GroupWriter = null override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { @@ -81,6 +81,18 @@ private[sql] object ParquetTestData { |} """.stripMargin + val testFilterSchema = + """ + |message myrecord { + |required boolean myboolean; + |required int32 myint; + |required binary mystring; + |required int64 mylong; + |required float myfloat; + |required double mydouble; + |} + """.stripMargin + // field names for test assertion error messages val subTestSchemaFieldNames = Seq( "myboolean:Boolean", @@ -88,51 +100,55 @@ private[sql] object ParquetTestData { ) val testDir = Utils.createTempDir() + val testFilterDir = Utils.createTempDir() lazy val testData = new ParquetRelation(testDir.toURI.toString) def writeFile() = { testDir.delete val path: Path = new Path(new Path(testDir.toURI), new Path("part-r-0.parquet")) - val job = new Job() - val configuration: Configuration = ContextUtil.getConfiguration(job) val schema: MessageType = MessageTypeParser.parseMessageType(testSchema) - - //val writeSupport = new MutableRowWriteSupport() - //writeSupport.setSchema(schema, configuration) - //val writer = new ParquetWriter(path, writeSupport) val writeSupport = new TestGroupWriteSupport(schema) - //val writer = //new ParquetWriter[Group](path, writeSupport) val writer = new ParquetWriter[Group](path, writeSupport) for(i <- 0 until 15) { val record = new SimpleGroup(schema) - //val data = new Array[Any](6) if (i % 3 == 0) { - //data.update(0, true) record.add(0, true) } else { - //data.update(0, false) record.add(0, false) } if (i % 5 == 0) { record.add(1, 5) - // data.update(1, 5) - } else { - if (i % 5 == 1) record.add(1, 4) } - //else { - // data.update(1, null) // optional - //} - //data.update(2, "abc") record.add(2, "abc") - //data.update(3, i.toLong << 33) record.add(3, i.toLong << 33) - //data.update(4, 2.5F) record.add(4, 2.5F) - //data.update(5, 4.5D) record.add(5, 4.5D) - //writer.write(new GenericRow(data.toArray)) + writer.write(record) + } + writer.close() + } + + def writeFilterFile() = { + testFilterDir.delete + val path: Path = new Path(new Path(testFilterDir.toURI), new Path("part-r-0.parquet")) + val schema: MessageType = MessageTypeParser.parseMessageType(testFilterSchema) + val writeSupport = new TestGroupWriteSupport(schema) + val writer = new ParquetWriter[Group](path, writeSupport) + + for(i <- 0 to 200) { + val record = new SimpleGroup(schema) + if (i % 4 == 0) { + record.add(0, true) + } else { + record.add(0, false) + } + record.add(1, i) + record.add(2, i.toString) + record.add(3, i.toLong) + record.add(4, i.toFloat + 0.5f) + record.add(5, i.toDouble + 0.5d) writer.write(record) } writer.close() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 49734dc0d225f..7c7c80f60bd28 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -17,25 +17,22 @@ package org.apache.spark.sql.parquet -import java.io.File - import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.mapreduce.Job import parquet.hadoop.ParquetFileWriter -import parquet.schema.MessageTypeParser import parquet.hadoop.util.ContextUtil +import parquet.schema.MessageTypeParser import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util.getTempFilePath -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row} +import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.TestData +import org.apache.spark.sql.SchemaRDD import org.apache.spark.util.Utils -import org.apache.spark.sql.catalyst.types.{StringType, IntegerType, DataType} -import org.apache.spark.sql.{parquet, SchemaRDD} // Implicits import org.apache.spark.sql.test.TestSQLContext._ @@ -64,12 +61,16 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll { override def beforeAll() { ParquetTestData.writeFile() + ParquetTestData.writeFilterFile() testRDD = parquetFile(ParquetTestData.testDir.toString) testRDD.registerAsTable("testsource") + parquetFile(ParquetTestData.testFilterDir.toString) + .registerAsTable("testfiltersource") } override def afterAll() { Utils.deleteRecursively(ParquetTestData.testDir) + Utils.deleteRecursively(ParquetTestData.testFilterDir) // here we should also unregister the table?? } @@ -256,5 +257,19 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll { assert(result != null) }*/ } + + test("test filter by predicate pushdown") { + for(myval <- Seq("myint", "mylong", "mydouble", "myfloat")) { + println(s"testing field $myval") + val result1 = sql(s"SELECT * FROM testfiltersource WHERE $myval < 150 AND $myval >= 100").collect() + assert(result1.size === 50) + val result2 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 150 AND $myval <= 200").collect() + assert(result2.size === 50) + } + val booleanResult = sql(s"SELECT * FROM testfiltersource WHERE myboolean = true AND myint < 40").collect() + assert(booleanResult.size === 10) + val stringResult = sql("SELECT * FROM testfiltersource WHERE mystring = \"100\"").collect() + assert(stringResult.size === 1) + } } From 210e9cbacf744bc0086acc0bbc2e9173311bf3af Mon Sep 17 00:00:00 2001 From: Andre Schumacher Date: Tue, 22 Apr 2014 20:13:00 +0300 Subject: [PATCH 04/13] Adding disjunctive filter predicates --- .../sql/catalyst/expressions/BoundAttribute.scala | 2 +- .../apache/spark/sql/parquet/ParquetFilters.scala | 13 ++++++++++++- .../apache/spark/sql/parquet/ParquetRelation.scala | 10 ---------- .../spark/sql/parquet/ParquetQuerySuite.scala | 8 ++++++++ 4 files changed, 21 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala index 40e7174fa8004..4ebf6c4584b94 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala @@ -45,7 +45,7 @@ case class BoundReference(ordinal: Int, baseReference: Attribute) override def toString = s"$baseReference:$ordinal" - override def eval(input: Row): Any = if (input != null) input(ordinal) else null + override def eval(input: Row): Any = input(ordinal) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala index 084bc6e89ae7f..3de074c1ecfe6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala @@ -114,6 +114,18 @@ object ParquetFilters { // https://github.com/Parquet/parquet-mr/issues/371 // has been resolved val filters: Seq[UnboundRecordFilter] = filterExpressions.collect { + case Or(left: Expression, right: Expression) + if createFilter(Seq(left)) != null && createFilter(Seq(right)) != null => { + // Note: if either side of this Or-predicate is empty then this means + // it contains a more complex comparison than between attribute and literal + // (e.g., it contained a CAST). The only safe thing to do is then to disregard + // this disjunction, which could be contained in a conjunction. If it stands + // alone then it is also safe to drop it, since a Null return value of this + // function is interpreted as having no filters at all. + val leftFilter = createFilter(Seq(left)) + val rightFilter = createFilter(Seq(right)) + OrRecordFilter.or(leftFilter, rightFilter) + } case Equals(left: Literal, right: NamedExpression) if !right.nullable => createEqualityFilter(right.name, left) case Equals(left: NamedExpression, right: Literal) if !left.nullable => @@ -135,7 +147,6 @@ object ParquetFilters { case GreaterThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable => createGreaterThanOrEqualFilter(left.name, right) } - // TODO: How about disjunctions? (Or-ed) if (filters.length > 0) filters.reduce(AndRecordFilter.and) else null } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 3b9ae2c67805f..01823bb9b26f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -146,16 +146,6 @@ private[sql] object ParquetRelation { new ParquetRelation(path.toString) } - def checkPredicatePushdownPossible(filters: Seq[Expression]): Boolean = { - def checkFeasible(left: Expression, right: Expression) = - left.isInstanceOf[Literal] || right.isInstanceOf[Literal] - filters.forall { - case Equals(left, right) => checkFeasible(left, right) - case LessThan(left, right) => checkFeasible(left, right) - case _ => false - } - } - private def checkPath(pathStr: String, allowExisting: Boolean, conf: Configuration): Path = { if (pathStr == null) { throw new IllegalArgumentException("Unable to create ParquetRelation: path is null") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 7c7c80f60bd28..789023edf37e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -266,6 +266,14 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll { val result2 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 150 AND $myval <= 200").collect() assert(result2.size === 50) } + for(myval <- Seq("myint", "mylong")) { + val result3 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 190 OR $myval < 10").collect() + assert(result3.size === 20) + } + for(myval <- Seq("mydouble", "myfloat")) { + val result3 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 190.5 OR $myval < 10").collect() + assert(result3.size === 20) + } val booleanResult = sql(s"SELECT * FROM testfiltersource WHERE myboolean = true AND myint < 40").collect() assert(booleanResult.size === 10) val stringResult = sql("SELECT * FROM testfiltersource WHERE mystring = \"100\"").collect() From f0ad3cf4265f3a61b63e1a8ca107c69a3f1c7e6c Mon Sep 17 00:00:00 2001 From: Andre Schumacher Date: Tue, 22 Apr 2014 20:22:27 +0300 Subject: [PATCH 05/13] Undoing changes not needed for this PR --- .../apache/spark/sql/catalyst/types/dataTypes.scala | 10 +--------- .../apache/spark/sql/execution/SparkStrategies.scala | 5 +++++ .../org/apache/spark/sql/parquet/ParquetRelation.scala | 4 +--- 3 files changed, 7 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index dd0dc8c7b51c4..da34bd3a21503 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -19,13 +19,10 @@ package org.apache.spark.sql.catalyst.types import java.sql.Timestamp -import scala.reflect.ClassTag -import scala.reflect.runtime.universe.{typeTag, TypeTag, runtimeMirror} +import scala.reflect.runtime.universe.{typeTag, TypeTag} import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.util.Utils - abstract class DataType { /** Matches any expression that evaluates to this DataType */ def unapply(a: Expression): Boolean = a match { @@ -40,11 +37,6 @@ abstract class NativeType extends DataType { type JvmType @transient val tag: TypeTag[JvmType] val ordering: Ordering[JvmType] - - @transient val classTag = { - val mirror = runtimeMirror(Utils.getSparkClassLoader) - ClassTag[JvmType](mirror.runtimeClass(tag.tpe)) - } } case object StringType extends NativeType { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 8ee142f27cccc..c76a1bc5ced6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -141,6 +141,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) => InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil case PhysicalOperation(projectList, filters, relation: ParquetRelation) => + // Note: we do not actually remove the filters that were pushed down to Parquet from + // the plan, in case that some of the predicates cannot be evaluated there because + // they contain complex operations, such as CASTs. + // TODO: rethink whether conjuntions that are handed down to Parquet should be removed + // from the list of higher-level filters. pruneFilterProject( projectList, filters, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 01823bb9b26f5..32813a66de3c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -33,11 +33,9 @@ import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeNa import parquet.schema.Type.Repetition import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException} -import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode} import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.catalyst.types.ArrayType -import org.apache.spark.sql.catalyst.expressions.AttributeReference // Implicits import scala.collection.JavaConversions._ From 85fea2df5a8912afe082e25cbc0b89690f0af7b2 Mon Sep 17 00:00:00 2001 From: Andre Schumacher Date: Wed, 23 Apr 2014 17:45:03 +0300 Subject: [PATCH 06/13] Adding SparkConf setting to disable filter predicate pushdown --- .../org/apache/spark/sql/parquet/ParquetFilters.scala | 4 ++++ .../apache/spark/sql/parquet/ParquetTableOperations.scala | 7 +++++-- .../org/apache/spark/sql/parquet/ParquetTestData.scala | 5 +++-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala index 3de074c1ecfe6..6b26d56c0e1bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala @@ -31,6 +31,10 @@ import org.apache.spark.sql.execution.SparkSqlSerializer object ParquetFilters { val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter" + // set this to false if pushdown should be disabled + // Note: prefix is "spark.hadoop." so that it will be copied from SparkConf + // to Hadoop configuration + val PARQUET_FILTER_PUSHDOWN_ENABLED = "org.apache.spark.sql.parquet.filter.pushdown" def createFilter(filterExpressions: Seq[Expression]): UnboundRecordFilter = { def createEqualityFilter(name: String, literal: Literal) = literal.dataType match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index bd397c3c944a3..26e108c473a4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -71,10 +71,13 @@ case class ParquetTableScan( ParquetTypesConverter.convertFromAttributes(output).toString) // Store record filtering predicate in `Configuration` - // Note: the input format ignores all predicates that cannot be expressed + // Note 1: the input format ignores all predicates that cannot be expressed // as simple column predicate filters in Parquet. Here we just record // the whole pruning predicate. - if (columnPruningPred.isDefined) { + // Note 2: you can disable filter predicate pushdown by setting + // "org.apache.spark.sql.parquet.filter.pushdown" to false inside SparkConf. + if (columnPruningPred.isDefined && + sc.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) { ParquetFilters.serializeFilterExpressions(columnPruningPred.get, conf) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala index 2f690198fc784..10a4bc85e72ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala @@ -130,14 +130,15 @@ private[sql] object ParquetTestData { writer.close() } - def writeFilterFile() = { + def writeFilterFile(records: Int = 200) = { + // for microbenchmark use: records = 300000000 testFilterDir.delete val path: Path = new Path(new Path(testFilterDir.toURI), new Path("part-r-0.parquet")) val schema: MessageType = MessageTypeParser.parseMessageType(testFilterSchema) val writeSupport = new TestGroupWriteSupport(schema) val writer = new ParquetWriter[Group](path, writeSupport) - for(i <- 0 to 200) { + for(i <- 0 to records) { val record = new SimpleGroup(schema) if (i % 4 == 0) { record.add(0, true) From b0f7806e40aba4ab0f663792a1464c03d8f5dffe Mon Sep 17 00:00:00 2001 From: Andre Schumacher Date: Wed, 23 Apr 2014 20:50:23 +0300 Subject: [PATCH 07/13] Optimizing imports in ParquetTestData --- .../apache/spark/sql/parquet/ParquetTestData.scala | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala index 10a4bc85e72ed..46c7172985642 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala @@ -19,20 +19,16 @@ package org.apache.spark.sql.parquet import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapreduce.Job +import parquet.example.data.{GroupWriter, Group} +import parquet.example.data.simple.SimpleGroup import parquet.hadoop.ParquetWriter -import parquet.hadoop.util.ContextUtil +import parquet.hadoop.api.WriteSupport +import parquet.hadoop.api.WriteSupport.WriteContext +import parquet.io.api.RecordConsumer import parquet.schema.{MessageType, MessageTypeParser} -import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.util.Utils -import parquet.hadoop.metadata.CompressionCodecName -import parquet.hadoop.api.WriteSupport -import parquet.example.data.{GroupWriter, Group} -import parquet.io.api.RecordConsumer -import parquet.hadoop.api.WriteSupport.WriteContext -import parquet.example.data.simple.SimpleGroup // Write support class for nested groups: ParquetWriter initializes GroupWriteSupport // with an empty configuration (it is after all not intended to be used in this way?) From a86553bf35feef4565d1048cd27731b47b3236a5 Mon Sep 17 00:00:00 2001 From: Andre Schumacher Date: Fri, 25 Apr 2014 09:48:24 +0300 Subject: [PATCH 08/13] First round of code review feedback Changes: - Predicates that are pushed down to Parquet are now kept track off - Predicates which are pushed down are removed from the higher-level filters - Filter enable setting renamed to "spark.sql.hints.parquetFilterPushdown" - Smaller changes, code formatting, imports, etc. --- .../spark/sql/execution/SparkStrategies.scala | 39 ++- .../spark/sql/parquet/ParquetFilters.scala | 304 +++++++++++++----- .../sql/parquet/ParquetTableOperations.scala | 13 +- .../spark/sql/parquet/ParquetQuerySuite.scala | 113 ++++++- 4 files changed, 364 insertions(+), 105 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index c76a1bc5ced6e..cdcd678a3aeac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -140,16 +140,35 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) => InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil - case PhysicalOperation(projectList, filters, relation: ParquetRelation) => - // Note: we do not actually remove the filters that were pushed down to Parquet from - // the plan, in case that some of the predicates cannot be evaluated there because - // they contain complex operations, such as CASTs. - // TODO: rethink whether conjuntions that are handed down to Parquet should be removed - // from the list of higher-level filters. - pruneFilterProject( - projectList, - filters, - ParquetTableScan(_, relation, Some(filters))(sparkContext)) :: Nil + case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => { + val remainingFilters = + if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) { + filters.filter { + // Note: filters cannot be pushed down to Parquet if they contain more complex + // expressions than simple "Attribute cmp Literal" comparisons. Here we remove + // all filters that have been pushed down. Note that a predicate such as + // "A AND B" can result in "A" being pushed down. + filter => + val recordFilter = ParquetFilters.createFilter(filter) + if (!recordFilter.isDefined) { + // First case: the pushdown did not result in any record filter. + true + } else { + // Second case: a record filter was created; here we are conservative in + // the sense that even if "A" was pushed and we check for "A AND B" we + // still want to keep "A AND B" in the higher-level filter, not just "B". + !ParquetFilters.findExpression(recordFilter.get, filter).isDefined + } + } + } else { + filters + } + pruneFilterProject( + projectList, + remainingFilters, + ParquetTableScan(_, relation, Some(filters))(sparkContext)) :: Nil + } + case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala index 6b26d56c0e1bd..ed691bdb74a3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala @@ -26,134 +26,186 @@ import parquet.column.ColumnReader import com.google.common.io.BaseEncoding import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.catalyst.expressions.{Predicate => CatalystPredicate} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkSqlSerializer object ParquetFilters { val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter" // set this to false if pushdown should be disabled - // Note: prefix is "spark.hadoop." so that it will be copied from SparkConf - // to Hadoop configuration - val PARQUET_FILTER_PUSHDOWN_ENABLED = "org.apache.spark.sql.parquet.filter.pushdown" + val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.hints.parquetFilterPushdown" - def createFilter(filterExpressions: Seq[Expression]): UnboundRecordFilter = { - def createEqualityFilter(name: String, literal: Literal) = literal.dataType match { + def createRecordFilter(filterExpressions: Seq[Expression]): UnboundRecordFilter = { + val filters: Seq[CatalystFilter] = filterExpressions.collect { + case (expression: Expression) if createFilter(expression).isDefined => + createFilter(expression).get + } + if (filters.length > 0) filters.reduce(AndRecordFilter.and) else null + } + + def createFilter(expression: Expression): Option[CatalystFilter] = { + def createEqualityFilter( + name: String, + literal: Literal, + predicate: CatalystPredicate) = literal.dataType match { case BooleanType => - ComparisonFilter.createBooleanFilter(name, literal.value.asInstanceOf[Boolean]) + ComparisonFilter.createBooleanFilter(name, literal.value.asInstanceOf[Boolean], predicate) case IntegerType => - ComparisonFilter.createIntFilter(name, (x: Int) => x == literal.value.asInstanceOf[Int]) + ComparisonFilter.createIntFilter(name, (x: Int) => + x == literal.value.asInstanceOf[Int], predicate) case LongType => - ComparisonFilter.createLongFilter(name, (x: Long) => x == literal.value.asInstanceOf[Long]) + ComparisonFilter.createLongFilter(name, (x: Long) => + x == literal.value.asInstanceOf[Long], predicate) case DoubleType => ComparisonFilter.createDoubleFilter( name, - (x: Double) => x == literal.value.asInstanceOf[Double]) + (x: Double) => x == literal.value.asInstanceOf[Double], + predicate) case FloatType => ComparisonFilter.createFloatFilter( name, - (x: Float) => x == literal.value.asInstanceOf[Float]) + (x: Float) => x == literal.value.asInstanceOf[Float], + predicate) case StringType => - ComparisonFilter.createStringFilter(name, literal.value.asInstanceOf[String]) + ComparisonFilter.createStringFilter(name, literal.value.asInstanceOf[String], predicate) } - def createLessThanFilter(name: String, literal: Literal) = literal.dataType match { + def createLessThanFilter( + name: String, + literal: Literal, + predicate: CatalystPredicate) = literal.dataType match { case IntegerType => - ComparisonFilter.createIntFilter(name, (x: Int) => x < literal.value.asInstanceOf[Int]) + ComparisonFilter.createIntFilter(name, (x: Int) => + x < literal.value.asInstanceOf[Int], predicate) case LongType => - ComparisonFilter.createLongFilter(name, (x: Long) => x < literal.value.asInstanceOf[Long]) + ComparisonFilter.createLongFilter(name, (x: Long) => + x < literal.value.asInstanceOf[Long], predicate) case DoubleType => ComparisonFilter.createDoubleFilter( name, - (x: Double) => x < literal.value.asInstanceOf[Double]) + (x: Double) => x < literal.value.asInstanceOf[Double], + predicate) case FloatType => ComparisonFilter.createFloatFilter( name, - (x: Float) => x < literal.value.asInstanceOf[Float]) + (x: Float) => x < literal.value.asInstanceOf[Float], + predicate) } - def createLessThanOrEqualFilter(name: String, literal: Literal) = literal.dataType match { + def createLessThanOrEqualFilter( + name: String, + literal: Literal, + predicate: CatalystPredicate) = literal.dataType match { case IntegerType => - ComparisonFilter.createIntFilter(name, (x: Int) => x <= literal.value.asInstanceOf[Int]) + ComparisonFilter.createIntFilter(name, (x: Int) => + x <= literal.value.asInstanceOf[Int], predicate) case LongType => - ComparisonFilter.createLongFilter(name, (x: Long) => x <= literal.value.asInstanceOf[Long]) + ComparisonFilter.createLongFilter(name, (x: Long) => + x <= literal.value.asInstanceOf[Long], predicate) case DoubleType => ComparisonFilter.createDoubleFilter( name, - (x: Double) => x <= literal.value.asInstanceOf[Double]) + (x: Double) => x <= literal.value.asInstanceOf[Double], + predicate) case FloatType => ComparisonFilter.createFloatFilter( name, - (x: Float) => x <= literal.value.asInstanceOf[Float]) + (x: Float) => x <= literal.value.asInstanceOf[Float], + predicate) } // TODO: combine these two types somehow? - def createGreaterThanFilter(name: String, literal: Literal) = literal.dataType match { + def createGreaterThanFilter( + name: String, + literal: Literal, + predicate: CatalystPredicate) = literal.dataType match { case IntegerType => - ComparisonFilter.createIntFilter(name, (x: Int) => x > literal.value.asInstanceOf[Int]) + ComparisonFilter.createIntFilter(name, (x: Int) => + x > literal.value.asInstanceOf[Int], predicate) case LongType => - ComparisonFilter.createLongFilter(name, (x: Long) => x > literal.value.asInstanceOf[Long]) + ComparisonFilter.createLongFilter(name, (x: Long) => + x > literal.value.asInstanceOf[Long], predicate) case DoubleType => ComparisonFilter.createDoubleFilter( name, - (x: Double) => x > literal.value.asInstanceOf[Double]) + (x: Double) => x > literal.value.asInstanceOf[Double], + predicate) case FloatType => ComparisonFilter.createFloatFilter( name, - (x: Float) => x > literal.value.asInstanceOf[Float]) + (x: Float) => x > literal.value.asInstanceOf[Float], + predicate) } - def createGreaterThanOrEqualFilter(name: String, literal: Literal) = literal.dataType match { + def createGreaterThanOrEqualFilter( + name: String, + literal: Literal, + predicate: CatalystPredicate) = literal.dataType match { case IntegerType => - ComparisonFilter.createIntFilter(name, (x: Int) => x >= literal.value.asInstanceOf[Int]) + ComparisonFilter.createIntFilter(name, (x: Int) => + x >= literal.value.asInstanceOf[Int], predicate) case LongType => - ComparisonFilter.createLongFilter(name, (x: Long) => x >= literal.value.asInstanceOf[Long]) + ComparisonFilter.createLongFilter(name, (x: Long) => + x >= literal.value.asInstanceOf[Long], predicate) case DoubleType => ComparisonFilter.createDoubleFilter( name, - (x: Double) => x >= literal.value.asInstanceOf[Double]) + (x: Double) => x >= literal.value.asInstanceOf[Double], + predicate) case FloatType => ComparisonFilter.createFloatFilter( name, - (x: Float) => x >= literal.value.asInstanceOf[Float]) + (x: Float) => x >= literal.value.asInstanceOf[Float], + predicate) } - // TODO: can we actually rely on the predicate being normalized as in expression < literal? - // That would simplify this pattern matching // TODO: we currently only filter on non-nullable (Parquet REQUIRED) attributes until // https://github.com/Parquet/parquet-mr/issues/371 // has been resolved - val filters: Seq[UnboundRecordFilter] = filterExpressions.collect { - case Or(left: Expression, right: Expression) - if createFilter(Seq(left)) != null && createFilter(Seq(right)) != null => { - // Note: if either side of this Or-predicate is empty then this means + expression match { + case p @ Or(left: Expression, right: Expression) + if createFilter(left).isDefined && createFilter(right).isDefined => { + // If either side of this Or-predicate is empty then this means // it contains a more complex comparison than between attribute and literal // (e.g., it contained a CAST). The only safe thing to do is then to disregard // this disjunction, which could be contained in a conjunction. If it stands // alone then it is also safe to drop it, since a Null return value of this // function is interpreted as having no filters at all. - val leftFilter = createFilter(Seq(left)) - val rightFilter = createFilter(Seq(right)) - OrRecordFilter.or(leftFilter, rightFilter) + val leftFilter = createFilter(left).get + val rightFilter = createFilter(right).get + Some(new OrFilter(leftFilter, rightFilter)) } - case Equals(left: Literal, right: NamedExpression) if !right.nullable => - createEqualityFilter(right.name, left) - case Equals(left: NamedExpression, right: Literal) if !left.nullable => - createEqualityFilter(left.name, right) - case LessThan(left: Literal, right: NamedExpression) if !right.nullable => - createLessThanFilter(right.name, left) - case LessThan(left: NamedExpression, right: Literal) if !left.nullable => - createLessThanFilter(left.name, right) - case LessThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable => - createLessThanOrEqualFilter(right.name, left) - case LessThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable => - createLessThanOrEqualFilter(left.name, right) - case GreaterThan(left: Literal, right: NamedExpression) if !right.nullable => - createGreaterThanFilter(right.name, left) - case GreaterThan(left: NamedExpression, right: Literal) if !left.nullable => - createGreaterThanFilter(left.name, right) - case GreaterThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable => - createGreaterThanOrEqualFilter(right.name, left) - case GreaterThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable => - createGreaterThanOrEqualFilter(left.name, right) + case p @ And(left: Expression, right: Expression) => { + // This treats nested conjunctions; since either side of the conjunction + // may contain more complex filter expressions we may actually generate + // strictly weaker filter predicates in the process. + val leftFilter = createFilter(left) + val rightFilter = createFilter(right) + (leftFilter, rightFilter) match { + case (None, Some(filter)) => Some(filter) + case (Some(filter), None) => Some(filter) + case (_, _) => + Some(new AndFilter(leftFilter.get, rightFilter.get)) + } + } + case p @ Equals(left: Literal, right: NamedExpression) if !right.nullable => + Some(createEqualityFilter(right.name, left, p)) + case p @ Equals(left: NamedExpression, right: Literal) if !left.nullable => + Some(createEqualityFilter(left.name, right, p)) + case p @ LessThan(left: Literal, right: NamedExpression) if !right.nullable => + Some(createLessThanFilter(right.name, left, p)) + case p @ LessThan(left: NamedExpression, right: Literal) if !left.nullable => + Some(createLessThanFilter(left.name, right, p)) + case p @ LessThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable => + Some(createLessThanOrEqualFilter(right.name, left, p)) + case p @ LessThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable => + Some(createLessThanOrEqualFilter(left.name, right, p)) + case p @ GreaterThan(left: Literal, right: NamedExpression) if !right.nullable => + Some(createGreaterThanFilter(right.name, left, p)) + case p @ GreaterThan(left: NamedExpression, right: Literal) if !left.nullable => + Some(createGreaterThanFilter(left.name, right, p)) + case p @ GreaterThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable => + Some(createGreaterThanOrEqualFilter(right.name, left, p)) + case p @ GreaterThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable => + Some(createGreaterThanOrEqualFilter(left.name, right, p)) + case _ => None } - if (filters.length > 0) filters.reduce(AndRecordFilter.and) else null } - // Note: Inside the Hadoop API we only have access to `Configuration`, not to // [[SparkContext]], so we cannot use broadcasts to convey the actual filter // predicate. @@ -175,19 +227,90 @@ object ParquetFilters { None } } + + // Try to find the given expression in the tree of filters in order to + // determine whether it is safe to remove it from the higher level filters. Note + // that strictly speaking we could stop the search whenever an expression is found + // that contains this expression as subexpression (e.g., when searching for "a" + // and "(a or c)" is found) but we don't care about optimizations here since the + // filter tree is assumed to be small. + def findExpression( + filter: CatalystFilter, + expression: Expression): Option[CatalystFilter] = filter match { + case f @ OrFilter(_, leftFilter, rightFilter, _) => + if (f.predicate == expression) { + Some(f) + } else { + val left = findExpression(leftFilter, expression) + if (left.isDefined) left else findExpression(rightFilter, expression) + } + case f @ AndFilter(_, leftFilter, rightFilter, _) => + if (f.predicate == expression) { + Some(f) + } else { + val left = findExpression(leftFilter, expression) + if (left.isDefined) left else findExpression(rightFilter, expression) + } + case f @ ComparisonFilter(_, _, predicate) => + if (predicate == expression) Some(f) else None + case _ => None + } +} + +abstract private[parquet] class CatalystFilter( + @transient val predicate: CatalystPredicate) extends UnboundRecordFilter + +private[parquet] case class ComparisonFilter( + val columnName: String, + private var filter: UnboundRecordFilter, + @transient override val predicate: CatalystPredicate) + extends CatalystFilter(predicate) { + override def bind(readers: java.lang.Iterable[ColumnReader]): RecordFilter = { + filter.bind(readers) + } } -class ComparisonFilter( - private val columnName: String, - private var filter: UnboundRecordFilter) - extends UnboundRecordFilter { +private[parquet] case class OrFilter( + private var filter: UnboundRecordFilter, + @transient val left: CatalystFilter, + @transient val right: CatalystFilter, + @transient override val predicate: Or) + extends CatalystFilter(predicate) { + def this(l: CatalystFilter, r: CatalystFilter) = + this( + OrRecordFilter.or(l, r), + l, + r, + Or(l.predicate, r.predicate)) + override def bind(readers: java.lang.Iterable[ColumnReader]): RecordFilter = { filter.bind(readers) } } -object ComparisonFilter { - def createBooleanFilter(columnName: String, value: Boolean): UnboundRecordFilter = +private[parquet] case class AndFilter( + private var filter: UnboundRecordFilter, + @transient val left: CatalystFilter, + @transient val right: CatalystFilter, + @transient override val predicate: And) + extends CatalystFilter(predicate) { + def this(l: CatalystFilter, r: CatalystFilter) = + this( + AndRecordFilter.and(l, r), + l, + r, + And(l.predicate, r.predicate)) + + override def bind(readers: java.lang.Iterable[ColumnReader]): RecordFilter = { + filter.bind(readers) + } +} + +private[parquet] object ComparisonFilter { + def createBooleanFilter( + columnName: String, + value: Boolean, + predicate: CatalystPredicate): CatalystFilter = new ComparisonFilter( columnName, ColumnRecordFilter.column( @@ -196,8 +319,12 @@ object ComparisonFilter { new BooleanPredicateFunction { def functionToApply(input: Boolean): Boolean = input == value } - ))) - def createStringFilter(columnName: String, value: String): UnboundRecordFilter = + )), + predicate) + def createStringFilter( + columnName: String, + value: String, + predicate: CatalystPredicate): CatalystFilter = new ComparisonFilter( columnName, ColumnRecordFilter.column( @@ -206,8 +333,12 @@ object ComparisonFilter { new ColumnPredicates.PredicateFunction[String] { def functionToApply(input: String): Boolean = input == value } - ))) - def createIntFilter(columnName: String, func: Int => Boolean): UnboundRecordFilter = + )), + predicate) + def createIntFilter( + columnName: String, + func: Int => Boolean, + predicate: CatalystPredicate): CatalystFilter = new ComparisonFilter( columnName, ColumnRecordFilter.column( @@ -216,8 +347,12 @@ object ComparisonFilter { new IntegerPredicateFunction { def functionToApply(input: Int) = func(input) } - ))) - def createLongFilter(columnName: String, func: Long => Boolean): UnboundRecordFilter = + )), + predicate) + def createLongFilter( + columnName: String, + func: Long => Boolean, + predicate: CatalystPredicate): CatalystFilter = new ComparisonFilter( columnName, ColumnRecordFilter.column( @@ -226,8 +361,12 @@ object ComparisonFilter { new LongPredicateFunction { def functionToApply(input: Long) = func(input) } - ))) - def createDoubleFilter(columnName: String, func: Double => Boolean): UnboundRecordFilter = + )), + predicate) + def createDoubleFilter( + columnName: String, + func: Double => Boolean, + predicate: CatalystPredicate): CatalystFilter = new ComparisonFilter( columnName, ColumnRecordFilter.column( @@ -236,8 +375,12 @@ object ComparisonFilter { new DoublePredicateFunction { def functionToApply(input: Double) = func(input) } - ))) - def createFloatFilter(columnName: String, func: Float => Boolean): UnboundRecordFilter = + )), + predicate) + def createFloatFilter( + columnName: String, + func: Float => Boolean, + predicate: CatalystPredicate): CatalystFilter = new ComparisonFilter( columnName, ColumnRecordFilter.column( @@ -246,7 +389,6 @@ object ComparisonFilter { new FloatPredicateFunction { def functionToApply(input: Float) = func(input) } - ))) + )), + predicate) } - - diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 26e108c473a4c..6f60e6b4e8e9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -27,17 +27,16 @@ import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat, FileOutputCommitter} -import parquet.hadoop.{ParquetRecordReader, BadConfigurationException, ParquetInputFormat, ParquetOutputFormat} +import parquet.hadoop.{ParquetRecordReader, ParquetInputFormat, ParquetOutputFormat} +import parquet.hadoop.api.ReadSupport import parquet.hadoop.util.ContextUtil import parquet.io.InvalidRecordException import parquet.schema.MessageType import org.apache.spark.{Logging, SerializableWritable, SparkContext, TaskContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{BinaryComparison, Attribute, Expression, Row} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row} import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode} -import parquet.filter.UnboundRecordFilter -import parquet.hadoop.api.ReadSupport /** * Parquet table scan operator. Imports the file that backs the given @@ -75,7 +74,7 @@ case class ParquetTableScan( // as simple column predicate filters in Parquet. Here we just record // the whole pruning predicate. // Note 2: you can disable filter predicate pushdown by setting - // "org.apache.spark.sql.parquet.filter.pushdown" to false inside SparkConf. + // "spark.sql.hints.parquetFilterPushdown" to false inside SparkConf. if (columnPruningPred.isDefined && sc.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) { ParquetFilters.serializeFilterExpressions(columnPruningPred.get, conf) @@ -289,7 +288,9 @@ private[parquet] class FilteringParquetRowInputFormat ParquetFilters.deserializeFilterExpressions(ContextUtil.getConfiguration(taskAttemptContext)) if (filterExpressions.isDefined) { logInfo(s"Pushing down predicates for RecordFilter: ${filterExpressions.mkString(", ")}") - new ParquetRecordReader[Row](readSupport, ParquetFilters.createFilter(filterExpressions.get)) + new ParquetRecordReader[Row]( + readSupport, + ParquetFilters.createRecordFilter(filterExpressions.get)) } else { new ParquetRecordReader[Row](readSupport) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 789023edf37e9..67c1356d1a816 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -28,10 +28,13 @@ import parquet.schema.MessageTypeParser import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util.getTempFilePath -import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.TestData import org.apache.spark.sql.SchemaRDD +import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.catalyst.expressions.Equals +import org.apache.spark.sql.catalyst.types.IntegerType import org.apache.spark.util.Utils // Implicits @@ -258,26 +261,120 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll { }*/ } + test("create RecordFilter for simple predicates") { + val attribute1 = new AttributeReference("first", IntegerType, false)() + val predicate1 = new Equals(attribute1, new Literal(1, IntegerType)) + val filter1 = ParquetFilters.createFilter(predicate1) + assert(filter1.isDefined) + assert(filter1.get.predicate == predicate1, "predicates do not match") + assert(filter1.get.isInstanceOf[ComparisonFilter]) + val cmpFilter1 = filter1.get.asInstanceOf[ComparisonFilter] + assert(cmpFilter1.columnName == "first", "column name incorrect") + + val predicate2 = new LessThan(attribute1, new Literal(4, IntegerType)) + val filter2 = ParquetFilters.createFilter(predicate2) + assert(filter2.isDefined) + assert(filter2.get.predicate == predicate2, "predicates do not match") + assert(filter2.get.isInstanceOf[ComparisonFilter]) + val cmpFilter2 = filter2.get.asInstanceOf[ComparisonFilter] + assert(cmpFilter2.columnName == "first", "column name incorrect") + + val predicate3 = new And(predicate1, predicate2) + val filter3 = ParquetFilters.createFilter(predicate3) + assert(filter3.isDefined) + assert(filter3.get.predicate == predicate3, "predicates do not match") + assert(filter3.get.isInstanceOf[AndFilter]) + + val predicate4 = new Or(predicate1, predicate2) + val filter4 = ParquetFilters.createFilter(predicate4) + assert(filter4.isDefined) + assert(filter4.get.predicate == predicate4, "predicates do not match") + assert(filter4.get.isInstanceOf[OrFilter]) + + val attribute2 = new AttributeReference("second", IntegerType, false)() + val predicate5 = new GreaterThan(attribute1, attribute2) + val badfilter = ParquetFilters.createFilter(predicate5) + assert(badfilter.isDefined === false) + } + test("test filter by predicate pushdown") { for(myval <- Seq("myint", "mylong", "mydouble", "myfloat")) { println(s"testing field $myval") - val result1 = sql(s"SELECT * FROM testfiltersource WHERE $myval < 150 AND $myval >= 100").collect() + val query1 = sql(s"SELECT * FROM testfiltersource WHERE $myval < 150 AND $myval >= 100") + assert( + query1.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan], + "Top operator should be ParquetTableScan after pushdown") + val result1 = query1.collect() assert(result1.size === 50) - val result2 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 150 AND $myval <= 200").collect() + assert(result1(0)(1) === 100) + assert(result1(49)(1) === 149) + val query2 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 150 AND $myval <= 200") + assert( + query2.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan], + "Top operator should be ParquetTableScan after pushdown") + val result2 = query2.collect() assert(result2.size === 50) + if (myval == "myint" || myval == "mylong") { + assert(result2(0)(1) === 151) + assert(result2(49)(1) === 200) + } else { + assert(result2(0)(1) === 150) + assert(result2(49)(1) === 199) + } } for(myval <- Seq("myint", "mylong")) { - val result3 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 190 OR $myval < 10").collect() + val query3 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 190 OR $myval < 10") + assert( + query3.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan], + "Top operator should be ParquetTableScan after pushdown") + val result3 = query3.collect() assert(result3.size === 20) + assert(result3(0)(1) === 0) + assert(result3(9)(1) === 9) + assert(result3(10)(1) === 191) + assert(result3(19)(1) === 200) } for(myval <- Seq("mydouble", "myfloat")) { - val result3 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 190.5 OR $myval < 10").collect() - assert(result3.size === 20) + val result4 = + if (myval == "mydouble") { + val query4 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 190.5 OR $myval < 10.0") + assert( + query4.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan], + "Top operator should be ParquetTableScan after pushdown") + query4.collect() + } else { + // CASTs are problematic. Here myfloat will be casted to a double and it seems there is + // currently no way to specify float constants in SqlParser? + sql(s"SELECT * FROM testfiltersource WHERE $myval > 190.5 OR $myval < 10").collect() + } + assert(result4.size === 20) + assert(result4(0)(1) === 0) + assert(result4(9)(1) === 9) + assert(result4(10)(1) === 191) + assert(result4(19)(1) === 200) } - val booleanResult = sql(s"SELECT * FROM testfiltersource WHERE myboolean = true AND myint < 40").collect() + val query5 = sql(s"SELECT * FROM testfiltersource WHERE myboolean = true AND myint < 40") + assert( + query5.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan], + "Top operator should be ParquetTableScan after pushdown") + val booleanResult = query5.collect() assert(booleanResult.size === 10) - val stringResult = sql("SELECT * FROM testfiltersource WHERE mystring = \"100\"").collect() + for(i <- 0 until 10) { + if (!booleanResult(i).getBoolean(0)) { + fail(s"Boolean value in result row $i not true") + } + if (booleanResult(i).getInt(1) != i * 4) { + fail(s"Int value in result row $i should be ${4*i}") + } + } + val query6 = sql("SELECT * FROM testfiltersource WHERE mystring = \"100\"") + assert( + query6.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan], + "Top operator should be ParquetTableScan after pushdown") + val stringResult = query6.collect() assert(stringResult.size === 1) + assert(stringResult(0).getString(2) == "100", "stringvalue incorrect") + assert(stringResult(0).getInt(1) === 100) } } From 7a78265bbc0fa7ed020659ee4118314eebfd4fca Mon Sep 17 00:00:00 2001 From: Andre Schumacher Date: Fri, 25 Apr 2014 10:04:28 +0300 Subject: [PATCH 09/13] Fixing broken formatting in ParquetFilter --- .../spark/sql/parquet/ParquetFilters.scala | 59 ++++++++++++------- 1 file changed, 39 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala index ed691bdb74a3f..2a16a52f44f93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala @@ -51,11 +51,15 @@ object ParquetFilters { case BooleanType => ComparisonFilter.createBooleanFilter(name, literal.value.asInstanceOf[Boolean], predicate) case IntegerType => - ComparisonFilter.createIntFilter(name, (x: Int) => - x == literal.value.asInstanceOf[Int], predicate) + ComparisonFilter.createIntFilter( + name, + (x: Int) => x == literal.value.asInstanceOf[Int], + predicate) case LongType => - ComparisonFilter.createLongFilter(name, (x: Long) => - x == literal.value.asInstanceOf[Long], predicate) + ComparisonFilter.createLongFilter( + name, + (x: Long) => x == literal.value.asInstanceOf[Long], + predicate) case DoubleType => ComparisonFilter.createDoubleFilter( name, @@ -74,11 +78,15 @@ object ParquetFilters { literal: Literal, predicate: CatalystPredicate) = literal.dataType match { case IntegerType => - ComparisonFilter.createIntFilter(name, (x: Int) => - x < literal.value.asInstanceOf[Int], predicate) + ComparisonFilter.createIntFilter( + name, + (x: Int) => x < literal.value.asInstanceOf[Int], + predicate) case LongType => - ComparisonFilter.createLongFilter(name, (x: Long) => - x < literal.value.asInstanceOf[Long], predicate) + ComparisonFilter.createLongFilter( + name, + (x: Long) => x < literal.value.asInstanceOf[Long], + predicate) case DoubleType => ComparisonFilter.createDoubleFilter( name, @@ -95,11 +103,15 @@ object ParquetFilters { literal: Literal, predicate: CatalystPredicate) = literal.dataType match { case IntegerType => - ComparisonFilter.createIntFilter(name, (x: Int) => - x <= literal.value.asInstanceOf[Int], predicate) + ComparisonFilter.createIntFilter( + name, + (x: Int) => x <= literal.value.asInstanceOf[Int], + predicate) case LongType => - ComparisonFilter.createLongFilter(name, (x: Long) => - x <= literal.value.asInstanceOf[Long], predicate) + ComparisonFilter.createLongFilter( + name, + (x: Long) => x <= literal.value.asInstanceOf[Long], + predicate) case DoubleType => ComparisonFilter.createDoubleFilter( name, @@ -117,11 +129,15 @@ object ParquetFilters { literal: Literal, predicate: CatalystPredicate) = literal.dataType match { case IntegerType => - ComparisonFilter.createIntFilter(name, (x: Int) => - x > literal.value.asInstanceOf[Int], predicate) + ComparisonFilter.createIntFilter( + name, + (x: Int) => x > literal.value.asInstanceOf[Int], + predicate) case LongType => - ComparisonFilter.createLongFilter(name, (x: Long) => - x > literal.value.asInstanceOf[Long], predicate) + ComparisonFilter.createLongFilter( + name, + (x: Long) => x > literal.value.asInstanceOf[Long], + predicate) case DoubleType => ComparisonFilter.createDoubleFilter( name, @@ -138,11 +154,14 @@ object ParquetFilters { literal: Literal, predicate: CatalystPredicate) = literal.dataType match { case IntegerType => - ComparisonFilter.createIntFilter(name, (x: Int) => - x >= literal.value.asInstanceOf[Int], predicate) + ComparisonFilter.createIntFilter( + name, (x: Int) => x >= literal.value.asInstanceOf[Int], + predicate) case LongType => - ComparisonFilter.createLongFilter(name, (x: Long) => - x >= literal.value.asInstanceOf[Long], predicate) + ComparisonFilter.createLongFilter( + name, + (x: Long) => x >= literal.value.asInstanceOf[Long], + predicate) case DoubleType => ComparisonFilter.createDoubleFilter( name, From 3da98db4cfa4a7b5b0e1d1fab7517f346c566782 Mon Sep 17 00:00:00 2001 From: Andre Schumacher Date: Sat, 26 Apr 2014 08:10:17 +0300 Subject: [PATCH 10/13] Second round of review feedback --- .../spark/sql/execution/SparkStrategies.scala | 4 +- .../spark/sql/parquet/ParquetFilters.scala | 60 ++++++++++++------- .../sql/parquet/ParquetTableOperations.scala | 44 +++++++++----- .../spark/sql/parquet/ParquetQuerySuite.scala | 2 +- 4 files changed, 71 insertions(+), 39 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index cdcd678a3aeac..394a59700dbaf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -147,7 +147,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // Note: filters cannot be pushed down to Parquet if they contain more complex // expressions than simple "Attribute cmp Literal" comparisons. Here we remove // all filters that have been pushed down. Note that a predicate such as - // "A AND B" can result in "A" being pushed down. + // "(A AND B) OR C" can result in "A OR C" being pushed down. filter => val recordFilter = ParquetFilters.createFilter(filter) if (!recordFilter.isDefined) { @@ -166,7 +166,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { pruneFilterProject( projectList, remainingFilters, - ParquetTableScan(_, relation, Some(filters))(sparkContext)) :: Nil + ParquetTableScan(_, relation, filters)(sparkContext)) :: Nil } case _ => Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala index 2a16a52f44f93..e07669865a3ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala @@ -173,9 +173,12 @@ object ParquetFilters { (x: Float) => x >= literal.value.asInstanceOf[Float], predicate) } - // TODO: we currently only filter on non-nullable (Parquet REQUIRED) attributes until - // https://github.com/Parquet/parquet-mr/issues/371 - // has been resolved + + /** + * TODO: we currently only filter on non-nullable (Parquet REQUIRED) attributes until + * https://github.com/Parquet/parquet-mr/issues/371 + * has been resolved. + */ expression match { case p @ Or(left: Expression, right: Expression) if createFilter(left).isDefined && createFilter(right).isDefined => { @@ -225,34 +228,49 @@ object ParquetFilters { case _ => None } } - // Note: Inside the Hadoop API we only have access to `Configuration`, not to - // [[SparkContext]], so we cannot use broadcasts to convey the actual filter - // predicate. + + /** + * Note: Inside the Hadoop API we only have access to `Configuration`, not to + * [[org.apache.spark.SparkContext]], so we cannot use broadcasts to convey + * the actual filter predicate. + */ def serializeFilterExpressions(filters: Seq[Expression], conf: Configuration): Unit = { - val serialized: Array[Byte] = SparkSqlSerializer.serialize(filters) - val encoded: String = BaseEncoding.base64().encode(serialized) - conf.set(PARQUET_FILTER_DATA, encoded) + if (filters.length > 0) { + val serialized: Array[Byte] = SparkSqlSerializer.serialize(filters) + val encoded: String = BaseEncoding.base64().encode(serialized) + conf.set(PARQUET_FILTER_DATA, encoded) + } } - // Note: Inside the Hadoop API we only have access to `Configuration`, not to - // [[SparkContext]], so we cannot use broadcasts to convey the actual filter - // predicate. - def deserializeFilterExpressions(conf: Configuration): Option[Seq[Expression]] = { + /** + * Note: Inside the Hadoop API we only have access to `Configuration`, not to + * [[org.apache.spark.SparkContext]], so we cannot use broadcasts to convey + * the actual filter predicate. + */ + def deserializeFilterExpressions(conf: Configuration): Seq[Expression] = { val data = conf.get(PARQUET_FILTER_DATA) if (data != null) { val decoded: Array[Byte] = BaseEncoding.base64().decode(data) - Some(SparkSqlSerializer.deserialize(decoded)) + SparkSqlSerializer.deserialize(decoded) } else { - None + Seq() } } - // Try to find the given expression in the tree of filters in order to - // determine whether it is safe to remove it from the higher level filters. Note - // that strictly speaking we could stop the search whenever an expression is found - // that contains this expression as subexpression (e.g., when searching for "a" - // and "(a or c)" is found) but we don't care about optimizations here since the - // filter tree is assumed to be small. + /** + * Try to find the given expression in the tree of filters in order to + * determine whether it is safe to remove it from the higher level filters. Note + * that strictly speaking we could stop the search whenever an expression is found + * that contains this expression as subexpression (e.g., when searching for "a" + * and "(a or c)" is found) but we don't care about optimizations here since the + * filter tree is assumed to be small. + * + * @param filter The [[org.apache.spark.sql.parquet.CatalystFilter]] to expand + * and search + * @param expression The expression to look for + * @return An optional [[org.apache.spark.sql.parquet.CatalystFilter]] that + * contains the expression. + */ def findExpression( filter: CatalystFilter, expression: Expression): Option[CatalystFilter] = filter match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 6f60e6b4e8e9b..2c5433e4243e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -40,14 +40,14 @@ import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode} /** * Parquet table scan operator. Imports the file that backs the given - * [[ParquetRelation]] as a RDD[Row]. + * [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[Row]``. */ case class ParquetTableScan( // note: output cannot be transient, see // https://issues.apache.org/jira/browse/SPARK-1367 output: Seq[Attribute], relation: ParquetRelation, - columnPruningPred: Option[Seq[Expression]])( + columnPruningPred: Seq[Expression])( @transient val sc: SparkContext) extends LeafNode { @@ -75,9 +75,9 @@ case class ParquetTableScan( // the whole pruning predicate. // Note 2: you can disable filter predicate pushdown by setting // "spark.sql.hints.parquetFilterPushdown" to false inside SparkConf. - if (columnPruningPred.isDefined && + if (columnPruningPred.length > 0 && sc.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) { - ParquetFilters.serializeFilterExpressions(columnPruningPred.get, conf) + ParquetFilters.serializeFilterExpressions(columnPruningPred, conf) } sc.newAPIHadoopRDD( @@ -197,10 +197,18 @@ case class InsertIntoParquetTable( override def otherCopyArgs = sc :: Nil - // based on ``saveAsNewAPIHadoopFile`` in [[PairRDDFunctions]] - // TODO: Maybe PairRDDFunctions should use Product2 instead of Tuple2? - // .. then we could use the default one and could use [[MutablePair]] - // instead of ``Tuple2`` + /** + * Stores the given Row RDD as a Hadoop file. + * + * Note: We cannot use ``saveAsNewAPIHadoopFile`` from [[org.apache.spark.rdd.PairRDDFunctions]] + * together with [[org.apache.spark.util.MutablePair]] because ``PairRDDFunctions`` uses ``Tuple2`` + * and not ``Product2``. Also, we want to allow appending files to an existing directory and need + * to determine which was the largest written file index before starting to write. + * + * @param rdd The [[org.apache.spark.rdd.RDD]] to writer + * @param path The directory to write to. + * @param conf A [[org.apache.hadoop.conf.Configuration]]. + */ private def saveAsHadoopFile( rdd: RDD[Row], path: String, @@ -257,8 +265,10 @@ case class InsertIntoParquetTable( } } -// TODO: this will be able to append to directories it created itself, not necessarily -// to imported ones +/** + * TODO: this will be able to append to directories it created itself, not necessarily + * to imported ones. + */ private[parquet] class AppendingParquetOutputFormat(offset: Int) extends parquet.hadoop.ParquetOutputFormat[Row] { // override to accept existing directories as valid output directory @@ -275,8 +285,10 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int) } } -// We extend ParquetInputFormat in order to have more control over which -// RecordFilter we want to use +/** + * We extend ParquetInputFormat in order to have more control over which + * RecordFilter we want to use. + */ private[parquet] class FilteringParquetRowInputFormat extends parquet.hadoop.ParquetInputFormat[Row] with Logging { override def createRecordReader( @@ -286,11 +298,11 @@ private[parquet] class FilteringParquetRowInputFormat val filterExpressions = ParquetFilters.deserializeFilterExpressions(ContextUtil.getConfiguration(taskAttemptContext)) - if (filterExpressions.isDefined) { + if (filterExpressions.length > 0) { logInfo(s"Pushing down predicates for RecordFilter: ${filterExpressions.mkString(", ")}") new ParquetRecordReader[Row]( readSupport, - ParquetFilters.createRecordFilter(filterExpressions.get)) + ParquetFilters.createRecordFilter(filterExpressions)) } else { new ParquetRecordReader[Row](readSupport) } @@ -313,7 +325,9 @@ private[parquet] object FileSystemHelper { fs.listStatus(path).map(_.getPath) } - // finds the maximum taskid in the output file names at the given path + /** + * Finds the maximum taskid in the output file names at the given path. + */ def findMaxTaskId(pathStr: String, conf: Configuration): Int = { val files = FileSystemHelper.listFiles(pathStr, conf) // filename pattern is part-r-.parquet diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 67c1356d1a816..8fe6410ab2381 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -124,7 +124,7 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll { val scanner = new ParquetTableScan( ParquetTestData.testData.output, ParquetTestData.testData, - None)(TestSQLContext.sparkContext) + Seq())(TestSQLContext.sparkContext) val projected = scanner.pruneColumns(ParquetTypesConverter .convertToAttributes(MessageTypeParser .parseMessageType(ParquetTestData.subTestSchema))) From c36d5cb7a750dc065b1df26855ca9ad922e9f534 Mon Sep 17 00:00:00 2001 From: Andre Schumacher Date: Sat, 26 Apr 2014 09:30:02 +0300 Subject: [PATCH 11/13] Scalastyle --- .../apache/spark/sql/parquet/ParquetTableOperations.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 2c5433e4243e3..65ba1246fbf9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -201,9 +201,10 @@ case class InsertIntoParquetTable( * Stores the given Row RDD as a Hadoop file. * * Note: We cannot use ``saveAsNewAPIHadoopFile`` from [[org.apache.spark.rdd.PairRDDFunctions]] - * together with [[org.apache.spark.util.MutablePair]] because ``PairRDDFunctions`` uses ``Tuple2`` - * and not ``Product2``. Also, we want to allow appending files to an existing directory and need - * to determine which was the largest written file index before starting to write. + * together with [[org.apache.spark.util.MutablePair]] because ``PairRDDFunctions`` uses + * ``Tuple2`` and not ``Product2``. Also, we want to allow appending files to an existing + * directory and need to determine which was the largest written file index before starting to + * write. * * @param rdd The [[org.apache.spark.rdd.RDD]] to writer * @param path The directory to write to. From 7b304ca415d83e64e39d2e62971d44b622a3893d Mon Sep 17 00:00:00 2001 From: Andre Schumacher Date: Sat, 10 May 2014 12:03:20 +0300 Subject: [PATCH 12/13] Fixing formatting --- .../spark/sql/parquet/ParquetFilters.scala | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala index e07669865a3ef..052b0a9196717 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala @@ -356,8 +356,9 @@ private[parquet] object ComparisonFilter { new BooleanPredicateFunction { def functionToApply(input: Boolean): Boolean = input == value } - )), + )), predicate) + def createStringFilter( columnName: String, value: String, @@ -370,8 +371,9 @@ private[parquet] object ComparisonFilter { new ColumnPredicates.PredicateFunction[String] { def functionToApply(input: String): Boolean = input == value } - )), + )), predicate) + def createIntFilter( columnName: String, func: Int => Boolean, @@ -384,8 +386,9 @@ private[parquet] object ComparisonFilter { new IntegerPredicateFunction { def functionToApply(input: Int) = func(input) } - )), + )), predicate) + def createLongFilter( columnName: String, func: Long => Boolean, @@ -398,8 +401,9 @@ private[parquet] object ComparisonFilter { new LongPredicateFunction { def functionToApply(input: Long) = func(input) } - )), + )), predicate) + def createDoubleFilter( columnName: String, func: Double => Boolean, @@ -412,8 +416,9 @@ private[parquet] object ComparisonFilter { new DoublePredicateFunction { def functionToApply(input: Double) = func(input) } - )), + )), predicate) + def createFloatFilter( columnName: String, func: Float => Boolean, @@ -426,6 +431,6 @@ private[parquet] object ComparisonFilter { new FloatPredicateFunction { def functionToApply(input: Float) = func(input) } - )), + )), predicate) } From 16bfe83bcdc1ef0f18b1d342cedb3d07dc1eac86 Mon Sep 17 00:00:00 2001 From: Andre Schumacher Date: Fri, 16 May 2014 20:41:20 +0300 Subject: [PATCH 13/13] Removing leftovers from merge during rebase --- .../spark/sql/parquet/ParquetQuerySuite.scala | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 8fe6410ab2381..65f4c17aeee3a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -243,24 +243,6 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll { assert(true) } - test("SELECT WHERE") { - val result = sql("SELECT * FROM testsource WHERE myint = 5").collect() - /*test("SELECT WHERE") { - //val result = parquetFile("/home/andre/input.adam").registerAsTable("adamtable") - //sql("SELECT * FROM adamtable WHERE mapq = 0").collect() - //assert(result != null) - //val result = sql("SELECT * FROM testsource WHERE myint = 5").collect() - // TODO: ADD larger case SchemaRDD with filtering on REQUIRED field! - implicit def anyToRow(value: Any): Row = value.asInstanceOf[Row] - TestSQLContext - .parquetFile(ParquetTestData.testNestedDir1.toString) - .toSchemaRDD.registerAsTable("xtmptable") - val result = sql("SELECT * FROM xtmptable WHERE owner = \"Julien Le Dem\"").collect() ->>>>>>> Extending ParquetFilters - assert(result != null) - }*/ - } - test("create RecordFilter for simple predicates") { val attribute1 = new AttributeReference("first", IntegerType, false)() val predicate1 = new Equals(attribute1, new Literal(1, IntegerType))