Skip to content

Commit 9093df4

Browse files
author
zhaorongsheng
committed
extract non-deterministic expression in some case and add test case
1 parent 39fac02 commit 9093df4

File tree

6 files changed

+1592
-8
lines changed

6 files changed

+1592
-8
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ object PhysicalOperation extends PredicateHelper {
4343

4444
/**
4545
* Collects all deterministic projects and filters, in-lining/substituting aliases if necessary.
46+
* Note that, if the filter is non-deterministic the deterministic expressions should be
47+
* extracted.
4648
* Here are two examples for alias in-lining/substitution.
4749
* Before:
4850
* {{{
@@ -63,9 +65,8 @@ object PhysicalOperation extends PredicateHelper {
6365
val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
6466
(Some(substitutedFields), filters, other, collectAliases(substitutedFields))
6567

66-
case filter @ Filter(condition, child)
67-
if condition.deterministic || condition.isInstanceOf[And] =>
68-
val determinedCondition = getDeterminedExpression(condition)
68+
case filter @ Filter(condition, child) =>
69+
val determinedCondition = extractDeterministicExpressions(condition)
6970
if (determinedCondition.isDefined) {
7071
val (fields, filters, other, aliases) = collectProjectsAndFilters(child)
7172
val substitutedCondition = substitute(aliases)(determinedCondition.get)
@@ -98,21 +99,48 @@ object PhysicalOperation extends PredicateHelper {
9899
}
99100
}
100101

101-
private def getDeterminedExpression(expr: Expression): Option[Expression] = {
102+
/**
103+
* Extract the deterministic expressions in non-deterministic expressions, i.e. 'And' and 'Or'.
104+
*
105+
* Example input:
106+
* {{{
107+
* col = 1 and rand() < 1
108+
* (col1 = 1 and rand() < 1) and col2 = 1
109+
* col1 = 1 or rand() < 1
110+
* (col1 = 1 and rand() < 1) or (col2 = 1 and rand() < 1)
111+
* }}}
112+
*
113+
* Result:
114+
* {{{
115+
* col = 1
116+
* col1 = 1 and col2 = 1
117+
* None
118+
* col1 = 1 or col2 = 1
119+
* }}}
120+
*/
121+
private[planning] def extractDeterministicExpressions(expr: Expression): Option[Expression] = {
102122
if (expr.deterministic) {
103123
Some(expr)
104124
} else {
105125
expr match {
106126
case And(left, right) =>
107-
val leftDeterminedExpr = getDeterminedExpression(left)
108-
val rightDeterminedExpr = getDeterminedExpression(right)
127+
val leftDeterminedExpr = extractDeterministicExpressions(left)
128+
val rightDeterminedExpr = extractDeterministicExpressions(right)
109129
if (leftDeterminedExpr.isDefined && rightDeterminedExpr.isDefined) {
110130
Some(And(leftDeterminedExpr.get, rightDeterminedExpr.get))
111131
} else if (leftDeterminedExpr.isDefined || rightDeterminedExpr.isDefined) {
112132
Some(leftDeterminedExpr.getOrElse(rightDeterminedExpr.get))
113133
} else {
114134
None
115135
}
136+
case Or(left, right) =>
137+
val leftDeterminedExpr = extractDeterministicExpressions(left)
138+
val rightDeterminedExpr = extractDeterministicExpressions(right)
139+
if (leftDeterminedExpr.isDefined && rightDeterminedExpr.isDefined) {
140+
Some(Or(leftDeterminedExpr.get, rightDeterminedExpr.get))
141+
} else {
142+
None
143+
}
116144
case _ =>
117145
None
118146
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.planning
19+
20+
import org.apache.spark.sql.catalyst.dsl.expressions._
21+
import org.apache.spark.sql.catalyst.expressions.Literal
22+
import org.apache.spark.sql.catalyst.plans.PlanTest
23+
24+
class PhysicalOperationSuite extends PlanTest {
25+
import PhysicalOperation.extractDeterministicExpressions
26+
27+
test("Extract deterministic expressions") {
28+
val expr1 = ('col.int === Literal(1)) && rand(1) < 1
29+
val expr2 = 'col.int === Literal(1)
30+
compareExpressions(extractDeterministicExpressions(expr1).get, expr2)
31+
32+
val expr3 = (('col1.int === Literal(1)) && rand(1) < 1) && ('col2.int === Literal(1))
33+
val expr4 = ('col1.int === Literal(1)) && ('col2.int === Literal(1))
34+
compareExpressions(extractDeterministicExpressions(expr3).get, expr4)
35+
36+
val expr5 = ('col.int === Literal(1)) || rand(1) < 1
37+
val expr6 = Literal(null)
38+
compareExpressions(extractDeterministicExpressions(expr5).getOrElse(Literal(null)), expr6)
39+
40+
val expr7 = (('col1.int === Literal(1)) && rand(1) < 1) ||
41+
(('col2.int === Literal(1)) && rand(1) < 1)
42+
val expr8 = ('col1.int === Literal(1)) || ('col2.int === Literal(1))
43+
compareExpressions(extractDeterministicExpressions(expr7).getOrElse(Literal(null)), expr8)
44+
}
45+
}

0 commit comments

Comments
 (0)