From bd17512271cebd375e3ddb7e06b5daeef62411ea Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 18 Nov 2014 14:50:54 +0800 Subject: [PATCH] Backports #3334 to branch-1.1 --- .../spark/sql/parquet/ParquetFilters.scala | 13 ++- .../spark/sql/parquet/ParquetQuerySuite.scala | 107 +++++++++++------- 2 files changed, 75 insertions(+), 45 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala index 7c83f1cad7d7..0365c34c8024 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala @@ -213,22 +213,27 @@ private[sql] object ParquetFilters { Some(createEqualityFilter(right.name, left, p)) case p @ EqualTo(left: NamedExpression, right: Literal) if !left.nullable => Some(createEqualityFilter(left.name, right, p)) + case p @ LessThan(left: Literal, right: NamedExpression) if !right.nullable => - Some(createLessThanFilter(right.name, left, p)) + Some(createGreaterThanFilter(right.name, left, p)) case p @ LessThan(left: NamedExpression, right: Literal) if !left.nullable => Some(createLessThanFilter(left.name, right, p)) + case p @ LessThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable => - Some(createLessThanOrEqualFilter(right.name, left, p)) + Some(createGreaterThanOrEqualFilter(right.name, left, p)) case p @ LessThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable => Some(createLessThanOrEqualFilter(left.name, right, p)) + case p @ GreaterThan(left: Literal, right: NamedExpression) if !right.nullable => - Some(createGreaterThanFilter(right.name, left, p)) + Some(createLessThanFilter(right.name, left, p)) case p @ GreaterThan(left: NamedExpression, right: Literal) if !left.nullable => Some(createGreaterThanFilter(left.name, right, p)) + case p @ GreaterThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable => - Some(createGreaterThanOrEqualFilter(right.name, left, p)) + Some(createLessThanOrEqualFilter(right.name, left, p)) case p @ GreaterThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable => Some(createGreaterThanOrEqualFilter(left.name, right, p)) + case _ => None } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index c6b790a4b6a2..10df1fac210a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -17,20 +17,19 @@ package org.apache.spark.sql.parquet +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.mapreduce.Job import org.scalatest.{BeforeAndAfterAll, FunSuiteLike} - import parquet.hadoop.ParquetFileWriter import parquet.hadoop.util.ContextUtil -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.mapreduce.Job import org.apache.spark.SparkContext import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.{SqlLexical, SqlParser} import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAttribute} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.types.{BooleanType, IntegerType} import org.apache.spark.sql.catalyst.util.getTempFilePath +import org.apache.spark.sql.catalyst.{SqlLexical, SqlParser} import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.test.TestSQLContext._ import org.apache.spark.util.Utils @@ -453,43 +452,46 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA } test("create RecordFilter for simple predicates") { - val attribute1 = new AttributeReference("first", IntegerType, false)() - val predicate1 = new EqualTo(attribute1, new Literal(1, IntegerType)) - val filter1 = ParquetFilters.createFilter(predicate1) - assert(filter1.isDefined) - assert(filter1.get.predicate == predicate1, "predicates do not match") - assert(filter1.get.isInstanceOf[ComparisonFilter]) - val cmpFilter1 = filter1.get.asInstanceOf[ComparisonFilter] - assert(cmpFilter1.columnName == "first", "column name incorrect") - - val predicate2 = new LessThan(attribute1, new Literal(4, IntegerType)) - val filter2 = ParquetFilters.createFilter(predicate2) - assert(filter2.isDefined) - assert(filter2.get.predicate == predicate2, "predicates do not match") - assert(filter2.get.isInstanceOf[ComparisonFilter]) - val cmpFilter2 = filter2.get.asInstanceOf[ComparisonFilter] - assert(cmpFilter2.columnName == "first", "column name incorrect") - - val predicate3 = new And(predicate1, predicate2) - val filter3 = ParquetFilters.createFilter(predicate3) - assert(filter3.isDefined) - assert(filter3.get.predicate == predicate3, "predicates do not match") - assert(filter3.get.isInstanceOf[AndFilter]) - - val predicate4 = new Or(predicate1, predicate2) - val filter4 = ParquetFilters.createFilter(predicate4) - assert(filter4.isDefined) - assert(filter4.get.predicate == predicate4, "predicates do not match") - assert(filter4.get.isInstanceOf[OrFilter]) - - val attribute2 = new AttributeReference("second", IntegerType, false)() - val predicate5 = new GreaterThan(attribute1, attribute2) - val badfilter = ParquetFilters.createFilter(predicate5) - assert(badfilter.isDefined === false) - - val predicate6 = And(GreaterThan(attribute1, attribute2), GreaterThan(attribute1, attribute2)) - val badfilter2 = ParquetFilters.createFilter(predicate6) - assert(badfilter2.isDefined === false) + def checkFilter(predicate: Predicate): Option[CatalystFilter] = { + ParquetFilters.createFilter(predicate).map { f => + assertResult(predicate)(f.predicate) + f + }.orElse { + fail(s"filter $predicate not pushed down") + } + } + + def checkComparisonFilter(predicate: Predicate, columnName: String): Unit = { + assertResult(columnName, "column name incorrect") { + checkFilter(predicate).map(_.asInstanceOf[ComparisonFilter].columnName).get + } + } + + def checkInvalidFilter(predicate: Predicate): Unit = { + assert(ParquetFilters.createFilter(predicate).isEmpty) + } + + val a = 'a.int.notNull + val b = 'b.int.notNull + + checkComparisonFilter(a === 1, "a") + checkComparisonFilter(Literal(1) === a, "a") + + checkComparisonFilter(a < 4, "a") + checkComparisonFilter(a > 4, "a") + checkComparisonFilter(a <= 4, "a") + checkComparisonFilter(a >= 4, "a") + + checkComparisonFilter(Literal(4) > a, "a") + checkComparisonFilter(Literal(4) < a, "a") + checkComparisonFilter(Literal(4) >= a, "a") + checkComparisonFilter(Literal(4) <= a, "a") + + checkFilter(a === 1 && a < 4) + checkFilter(a === 1 || a < 4) + + checkInvalidFilter(a > b) + checkInvalidFilter((a > b) && (a > b)) } test("test filter by predicate pushdown") { @@ -516,6 +518,29 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA assert(result2(49)(1) === 199) } } + for(myval <- Seq("myint", "mylong", "mydouble", "myfloat")) { + val query1 = sql(s"SELECT * FROM testfiltersource WHERE 150 > $myval AND 100 <= $myval") + assert( + query1.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan], + "Top operator should be ParquetTableScan after pushdown") + val result1 = query1.collect() + assert(result1.size === 50) + assert(result1(0)(1) === 100) + assert(result1(49)(1) === 149) + val query2 = sql(s"SELECT * FROM testfiltersource WHERE 150 < $myval AND 200 >= $myval") + assert( + query2.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan], + "Top operator should be ParquetTableScan after pushdown") + val result2 = query2.collect() + assert(result2.size === 50) + if (myval == "myint" || myval == "mylong") { + assert(result2(0)(1) === 151) + assert(result2(49)(1) === 200) + } else { + assert(result2(0)(1) === 150) + assert(result2(49)(1) === 199) + } + } for(myval <- Seq("myint", "mylong")) { val query3 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 190 OR $myval < 10") assert(