Skip to content

Conversation

@nikolamand-db
Copy link
Contributor

@nikolamand-db nikolamand-db commented Jul 29, 2024

What changes were proposed in this pull request?

Fix RewriteDistinctAggregates rule to deal properly with aggregation on DISTINCT literals. Physical plan for select count(distinct 1) from t:

-- count(distinct 1)
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(distinct 1)], output=[count(DISTINCT 1)#2L])
   +- HashAggregate(keys=[], functions=[partial_count(distinct 1)], output=[count#6L])
      +- HashAggregate(keys=[], functions=[], output=[])
         +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=20]
            +- HashAggregate(keys=[], functions=[], output=[])
               +- FileScan parquet spark_catalog.default.t[] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/nikola.mandic/oss-spark/spark-warehouse/org.apache.spark.s..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

Problem is happening when HashAggregate(keys=[], functions=[], output=[]) node yields one row to partial_count node, which then captures one row. This four-node structure is constructed by AggUtils.planAggregateWithOneDistinct.

To fix the problem, we're adding Expand node which will force non-empty grouping expressions in HashAggregateExec nodes. This will in turn enable streaming zero rows to parent partial_count node, yielding correct final result.

Why are the changes needed?

Aggregation with DISTINCT literal gives wrong results. For example, when running on empty table t:
select count(distinct 1) from t returns 1, while the correct result should be 0.
For reference:
select count(1) from t returns 0, which is the correct and expected result.

Does this PR introduce any user-facing change?

Yes, this fixes a critical bug in Spark.

How was this patch tested?

New e2e SQL tests for aggregates with DISTINCT literals.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Jul 29, 2024
@cloud-fan
Copy link
Contributor

can we move the tests to SQL golden files?

@uros-db
Copy link
Contributor

uros-db commented Jul 30, 2024

@cloud-fan we've updated the tests quite a bit to try and limit the impact of e2e sql testing, but we believe it's best to keep it like this instead of using golden files - we're using loops and test cases to verify expected results with a table that has various number of rows, while also verifying whether Expand was injected

Copy link
Contributor

@uros-db uros-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all checks look good, @cloud-fan please review

@cloud-fan
Copy link
Contributor

Please fill the PR description

@uros-db
Copy link
Contributor

uros-db commented Jul 31, 2024

I think @nikolamand-db will have to do that because it's his PR, I don't have access to edit PR description

### What changes were proposed in this pull request?
Fix `RewriteDistinctAggregates` rule to deal properly with aggregation on DISTINCT literals.


### Why are the changes needed?
Aggregation with DISTINCT literal gives wrong results. For example:
`select count(distinct 1) from t` returns 1, while the correct result should be 0.
For reference:
`select count(1) from t` returns 0, which is the correct and expected result.


### Does this PR introduce _any_ user-facing change?
Yes, this fixes a critical bug in Spark.


### How was this patch tested?
New e2e SQL tests for aggregates with DISTINCT literals.


### Was this patch authored or co-authored using generative AI tooling?
No.

@uros-db
Copy link
Contributor

uros-db commented Jul 31, 2024

also, I think it's worth noting (at least in this comment) that the optimizer rule RewriteDistinctAggregates is in nonExcludableRules - this is important because the changes are related to a correctness issue

@nikolamand-db nikolamand-db changed the title [SPARK-49000][SQL][WIP] Fix "select count(distinct 1) from t" where t is empty table by expanding RewriteDistinctAggregates [SPARK-49000][SQL] Fix "select count(distinct 1) from t" where t is empty table by expanding RewriteDistinctAggregates Jul 31, 2024
@nikolamand-db nikolamand-db requested a review from cloud-fan July 31, 2024 08:28
@nikolamand-db
Copy link
Contributor Author

@cloud-fan resolved all threads, should we merge now? Thanks.

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.5!

@cloud-fan cloud-fan closed this in dfa2133 Jul 31, 2024
cloud-fan pushed a commit that referenced this pull request Jul 31, 2024
…mpty table by expanding RewriteDistinctAggregates

Fix `RewriteDistinctAggregates` rule to deal properly with aggregation on DISTINCT literals. Physical plan for `select count(distinct 1) from t`:
```
-- count(distinct 1)
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(distinct 1)], output=[count(DISTINCT 1)#2L])
   +- HashAggregate(keys=[], functions=[partial_count(distinct 1)], output=[count#6L])
      +- HashAggregate(keys=[], functions=[], output=[])
         +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=20]
            +- HashAggregate(keys=[], functions=[], output=[])
               +- FileScan parquet spark_catalog.default.t[] Batched: false, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/nikola.mandic/oss-spark/spark-warehouse/org.apache.spark.s..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>
```
Problem is happening when `HashAggregate(keys=[], functions=[], output=[])` node yields one row to `partial_count` node, which then captures one row. This four-node structure is constructed by `AggUtils.planAggregateWithOneDistinct`.

To fix the problem, we're adding `Expand` node which will force non-empty grouping expressions in `HashAggregateExec` nodes. This will in turn enable streaming zero rows to parent `partial_count` node, yielding correct final result.

Aggregation with DISTINCT literal gives wrong results. For example, when running on empty table `t`:
`select count(distinct 1) from t` returns 1, while the correct result should be 0.
For reference:
`select count(1) from t` returns 0, which is the correct and expected result.

Yes, this fixes a critical bug in Spark.

New e2e SQL tests for aggregates with DISTINCT literals.

No.

Closes #47525 from nikolamand-db/SPARK-49000-spark-expand-approach.

Lead-authored-by: Uros Bojanic <[email protected]>
Co-authored-by: Nikola Mandic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit dfa2133)
Signed-off-by: Wenchen Fan <[email protected]>
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @nikolamand-db and @cloud-fan .

According to the JIRA, this is filed against on 3.0.0 with the following JIRA report. If then, can we have this on branch-3.4? Could you confirm the affected version number once more?

It appears that this bug affects all (or most) released versions of Spark.
...
Aggregation with DISTINCT literal gives wrong results. For example, when running on empty table t:
select count(distinct 1) from t returns 1, while the correct result should be 0.

When I use spark-sql, Apache Spark 3.5.1 and 3.4.2 seems to work correctly like the following. Is there a handy way to check this PR's case?

spark-sql (default)> select count(distinct 1) from (select * from range(1) where 1 = 0);
0
Time taken: 0.055 seconds, Fetched 1 row(s)

@dongjoon-hyun
Copy link
Member

cc @yaooqinn and @viirya , too

@uros-db
Copy link
Contributor

uros-db commented Jul 31, 2024

@dongjoon-hyun please try this:

  test("minimal test") {
    withTable("tbl") {
      sql("create table tbl (col string) using parquet")
      checkAnswer(sql("select count(distinct 1) from tbl"), Row(1)) // wrong
    }
  }

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Jul 31, 2024

Thank you, @uros-db . I confirmed with Spark 3.4.3.

spark-sql (default)> SELECT version();
3.4.3 1eb558c3a6fbdd59e5a305bc3ab12ce748f6511f
Time taken: 0.054 seconds, Fetched 1 row(s)

spark-sql (default)> select count(*) from tbl;
0
Time taken: 0.109 seconds, Fetched 1 row(s)

spark-sql (default)> select count(distinct 1) from tbl;
1
Time taken: 0.336 seconds, Fetched 1 row(s)

*/
object RewriteDistinctAggregates extends Rule[LogicalPlan] {
private def mustRewrite(
aggregateExpressions: Seq[AggregateExpression],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/aggregateExpressions/distinctAggs/

private def mustRewrite(
aggregateExpressions: Seq[AggregateExpression],
groupingExpressions: Seq[Expression]): Boolean = {
// If there are any AggregateExpressions with filter, we need to rewrite the query.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/any/any distinct/

Comment on lines 214 to 216
// clause for this rule because aggregation strategy can handle a single distinct aggregate
// group without filter clause.
// This check can produce false-positives, e.g., SUM(DISTINCT a) & COUNT(DISTINCT a).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to update the comment.

groupingExpressions: Seq[Expression]): Boolean = {
// If there are any AggregateExpressions with filter, we need to rewrite the query.
// Also, if there are no grouping expressions and all aggregate expressions are foldable,
// we need to rewrite the query, e.g. SELECT COUNT(DISTINCT 1).
Copy link
Member

@viirya viirya Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compared to the comment in mayNeedtoRewrite which explains why rewriting is necessary. This comment doesn't do any explanation but just claims it needs to rewrite the query. This comment simply describes what the code does and it is obvious.

To better improve the code readability, it would be better to explain why the rewriting is needed for the case.

@yaooqinn
Copy link
Member

yaooqinn commented Aug 1, 2024

Removed this from branch-3.5 as it causes the GA failures

https://github.com/apache/spark/actions/runs/10182404165/job/28186327497

@yaooqinn yaooqinn reopened this Aug 1, 2024
s"""SELECT COUNT(DISTINCT 1, "col") FROM $t"""
),
AggregateTestCaseDefault(
s"""SELECT COUNT(DISTINCT collation("abc")) FROM $t"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case cannot be merged into branch-3.5, as collation is a new function added in Spark 4.0.
cc @nikolamand-db @cloud-fan
also cc @yaooqinn

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, collation doesn't exist in older version - so this test will need to be excluded

I can take care of that in a follow-up

@cloud-fan
Copy link
Contributor

@nikolamand-db please send a followup PR to address post-hoc review comments and then create backport PRs for 3.5 and 3.4

@uros-db
Copy link
Contributor

uros-db commented Aug 1, 2024

@cloud-fan I'll be creating the follow-up: #47565

let's first merge this into master, and we can backport later

@yaooqinn
Copy link
Member

yaooqinn commented Aug 1, 2024

Close this first as a new PR is appropriate for branch-3.5 and/or branch-3.4

@yaooqinn yaooqinn closed this Aug 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants