From d7162bfdb2a4389ea08cbb0d3627a429de6d7c45 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 1 Dec 2016 11:52:20 -0800 Subject: [PATCH 1/4] Add test case --- .../datasources/parquet/ParquetFilterSuite.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 4246b54c21f0c..2c221f6fefaae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -47,7 +47,6 @@ import org.apache.spark.util.{AccumulatorContext, LongAccumulator} * data type is nullable. */ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext { - private def checkFilterPredicate( df: DataFrame, predicate: Predicate, @@ -558,4 +557,16 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } } + + test("SPARK-17213: Broken Parquet filter push-down for string columns") { + withTempPath { dir => + import testImplicits._ + + val path = dir.getCanonicalPath + // scalastyle:off nonascii + Seq("a", "é").toDF("name").write.parquet(path) + // scalastyle:on nonascii + assert(spark.read.parquet(path).where("name > 'a'").count() == 1) + } + } } From 810c1533ae9ba959e9696a806b22e19f8d0b5c31 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 1 Dec 2016 11:54:09 -0800 Subject: [PATCH 2/4] Disable Parquet filter push-down for string and binary columns --- .../datasources/parquet/ParquetFilters.scala | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index a6e9788097728..7730d1fccb0b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -40,6 +40,9 @@ private[parquet] object ParquetFilters { (n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float]) case DoubleType => (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + + // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213 + /* // Binary.fromString and Binary.fromByteArray don't accept null values case StringType => (n: String, v: Any) => FilterApi.eq( @@ -49,6 +52,7 @@ private[parquet] object ParquetFilters { (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) + */ } private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -62,6 +66,9 @@ private[parquet] object ParquetFilters { (n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) case DoubleType => (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + + // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213 + /* case StringType => (n: String, v: Any) => FilterApi.notEq( binaryColumn(n), @@ -70,6 +77,7 @@ private[parquet] object ParquetFilters { (n: String, v: Any) => FilterApi.notEq( binaryColumn(n), Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) + */ } private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -81,6 +89,9 @@ private[parquet] object ParquetFilters { (n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float]) case DoubleType => (n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + + // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213 + /* case StringType => (n: String, v: Any) => FilterApi.lt(binaryColumn(n), @@ -88,6 +99,7 @@ private[parquet] object ParquetFilters { case BinaryType => (n: String, v: Any) => FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) + */ } private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -99,6 +111,9 @@ private[parquet] object ParquetFilters { (n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) case DoubleType => (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + + // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213 + /* case StringType => (n: String, v: Any) => FilterApi.ltEq(binaryColumn(n), @@ -106,6 +121,7 @@ private[parquet] object ParquetFilters { case BinaryType => (n: String, v: Any) => FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) + */ } private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -117,6 +133,9 @@ private[parquet] object ParquetFilters { (n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float]) case DoubleType => (n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + + // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213 + /* case StringType => (n: String, v: Any) => FilterApi.gt(binaryColumn(n), @@ -124,6 +143,7 @@ private[parquet] object ParquetFilters { case BinaryType => (n: String, v: Any) => FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) + */ } private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -135,6 +155,9 @@ private[parquet] object ParquetFilters { (n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) case DoubleType => (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + + // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213 + /* case StringType => (n: String, v: Any) => FilterApi.gtEq(binaryColumn(n), @@ -142,6 +165,7 @@ private[parquet] object ParquetFilters { case BinaryType => (n: String, v: Any) => FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) + */ } /** From ce71cca44009d2f8508e02291ac6872b680cd28a Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 1 Dec 2016 11:59:37 -0800 Subject: [PATCH 3/4] Test more string comparison operators --- .../execution/datasources/parquet/ParquetFilterSuite.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 2c221f6fefaae..aec3ad0fb0532 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -566,7 +566,14 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // scalastyle:off nonascii Seq("a", "é").toDF("name").write.parquet(path) // scalastyle:on nonascii + assert(spark.read.parquet(path).where("name > 'a'").count() == 1) + assert(spark.read.parquet(path).where("name >= 'a'").count() == 2) + + // scalastyle:off nonascii + assert(spark.read.parquet(path).where("name < 'é'").count() == 1) + assert(spark.read.parquet(path).where("name <= 'é'").count() == 2) + // scalastyle:on nonascii } } } From 7b12415ec7640399d50fa3812f1824e55e0dd07a Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 1 Dec 2016 14:34:03 -0800 Subject: [PATCH 4/4] Ignore test cases for string and binary column filter push-down --- .../execution/datasources/parquet/ParquetFilterSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index aec3ad0fb0532..a0d57d79f045a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -229,7 +229,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } - test("filter pushdown - string") { + // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213 + ignore("filter pushdown - string") { withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df => checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) checkFilterPredicate( @@ -257,7 +258,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } - test("filter pushdown - binary") { + // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213 + ignore("filter pushdown - binary") { implicit class IntToBinary(int: Int) { def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8) }