Skip to content

Commit 9a9e47f

Browse files
committed
Added more test and doc
1 parent d37b2e8 commit 9a9e47f

File tree

2 files changed

+146
-5
lines changed

2 files changed

+146
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -494,8 +494,17 @@ private[parquet] class ParquetFilters(
494494
.map(_(nameToParquetField(name).fieldName, value))
495495

496496
case sources.And(lhs, rhs) =>
497-
// If the unsupported predicate is in the top level `And` condition or in the child
498-
// `And` condition before hitting `Not` or `Or` condition, it can be safely removed.
497+
// At here, it is not safe to just convert one side and remove the other side
498+
// if we do not understand what the parent filters are.
499+
//
500+
// Here is an example used to explain the reason.
501+
// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
502+
// convert b in ('1'). If we only convert a = 2, we will end up with a filter
503+
// NOT(a = 2), which will generate wrong results.
504+
//
505+
// Pushing one side of AND down is only safe to do at the top level or in the child
506+
// AND before hitting NOT or OR conditions, and in this case, the unsupported predicate
507+
// can be safely removed.
499508
val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd)
500509
val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd)
501510

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 135 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -750,7 +750,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
750750
}
751751
}
752752

753-
test("SPARK-12218 Converting conjunctions into Parquet filter predicates") {
753+
test("SPARK-12218 and SPARK-25559 Converting conjunctions into Parquet filter predicates") {
754754
val schema = StructType(Seq(
755755
StructField("a", IntegerType, nullable = false),
756756
StructField("b", StringType, nullable = true),
@@ -770,6 +770,10 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
770770
sources.GreaterThan("c", 1.5D)))
771771
}
772772

773+
// Testing when `canRemoveOneSideInAnd == true`
774+
// case sources.And(lhs, rhs) =>
775+
// ...
776+
// case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter)
773777
assertResult(Some(lt(intColumn("a"), 10: Integer))) {
774778
parquetFilters.createFilter(
775779
parquetSchema,
@@ -778,6 +782,122 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
778782
sources.StringContains("b", "prefix")))
779783
}
780784

785+
// Testing when `canRemoveOneSideInAnd == true`
786+
// case sources.And(lhs, rhs) =>
787+
// ...
788+
// case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter)
789+
assertResult(Some(lt(intColumn("a"), 10: Integer))) {
790+
parquetFilters.createFilter(
791+
parquetSchema,
792+
sources.And(
793+
sources.StringContains("b", "prefix"),
794+
sources.LessThan("a", 10)))
795+
}
796+
797+
// Testing complex And conditions
798+
assertResult(Some(
799+
FilterApi.and(lt(intColumn("a"), 10: Integer), gt(intColumn("a"), 5: Integer)))) {
800+
parquetFilters.createFilter(
801+
parquetSchema,
802+
sources.And(
803+
sources.And(
804+
sources.LessThan("a", 10),
805+
sources.StringContains("b", "prefix")
806+
),
807+
sources.GreaterThan("a", 5)))
808+
}
809+
810+
// Testing complex And conditions
811+
assertResult(Some(
812+
FilterApi.and(gt(intColumn("a"), 5: Integer), lt(intColumn("a"), 10: Integer)))) {
813+
parquetFilters.createFilter(
814+
parquetSchema,
815+
sources.And(
816+
sources.GreaterThan("a", 5),
817+
sources.And(
818+
sources.StringContains("b", "prefix"),
819+
sources.LessThan("a", 10)
820+
)))
821+
}
822+
823+
// Testing
824+
// case sources.Or(lhs, rhs) =>
825+
// ...
826+
// lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false)
827+
assertResult(None) {
828+
parquetFilters.createFilter(
829+
parquetSchema,
830+
sources.Or(
831+
sources.And(
832+
sources.GreaterThan("a", 1),
833+
sources.StringContains("b", "prefix")),
834+
sources.GreaterThan("a", 2)))
835+
}
836+
837+
// Testing
838+
// case sources.Or(lhs, rhs) =>
839+
// ...
840+
// rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false)
841+
assertResult(None) {
842+
parquetFilters.createFilter(
843+
parquetSchema,
844+
sources.Or(
845+
sources.GreaterThan("a", 2),
846+
sources.And(
847+
sources.GreaterThan("a", 1),
848+
sources.StringContains("b", "prefix"))))
849+
}
850+
851+
// Testing
852+
// case sources.Not(pred) =>
853+
// createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
854+
// .map(FilterApi.not)
855+
//
856+
// and
857+
//
858+
// Testing when `canRemoveOneSideInAnd == false`
859+
// case sources.And(lhs, rhs) =>
860+
// ...
861+
// case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter)
862+
assertResult(None) {
863+
parquetFilters.createFilter(
864+
parquetSchema,
865+
sources.Not(
866+
sources.And(
867+
sources.GreaterThan("a", 1),
868+
sources.StringContains("b", "prefix"))))
869+
}
870+
871+
// Testing
872+
// case sources.Not(pred) =>
873+
// createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
874+
// .map(FilterApi.not)
875+
//
876+
// and
877+
//
878+
// Testing when `canRemoveOneSideInAnd == false`
879+
// case sources.And(lhs, rhs) =>
880+
// ...
881+
// case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter)
882+
assertResult(None) {
883+
parquetFilters.createFilter(
884+
parquetSchema,
885+
sources.Not(
886+
sources.And(
887+
sources.StringContains("b", "prefix"),
888+
sources.GreaterThan("a", 1))))
889+
}
890+
891+
// Testing
892+
// case sources.Not(pred) =>
893+
// createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
894+
// .map(FilterApi.not)
895+
//
896+
// and
897+
//
898+
// Testing passing `canRemoveOneSideInAnd = false` into
899+
// case sources.And(lhs, rhs) =>
900+
// val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd)
781901
assertResult(None) {
782902
parquetFilters.createFilter(
783903
parquetSchema,
@@ -789,13 +909,25 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
789909
sources.GreaterThan("a", 2))))
790910
}
791911

912+
// Testing
913+
// case sources.Not(pred) =>
914+
// createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
915+
// .map(FilterApi.not)
916+
//
917+
// and
918+
//
919+
// Testing passing `canRemoveOneSideInAnd = false` into
920+
// case sources.And(lhs, rhs) =>
921+
// val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd)
792922
assertResult(None) {
793923
parquetFilters.createFilter(
794924
parquetSchema,
795925
sources.Not(
796926
sources.And(
797-
sources.GreaterThan("a", 1),
798-
sources.StringContains("b", "prefix"))))
927+
sources.GreaterThan("a", 2),
928+
sources.And(
929+
sources.GreaterThan("a", 1),
930+
sources.StringContains("b", "prefix")))))
799931
}
800932
}
801933

0 commit comments

Comments
 (0)