Skip to content

Commit fd711ea

Browse files
committed
[SPARK-20273][SQL] Disallow Non-deterministic Filter push-down into Join Conditions
## What changes were proposed in this pull request? ``` sql("SELECT t1.b, rand(0) as r FROM cachedData, cachedData t1 GROUP BY t1.b having r > 0.5").show() ``` We will get the following error: ``` Job aborted due to stage failure: Task 1 in stage 4.0 failed 1 times, most recent failure: Lost task 1.0 in stage 4.0 (TID 8, localhost, executor driver): java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$org$apache$spark$sql$execution$joins$BroadcastNestedLoopJoinExec$$boundCondition$1.apply(BroadcastNestedLoopJoinExec.scala:87) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$org$apache$spark$sql$execution$joins$BroadcastNestedLoopJoinExec$$boundCondition$1.apply(BroadcastNestedLoopJoinExec.scala:87) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463) ``` Filters could be pushed down to the join conditions by the optimizer rule `PushPredicateThroughJoin`. However, Analyzer [blocks users to add non-deterministics conditions](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L386-L395) (For details, see the PR apache#7535). We should not push down non-deterministic conditions; otherwise, we need to explicitly initialize the non-deterministic expressions. This PR is to simply block it. ### How was this patch tested? Added a test case Author: Xiao Li <[email protected]> Closes apache#17585 from gatorsmile/joinRandCondition.
1 parent 5acaf8c commit fd711ea

File tree

2 files changed

+12
-0
lines changed

2 files changed

+12
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ trait PredicateHelper {
9090
* Returns true iff `expr` could be evaluated as a condition within join.
9191
*/
9292
protected def canEvaluateWithinJoin(expr: Expression): Boolean = expr match {
93+
// Non-deterministic expressions are not allowed as join conditions.
94+
case e if !e.deterministic => false
9395
case l: ListQuery =>
9496
// A ListQuery defines the query which we want to search in an IN subquery expression.
9597
// Currently the only way to evaluate an IN subquery is to convert it to a

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,16 @@ class FilterPushdownSuite extends PlanTest {
241241
comparePlans(optimized, correctAnswer)
242242
}
243243

244+
test("joins: do not push down non-deterministic filters into join condition") {
245+
val x = testRelation.subquery('x)
246+
val y = testRelation1.subquery('y)
247+
248+
val originalQuery = x.join(y).where(Rand(10) > 5.0).analyze
249+
val optimized = Optimize.execute(originalQuery)
250+
251+
comparePlans(optimized, originalQuery)
252+
}
253+
244254
test("joins: push to one side after transformCondition") {
245255
val x = testRelation.subquery('x)
246256
val y = testRelation1.subquery('y)

0 commit comments

Comments
 (0)