-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-18969][SQL] Support grouping by nondeterministic expressions #16404
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @rxin @gatorsmile |
|
Test build #70597 has finished for PR 16404 at commit
|
|
Test build #70603 has finished for PR 16404 at commit
|
|
Test build #70617 has finished for PR 16404 at commit
|
|
|
||
| private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, NamedExpression] = { | ||
| exprs.filterNot(_.deterministic).flatMap { expr => | ||
| val leafNondeterministic = expr.collect { case n: Nondeterministic => n } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not all the non-deterministic expressions extend Nondeterministic. This might not cover all the cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- this is same with the previous behavior
- according to the variable name:
leafNondeterministic, it seems reasonable to collectNondeterministichere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This rule is running once. Thus, it should be safe; otherwise, it might generate many useless Project when some expressions are not extending Nondeterministic but its deterministics is false.
Maybe added a nagative test case for this check
sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'")
val df = Seq((1, 1)).toDF("a", "b")
df.createOrReplaceTempView("data")
sql("select a, statefulUDF(), sum(b) from data group by a, 2").show()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it same with the existing test? select a, rand(0), sum(b) from data group by a, 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
statefulUDF() is a stateful/non-deterministic UDF which does not exend Nondeterministic, but its deterministic is equal to false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems we need to add a new API, here we wanna get non-deterministic leaf nodes, and trait Nondeterministic is not suitable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this problem was already there, let's send a new PR to fix it.
| // todo: It's hard to write a general rule to pull out nondeterministic expressions | ||
| // from LogicalPlan, currently we only do it for UnaryNode which has same output | ||
| // schema with its child. | ||
| case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a question. What is the reason why we need to have such a condition p.output == p.child.output?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to narrow down the scope of the affected operators, but ideally we should use a white-list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I see
|
retest this please |
|
Test build #70753 has finished for PR 16404 at commit
|
|
Test build #70754 has finished for PR 16404 at commit
|
|
LGTM cc @rxin |
|
Found a bug filed in a JIRA https://issues.apache.org/jira/browse/SPARK-19035. This PR does not resolves it. |
sql("select a + rand() from testData2 group by a, a + rand()").explain(true)After we merging this PR, I am afraid we might hitting a common misunderstanding. Users might assume |
|
How do other databases handle this case? Do they forbid using non-deterministic expressions in GROUP BY, or give a better error message? |
|
DB2 has such a limit. See the error message
It documents the same workaround:
|
|
Oracle allows it. It sounds like they treat SQL> select (username || dbms_random.string('a', 10)) from all_users group by (username || dbms_random.string('a', 10));
(USERNAME||DBMS_RANDOM.STRING('A',10))
--------------------------------------------------------------------------------
APEX_040000cklbMYhekl
FLOWS_FILESVmTbIIeiUs
CTXSYSPmgqeRFPry
SYSTEMxQLrzXxHth
XDBRRTfatsLlU
SYSoLDWRKMvlZ
XS$NULLXAaOykZCDH
APEX_PUBLIC_USERvcLswvpbcw
ANONYMOUSgupWiktQKh
OUTLNjLdKOTZoFI
MDSYSxEOhwTwQqa
(USERNAME||DBMS_RANDOM.STRING('A',10))
--------------------------------------------------------------------------------
HRkovpxQztYU
12 rows selected.If I change the order, I got the error: SQL> select (dbms_random.string('a', 10) || username) from all_users group by (username || dbms_random.string('a', 10))
2 ;
select (dbms_random.string('a', 10) || username) from all_users group by (username || dbms_random.string('a', 10))
*
ERROR at line 1:
ORA-00979: not a GROUP BY expression |
|
MySQL 5.7 treats them differently... mysql> select c1, concat(rand(), c1) from t1 group by c1;
+------+----------------------+
| c1 | concat(rand(), c1) |
+------+----------------------+
| 1 | 0.084388771172974981 |
| 3 | 0.116890648488784823 |
+------+----------------------+
2 rows in set (0.00 sec)
mysql> select c1, concat(rand(), c1) from t1 group by c1, concat(rand(), c1);
+------+----------------------+
| c1 | concat(rand(), c1) |
+------+----------------------+
| 1 | 0.16241911441313021 |
| 1 | 0.461423657332941551 |
| 3 | 0.81986097415896223 |
+------+----------------------+
3 rows in set (0.00 sec)MySQL 5.5 is even more crazy... It allows the following query: mysql> select c1, rand() from t1 group by rand();
+------+---------------------+
| c1 | rand() |
+------+---------------------+
| 2 | 0.10147774974021852 |
| 1 | 0.15563200614939632 |
| 1 | 0.22556077058013602 |
+------+---------------------+
3 rows in set (0.00 sec) |
|
Presto allows it and uses the same value of rand() computed during projection for doing aggregation. Internally, every aggregation column is hashed and hash values are appended to the intermediate row. These hashes are used for local aggregation and final aggregation after a shuffle. |
|
postgres handles Hive has the same behavior. |
|
how about we fix this in follow-up PR? Looks like the fix is not trivial. |
|
retest this please |
|
It sounds like different RDBMS have different behaviors. Have we decided which way we should follow? |
|
Test build #71244 has finished for PR 16404 at commit
|
|
Since Hive and postgres have same behavior on this, I'd like to follow them. |
|
Agree. Oracle behaves the same. |
|
I am fine to merge this at first, as long as we can fix it before the release. |
|
also cc @rxin to take a look |
|
LGTM on the behavior |
|
Make sure you update the pull request and jira ticket description before you merge. |
|
updated. Merging to master/2.1/2.0! |
## What changes were proposed in this pull request? Currently nondeterministic expressions are allowed in `Aggregate`(see the [comment](https://github.com/apache/spark/blob/v2.0.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L249-L251)), but the `PullOutNondeterministic` analyzer rule failed to handle `Aggregate`, this PR fixes it. close #16379 There is still one remaining issue: `SELECT a + rand() FROM t GROUP BY a + rand()` is not allowed, because the 2 `rand()` are different(we generate random seed as the default seed for `rand()`). https://issues.apache.org/jira/browse/SPARK-19035 is tracking this issue. ## How was this patch tested? a new test suite Author: Wenchen Fan <[email protected]> Closes #16404 from cloud-fan/groupby. (cherry picked from commit 871d266) Signed-off-by: Wenchen Fan <[email protected]>
## What changes were proposed in this pull request? Currently nondeterministic expressions are allowed in `Aggregate`(see the [comment](https://github.com/apache/spark/blob/v2.0.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L249-L251)), but the `PullOutNondeterministic` analyzer rule failed to handle `Aggregate`, this PR fixes it. close #16379 There is still one remaining issue: `SELECT a + rand() FROM t GROUP BY a + rand()` is not allowed, because the 2 `rand()` are different(we generate random seed as the default seed for `rand()`). https://issues.apache.org/jira/browse/SPARK-19035 is tracking this issue. ## How was this patch tested? a new test suite Author: Wenchen Fan <[email protected]> Closes #16404 from cloud-fan/groupby. (cherry picked from commit 871d266) Signed-off-by: Wenchen Fan <[email protected]>
## What changes were proposed in this pull request? Currently nondeterministic expressions are allowed in `Aggregate`(see the [comment](https://github.com/apache/spark/blob/v2.0.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L249-L251)), but the `PullOutNondeterministic` analyzer rule failed to handle `Aggregate`, this PR fixes it. close apache#16379 There is still one remaining issue: `SELECT a + rand() FROM t GROUP BY a + rand()` is not allowed, because the 2 `rand()` are different(we generate random seed as the default seed for `rand()`). https://issues.apache.org/jira/browse/SPARK-19035 is tracking this issue. ## How was this patch tested? a new test suite Author: Wenchen Fan <[email protected]> Closes apache#16404 from cloud-fan/groupby.
## What changes were proposed in this pull request? Currently nondeterministic expressions are allowed in `Aggregate`(see the [comment](https://github.com/apache/spark/blob/v2.0.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L249-L251)), but the `PullOutNondeterministic` analyzer rule failed to handle `Aggregate`, this PR fixes it. close apache#16379 There is still one remaining issue: `SELECT a + rand() FROM t GROUP BY a + rand()` is not allowed, because the 2 `rand()` are different(we generate random seed as the default seed for `rand()`). https://issues.apache.org/jira/browse/SPARK-19035 is tracking this issue. ## How was this patch tested? a new test suite Author: Wenchen Fan <[email protected]> Closes apache#16404 from cloud-fan/groupby.
What changes were proposed in this pull request?
Currently nondeterministic expressions are allowed in
Aggregate(see the comment), but thePullOutNondeterministicanalyzer rule failed to handleAggregate, this PR fixes it.close #16379
There is still one remaining issue:
SELECT a + rand() FROM t GROUP BY a + rand()is not allowed, because the 2rand()are different(we generate random seed as the default seed forrand()). https://issues.apache.org/jira/browse/SPARK-19035 is tracking this issue.How was this patch tested?
a new test suite