Skip to content

Conversation

@tanelk
Copy link
Contributor

@tanelk tanelk commented Oct 12, 2020

What changes were proposed in this pull request?

Added optimizer rule RemoveRedundantAggregates. It removes redundant aggregates from a query plan. A redundant aggregate is an aggregate whose only goal is to keep distinct values, while its parent aggregate would ignore duplicate values.

The affected part of the query plan for TPCDS q87:

Before:

== Physical Plan ==
*(26) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition, true, [id=#785]
   +- *(25) HashAggregate(keys=[], functions=[partial_count(1)])
      +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
         +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
            +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
               +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
                  +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
                     +- Exchange hashpartitioning(c_last_name#61, c_first_name#60, d_date#26, 5), true, [id=#724]
                        +- *(24) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
                           +- SortMergeJoin [coalesce(c_last_name#61, ), isnull(c_last_name#61), coalesce(c_first_name#60, ), isnull(c_first_name#60), coalesce(d_date#26, 0), isnull(d_date#26)], [coalesce(c_last_name#221, ), isnull(c_last_name#221), coalesce(c_first_name#220, ), isnull(c_first_name#220), coalesce(d_date#186, 0), isnull(d_date#186)], LeftAnti
                              :- ...

After:

== Physical Plan ==
*(26) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition, true, [id=#751]
   +- *(25) HashAggregate(keys=[], functions=[partial_count(1)])
      +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
         +- Exchange hashpartitioning(c_last_name#61, c_first_name#60, d_date#26, 5), true, [id=#694]
            +- *(24) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
               +- SortMergeJoin [coalesce(c_last_name#61, ), isnull(c_last_name#61), coalesce(c_first_name#60, ), isnull(c_first_name#60), coalesce(d_date#26, 0), isnull(d_date#26)], [coalesce(c_last_name#221, ), isnull(c_last_name#221), coalesce(c_first_name#220, ), isnull(c_first_name#220), coalesce(d_date#186, 0), isnull(d_date#186)], LeftAnti
                  :- ...

Why are the changes needed?

Performance improvements - few TPCDS queries have these kinds of duplicate aggregates.

Does this PR introduce any user-facing change?

No

How was this patch tested?

UT

Benchmarks (sf=5):

OpenJDK 64-Bit Server VM 1.8.0_265-b01 on Linux 5.8.13-arch1-1
Intel(R) Core(TM) i5-6500 CPU @ 3.20GHz

Query Before After Speedup
q14a 44s 44s 1x
q14b 41s 41s 1x
q38 6.5s 5.9s 1.1x
q87 7.2s 6.8s 1.1x
q14a-v2.7 55s 53s 1x

* @param groupingExpressions expressions for grouping keys
* @param aggregateExpressions expressions for a project list, which could contain
* [[AggregateFunction]]s.
* [[AggregateExpression]]s.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This caused some confusion while making this PR

@tanelk
Copy link
Contributor Author

tanelk commented Oct 12, 2020

I'll try do get the actual performance change for the TPCDS queries soon.

@SparkQA
Copy link

SparkQA commented Oct 12, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34311/

@SparkQA
Copy link

SparkQA commented Oct 12, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34311/

@maropu
Copy link
Member

maropu commented Oct 12, 2020

Could you describe more about the "redundant" case in the PR description? e.g., plan changes before/after this PR

@SparkQA
Copy link

SparkQA commented Oct 13, 2020

Test build #129705 has finished for PR 30018 at commit 14f3033.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tanelk
Copy link
Contributor Author

tanelk commented Oct 14, 2020

Added a changed query plan sample and some TPCDS results. The change is not remarkable, but for bigger datasets it can add up.

@SparkQA
Copy link

SparkQA commented Oct 14, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34359/

@SparkQA
Copy link

SparkQA commented Oct 14, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34359/

@SparkQA
Copy link

SparkQA commented Oct 14, 2020

Test build #129753 has finished for PR 30018 at commit 29701dc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34375/

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34375/

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Test build #129769 has finished for PR 30018 at commit 4ce0644.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34500/

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34500/

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Test build #129895 has finished for PR 30018 at commit ef64abf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait PredicateHelper extends Logging with AliasHelper
  • trait AliasHelper

@tanelk
Copy link
Contributor Author

tanelk commented Oct 16, 2020

Test build #129895 has finished for PR 30018 at commit ef64abf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait PredicateHelper extends Logging with AliasHelper
  • trait AliasHelper

The org.apache.spark.sql.hive.thriftserver.ThriftServerQueryTestSuite.subquery/scalar-subquery/scalar-subquery-select.sql seems to failing on other PRs also. For example https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129890/

@SparkQA
Copy link

SparkQA commented Jan 26, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39100/

@SparkQA
Copy link

SparkQA commented Jan 26, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39100/

@maropu
Copy link
Member

maropu commented Mar 4, 2021

retest this please

@SparkQA
Copy link

SparkQA commented Mar 4, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40340/

@SparkQA
Copy link

SparkQA commented Mar 4, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40340/

@SparkQA
Copy link

SparkQA commented Mar 4, 2021

Test build #135757 has finished for PR 30018 at commit 37dc4b1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM again (I've checked the latest changes)

@tanelk
Copy link
Contributor Author

tanelk commented Mar 17, 2021

@maropu , this has been aproved for a while now, any change, that we can merge this?

@maropu
Copy link
Member

maropu commented Mar 17, 2021

Anyone could check this? @cloud-fan @viirya @dongjoon-hyun @HyukjinKwon If no one has more comments, I'll merge this into master in a few days.

@maropu
Copy link
Member

maropu commented Mar 17, 2021

retest this please

@SparkQA
Copy link

SparkQA commented Mar 17, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40740/

@SparkQA
Copy link

SparkQA commented Mar 17, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40740/

@maropu
Copy link
Member

maropu commented Mar 19, 2021

hm, Jenkins still looks unstable. Could you add an empty commit to invoke GA, @tanelk?

@tanelk
Copy link
Contributor Author

tanelk commented Mar 19, 2021

@maropu , the checks did pass

@maropu
Copy link
Member

maropu commented Mar 20, 2021

okay, we have much time until the next release, so I'll merge this for now. If there are more comments, please feel free to leave them.

@maropu maropu closed this in 620cae0 Mar 20, 2021
@maropu
Copy link
Member

maropu commented Mar 20, 2021

Thanks! Merged to master. cc: @cloud-fan @viirya @dongjoon-hyun @HyukjinKwon

}
}

private def lowerIsRedundant(upper: Aggregate, lower: Aggregate): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Usually, isXXX is better and consistent with Apache Spark convention.

replaceAliasButKeepName(_, aliasMap))
)

// We might have introduces non-deterministic grouping expression
Copy link
Member

@dongjoon-hyun dongjoon-hyun Mar 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • introduces -> introduced
  • expression -> expressions

* Remove redundant aggregates from a query plan. A redundant aggregate is an aggregate whose
* only goal is to keep distinct values, while its parent aggregate would ignore duplicate values.
*/
object RemoveRedundantAggregates extends Rule[LogicalPlan] with AliasHelper {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this optimizer into a new file please, @tanelk ?

lower
.aggregateExpressions
.filter(_.deterministic)
.filter(!isAggregate(_))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- .filter(!isAggregate(_))
+ .filterNot(isAggregate)

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks reasonable. I left a few minor comments. Thank you, @tanelk , @maropu , @peter-toth .

@maropu
Copy link
Member

maropu commented Mar 21, 2021

Thanks for the reviews, @dongjoon-hyun ! Please open a new follow-up PR to address them, @tanelk .

@cloud-fan
Copy link
Contributor

The affected part of the query plan for TPCDS q87:

Why is the golden file of TPCDS q87 not updated in this PR?

@tanelk
Copy link
Contributor Author

tanelk commented Mar 22, 2021

The affected part of the query plan for TPCDS q87:

Why is the golden file of TPCDS q87 not updated in this PR?

The LeftSemi/LeftAnti pushdown rule was changed while this PR was in review and the situation where this rule applied did not occure any more.

maropu pushed a commit that referenced this pull request May 25, 2021
…er rule to apply to more cases

### What changes were proposed in this pull request?

Addressed the dongjoon-hyun comments on the previous PR #30018.
Extended the `RemoveRedundantAggregates` rule to remove redundant aggregations in even more queries. For example in
 ```
dataset
    .dropDuplicates()
    .groupBy('a)
    .agg(max('b))
```
the `dropDuplicates` is not needed, because the result on `max` does not depend on duplicate values.

### Why are the changes needed?

Improve performance.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT

Closes #31914 from tanelk/SPARK-33122_redundant_aggs_followup.

Lead-authored-by: [email protected] <[email protected]>
Co-authored-by: Tanel Kiis <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants