-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-39148][SQL] DS V2 aggregate push down can work with OFFSET or LIMIT #37195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| } | ||
|
|
||
| private def rewriteAggregate(agg: Aggregate): LogicalPlan = agg.child match { | ||
| case ScanOperation(project, Nil, holder@ScanBuilderHolder(_, _, r: SupportsPushDownAggregates)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| case ScanOperation(project, Nil, holder@ScanBuilderHolder(_, _, r: SupportsPushDownAggregates)) | |
| case ScanOperation(project, Nil, holder @ ScanBuilderHolder(_, _, r: SupportsPushDownAggregates)) |
| val newOutput = normalizedGroupingExpr.zipWithIndex.map { case (e, i) => | ||
| AttributeReference(s"group_col_$i", e.dataType)() | ||
| } ++ finalAggExprs.zipWithIndex.map { case (e, i) => | ||
| AttributeReference(s"agg_func_$i", e.dataType)() | ||
| } | ||
| val groupOutput = newOutput.take(normalizedGroupingExpr.length) | ||
| val aggOutput = newOutput.drop(normalizedGroupingExpr.length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| val newOutput = normalizedGroupingExpr.zipWithIndex.map { case (e, i) => | |
| AttributeReference(s"group_col_$i", e.dataType)() | |
| } ++ finalAggExprs.zipWithIndex.map { case (e, i) => | |
| AttributeReference(s"agg_func_$i", e.dataType)() | |
| } | |
| val groupOutput = newOutput.take(normalizedGroupingExpr.length) | |
| val aggOutput = newOutput.drop(normalizedGroupingExpr.length) | |
| val groupOutput = normalizedGroupingExpr.zipWithIndex.map { case (e, i) => | |
| AttributeReference(s"group_col_$i", e.dataType)() | |
| } | |
| val aggOutput = finalAggExprs.zipWithIndex.map { case (e, i) => | |
| AttributeReference(s"agg_func_$i", e.dataType)() | |
| } | |
| val newOutput = groupOutput ++ aggOutput |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good decoupling !
| |Pushed Aggregate Functions: | ||
| | ${translatedAgg.aggregateExpressions().mkString(", ")} | ||
| |Pushed Group by: | ||
| | ${translatedAgg.groupByExpressions.mkString(", ")} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we not display Output information here now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now this is completed inferred by Spark, according to the pushed agg exprs and group by exprs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
| // The data source may return columns with arbitrary data types and it's safer to cast them | ||
| // to the expected data type. | ||
| assert(Cast.canCast(a1.dataType, a2.dataType)) | ||
| Alias(addCastIfNeeded(a1, a2.dataType), a2.name)(a2.exprId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good decoupling !
| if (r.supportCompletePushDown(translatedAggOpt.get)) { | ||
| (actualResultExprs, normalizedAggExprs, translatedAggOpt.get, true) | ||
| } else if (!translatedAggOpt.get.aggregateExpressions().exists(_.isInstanceOf[Avg])) { | ||
| (actualResultExprs, normalizedAggExprs, translatedAggOpt.get, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why for this case canCompletePushDown is false? Previously it is treated like supportCompletePushDown, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was false before this PR as well. The previous code will invoke r.supportCompletePushDown one more time which returns false.
The logic is also quite clear:
- the Aggregate can't be completely pushed
- the Aggregate can't be rewritten (avg -> sum/count)
The above two means this Aggregate can't be completely pushed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. canCompletePushDown is false but it's still possible to be partial pushdown. It will be checked later.
| if sHolder.pushedAggregate.isEmpty && filter.isEmpty && | ||
| CollapseProject.canCollapseExpressions(order, project, alwaysInline = true) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, is this regression? I.e. previously we can pushdown, but now we cannot?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously we can't push down either. After agg pushdown, Scan is already built, so limit/offset/topN pushdown won't apply anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I was confused by the added comment and thought it was supported before.
| // SELECT min(c1), max(c1) FROM t GROUP BY c2; | ||
| // Use c2, min(c1), max(c1) as output for DataSourceV2ScanRelation | ||
| // We want to have the following logical plan: | ||
| // == Optimized Logical Plan == |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this example is not accurate any more. Do we need to update this?
|
thanks for review, merging to master! |
…LIMIT ### What changes were proposed in this pull request? This PR refactors the v2 agg pushdown code. The main change is, now we don't build the `Scan` immediately when pushing agg. We did it so before because we want to know the data schema with agg pushed, then we can add cast when rewriting the query plan after pushdown. But the problem is, we build `Scan` too early and can't push down any more operators, while it's common to see LIMIT/OFFSET after agg. The idea of the refactor is, we don't need to know the data schema with agg pushed. We just give an expectation (the data type should be the same of Spark agg functions), use it to define the output of `ScanBuilderHolder`, and then rewrite the query plan. Later on, when we build the `Scan` and replace `ScanBuilderHolder` with `DataSourceV2ScanRelation`, we check the actual data schema and add a `Project` to do type cast if necessary. ### Why are the changes needed? support pushing down LIMIT/OFFSET after agg. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? updated tests Closes apache#37195 from cloud-fan/agg. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…LIMIT (#505) * [SPARK-39139][SQL] DS V2 supports push down DS V2 UDF ### What changes were proposed in this pull request? Currently, Spark DS V2 push-down framework supports push down SQL to data sources. But the DS V2 push-down framework only support push down the built-in functions to data sources. Each database have a lot very useful functions which not supported by Spark. If we can push down these functions into data source, it will reduce disk I/O and network I/O and improve the performance when query databases. ### Why are the changes needed? 1. Spark can leverage the functions supported by databases 2. Improve the query performance. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36593 from beliefer/SPARK-39139. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-39453][SQL][TESTS][FOLLOWUP] Let `RAND` in filter is more meaningful ### What changes were proposed in this pull request? apache#36830 makes DS V2 supports push down misc non-aggregate functions(non ANSI). But he `Rand` in test case looks no meaningful. ### Why are the changes needed? Let `Rand` in filter is more meaningful. ### Does this PR introduce _any_ user-facing change? 'No'. Just update test case. ### How was this patch tested? Just update test case. Closes apache#37033 from beliefer/SPARK-39453_followup. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-37527][SQL][FOLLOWUP] Cannot compile COVAR_POP, COVAR_SAMP and CORR in `H2Dialect` if them with `DISTINCT` ### What changes were proposed in this pull request? apache#35145 compile COVAR_POP, COVAR_SAMP and CORR in H2Dialect. Because H2 does't support COVAR_POP, COVAR_SAMP and CORR works with DISTINCT. So apache#35145 introduces a bug that compile COVAR_POP, COVAR_SAMP and CORR if these aggregate functions with DISTINCT. ### Why are the changes needed? Fix bug that compile COVAR_POP, COVAR_SAMP and CORR if these aggregate functions with DISTINCT. ### Does this PR introduce _any_ user-facing change? 'Yes'. Bug will be fix. ### How was this patch tested? New test cases. Closes apache#37090 from beliefer/SPARK-37527_followup2. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-39627][SQL] DS V2 pushdown should unify the compile API ### What changes were proposed in this pull request? Currently, `JdbcDialect` have two API `compileAggregate` and `compileExpression`, we can unify them. ### Why are the changes needed? Improve ease of use. ### Does this PR introduce _any_ user-facing change? 'No'. The two API `compileAggregate` call `compileExpression` not changed. ### How was this patch tested? N/A Closes apache#37047 from beliefer/SPARK-39627. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-39384][SQL] Compile built-in linear regression aggregate functions for JDBC dialect ### What changes were proposed in this pull request? Recently, Spark DS V2 pushdown framework translate a lot of standard linear regression aggregate functions. Currently, only H2Dialect compile these standard linear regression aggregate functions. This PR compile these standard linear regression aggregate functions for other build-in JDBC dialect. ### Why are the changes needed? Make build-in JDBC dialect support compile linear regression aggregate push-down. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New test cases. Closes apache#37188 from beliefer/SPARK-39384. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Sean Owen <[email protected]> * [SPARK-39148][SQL] DS V2 aggregate push down can work with OFFSET or LIMIT ### What changes were proposed in this pull request? This PR refactors the v2 agg pushdown code. The main change is, now we don't build the `Scan` immediately when pushing agg. We did it so before because we want to know the data schema with agg pushed, then we can add cast when rewriting the query plan after pushdown. But the problem is, we build `Scan` too early and can't push down any more operators, while it's common to see LIMIT/OFFSET after agg. The idea of the refactor is, we don't need to know the data schema with agg pushed. We just give an expectation (the data type should be the same of Spark agg functions), use it to define the output of `ScanBuilderHolder`, and then rewrite the query plan. Later on, when we build the `Scan` and replace `ScanBuilderHolder` with `DataSourceV2ScanRelation`, we check the actual data schema and add a `Project` to do type cast if necessary. ### Why are the changes needed? support pushing down LIMIT/OFFSET after agg. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? updated tests Closes apache#37195 from cloud-fan/agg. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> Co-authored-by: Jiaan Geng <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]>
…LIMIT (Kyligence#505) * [SPARK-39139][SQL] DS V2 supports push down DS V2 UDF ### What changes were proposed in this pull request? Currently, Spark DS V2 push-down framework supports push down SQL to data sources. But the DS V2 push-down framework only support push down the built-in functions to data sources. Each database have a lot very useful functions which not supported by Spark. If we can push down these functions into data source, it will reduce disk I/O and network I/O and improve the performance when query databases. ### Why are the changes needed? 1. Spark can leverage the functions supported by databases 2. Improve the query performance. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36593 from beliefer/SPARK-39139. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-39453][SQL][TESTS][FOLLOWUP] Let `RAND` in filter is more meaningful ### What changes were proposed in this pull request? apache#36830 makes DS V2 supports push down misc non-aggregate functions(non ANSI). But he `Rand` in test case looks no meaningful. ### Why are the changes needed? Let `Rand` in filter is more meaningful. ### Does this PR introduce _any_ user-facing change? 'No'. Just update test case. ### How was this patch tested? Just update test case. Closes apache#37033 from beliefer/SPARK-39453_followup. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-37527][SQL][FOLLOWUP] Cannot compile COVAR_POP, COVAR_SAMP and CORR in `H2Dialect` if them with `DISTINCT` ### What changes were proposed in this pull request? apache#35145 compile COVAR_POP, COVAR_SAMP and CORR in H2Dialect. Because H2 does't support COVAR_POP, COVAR_SAMP and CORR works with DISTINCT. So apache#35145 introduces a bug that compile COVAR_POP, COVAR_SAMP and CORR if these aggregate functions with DISTINCT. ### Why are the changes needed? Fix bug that compile COVAR_POP, COVAR_SAMP and CORR if these aggregate functions with DISTINCT. ### Does this PR introduce _any_ user-facing change? 'Yes'. Bug will be fix. ### How was this patch tested? New test cases. Closes apache#37090 from beliefer/SPARK-37527_followup2. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-39627][SQL] DS V2 pushdown should unify the compile API ### What changes were proposed in this pull request? Currently, `JdbcDialect` have two API `compileAggregate` and `compileExpression`, we can unify them. ### Why are the changes needed? Improve ease of use. ### Does this PR introduce _any_ user-facing change? 'No'. The two API `compileAggregate` call `compileExpression` not changed. ### How was this patch tested? N/A Closes apache#37047 from beliefer/SPARK-39627. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-39384][SQL] Compile built-in linear regression aggregate functions for JDBC dialect ### What changes were proposed in this pull request? Recently, Spark DS V2 pushdown framework translate a lot of standard linear regression aggregate functions. Currently, only H2Dialect compile these standard linear regression aggregate functions. This PR compile these standard linear regression aggregate functions for other build-in JDBC dialect. ### Why are the changes needed? Make build-in JDBC dialect support compile linear regression aggregate push-down. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New test cases. Closes apache#37188 from beliefer/SPARK-39384. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Sean Owen <[email protected]> * [SPARK-39148][SQL] DS V2 aggregate push down can work with OFFSET or LIMIT ### What changes were proposed in this pull request? This PR refactors the v2 agg pushdown code. The main change is, now we don't build the `Scan` immediately when pushing agg. We did it so before because we want to know the data schema with agg pushed, then we can add cast when rewriting the query plan after pushdown. But the problem is, we build `Scan` too early and can't push down any more operators, while it's common to see LIMIT/OFFSET after agg. The idea of the refactor is, we don't need to know the data schema with agg pushed. We just give an expectation (the data type should be the same of Spark agg functions), use it to define the output of `ScanBuilderHolder`, and then rewrite the query plan. Later on, when we build the `Scan` and replace `ScanBuilderHolder` with `DataSourceV2ScanRelation`, we check the actual data schema and add a `Project` to do type cast if necessary. ### Why are the changes needed? support pushing down LIMIT/OFFSET after agg. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? updated tests Closes apache#37195 from cloud-fan/agg. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> Co-authored-by: Jiaan Geng <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]>
…LIMIT (#505) * [SPARK-39139][SQL] DS V2 supports push down DS V2 UDF ### What changes were proposed in this pull request? Currently, Spark DS V2 push-down framework supports push down SQL to data sources. But the DS V2 push-down framework only support push down the built-in functions to data sources. Each database have a lot very useful functions which not supported by Spark. If we can push down these functions into data source, it will reduce disk I/O and network I/O and improve the performance when query databases. ### Why are the changes needed? 1. Spark can leverage the functions supported by databases 2. Improve the query performance. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36593 from beliefer/SPARK-39139. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-39453][SQL][TESTS][FOLLOWUP] Let `RAND` in filter is more meaningful ### What changes were proposed in this pull request? apache#36830 makes DS V2 supports push down misc non-aggregate functions(non ANSI). But he `Rand` in test case looks no meaningful. ### Why are the changes needed? Let `Rand` in filter is more meaningful. ### Does this PR introduce _any_ user-facing change? 'No'. Just update test case. ### How was this patch tested? Just update test case. Closes apache#37033 from beliefer/SPARK-39453_followup. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-37527][SQL][FOLLOWUP] Cannot compile COVAR_POP, COVAR_SAMP and CORR in `H2Dialect` if them with `DISTINCT` ### What changes were proposed in this pull request? apache#35145 compile COVAR_POP, COVAR_SAMP and CORR in H2Dialect. Because H2 does't support COVAR_POP, COVAR_SAMP and CORR works with DISTINCT. So apache#35145 introduces a bug that compile COVAR_POP, COVAR_SAMP and CORR if these aggregate functions with DISTINCT. ### Why are the changes needed? Fix bug that compile COVAR_POP, COVAR_SAMP and CORR if these aggregate functions with DISTINCT. ### Does this PR introduce _any_ user-facing change? 'Yes'. Bug will be fix. ### How was this patch tested? New test cases. Closes apache#37090 from beliefer/SPARK-37527_followup2. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-39627][SQL] DS V2 pushdown should unify the compile API ### What changes were proposed in this pull request? Currently, `JdbcDialect` have two API `compileAggregate` and `compileExpression`, we can unify them. ### Why are the changes needed? Improve ease of use. ### Does this PR introduce _any_ user-facing change? 'No'. The two API `compileAggregate` call `compileExpression` not changed. ### How was this patch tested? N/A Closes apache#37047 from beliefer/SPARK-39627. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-39384][SQL] Compile built-in linear regression aggregate functions for JDBC dialect ### What changes were proposed in this pull request? Recently, Spark DS V2 pushdown framework translate a lot of standard linear regression aggregate functions. Currently, only H2Dialect compile these standard linear regression aggregate functions. This PR compile these standard linear regression aggregate functions for other build-in JDBC dialect. ### Why are the changes needed? Make build-in JDBC dialect support compile linear regression aggregate push-down. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New test cases. Closes apache#37188 from beliefer/SPARK-39384. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Sean Owen <[email protected]> * [SPARK-39148][SQL] DS V2 aggregate push down can work with OFFSET or LIMIT ### What changes were proposed in this pull request? This PR refactors the v2 agg pushdown code. The main change is, now we don't build the `Scan` immediately when pushing agg. We did it so before because we want to know the data schema with agg pushed, then we can add cast when rewriting the query plan after pushdown. But the problem is, we build `Scan` too early and can't push down any more operators, while it's common to see LIMIT/OFFSET after agg. The idea of the refactor is, we don't need to know the data schema with agg pushed. We just give an expectation (the data type should be the same of Spark agg functions), use it to define the output of `ScanBuilderHolder`, and then rewrite the query plan. Later on, when we build the `Scan` and replace `ScanBuilderHolder` with `DataSourceV2ScanRelation`, we check the actual data schema and add a `Project` to do type cast if necessary. ### Why are the changes needed? support pushing down LIMIT/OFFSET after agg. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? updated tests Closes apache#37195 from cloud-fan/agg. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> Co-authored-by: Jiaan Geng <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]>
…LIMIT (#505) * [SPARK-39139][SQL] DS V2 supports push down DS V2 UDF ### What changes were proposed in this pull request? Currently, Spark DS V2 push-down framework supports push down SQL to data sources. But the DS V2 push-down framework only support push down the built-in functions to data sources. Each database have a lot very useful functions which not supported by Spark. If we can push down these functions into data source, it will reduce disk I/O and network I/O and improve the performance when query databases. ### Why are the changes needed? 1. Spark can leverage the functions supported by databases 2. Improve the query performance. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36593 from beliefer/SPARK-39139. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-39453][SQL][TESTS][FOLLOWUP] Let `RAND` in filter is more meaningful ### What changes were proposed in this pull request? apache#36830 makes DS V2 supports push down misc non-aggregate functions(non ANSI). But he `Rand` in test case looks no meaningful. ### Why are the changes needed? Let `Rand` in filter is more meaningful. ### Does this PR introduce _any_ user-facing change? 'No'. Just update test case. ### How was this patch tested? Just update test case. Closes apache#37033 from beliefer/SPARK-39453_followup. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-37527][SQL][FOLLOWUP] Cannot compile COVAR_POP, COVAR_SAMP and CORR in `H2Dialect` if them with `DISTINCT` ### What changes were proposed in this pull request? apache#35145 compile COVAR_POP, COVAR_SAMP and CORR in H2Dialect. Because H2 does't support COVAR_POP, COVAR_SAMP and CORR works with DISTINCT. So apache#35145 introduces a bug that compile COVAR_POP, COVAR_SAMP and CORR if these aggregate functions with DISTINCT. ### Why are the changes needed? Fix bug that compile COVAR_POP, COVAR_SAMP and CORR if these aggregate functions with DISTINCT. ### Does this PR introduce _any_ user-facing change? 'Yes'. Bug will be fix. ### How was this patch tested? New test cases. Closes apache#37090 from beliefer/SPARK-37527_followup2. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-39627][SQL] DS V2 pushdown should unify the compile API ### What changes were proposed in this pull request? Currently, `JdbcDialect` have two API `compileAggregate` and `compileExpression`, we can unify them. ### Why are the changes needed? Improve ease of use. ### Does this PR introduce _any_ user-facing change? 'No'. The two API `compileAggregate` call `compileExpression` not changed. ### How was this patch tested? N/A Closes apache#37047 from beliefer/SPARK-39627. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-39384][SQL] Compile built-in linear regression aggregate functions for JDBC dialect ### What changes were proposed in this pull request? Recently, Spark DS V2 pushdown framework translate a lot of standard linear regression aggregate functions. Currently, only H2Dialect compile these standard linear regression aggregate functions. This PR compile these standard linear regression aggregate functions for other build-in JDBC dialect. ### Why are the changes needed? Make build-in JDBC dialect support compile linear regression aggregate push-down. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New test cases. Closes apache#37188 from beliefer/SPARK-39384. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Sean Owen <[email protected]> * [SPARK-39148][SQL] DS V2 aggregate push down can work with OFFSET or LIMIT ### What changes were proposed in this pull request? This PR refactors the v2 agg pushdown code. The main change is, now we don't build the `Scan` immediately when pushing agg. We did it so before because we want to know the data schema with agg pushed, then we can add cast when rewriting the query plan after pushdown. But the problem is, we build `Scan` too early and can't push down any more operators, while it's common to see LIMIT/OFFSET after agg. The idea of the refactor is, we don't need to know the data schema with agg pushed. We just give an expectation (the data type should be the same of Spark agg functions), use it to define the output of `ScanBuilderHolder`, and then rewrite the query plan. Later on, when we build the `Scan` and replace `ScanBuilderHolder` with `DataSourceV2ScanRelation`, we check the actual data schema and add a `Project` to do type cast if necessary. ### Why are the changes needed? support pushing down LIMIT/OFFSET after agg. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? updated tests Closes apache#37195 from cloud-fan/agg. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> Co-authored-by: Jiaan Geng <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]>
…LIMIT (#505) * [SPARK-39139][SQL] DS V2 supports push down DS V2 UDF Currently, Spark DS V2 push-down framework supports push down SQL to data sources. But the DS V2 push-down framework only support push down the built-in functions to data sources. Each database have a lot very useful functions which not supported by Spark. If we can push down these functions into data source, it will reduce disk I/O and network I/O and improve the performance when query databases. 1. Spark can leverage the functions supported by databases 2. Improve the query performance. 'No'. New feature. New tests. Closes apache#36593 from beliefer/SPARK-39139. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-39453][SQL][TESTS][FOLLOWUP] Let `RAND` in filter is more meaningful apache#36830 makes DS V2 supports push down misc non-aggregate functions(non ANSI). But he `Rand` in test case looks no meaningful. Let `Rand` in filter is more meaningful. 'No'. Just update test case. Just update test case. Closes apache#37033 from beliefer/SPARK-39453_followup. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-37527][SQL][FOLLOWUP] Cannot compile COVAR_POP, COVAR_SAMP and CORR in `H2Dialect` if them with `DISTINCT` apache#35145 compile COVAR_POP, COVAR_SAMP and CORR in H2Dialect. Because H2 does't support COVAR_POP, COVAR_SAMP and CORR works with DISTINCT. So apache#35145 introduces a bug that compile COVAR_POP, COVAR_SAMP and CORR if these aggregate functions with DISTINCT. Fix bug that compile COVAR_POP, COVAR_SAMP and CORR if these aggregate functions with DISTINCT. 'Yes'. Bug will be fix. New test cases. Closes apache#37090 from beliefer/SPARK-37527_followup2. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-39627][SQL] DS V2 pushdown should unify the compile API Currently, `JdbcDialect` have two API `compileAggregate` and `compileExpression`, we can unify them. Improve ease of use. 'No'. The two API `compileAggregate` call `compileExpression` not changed. N/A Closes apache#37047 from beliefer/SPARK-39627. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-39384][SQL] Compile built-in linear regression aggregate functions for JDBC dialect Recently, Spark DS V2 pushdown framework translate a lot of standard linear regression aggregate functions. Currently, only H2Dialect compile these standard linear regression aggregate functions. This PR compile these standard linear regression aggregate functions for other build-in JDBC dialect. Make build-in JDBC dialect support compile linear regression aggregate push-down. 'No'. New feature. New test cases. Closes apache#37188 from beliefer/SPARK-39384. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Sean Owen <[email protected]> * [SPARK-39148][SQL] DS V2 aggregate push down can work with OFFSET or LIMIT This PR refactors the v2 agg pushdown code. The main change is, now we don't build the `Scan` immediately when pushing agg. We did it so before because we want to know the data schema with agg pushed, then we can add cast when rewriting the query plan after pushdown. But the problem is, we build `Scan` too early and can't push down any more operators, while it's common to see LIMIT/OFFSET after agg. The idea of the refactor is, we don't need to know the data schema with agg pushed. We just give an expectation (the data type should be the same of Spark agg functions), use it to define the output of `ScanBuilderHolder`, and then rewrite the query plan. Later on, when we build the `Scan` and replace `ScanBuilderHolder` with `DataSourceV2ScanRelation`, we check the actual data schema and add a `Project` to do type cast if necessary. support pushing down LIMIT/OFFSET after agg. no updated tests Closes apache#37195 from cloud-fan/agg. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> Co-authored-by: Jiaan Geng <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]>
…LIMIT (#505) * [SPARK-39139][SQL] DS V2 supports push down DS V2 UDF Currently, Spark DS V2 push-down framework supports push down SQL to data sources. But the DS V2 push-down framework only support push down the built-in functions to data sources. Each database have a lot very useful functions which not supported by Spark. If we can push down these functions into data source, it will reduce disk I/O and network I/O and improve the performance when query databases. 1. Spark can leverage the functions supported by databases 2. Improve the query performance. 'No'. New feature. New tests. Closes apache#36593 from beliefer/SPARK-39139. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-39453][SQL][TESTS][FOLLOWUP] Let `RAND` in filter is more meaningful apache#36830 makes DS V2 supports push down misc non-aggregate functions(non ANSI). But he `Rand` in test case looks no meaningful. Let `Rand` in filter is more meaningful. 'No'. Just update test case. Just update test case. Closes apache#37033 from beliefer/SPARK-39453_followup. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-37527][SQL][FOLLOWUP] Cannot compile COVAR_POP, COVAR_SAMP and CORR in `H2Dialect` if them with `DISTINCT` apache#35145 compile COVAR_POP, COVAR_SAMP and CORR in H2Dialect. Because H2 does't support COVAR_POP, COVAR_SAMP and CORR works with DISTINCT. So apache#35145 introduces a bug that compile COVAR_POP, COVAR_SAMP and CORR if these aggregate functions with DISTINCT. Fix bug that compile COVAR_POP, COVAR_SAMP and CORR if these aggregate functions with DISTINCT. 'Yes'. Bug will be fix. New test cases. Closes apache#37090 from beliefer/SPARK-37527_followup2. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-39627][SQL] DS V2 pushdown should unify the compile API Currently, `JdbcDialect` have two API `compileAggregate` and `compileExpression`, we can unify them. Improve ease of use. 'No'. The two API `compileAggregate` call `compileExpression` not changed. N/A Closes apache#37047 from beliefer/SPARK-39627. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-39384][SQL] Compile built-in linear regression aggregate functions for JDBC dialect Recently, Spark DS V2 pushdown framework translate a lot of standard linear regression aggregate functions. Currently, only H2Dialect compile these standard linear regression aggregate functions. This PR compile these standard linear regression aggregate functions for other build-in JDBC dialect. Make build-in JDBC dialect support compile linear regression aggregate push-down. 'No'. New feature. New test cases. Closes apache#37188 from beliefer/SPARK-39384. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Sean Owen <[email protected]> * [SPARK-39148][SQL] DS V2 aggregate push down can work with OFFSET or LIMIT This PR refactors the v2 agg pushdown code. The main change is, now we don't build the `Scan` immediately when pushing agg. We did it so before because we want to know the data schema with agg pushed, then we can add cast when rewriting the query plan after pushdown. But the problem is, we build `Scan` too early and can't push down any more operators, while it's common to see LIMIT/OFFSET after agg. The idea of the refactor is, we don't need to know the data schema with agg pushed. We just give an expectation (the data type should be the same of Spark agg functions), use it to define the output of `ScanBuilderHolder`, and then rewrite the query plan. Later on, when we build the `Scan` and replace `ScanBuilderHolder` with `DataSourceV2ScanRelation`, we check the actual data schema and add a `Project` to do type cast if necessary. support pushing down LIMIT/OFFSET after agg. no updated tests Closes apache#37195 from cloud-fan/agg. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> Co-authored-by: Jiaan Geng <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]>
…LIMIT (#505) * [SPARK-39139][SQL] DS V2 supports push down DS V2 UDF Currently, Spark DS V2 push-down framework supports push down SQL to data sources. But the DS V2 push-down framework only support push down the built-in functions to data sources. Each database have a lot very useful functions which not supported by Spark. If we can push down these functions into data source, it will reduce disk I/O and network I/O and improve the performance when query databases. 1. Spark can leverage the functions supported by databases 2. Improve the query performance. 'No'. New feature. New tests. Closes apache#36593 from beliefer/SPARK-39139. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-39453][SQL][TESTS][FOLLOWUP] Let `RAND` in filter is more meaningful apache#36830 makes DS V2 supports push down misc non-aggregate functions(non ANSI). But he `Rand` in test case looks no meaningful. Let `Rand` in filter is more meaningful. 'No'. Just update test case. Just update test case. Closes apache#37033 from beliefer/SPARK-39453_followup. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-37527][SQL][FOLLOWUP] Cannot compile COVAR_POP, COVAR_SAMP and CORR in `H2Dialect` if them with `DISTINCT` apache#35145 compile COVAR_POP, COVAR_SAMP and CORR in H2Dialect. Because H2 does't support COVAR_POP, COVAR_SAMP and CORR works with DISTINCT. So apache#35145 introduces a bug that compile COVAR_POP, COVAR_SAMP and CORR if these aggregate functions with DISTINCT. Fix bug that compile COVAR_POP, COVAR_SAMP and CORR if these aggregate functions with DISTINCT. 'Yes'. Bug will be fix. New test cases. Closes apache#37090 from beliefer/SPARK-37527_followup2. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-39627][SQL] DS V2 pushdown should unify the compile API Currently, `JdbcDialect` have two API `compileAggregate` and `compileExpression`, we can unify them. Improve ease of use. 'No'. The two API `compileAggregate` call `compileExpression` not changed. N/A Closes apache#37047 from beliefer/SPARK-39627. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-39384][SQL] Compile built-in linear regression aggregate functions for JDBC dialect Recently, Spark DS V2 pushdown framework translate a lot of standard linear regression aggregate functions. Currently, only H2Dialect compile these standard linear regression aggregate functions. This PR compile these standard linear regression aggregate functions for other build-in JDBC dialect. Make build-in JDBC dialect support compile linear regression aggregate push-down. 'No'. New feature. New test cases. Closes apache#37188 from beliefer/SPARK-39384. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Sean Owen <[email protected]> * [SPARK-39148][SQL] DS V2 aggregate push down can work with OFFSET or LIMIT This PR refactors the v2 agg pushdown code. The main change is, now we don't build the `Scan` immediately when pushing agg. We did it so before because we want to know the data schema with agg pushed, then we can add cast when rewriting the query plan after pushdown. But the problem is, we build `Scan` too early and can't push down any more operators, while it's common to see LIMIT/OFFSET after agg. The idea of the refactor is, we don't need to know the data schema with agg pushed. We just give an expectation (the data type should be the same of Spark agg functions), use it to define the output of `ScanBuilderHolder`, and then rewrite the query plan. Later on, when we build the `Scan` and replace `ScanBuilderHolder` with `DataSourceV2ScanRelation`, we check the actual data schema and add a `Project` to do type cast if necessary. support pushing down LIMIT/OFFSET after agg. no updated tests Closes apache#37195 from cloud-fan/agg. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> Co-authored-by: Jiaan Geng <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
This PR refactors the v2 agg pushdown code. The main change is, now we don't build the
Scanimmediately when pushing agg. We did it so before because we want to know the data schema with agg pushed, then we can add cast when rewriting the query plan after pushdown. But the problem is, we buildScantoo early and can't push down any more operators, while it's common to see LIMIT/OFFSET after agg.The idea of the refactor is, we don't need to know the data schema with agg pushed. We just give an expectation (the data type should be the same of Spark agg functions), use it to define the output of
ScanBuilderHolder, and then rewrite the query plan. Later on, when we build theScanand replaceScanBuilderHolderwithDataSourceV2ScanRelation, we check the actual data schema and add aProjectto do type cast if necessary.Why are the changes needed?
support pushing down LIMIT/OFFSET after agg.
Does this PR introduce any user-facing change?
no
How was this patch tested?
updated tests