From cee1c73acc700f9712374e729e34a26df7eafa50 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 14 May 2019 17:45:27 +0800 Subject: [PATCH 1/9] Partially push down disjunctive predicated in Parquet/ORC --- .../datasources/parquet/ParquetFilters.scala | 4 +- .../parquet/ParquetFilterSuite.scala | 85 +++++++++++++------ .../datasources/orc/OrcFilters.scala | 8 +- .../datasources/orc/OrcFilterSuite.scala | 60 ++++++++++--- 4 files changed, 112 insertions(+), 45 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 21ab9c78e53d..dd7f0ce19e32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -529,9 +529,9 @@ private[parquet] class ParquetFilters( case sources.Or(lhs, rhs) => for { lhsFilter <- - createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts = false) + createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts = true) rhsFilter <- - createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts = false) + createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts = true) } yield FilterApi.or(lhsFilter, rhsFilter) case sources.Not(pred) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 255f7db8d135..f2b92ae61a52 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -827,34 +827,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex ))) } - // Testing - // case sources.Or(lhs, rhs) => - // ... - // lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false) - assertResult(None) { - parquetFilters.createFilter( - parquetSchema, - sources.Or( - sources.And( - sources.GreaterThan("a", 1), - sources.StringContains("b", "prefix")), - sources.GreaterThan("a", 2))) - } - - // Testing - // case sources.Or(lhs, rhs) => - // ... - // rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false) - assertResult(None) { - parquetFilters.createFilter( - parquetSchema, - sources.Or( - sources.GreaterThan("a", 2), - sources.And( - sources.GreaterThan("a", 1), - sources.StringContains("b", "prefix")))) - } - // Testing // case sources.Not(pred) => // createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) @@ -938,6 +910,63 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } + test("SPARK-27699 Converting disjunctions into Parquet filter predicates") { + val schema = StructType(Seq( + StructField("a", IntegerType, nullable = false), + StructField("b", StringType, nullable = true), + StructField("c", DoubleType, nullable = true) + )) + + val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema) + // Testing + // case sources.Or(lhs, rhs) => + // ... + // lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = true) + assertResult(Some( + FilterApi.or(gt(intColumn("a"), 1: Integer), gt(intColumn("a"), 2: Integer)))) { + parquetFilters.createFilter( + parquetSchema, + sources.Or( + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix")), + sources.GreaterThan("a", 2))) + } + + // Testing + // case sources.Or(lhs, rhs) => + // ... + // rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = true) + assertResult(Some( + FilterApi.or(gt(intColumn("a"), 2: Integer), gt(intColumn("a"), 1: Integer)))) { + parquetFilters.createFilter( + parquetSchema, + sources.Or( + sources.GreaterThan("a", 2), + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix")))) + } + + // Testing + // case sources.Or(lhs, rhs) => + // ... + // lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = true) + // rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = true) + assertResult(Some( + FilterApi.or(gt(intColumn("a"), 1: Integer), lt(intColumn("a"), 0: Integer)))) { + parquetFilters.createFilter( + parquetSchema, + sources.Or( + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix")), + sources.And( + sources.LessThan("a", 0), + sources.StringContains("b", "foobar")))) + } + } + test("SPARK-16371 Do not push down filters when inner name and outer name are the same") { withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i)))) { implicit df => // Here the schema becomes as below: diff --git a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 112dcb2cb238..c8f1599b831a 100644 --- a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -176,11 +176,11 @@ private[sql] object OrcFilters extends OrcFiltersBase { case Or(left, right) => for { - _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = false) - _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = false) + _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = true) + _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = true) lhs <- createBuilder(dataTypeMap, left, - builder.startOr(), canPartialPushDownConjuncts = false) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts = false) + builder.startOr(), canPartialPushDownConjuncts = true) + rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts = true) } yield rhs.end() case Not(child) => diff --git a/sql/core/v1.2.1/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/v1.2.1/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index e96c6fb7716c..535c32396b59 100644 --- a/sql/core/v1.2.1/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/v1.2.1/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -362,17 +362,6 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext { )).get.toString } - // Can not remove unsupported `StringContains` predicate since it is under `Or` operator. - assert(OrcFilters.createFilter(schema, Array( - Or( - LessThan("a", 10), - And( - StringContains("b", "prefix"), - GreaterThan("a", 1) - ) - ) - )).isEmpty) - // Safely remove unsupported `StringContains` predicate and push down `LessThan` assertResult("leaf-0 = (LESS_THAN a 10), expr = leaf-0") { OrcFilters.createFilter(schema, Array( @@ -398,6 +387,55 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext { } } + test("SPARK-27699 Converting disjunctions into ORC SearchArguments") { + import org.apache.spark.sql.sources._ + // The `LessThan` should be converted while the `StringContains` shouldn't + val schema = new StructType( + Array( + StructField("a", IntegerType, nullable = true), + StructField("b", StringType, nullable = true))) + + // The predicate `StringContains` predicate is not able to be pushed down. + assertResult("leaf-0 = (LESS_THAN_EQUALS a 10), leaf-1 = (LESS_THAN a 1)," + + " expr = (or (not leaf-0) leaf-1)") { + OrcFilters.createFilter(schema, Array( + Or( + GreaterThan("a", 10), + And( + StringContains("b", "prefix"), + LessThan("a", 1) + ) + ) + )).get.toString + } + + assertResult("leaf-0 = (LESS_THAN_EQUALS a 10), leaf-1 = (LESS_THAN a 1)," + + " expr = (or (not leaf-0) leaf-1)") { + OrcFilters.createFilter(schema, Array( + Or( + And( + GreaterThan("a", 10), + StringContains("b", "foobar") + ), + And( + StringContains("b", "prefix"), + LessThan("a", 1) + ) + ) + )).get.toString + } + + assert(OrcFilters.createFilter(schema, Array( + Or( + StringContains("b", "foobar"), + And( + StringContains("b", "prefix"), + LessThan("a", 1) + ) + ) + )).isEmpty) + } + test("SPARK-27160: Fix casting of the DecimalType literal") { import org.apache.spark.sql.sources._ val schema = StructType(Array(StructField("a", DecimalType(3, 2)))) From 3f7ef8a8eac5a1a3c3a3cf04ac38ebd2e0bcb176 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 15 May 2019 13:16:50 +0800 Subject: [PATCH 2/9] address comments --- .../datasources/parquet/ParquetFilters.scala | 11 ++++ .../datasources/orc/OrcFilters.scala | 11 ++++ .../datasources/orc/OrcFilters.scala | 19 ++++-- .../datasources/orc/OrcFilterSuite.scala | 60 +++++++++++++++---- 4 files changed, 86 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index dd7f0ce19e32..3796c9056865 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -527,6 +527,17 @@ private[parquet] class ParquetFilters( } case sources.Or(lhs, rhs) => + // The Or predicate is convertible when both of its children can be pushed down. + // That is to say, if one/both of the children can be partially pushed down, the Or + // predicate can be partially pushed down as well. + // + // Here is an example used to explain the reason. + // Let's say we have + // (a1 AND a2) OR (b1 AND b2), + // a1 and b1 is convertible, while a2 and b2 is not. + // The predicate can be converted as + // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) + // As per the logical in And predicate, we can push down (a1 OR b1). for { lhsFilter <- createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts = true) diff --git a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index c8f1599b831a..472e105d6ffd 100644 --- a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -175,6 +175,17 @@ private[sql] object OrcFilters extends OrcFiltersBase { } case Or(left, right) => + // The Or predicate is convertible when both of its children can be pushed down. + // That is to say, if one/both of the children can be partially pushed down, the Or + // predicate can be partially pushed down as well. + // + // Here is an example used to explain the reason. + // Let's say we have + // (a1 AND a2) OR (b1 AND b2), + // a1 and b1 is convertible, while a2 and b2 is not. + // The predicate can be converted as + // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) + // As per the logical in And predicate, we can push down (a1 OR b1). for { _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = true) _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = true) diff --git a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 85d61bccc8e3..fe6abffbb697 100644 --- a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -175,12 +175,23 @@ private[sql] object OrcFilters extends OrcFiltersBase { } case Or(left, right) => + // The Or predicate is convertible when both of its children can be pushed down. + // That is to say, if one/both of the children can be partially pushed down, the Or + // predicate can be partially pushed down as well. + // + // Here is an example used to explain the reason. + // Let's say we have + // (a1 AND a2) OR (b1 AND b2), + // a1 and b1 is convertible, while a2 and b2 is not. + // The predicate can be converted as + // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) + // As per the logical in And predicate, we can push down (a1 OR b1). for { - _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = false) - _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = false) + _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = true) + _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = true) lhs <- createBuilder(dataTypeMap, left, - builder.startOr(), canPartialPushDownConjuncts = false) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts = false) + builder.startOr(), canPartialPushDownConjuncts = true) + rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts = true) } yield rhs.end() case Not(child) => diff --git a/sql/core/v2.3.4/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/v2.3.4/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index 1ed42f1c5f75..0f19c9e40b93 100644 --- a/sql/core/v2.3.4/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/v2.3.4/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -362,17 +362,6 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext { )).get.toString } - // Can not remove unsupported `StringContains` predicate since it is under `Or` operator. - assert(OrcFilters.createFilter(schema, Array( - Or( - LessThan("a", 10), - And( - StringContains("b", "prefix"), - GreaterThan("a", 1) - ) - ) - )).isEmpty) - // Safely remove unsupported `StringContains` predicate and push down `LessThan` assertResult("leaf-0 = (LESS_THAN a 10), expr = leaf-0") { OrcFilters.createFilter(schema, Array( @@ -398,6 +387,55 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext { } } + test("SPARK-27699 Converting disjunctions into ORC SearchArguments") { + import org.apache.spark.sql.sources._ + // The `LessThan` should be converted while the `StringContains` shouldn't + val schema = new StructType( + Array( + StructField("a", IntegerType, nullable = true), + StructField("b", StringType, nullable = true))) + + // The predicate `StringContains` predicate is not able to be pushed down. + assertResult("leaf-0 = (LESS_THAN_EQUALS a 10), leaf-1 = (LESS_THAN a 1)," + + " expr = (or (not leaf-0) leaf-1)") { + OrcFilters.createFilter(schema, Array( + Or( + GreaterThan("a", 10), + And( + StringContains("b", "prefix"), + LessThan("a", 1) + ) + ) + )).get.toString + } + + assertResult("leaf-0 = (LESS_THAN_EQUALS a 10), leaf-1 = (LESS_THAN a 1)," + + " expr = (or (not leaf-0) leaf-1)") { + OrcFilters.createFilter(schema, Array( + Or( + And( + GreaterThan("a", 10), + StringContains("b", "foobar") + ), + And( + StringContains("b", "prefix"), + LessThan("a", 1) + ) + ) + )).get.toString + } + + assert(OrcFilters.createFilter(schema, Array( + Or( + StringContains("b", "foobar"), + And( + StringContains("b", "prefix"), + LessThan("a", 1) + ) + ) + )).isEmpty) + } + test("SPARK-27160: Fix casting of the DecimalType literal") { import org.apache.spark.sql.sources._ val schema = StructType(Array(StructField("a", DecimalType(3, 2)))) From 529b2078857c6950436e3d9d47b5e71dcc5cfb63 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 15 May 2019 15:23:41 +0800 Subject: [PATCH 3/9] address comment --- .../execution/datasources/parquet/ParquetFilters.scala | 4 ++-- .../spark/sql/execution/datasources/orc/OrcFilters.scala | 8 ++++---- .../spark/sql/execution/datasources/orc/OrcFilters.scala | 8 ++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 3796c9056865..22d71659b928 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -540,9 +540,9 @@ private[parquet] class ParquetFilters( // As per the logical in And predicate, we can push down (a1 OR b1). for { lhsFilter <- - createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts = true) + createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts) rhsFilter <- - createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts = true) + createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts) } yield FilterApi.or(lhsFilter, rhsFilter) case sources.Not(pred) => diff --git a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 472e105d6ffd..8bf9b24d7b63 100644 --- a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -187,11 +187,11 @@ private[sql] object OrcFilters extends OrcFiltersBase { // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) // As per the logical in And predicate, we can push down (a1 OR b1). for { - _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = true) - _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = true) + _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) + _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) lhs <- createBuilder(dataTypeMap, left, - builder.startOr(), canPartialPushDownConjuncts = true) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts = true) + builder.startOr(), canPartialPushDownConjuncts) + rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) } yield rhs.end() case Not(child) => diff --git a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index fe6abffbb697..12d771f2c384 100644 --- a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -187,11 +187,11 @@ private[sql] object OrcFilters extends OrcFiltersBase { // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) // As per the logical in And predicate, we can push down (a1 OR b1). for { - _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = true) - _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = true) + _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) + _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) lhs <- createBuilder(dataTypeMap, left, - builder.startOr(), canPartialPushDownConjuncts = true) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts = true) + builder.startOr(), canPartialPushDownConjuncts) + rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjunct) } yield rhs.end() case Not(child) => From caeb64d089de9a3bc9237cc616062273d10dba09 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 15 May 2019 19:54:01 +0800 Subject: [PATCH 4/9] improve OrcFilters in Hive project as well --- .../spark/sql/hive/orc/OrcFilters.scala | 19 ++++-- .../sql/hive/orc/HiveOrcFilterSuite.scala | 60 +++++++++++++++---- 2 files changed, 64 insertions(+), 15 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index dfac73ce62fa..3bfe157f5fe1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -155,12 +155,23 @@ private[orc] object OrcFilters extends Logging { } case Or(left, right) => + // The Or predicate is convertible when both of its children can be pushed down. + // That is to say, if one/both of the children can be partially pushed down, the Or + // predicate can be partially pushed down as well. + // + // Here is an example used to explain the reason. + // Let's say we have + // (a1 AND a2) OR (b1 AND b2), + // a1 and b1 is convertible, while a2 and b2 is not. + // The predicate can be converted as + // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) + // As per the logical in And predicate, we can push down (a1 OR b1). for { - _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = false) - _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = false) + _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) + _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) lhs <- createBuilder(dataTypeMap, left, - builder.startOr(), canPartialPushDownConjuncts = false) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts = false) + builder.startOr(), canPartialPushDownConjuncts) + rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) } yield rhs.end() case Not(child) => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala index 6de47a68a9bc..6932a2d6174e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala @@ -399,17 +399,6 @@ class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton { )).get.toString } - // Can not remove unsupported `StringContains` predicate since it is under `Or` operator. - assert(OrcFilters.createFilter(schema, Array( - Or( - LessThan("a", 10), - And( - StringContains("b", "prefix"), - GreaterThan("a", 1) - ) - ) - )).isEmpty) - // Safely remove unsupported `StringContains` predicate and push down `LessThan` assertResultWithDiffHiveVersion( """leaf-0 = (LESS_THAN a 10) @@ -442,4 +431,53 @@ class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton { )).get.toString } } + + test("SPARK-27699 Converting disjunctions into ORC SearchArguments") { + import org.apache.spark.sql.sources._ + // The `LessThan` should be converted while the `StringContains` shouldn't + val schema = new StructType( + Array( + StructField("a", IntegerType, nullable = true), + StructField("b", StringType, nullable = true))) + + // The predicate `StringContains` predicate is not able to be pushed down. + assertResult("leaf-0 = (LESS_THAN_EQUALS a 10)\nleaf-1 = (LESS_THAN a 1)\n" + + "expr = (or (not leaf-0) leaf-1)") { + OrcFilters.createFilter(schema, Array( + Or( + GreaterThan("a", 10), + And( + StringContains("b", "prefix"), + LessThan("a", 1) + ) + ) + )).get.toString + } + + assertResult("leaf-0 = (LESS_THAN_EQUALS a 10)\nleaf-1 = (LESS_THAN a 1)\n" + + "expr = (or (not leaf-0) leaf-1)") { + OrcFilters.createFilter(schema, Array( + Or( + And( + GreaterThan("a", 10), + StringContains("b", "foobar") + ), + And( + StringContains("b", "prefix"), + LessThan("a", 1) + ) + ) + )).get.toString + } + + assert(OrcFilters.createFilter(schema, Array( + Or( + StringContains("b", "foobar"), + And( + StringContains("b", "prefix"), + LessThan("a", 1) + ) + ) + )).isEmpty) + } } From 0fe71948d54383ad79831e9f97993fa1c6ab25fd Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 16 May 2019 17:18:28 +0800 Subject: [PATCH 5/9] fix convertibleFilters --- .../org/apache/spark/sql/SQLQuerySuite.scala | 20 +++++++++- .../datasources/orc/OrcFilters.scala | 40 +++++++++++++++++-- 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 6ae77a8e68c9..c028715e0a99 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -26,7 +26,8 @@ import org.apache.spark.{AccumulatorSuite, SparkException} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec} -import org.apache.spark.sql.execution.datasources.FilePartition +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -2978,6 +2979,23 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } + test("SPARK-27699 Converting disjunctions into ORC SearchArguments") { + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") { + withTempPath { dir => + spark.range(10).map(i => (i, i.toString)).toDF("id", "s").write.orc(dir.getCanonicalPath) + val df = spark.read.orc(dir.getCanonicalPath) + .where(('id < 2 and 's.contains("foo")) or ('id > 10 and 's.contains("bar"))) + val scan = df.queryExecution.sparkPlan + .find(_.isInstanceOf[BatchScanExec]).get.asInstanceOf[BatchScanExec] + .scan + assert(scan.isInstanceOf[OrcScan]) + assertResult(Array(sources.Or(sources.LessThan("id", 2), sources.GreaterThan("id", 10)))) { + scan.asInstanceOf[OrcScan].pushedFilters + } + } + } + } + test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") { Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) { diff --git a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 8bf9b24d7b63..9e4bf22ff7e3 100644 --- a/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -75,10 +75,42 @@ private[sql] object OrcFilters extends OrcFiltersBase { schema: StructType, dataTypeMap: Map[String, DataType], filters: Seq[Filter]): Seq[Filter] = { - for { - filter <- filters - _ <- buildSearchArgument(dataTypeMap, filter, newBuilder()) - } yield filter + import org.apache.spark.sql.sources._ + + def convertibleFiltersHelper( + filter: Filter, + canPartialPushDown: Boolean): Option[Filter] = filter match { + case And(left, right) => + val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) + val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) + (leftResultOptional, rightResultOptional) match { + case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, rightResult)) + case (Some(leftResult), None) if canPartialPushDown => Some(leftResult) + case (None, Some(rightResult)) if canPartialPushDown => Some(rightResult) + case _ => None + } + + case Or(left, right) => + val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) + val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) + if (leftResultOptional.isEmpty || rightResultOptional.isEmpty) { + None + } else { + Some(Or(leftResultOptional.get, rightResultOptional.get)) + } + case Not(pred) => + val resultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) + resultOptional.map(Not) + case other => + if (buildSearchArgument(dataTypeMap, other, newBuilder()).isDefined) { + Some(other) + } else { + None + } + } + filters.flatMap { filter => + convertibleFiltersHelper(filter, true) + } } /** From 5f412bd8bbfa18cb877f4b8eb0e132d4c993eb12 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 16 May 2019 23:39:40 +0800 Subject: [PATCH 6/9] fix DataSourceV2Strategy --- .../datasources/DataSourceStrategy.scala | 179 +++++++++++------- .../datasources/v2/DataSourceV2Strategy.scala | 19 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 26 ++- 3 files changed, 141 insertions(+), 83 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index c907ac21af38..d8587864fb49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources import java.util.Locale +import scala.collection.mutable + import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging @@ -436,60 +438,94 @@ object DataSourceStrategy { } } + private def translateLeafNodeFilter(predicate: Expression): Option[Filter] = predicate match { + case expressions.EqualTo(a: Attribute, Literal(v, t)) => + Some(sources.EqualTo(a.name, convertToScala(v, t))) + case expressions.EqualTo(Literal(v, t), a: Attribute) => + Some(sources.EqualTo(a.name, convertToScala(v, t))) + + case expressions.EqualNullSafe(a: Attribute, Literal(v, t)) => + Some(sources.EqualNullSafe(a.name, convertToScala(v, t))) + case expressions.EqualNullSafe(Literal(v, t), a: Attribute) => + Some(sources.EqualNullSafe(a.name, convertToScala(v, t))) + + case expressions.GreaterThan(a: Attribute, Literal(v, t)) => + Some(sources.GreaterThan(a.name, convertToScala(v, t))) + case expressions.GreaterThan(Literal(v, t), a: Attribute) => + Some(sources.LessThan(a.name, convertToScala(v, t))) + + case expressions.LessThan(a: Attribute, Literal(v, t)) => + Some(sources.LessThan(a.name, convertToScala(v, t))) + case expressions.LessThan(Literal(v, t), a: Attribute) => + Some(sources.GreaterThan(a.name, convertToScala(v, t))) + + case expressions.GreaterThanOrEqual(a: Attribute, Literal(v, t)) => + Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t))) + case expressions.GreaterThanOrEqual(Literal(v, t), a: Attribute) => + Some(sources.LessThanOrEqual(a.name, convertToScala(v, t))) + + case expressions.LessThanOrEqual(a: Attribute, Literal(v, t)) => + Some(sources.LessThanOrEqual(a.name, convertToScala(v, t))) + case expressions.LessThanOrEqual(Literal(v, t), a: Attribute) => + Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t))) + + case expressions.InSet(a: Attribute, set) => + val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType) + Some(sources.In(a.name, set.toArray.map(toScala))) + + // Because we only convert In to InSet in Optimizer when there are more than certain + // items. So it is possible we still get an In expression here that needs to be pushed + // down. + case expressions.In(a: Attribute, list) if list.forall(_.isInstanceOf[Literal]) => + val hSet = list.map(_.eval(EmptyRow)) + val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType) + Some(sources.In(a.name, hSet.toArray.map(toScala))) + + case expressions.IsNull(a: Attribute) => + Some(sources.IsNull(a.name)) + case expressions.IsNotNull(a: Attribute) => + Some(sources.IsNotNull(a.name)) + case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) => + Some(sources.StringStartsWith(a.name, v.toString)) + + case expressions.EndsWith(a: Attribute, Literal(v: UTF8String, StringType)) => + Some(sources.StringEndsWith(a.name, v.toString)) + + case expressions.Contains(a: Attribute, Literal(v: UTF8String, StringType)) => + Some(sources.StringContains(a.name, v.toString)) + + case expressions.Literal(true, BooleanType) => + Some(sources.AlwaysTrue) + + case expressions.Literal(false, BooleanType) => + Some(sources.AlwaysFalse) + + case _ => None + } + /** * Tries to translate a Catalyst [[Expression]] into data source [[Filter]]. * * @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`. */ protected[sql] def translateFilter(predicate: Expression): Option[Filter] = { - predicate match { - case expressions.EqualTo(a: Attribute, Literal(v, t)) => - Some(sources.EqualTo(a.name, convertToScala(v, t))) - case expressions.EqualTo(Literal(v, t), a: Attribute) => - Some(sources.EqualTo(a.name, convertToScala(v, t))) - - case expressions.EqualNullSafe(a: Attribute, Literal(v, t)) => - Some(sources.EqualNullSafe(a.name, convertToScala(v, t))) - case expressions.EqualNullSafe(Literal(v, t), a: Attribute) => - Some(sources.EqualNullSafe(a.name, convertToScala(v, t))) - - case expressions.GreaterThan(a: Attribute, Literal(v, t)) => - Some(sources.GreaterThan(a.name, convertToScala(v, t))) - case expressions.GreaterThan(Literal(v, t), a: Attribute) => - Some(sources.LessThan(a.name, convertToScala(v, t))) - - case expressions.LessThan(a: Attribute, Literal(v, t)) => - Some(sources.LessThan(a.name, convertToScala(v, t))) - case expressions.LessThan(Literal(v, t), a: Attribute) => - Some(sources.GreaterThan(a.name, convertToScala(v, t))) - - case expressions.GreaterThanOrEqual(a: Attribute, Literal(v, t)) => - Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t))) - case expressions.GreaterThanOrEqual(Literal(v, t), a: Attribute) => - Some(sources.LessThanOrEqual(a.name, convertToScala(v, t))) - - case expressions.LessThanOrEqual(a: Attribute, Literal(v, t)) => - Some(sources.LessThanOrEqual(a.name, convertToScala(v, t))) - case expressions.LessThanOrEqual(Literal(v, t), a: Attribute) => - Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t))) - - case expressions.InSet(a: Attribute, set) => - val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType) - Some(sources.In(a.name, set.toArray.map(toScala))) - - // Because we only convert In to InSet in Optimizer when there are more than certain - // items. So it is possible we still get an In expression here that needs to be pushed - // down. - case expressions.In(a: Attribute, list) if list.forall(_.isInstanceOf[Literal]) => - val hSet = list.map(_.eval(EmptyRow)) - val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType) - Some(sources.In(a.name, hSet.toArray.map(toScala))) - - case expressions.IsNull(a: Attribute) => - Some(sources.IsNull(a.name)) - case expressions.IsNotNull(a: Attribute) => - Some(sources.IsNotNull(a.name)) + translateFilterWithMapping(predicate, None) + } + /** + * Tries to translate a Catalyst [[Expression]] into data source [[Filter]]. + * + * @param predicate The input [[Expression]] to be translated as [[Filter]] + * @param translatedFilterToExpr Optional Mapping of all the leaf node filter expression to its + * translated [[Filter]]. The map is used for rebuilding Expression + * from [[Filter]]s. + * @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`. + */ + protected[sql] def translateFilterWithMapping( + predicate: Expression, + translatedFilterToExpr: Option[mutable.HashMap[sources.Filter, Expression]]) + : Option[Filter] = { + predicate match { case expressions.And(left, right) => // See SPARK-12218 for detailed discussion // It is not safe to just convert one side if we do not understand the @@ -501,35 +537,44 @@ object DataSourceStrategy { // Pushing one leg of AND down is only safe to do at the top level. // You can see ParquetFilters' createFilter for more details. for { - leftFilter <- translateFilter(left) - rightFilter <- translateFilter(right) + leftFilter <- translateFilterWithMapping(left, translatedFilterToExpr) + rightFilter <- translateFilterWithMapping(right, translatedFilterToExpr) } yield sources.And(leftFilter, rightFilter) case expressions.Or(left, right) => for { - leftFilter <- translateFilter(left) - rightFilter <- translateFilter(right) + leftFilter <- translateFilterWithMapping(left, translatedFilterToExpr) + rightFilter <- translateFilterWithMapping(right, translatedFilterToExpr) } yield sources.Or(leftFilter, rightFilter) case expressions.Not(child) => - translateFilter(child).map(sources.Not) - - case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) => - Some(sources.StringStartsWith(a.name, v.toString)) - - case expressions.EndsWith(a: Attribute, Literal(v: UTF8String, StringType)) => - Some(sources.StringEndsWith(a.name, v.toString)) - - case expressions.Contains(a: Attribute, Literal(v: UTF8String, StringType)) => - Some(sources.StringContains(a.name, v.toString)) - - case expressions.Literal(true, BooleanType) => - Some(sources.AlwaysTrue) + translateFilterWithMapping(child, translatedFilterToExpr).map(sources.Not) - case expressions.Literal(false, BooleanType) => - Some(sources.AlwaysFalse) + case other => + val filter = translateLeafNodeFilter(other) + if (filter.isDefined && translatedFilterToExpr.isDefined) { + translatedFilterToExpr.get(filter.get) = predicate + } + filter + } + } - case _ => None + protected[sql] def rebuildExpressionFromFilter( + filter: Filter, + translatedFilterToExpr: mutable.HashMap[sources.Filter, Expression]): Expression = { + filter match { + case sources.And(left, right) => + expressions.And(rebuildExpressionFromFilter(left, translatedFilterToExpr), + rebuildExpressionFromFilter(right, translatedFilterToExpr)) + case sources.Or(left, right) => + expressions.Or(rebuildExpressionFromFilter(left, translatedFilterToExpr), + rebuildExpressionFromFilter(right, translatedFilterToExpr)) + case sources.Not(pred) => + expressions.Not(rebuildExpressionFromFilter(pred, translatedFilterToExpr)) + case other => + translatedFilterToExpr.getOrElse(other, + throw new AnalysisException( + s"Fail to rebuild expression: missing key $filter in `translatedFilterToExpr`")) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 4325aaa4e993..aba7b62d4d5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -46,25 +46,30 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { case r: SupportsPushDownFilters => // A map from translated data source filters to original catalyst filter expressions. val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, Expression] + val translatedFilters = mutable.ArrayBuffer.empty[sources.Filter] // Catalyst filter expression that can't be translated to data source filters. val untranslatableExprs = mutable.ArrayBuffer.empty[Expression] for (filterExpr <- filters) { - val translated = DataSourceStrategy.translateFilter(filterExpr) - if (translated.isDefined) { - translatedFilterToExpr(translated.get) = filterExpr - } else { + val translated = + DataSourceStrategy.translateFilterWithMapping(filterExpr, Some(translatedFilterToExpr)) + if (translated.isEmpty) { untranslatableExprs += filterExpr + } else { + translatedFilters += translated.get } } // Data source filters that need to be evaluated again after scanning. which means // the data source cannot guarantee the rows returned can pass these filters. // As a result we must return it so Spark can plan an extra filter operator. - val postScanFilters = r.pushFilters(translatedFilterToExpr.keys.toArray) - .map(translatedFilterToExpr) + val postScanFilters = r.pushFilters(translatedFilters.toArray).map { filter => + DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr) + } // The filters which are marked as pushed to this data source - val pushedFilters = r.pushedFilters().map(translatedFilterToExpr) + val pushedFilters = r.pushedFilters().map { filter => + DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr) + } (pushedFilters, untranslatableExprs ++ postScanFilters) case _ => (Nil, filters) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index c028715e0a99..da2645ccca96 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2979,19 +2979,27 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } - test("SPARK-27699 Converting disjunctions into ORC SearchArguments") { + test("SPARK-27699 Validate pushed down filters") { + def checkPushedFilters(df: DataFrame, filters: Array[sources.Filter]): Unit = { + val scan = df.queryExecution.sparkPlan + .find(_.isInstanceOf[BatchScanExec]).get.asInstanceOf[BatchScanExec] + .scan + assert(scan.isInstanceOf[OrcScan]) + assert(scan.asInstanceOf[OrcScan].pushedFilters === filters) + } withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") { withTempPath { dir => spark.range(10).map(i => (i, i.toString)).toDF("id", "s").write.orc(dir.getCanonicalPath) val df = spark.read.orc(dir.getCanonicalPath) - .where(('id < 2 and 's.contains("foo")) or ('id > 10 and 's.contains("bar"))) - val scan = df.queryExecution.sparkPlan - .find(_.isInstanceOf[BatchScanExec]).get.asInstanceOf[BatchScanExec] - .scan - assert(scan.isInstanceOf[OrcScan]) - assertResult(Array(sources.Or(sources.LessThan("id", 2), sources.GreaterThan("id", 10)))) { - scan.asInstanceOf[OrcScan].pushedFilters - } + checkPushedFilters( + df.where(('id < 2 and 's.contains("foo")) or ('id > 10 and 's.contains("bar"))), + Array(sources.Or(sources.LessThan("id", 2), sources.GreaterThan("id", 10)))) + checkPushedFilters( + df.where('s.contains("foo") or ('id > 10 and 's.contains("bar"))), + Array.empty) + checkPushedFilters( + df.where('id < 2 and not('id > 10 and 's.contains("bar"))), + Array(sources.IsNotNull("id"), sources.LessThan("id", 2))) } } } From b8cb8430bd933033fe765d37f6fea09986f739aa Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 17 May 2019 00:29:18 +0800 Subject: [PATCH 7/9] add comment --- .../sql/execution/datasources/DataSourceStrategy.scala | 6 +++--- .../sql/execution/datasources/v2/DataSourceV2Strategy.scala | 5 ++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index d8587864fb49..b8bed8569ace 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -516,9 +516,9 @@ object DataSourceStrategy { * Tries to translate a Catalyst [[Expression]] into data source [[Filter]]. * * @param predicate The input [[Expression]] to be translated as [[Filter]] - * @param translatedFilterToExpr Optional Mapping of all the leaf node filter expression to its - * translated [[Filter]]. The map is used for rebuilding Expression - * from [[Filter]]s. + * @param translatedFilterToExpr An optional map from leaf node filter expressions to its + * translated [[Filter]]. The map is used for rebuilding + * [[Expression]] from [[Filter]]. * @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`. */ protected[sql] def translateFilterWithMapping( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index aba7b62d4d5e..e1d443b9a388 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -44,7 +44,10 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = { scanBuilder match { case r: SupportsPushDownFilters => - // A map from translated data source filters to original catalyst filter expressions. + // A map from translated data source leaf node filters to original catalyst filter + // expressions. For a `And`/`Or` predicate, it is possible that the predicate is partially + // pushed down. This map can used to construct a catalyst filter expression from the input + // filter, or a superset(partial push down filter) of the input filter. val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, Expression] val translatedFilters = mutable.ArrayBuffer.empty[sources.Filter] // Catalyst filter expression that can't be translated to data source filters. From 4d840607490b52ebdf65a103a3502a1442fc2198 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 17 May 2019 07:59:00 +0800 Subject: [PATCH 8/9] revise --- .../sql/execution/datasources/v2/DataSourceV2Strategy.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index e1d443b9a388..92596aca5190 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -46,8 +46,8 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { case r: SupportsPushDownFilters => // A map from translated data source leaf node filters to original catalyst filter // expressions. For a `And`/`Or` predicate, it is possible that the predicate is partially - // pushed down. This map can used to construct a catalyst filter expression from the input - // filter, or a superset(partial push down filter) of the input filter. + // pushed down. This map can be used to construct a catalyst filter expression from the + // input filter, or a superset(partial push down filter) of the input filter. val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, Expression] val translatedFilters = mutable.ArrayBuffer.empty[sources.Filter] // Catalyst filter expression that can't be translated to data source filters. From 90b0b697246251b1e0b8acfe07f53f1153aefe45 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 17 May 2019 08:34:49 +0800 Subject: [PATCH 9/9] update 2.3.4 --- .../datasources/orc/OrcFilters.scala | 45 ++++++++++++++++--- 1 file changed, 38 insertions(+), 7 deletions(-) diff --git a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 12d771f2c384..632a72a32abd 100644 --- a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -75,10 +75,42 @@ private[sql] object OrcFilters extends OrcFiltersBase { schema: StructType, dataTypeMap: Map[String, DataType], filters: Seq[Filter]): Seq[Filter] = { - for { - filter <- filters - _ <- buildSearchArgument(dataTypeMap, filter, newBuilder()) - } yield filter + import org.apache.spark.sql.sources._ + + def convertibleFiltersHelper( + filter: Filter, + canPartialPushDown: Boolean): Option[Filter] = filter match { + case And(left, right) => + val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) + val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) + (leftResultOptional, rightResultOptional) match { + case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, rightResult)) + case (Some(leftResult), None) if canPartialPushDown => Some(leftResult) + case (None, Some(rightResult)) if canPartialPushDown => Some(rightResult) + case _ => None + } + + case Or(left, right) => + val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) + val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) + if (leftResultOptional.isEmpty || rightResultOptional.isEmpty) { + None + } else { + Some(Or(leftResultOptional.get, rightResultOptional.get)) + } + case Not(pred) => + val resultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) + resultOptional.map(Not) + case other => + if (buildSearchArgument(dataTypeMap, other, newBuilder()).isDefined) { + Some(other) + } else { + None + } + } + filters.flatMap { filter => + convertibleFiltersHelper(filter, true) + } } /** @@ -189,9 +221,8 @@ private[sql] object OrcFilters extends OrcFiltersBase { for { _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) - lhs <- createBuilder(dataTypeMap, left, - builder.startOr(), canPartialPushDownConjuncts) - rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjunct) + lhs <- createBuilder(dataTypeMap, left, builder.startOr(), canPartialPushDownConjuncts) + rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) } yield rhs.end() case Not(child) =>