Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources

import java.util.Locale

import scala.collection.mutable

import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -436,60 +438,94 @@ object DataSourceStrategy {
}
}

private def translateLeafNodeFilter(predicate: Expression): Option[Filter] = predicate match {
case expressions.EqualTo(a: Attribute, Literal(v, t)) =>
Some(sources.EqualTo(a.name, convertToScala(v, t)))
case expressions.EqualTo(Literal(v, t), a: Attribute) =>
Some(sources.EqualTo(a.name, convertToScala(v, t)))

case expressions.EqualNullSafe(a: Attribute, Literal(v, t)) =>
Some(sources.EqualNullSafe(a.name, convertToScala(v, t)))
case expressions.EqualNullSafe(Literal(v, t), a: Attribute) =>
Some(sources.EqualNullSafe(a.name, convertToScala(v, t)))

case expressions.GreaterThan(a: Attribute, Literal(v, t)) =>
Some(sources.GreaterThan(a.name, convertToScala(v, t)))
case expressions.GreaterThan(Literal(v, t), a: Attribute) =>
Some(sources.LessThan(a.name, convertToScala(v, t)))

case expressions.LessThan(a: Attribute, Literal(v, t)) =>
Some(sources.LessThan(a.name, convertToScala(v, t)))
case expressions.LessThan(Literal(v, t), a: Attribute) =>
Some(sources.GreaterThan(a.name, convertToScala(v, t)))

case expressions.GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t)))
case expressions.GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
Some(sources.LessThanOrEqual(a.name, convertToScala(v, t)))

case expressions.LessThanOrEqual(a: Attribute, Literal(v, t)) =>
Some(sources.LessThanOrEqual(a.name, convertToScala(v, t)))
case expressions.LessThanOrEqual(Literal(v, t), a: Attribute) =>
Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t)))

case expressions.InSet(a: Attribute, set) =>
val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType)
Some(sources.In(a.name, set.toArray.map(toScala)))

// Because we only convert In to InSet in Optimizer when there are more than certain
// items. So it is possible we still get an In expression here that needs to be pushed
// down.
case expressions.In(a: Attribute, list) if list.forall(_.isInstanceOf[Literal]) =>
val hSet = list.map(_.eval(EmptyRow))
val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType)
Some(sources.In(a.name, hSet.toArray.map(toScala)))

case expressions.IsNull(a: Attribute) =>
Some(sources.IsNull(a.name))
case expressions.IsNotNull(a: Attribute) =>
Some(sources.IsNotNull(a.name))
case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
Some(sources.StringStartsWith(a.name, v.toString))

case expressions.EndsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
Some(sources.StringEndsWith(a.name, v.toString))

case expressions.Contains(a: Attribute, Literal(v: UTF8String, StringType)) =>
Some(sources.StringContains(a.name, v.toString))

case expressions.Literal(true, BooleanType) =>
Some(sources.AlwaysTrue)

case expressions.Literal(false, BooleanType) =>
Some(sources.AlwaysFalse)

case _ => None
}

/**
* Tries to translate a Catalyst [[Expression]] into data source [[Filter]].
*
* @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`.
*/
protected[sql] def translateFilter(predicate: Expression): Option[Filter] = {
predicate match {
case expressions.EqualTo(a: Attribute, Literal(v, t)) =>
Some(sources.EqualTo(a.name, convertToScala(v, t)))
case expressions.EqualTo(Literal(v, t), a: Attribute) =>
Some(sources.EqualTo(a.name, convertToScala(v, t)))

case expressions.EqualNullSafe(a: Attribute, Literal(v, t)) =>
Some(sources.EqualNullSafe(a.name, convertToScala(v, t)))
case expressions.EqualNullSafe(Literal(v, t), a: Attribute) =>
Some(sources.EqualNullSafe(a.name, convertToScala(v, t)))

case expressions.GreaterThan(a: Attribute, Literal(v, t)) =>
Some(sources.GreaterThan(a.name, convertToScala(v, t)))
case expressions.GreaterThan(Literal(v, t), a: Attribute) =>
Some(sources.LessThan(a.name, convertToScala(v, t)))

case expressions.LessThan(a: Attribute, Literal(v, t)) =>
Some(sources.LessThan(a.name, convertToScala(v, t)))
case expressions.LessThan(Literal(v, t), a: Attribute) =>
Some(sources.GreaterThan(a.name, convertToScala(v, t)))

case expressions.GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t)))
case expressions.GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
Some(sources.LessThanOrEqual(a.name, convertToScala(v, t)))

case expressions.LessThanOrEqual(a: Attribute, Literal(v, t)) =>
Some(sources.LessThanOrEqual(a.name, convertToScala(v, t)))
case expressions.LessThanOrEqual(Literal(v, t), a: Attribute) =>
Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t)))

case expressions.InSet(a: Attribute, set) =>
val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType)
Some(sources.In(a.name, set.toArray.map(toScala)))

// Because we only convert In to InSet in Optimizer when there are more than certain
// items. So it is possible we still get an In expression here that needs to be pushed
// down.
case expressions.In(a: Attribute, list) if list.forall(_.isInstanceOf[Literal]) =>
val hSet = list.map(_.eval(EmptyRow))
val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType)
Some(sources.In(a.name, hSet.toArray.map(toScala)))

case expressions.IsNull(a: Attribute) =>
Some(sources.IsNull(a.name))
case expressions.IsNotNull(a: Attribute) =>
Some(sources.IsNotNull(a.name))
translateFilterWithMapping(predicate, None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add more explanation why this PR needs to have translateFilterWithMapping and translateLeafNodeFilter? Is this inevitable refactoring?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inevitable. See my comments below or the ending part of the PR description.

}

/**
* Tries to translate a Catalyst [[Expression]] into data source [[Filter]].
*
* @param predicate The input [[Expression]] to be translated as [[Filter]]
* @param translatedFilterToExpr An optional map from leaf node filter expressions to its
* translated [[Filter]]. The map is used for rebuilding
* [[Expression]] from [[Filter]].
* @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`.
*/
protected[sql] def translateFilterWithMapping(
predicate: Expression,
translatedFilterToExpr: Option[mutable.HashMap[sources.Filter, Expression]])
: Option[Filter] = {
predicate match {
case expressions.And(left, right) =>
// See SPARK-12218 for detailed discussion
// It is not safe to just convert one side if we do not understand the
Expand All @@ -501,35 +537,44 @@ object DataSourceStrategy {
// Pushing one leg of AND down is only safe to do at the top level.
// You can see ParquetFilters' createFilter for more details.
for {
leftFilter <- translateFilter(left)
rightFilter <- translateFilter(right)
leftFilter <- translateFilterWithMapping(left, translatedFilterToExpr)
rightFilter <- translateFilterWithMapping(right, translatedFilterToExpr)
} yield sources.And(leftFilter, rightFilter)

case expressions.Or(left, right) =>
for {
leftFilter <- translateFilter(left)
rightFilter <- translateFilter(right)
leftFilter <- translateFilterWithMapping(left, translatedFilterToExpr)
rightFilter <- translateFilterWithMapping(right, translatedFilterToExpr)
} yield sources.Or(leftFilter, rightFilter)

case expressions.Not(child) =>
translateFilter(child).map(sources.Not)

case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
Some(sources.StringStartsWith(a.name, v.toString))

case expressions.EndsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
Some(sources.StringEndsWith(a.name, v.toString))

case expressions.Contains(a: Attribute, Literal(v: UTF8String, StringType)) =>
Some(sources.StringContains(a.name, v.toString))

case expressions.Literal(true, BooleanType) =>
Some(sources.AlwaysTrue)
translateFilterWithMapping(child, translatedFilterToExpr).map(sources.Not)

case expressions.Literal(false, BooleanType) =>
Some(sources.AlwaysFalse)
case other =>
val filter = translateLeafNodeFilter(other)
if (filter.isDefined && translatedFilterToExpr.isDefined) {
translatedFilterToExpr.get(filter.get) = predicate
}
filter
}
}

case _ => None
protected[sql] def rebuildExpressionFromFilter(
filter: Filter,
translatedFilterToExpr: mutable.HashMap[sources.Filter, Expression]): Expression = {
filter match {
case sources.And(left, right) =>
expressions.And(rebuildExpressionFromFilter(left, translatedFilterToExpr),
rebuildExpressionFromFilter(right, translatedFilterToExpr))
case sources.Or(left, right) =>
expressions.Or(rebuildExpressionFromFilter(left, translatedFilterToExpr),
rebuildExpressionFromFilter(right, translatedFilterToExpr))
case sources.Not(pred) =>
expressions.Not(rebuildExpressionFromFilter(pred, translatedFilterToExpr))
case other =>
translatedFilterToExpr.getOrElse(other,
throw new AnalysisException(
s"Fail to rebuild expression: missing key $filter in `translatedFilterToExpr`"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,11 +527,22 @@ private[parquet] class ParquetFilters(
}

case sources.Or(lhs, rhs) =>
// The Or predicate is convertible when both of its children can be pushed down.
// That is to say, if one/both of the children can be partially pushed down, the Or
// predicate can be partially pushed down as well.
//
// Here is an example used to explain the reason.
// Let's say we have
// (a1 AND a2) OR (b1 AND b2),
// a1 and b1 is convertible, while a2 and b2 is not.
// The predicate can be converted as
// (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rule is correct, but it looks a little dangerous to set true blindly without considering the given parameter canPartialPushDownConjuncts. Could you add a test case for that complex example like OR under NOT in the deep predicate tree?

Also, it would be great to add more higher level test case in SQLQuerySuite.scala to show the benefit of this additional predicate pushdown a1 OR b1. Could you add that, too?

Copy link
Member Author

@gengliangwang gengliangwang May 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rule is correct, but it looks a little dangerous to set true blindly without considering the given parameter canPartialPushDownConjuncts. Could you add a test case for that complex example like OR under NOT in the deep predicate tree?

@dongjoon-hyun nice catch. However, here the Not predicate won't have a child as Or or And predicate because of BooleanSimplification optimization rule.
I have updated the code, but a new test case seems unnecessary. Not(a Or b) can be converted as Not(a) And Not(b)
Also, I have created a PR for pushing down Not operator before(for double insurance), but seems the PR made thing too complex: #22687

Also, it would be great to add more higher level test case in SQLQuerySuite.scala to show the benefit of this additional predicate pushdown a1 OR b1. Could you add that, too?

How can we verify the predicate is pushed down? Match the OrcScan and check the pushedFilters? Only Orc V2 can be checked in this way currently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conversion from (a1 AND a2) OR (b1 AND b2) to(a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) always works no matter it's a top level boolean expression or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang . You can use explain with Console.withOut. What we need is to check PushedFilters: in the plan.

How can we verify the predicate is pushed down? Match the OrcScan and check the pushedFilters? Only Orc V2 can be checked in this way currently.

@cloud-fan and @gengliangwang . Yes. Of course, I already agreed that the rule is correct. I want to have a test case in a complete end-to-end query form which shows the new benefit clearly. Could you please add a real use case you met?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for an end-to-end test

// As per the logical in And predicate, we can push down (a1 OR b1).
for {
lhsFilter <-
createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some comments like we did in AND?

createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts)
rhsFilter <-
createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts = false)
createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts)
} yield FilterApi.or(lhsFilter, rhsFilter)

case sources.Not(pred) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,27 +44,35 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
scanBuilder match {
case r: SupportsPushDownFilters =>
// A map from translated data source filters to original catalyst filter expressions.
// A map from translated data source leaf node filters to original catalyst filter
// expressions. For a `And`/`Or` predicate, it is possible that the predicate is partially
// pushed down. This map can be used to construct a catalyst filter expression from the
// input filter, or a superset(partial push down filter) of the input filter.
val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, Expression]
val translatedFilters = mutable.ArrayBuffer.empty[sources.Filter]
// Catalyst filter expression that can't be translated to data source filters.
val untranslatableExprs = mutable.ArrayBuffer.empty[Expression]

for (filterExpr <- filters) {
val translated = DataSourceStrategy.translateFilter(filterExpr)
if (translated.isDefined) {
translatedFilterToExpr(translated.get) = filterExpr
} else {
val translated =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With partial filter push down in Or operator, the result of pushedFilters() might not exist in the mapping translatedFilterToExpr. To fix it, this PR changes the mapping translatedFilterToExpr as leaf filter expression to sources.filter, and later on rebuild the whole expression with the mapping.

Copy link
Member

@dongjoon-hyun dongjoon-hyun May 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Actually, when I tested your PR before, I also noticed that. The new Or doesn't work in the end-to-end case. That's the reason to ask this. I'll test more.

Thank you for making this PR working, @gengliangwang !

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, it would be great to add a real comment in the code. It's non-trivial to figure out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for for the testing. Appreciate it!

DataSourceStrategy.translateFilterWithMapping(filterExpr, Some(translatedFilterToExpr))
if (translated.isEmpty) {
untranslatableExprs += filterExpr
} else {
translatedFilters += translated.get
}
}

// Data source filters that need to be evaluated again after scanning. which means
// the data source cannot guarantee the rows returned can pass these filters.
// As a result we must return it so Spark can plan an extra filter operator.
val postScanFilters = r.pushFilters(translatedFilterToExpr.keys.toArray)
.map(translatedFilterToExpr)
val postScanFilters = r.pushFilters(translatedFilters.toArray).map { filter =>
DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr)
}
// The filters which are marked as pushed to this data source
val pushedFilters = r.pushedFilters().map(translatedFilterToExpr)
val pushedFilters = r.pushedFilters().map { filter =>
DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr)
}
(pushedFilters, untranslatableExprs ++ postScanFilters)

case _ => (Nil, filters)
Expand Down
28 changes: 27 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import org.apache.spark.{AccumulatorSuite, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -2978,6 +2979,31 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}

test("SPARK-27699 Validate pushed down filters") {
def checkPushedFilters(df: DataFrame, filters: Array[sources.Filter]): Unit = {
val scan = df.queryExecution.sparkPlan
.find(_.isInstanceOf[BatchScanExec]).get.asInstanceOf[BatchScanExec]
.scan
assert(scan.isInstanceOf[OrcScan])
assert(scan.asInstanceOf[OrcScan].pushedFilters === filters)
}
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") {
withTempPath { dir =>
spark.range(10).map(i => (i, i.toString)).toDF("id", "s").write.orc(dir.getCanonicalPath)
val df = spark.read.orc(dir.getCanonicalPath)
Copy link
Member

@dongjoon-hyun dongjoon-hyun May 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to have Parquet testing because it's the default format in Apache Spark. But, Yes. I got it. You want to use a DSv2 way for testing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have a follow up after the Parquet V2 is merged.

checkPushedFilters(
df.where(('id < 2 and 's.contains("foo")) or ('id > 10 and 's.contains("bar"))),
Array(sources.Or(sources.LessThan("id", 2), sources.GreaterThan("id", 10))))
checkPushedFilters(
df.where('s.contains("foo") or ('id > 10 and 's.contains("bar"))),
Array.empty)
checkPushedFilters(
df.where('id < 2 and not('id > 10 and 's.contains("bar"))),
Array(sources.IsNotNull("id"), sources.LessThan("id", 2)))
}
}
}

test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery =>
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) {
Expand Down
Loading