Skip to content

Conversation

@GulajavaMinistudio
Copy link
Owner

What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

jerryshao and others added 15 commits May 8, 2017 14:27
…ort configuration

## What changes were proposed in this pull request?

After SPARK-10997, client mode Netty RpcEnv doesn't require to start server, so port configurations are not used any more, here propose to remove these two configurations: "spark.executor.port" and "spark.am.port".

## How was this patch tested?

Existing UTs.

Author: jerryshao <[email protected]>

Closes #17866 from jerryshao/SPARK-20605.
## What changes were proposed in this pull request?
Cleaning existing temp tables before running tableNames tests

## How was this patch tested?
SparkR Unit tests

Author: Hossein <[email protected]>

Closes #17903 from falaki/SPARK-20661.
## What changes were proposed in this pull request?

Change it to check for relative count like in this test https://github.com/apache/spark/blame/master/R/pkg/inst/tests/testthat/test_sparkSQL.R#L3355 for catalog APIs

## How was this patch tested?

unit tests, this needs to combine with another commit with SQL change to check

Author: Felix Cheung <[email protected]>

Closes #17905 from felixcheung/rtabletests.
The recommendForAll of MLLIB ALS is very slow.
GC is a key problem of the current method.
The task use the following code to keep temp result:
val output = new Array[(Int, (Int, Double))](m*n)
m = n = 4096 (default value, no method to set)
so output is about 4k * 4k * (4 + 4 + 8) = 256M. This is a large memory and cause serious GC problem, and it is frequently OOM.

Actually, we don't need to save all the temp result. Support we recommend topK (topK is about 10, or 20) product for each user, we only need 4k * topK * (4 + 4 + 8) memory to save the temp result.

The Test Environment:
3 workers: each work 10 core, each work 30G memory, each work 1 executor.
The Data: User 480,000, and Item 17,000

BlockSize:     1024  2048  4096  8192
Old method:  245s  332s  488s  OOM
This solution: 121s  118s   117s  120s

The existing UT.

Author: Peng <[email protected]>
Author: Peng Meng <[email protected]>

Closes #17742 from mpjlu/OptimizeAls.
This PR is a `DataFrame` version of #17742 for [SPARK-11968](https://issues.apache.org/jira/browse/SPARK-11968), for improving the performance of `recommendAll` methods.

## How was this patch tested?

Existing unit tests.

Author: Nick Pentreath <[email protected]>

Closes #17845 from MLnick/ml-als-perf.
…ception

## What changes were proposed in this pull request?

Added a check for for the number of defined values.  Previously the argmax function assumed that at least one value was defined if the vector size was greater than zero.

## How was this patch tested?

Tests were added to the existing VectorsSuite to cover this case.

Author: Jon McLean <[email protected]>

Closes #17877 from jonmclean/vectorArgmaxIndexBug.
## What changes were proposed in this pull request?
Remove ML methods we deprecated in 2.1.

## How was this patch tested?
Existing tests.

Author: Yanbo Liang <[email protected]>

Closes #17867 from yanboliang/spark-20606.
…ting the package of sql/core and sql/hive

## What changes were proposed in this pull request?

So far, we do not drop all the cataloged objects after each package. Sometimes, we might hit strange test case errors because the previous test suite did not drop the cataloged/temporary objects (tables/functions/database). At least, we can first clean up the environment when completing the package of `sql/core` and `sql/hive`.

## How was this patch tested?
N/A

Author: Xiao Li <[email protected]>

Closes #17908 from gatorsmile/reset.
## What changes were proposed in this pull request?
This pr added parsing rules to support aliases in table value functions.

## How was this patch tested?
Added tests in `PlanParserSuite`.

Author: Takeshi Yamamuro <[email protected]>

Closes #17666 from maropu/SPARK-20311.
… headerpage

## What changes were proposed in this pull request?

Spark Version for a specific application is not displayed on the history page now. It should be nice to switch the spark version on the UI when we click on the specific application.
Currently there seems to be way as SparkListenerLogStart records the application version. So, it should be trivial to listen to this event and provision this change on the UI.
For Example
<img width="1439" alt="screen shot 2017-04-06 at 3 23 41 pm" src="https://cloud.githubusercontent.com/assets/8295799/25092650/41f3970a-2354-11e7-9b0d-4646d0adeb61.png">
<img width="1399" alt="screen shot 2017-04-17 at 9 59 33 am" src="https://cloud.githubusercontent.com/assets/8295799/25092743/9f9e2f28-2354-11e7-9605-f2f1c63f21fe.png">

{"Event":"SparkListenerLogStart","Spark Version":"2.0.0"}
(Please fill in changes proposed in this fix)
Modified the SparkUI for History server to listen to SparkLogListenerStart event and extract the version and print it.

## How was this patch tested?
Manual testing of UI page. Attaching the UI screenshot changes here

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Sanket <schintap@untilservice-lm>

Closes #17658 from redsanket/SPARK-20355.
## What changes were proposed in this pull request?

`ReplSuite.newProductSeqEncoder with REPL defined class` was flaky and throws OOM exception frequently. By analyzing the heap dump, we found the reason is that, in each test case of `ReplSuite`, we create a REPL instance, which creates a classloader and loads a lot of classes related to `SparkContext`. More details please see #17833 (comment).

In this PR, we create a new test suite, `SingletonReplSuite`, which shares one REPL instances among all the test cases. Then we move most of the tests from `ReplSuite` to `SingletonReplSuite`, to avoid creating a lot of REPL instances and reduce memory footprint.

## How was this patch tested?

test only change

Author: Wenchen Fan <[email protected]>

Closes #17844 from cloud-fan/flaky-test.
## What changes were proposed in this pull request?
For some reason we don't have an API to register UserDefinedFunction as named UDF. It is a no brainer to add one, in addition to the existing register functions we have.

## How was this patch tested?
Added a test case in UDFSuite for the new API.

Author: Reynold Xin <[email protected]>

Closes #17915 from rxin/SPARK-20674.
## What changes were proposed in this pull request?

Simply moves `Trigger.java` to `src/main/java` from `src/main/scala`
See #17219

## How was this patch tested?

Existing tests.

Author: Sean Owen <[email protected]>

Closes #17921 from srowen/SPARK-19876.2.
…hon version

## What changes were proposed in this pull request?

Drop the hadoop distirbution name from the Python version (PEP440 - https://www.python.org/dev/peps/pep-0440/). We've been using the local version string to disambiguate between different hadoop versions packaged with PySpark, but PEP0440 states that local versions should not be used when publishing up-stream. Since we no longer make PySpark pip packages for different hadoop versions, we can simply drop the hadoop information. If at a later point we need to start publishing different hadoop versions we can look at make different packages or similar.

## How was this patch tested?

Ran `make-distribution` locally

Author: Holden Karau <[email protected]>

Closes #17885 from holdenk/SPARK-20627-remove-pip-local-version-string.
This reverts commit 2269155.

See JIRA ticket for more information.
@GulajavaMinistudio GulajavaMinistudio merged commit 51ec388 into GulajavaMinistudio:master May 9, 2017
GulajavaMinistudio pushed a commit that referenced this pull request May 30, 2020
### What changes were proposed in this pull request?

1. Make more expressions extend `NullIntolerant`.
2. Add a checker(in `ExpressionInfoSuite`) to identify whether the expression is `NullIntolerant`.

### Why are the changes needed?

Avoid skew join if the join column has many null values and can improve query performance. For examples:
```sql
CREATE TABLE t1(c1 string, c2 string) USING parquet;
CREATE TABLE t2(c1 string, c2 string) USING parquet;
EXPLAIN SELECT t1.* FROM t1 JOIN t2 ON upper(t1.c1) = upper(t2.c1);
```

Before and after this PR:
```sql
== Physical Plan ==
*(2) Project [c1#5, c2#6]
+- *(2) BroadcastHashJoin [upper(c1#5)], [upper(c1#7)], Inner, BuildLeft
   :- BroadcastExchange HashedRelationBroadcastMode(List(upper(input[0, string, true]))), [id=#41]
   :  +- *(1) ColumnarToRow
   :     +- FileScan parquet default.t1[c1#5,c2#6]
   +- *(2) ColumnarToRow
      +- FileScan parquet default.t2[c1#7]

== Physical Plan ==
*(2) Project [c1#5, c2#6]
+- *(2) BroadcastHashJoin [upper(c1#5)], [upper(c1#7)], Inner, BuildRight
   :- *(2) Project [c1#5, c2#6]
   :  +- *(2) Filter isnotnull(c1#5)
   :     +- *(2) ColumnarToRow
   :        +- FileScan parquet default.t1[c1#5,c2#6]
   +- BroadcastExchange HashedRelationBroadcastMode(List(upper(input[0, string, true]))), [id=#59]
      +- *(1) Project [c1#7]
         +- *(1) Filter isnotnull(c1#7)
            +- *(1) ColumnarToRow
               +- FileScan parquet default.t2[c1#7]

```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes apache#28626 from wangyum/SPARK-28481.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
GulajavaMinistudio pushed a commit that referenced this pull request Aug 6, 2022
### What changes were proposed in this pull request?

Optimize the TransposeWindow rule to extend applicable cases and optimize time complexity.
TransposeWindow rule will try to eliminate unnecessary shuffle:

but the function compatiblePartitions will only take the first n elements of the window2 partition sequence, for some cases, this will not take effect, like the case below: 

val df = spark.range(10).selectExpr("id AS a", "id AS b", "id AS c", "id AS d")
df.selectExpr(
    "sum(`d`) OVER(PARTITION BY `b`,`a`) as e",
    "sum(`c`) OVER(PARTITION BY `a`) as f"
  ).explain

Current plan

== Physical Plan ==
*(5) Project [e#10L, f#11L]
+- Window [sum(c#4L) windowspecdefinition(a#2L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS f#11L], [a#2L]
   +- *(4) Sort [a#2L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(a#2L, 200), true, [id=#41]
         +- *(3) Project [a#2L, c#4L, e#10L]
            +- Window [sum(d#5L) windowspecdefinition(b#3L, a#2L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS e#10L], [b#3L, a#2L]
               +- *(2) Sort [b#3L ASC NULLS FIRST, a#2L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(b#3L, a#2L, 200), true, [id=#33]
                     +- *(1) Project [id#0L AS d#5L, id#0L AS b#3L, id#0L AS a#2L, id#0L AS c#4L]
                        +- *(1) Range (0, 10, step=1, splits=10)

Expected plan:

== Physical Plan ==
*(4) Project [e#924L, f#925L]
+- Window [sum(d#43L) windowspecdefinition(b#41L, a#40L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS e#924L], [b#41L, a#40L]
   +- *(3) Sort [b#41L ASC NULLS FIRST, a#40L ASC NULLS FIRST], false, 0
      +- *(3) Project [d#43L, b#41L, a#40L, f#925L]
         +- Window [sum(c#42L) windowspecdefinition(a#40L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS f#925L], [a#40L]
            +- *(2) Sort [a#40L ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(a#40L, 200), true, [id=#282]
                  +- *(1) Project [id#38L AS d#43L, id#38L AS b#41L, id#38L AS a#40L, id#38L AS c#42L]
                     +- *(1) Range (0, 10, step=1, splits=10)

Also the permutations method has a O(n!) time complexity, which is very expensive when there are many partition columns, we could try to optimize it.

### Why are the changes needed?

We could apply the rule for more cases, which could improve the execution performance by eliminate unnecessary shuffle, and by reducing the time complexity from O(n!) to O(n2), the performance for the rule itself could improve

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

UT

Closes apache#35334 from constzhou/SPARK-38034_optimize_transpose_window_rule.

Authored-by: xzhou <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
GulajavaMinistudio pushed a commit that referenced this pull request Nov 18, 2025
### What changes were proposed in this pull request?

This PR proposes to add `doCanonicalize` function for DataSourceV2ScanRelation. The implementation is similar to [the one in BatchScanExec](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150), as well as the [the one in LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52).

### Why are the changes needed?

Query optimization rules such as MergeScalarSubqueries check if two plans are identical by [comparing their canonicalized form](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala#L219). For DSv2, for physical plan, the canonicalization goes down in the child hierarchy to the BatchScanExec, which [has a doCanonicalize function](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150); for logical plan, the canonicalization goes down to the DataSourceV2ScanRelation, which, however, does not have a doCanonicalize function. As a result, two logical plans who are semantically identical are not identified.

Moreover, for reference, [DSv1 LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52) also has `doCanonicalize()`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

A new unit test is added to show that `MergeScalarSubqueries` is working for DataSourceV2ScanRelation.

For a query
```sql
select (select max(i) from df) as max_i, (select min(i) from df) as min_i
```

Before introducing the canonicalization, the plan is
```
== Parsed Logical Plan ==
'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- 'Project [unresolvedalias('max('i))]
:  :  +- 'UnresolvedRelation [df], [], false
:  +- 'Project [unresolvedalias('min('i))]
:     +- 'UnresolvedRelation [df], [], false
+- OneRowRelation

== Analyzed Logical Plan ==
max_i: int, min_i: int
Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- Aggregate [max(i#0) AS max(i)#7]
:  :  +- SubqueryAlias df
:  :     +- View (`df`, [i#0, j#1])
:  :        +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
:  +- Aggregate [min(i#10) AS min(i)#9]
:     +- SubqueryAlias df
:        +- View (`df`, [i#10, j#11])
:           +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
+- OneRowRelation

== Optimized Logical Plan ==
Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- Aggregate [max(i#0) AS max(i)#7]
:  :  +- Project [i#0]
:  :     +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
:  +- Aggregate [min(i#10) AS min(i)#9]
:     +- Project [i#10]
:        +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
+- OneRowRelation

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   ResultQueryStage 0
   +- *(1) Project [Subquery subquery#2, [id=#32] AS max_i#3, Subquery subquery#4, [id=#33] AS min_i#5]
      :  :- Subquery subquery#2, [id=#32]
      :  :  +- AdaptiveSparkPlan isFinalPlan=true
            +- == Final Plan ==
               ResultQueryStage 1
               +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7])
                  +- ShuffleQueryStage 0
                     +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58]
                        +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14])
                           +- *(1) Project [i#0]
                              +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
            +- == Initial Plan ==
               HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19]
                  +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14])
                     +- Project [i#0]
                        +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
      :  +- Subquery subquery#4, [id=#33]
      :     +- AdaptiveSparkPlan isFinalPlan=true
            +- == Final Plan ==
               ResultQueryStage 1
               +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9])
                  +- ShuffleQueryStage 0
                     +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63]
                        +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15])
                           +- *(1) Project [i#10]
                              +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
            +- == Initial Plan ==
               HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30]
                  +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15])
                     +- Project [i#10]
                        +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
      +- *(1) Scan OneRowRelation[]
+- == Initial Plan ==
   Project [Subquery subquery#2, [id=#32] AS max_i#3, Subquery subquery#4, [id=#33] AS min_i#5]
   :  :- Subquery subquery#2, [id=#32]
   :  :  +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            ResultQueryStage 1
            +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7])
               +- ShuffleQueryStage 0
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58]
                     +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14])
                        +- *(1) Project [i#0]
                           +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
         +- == Initial Plan ==
            HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7])
            +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19]
               +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14])
                  +- Project [i#0]
                     +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
   :  +- Subquery subquery#4, [id=#33]
   :     +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            ResultQueryStage 1
            +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9])
               +- ShuffleQueryStage 0
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63]
                     +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15])
                        +- *(1) Project [i#10]
                           +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
         +- == Initial Plan ==
            HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9])
            +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30]
               +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15])
                  +- Project [i#10]
                     +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
   +- Scan OneRowRelation[]
```

After introducing the canonicalization, the plan is as following, where you can see **ReusedSubquery**
```
== Parsed Logical Plan ==
'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- 'Project [unresolvedalias('max('i))]
:  :  +- 'UnresolvedRelation [df], [], false
:  +- 'Project [unresolvedalias('min('i))]
:     +- 'UnresolvedRelation [df], [], false
+- OneRowRelation

== Analyzed Logical Plan ==
max_i: int, min_i: int
Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- Aggregate [max(i#0) AS max(i)#7]
:  :  +- SubqueryAlias df
:  :     +- View (`df`, [i#0, j#1])
:  :        +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
:  +- Aggregate [min(i#10) AS min(i)#9]
:     +- SubqueryAlias df
:        +- View (`df`, [i#10, j#11])
:           +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
+- OneRowRelation

== Optimized Logical Plan ==
Project [scalar-subquery#2 [].max(i) AS max_i#3, scalar-subquery#4 [].min(i) AS min_i#5]
:  :- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
:  :  +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9]
:  :     +- Project [i#0]
:  :        +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
:  +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
:     +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9]
:        +- Project [i#0]
:           +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
+- OneRowRelation

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   ResultQueryStage 0
   +- *(1) Project [Subquery subquery#2, [id=#40].max(i) AS max_i#3, ReusedSubquery Subquery subquery#2, [id=#40].min(i) AS min_i#5]
      :  :- Subquery subquery#2, [id=#40]
      :  :  +- AdaptiveSparkPlan isFinalPlan=true
            +- == Final Plan ==
               ResultQueryStage 1
               +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
                  +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
                     +- ShuffleQueryStage 0
                        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71]
                           +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
                              +- *(1) Project [i#0]
                                 +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
            +- == Initial Plan ==
               Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
               +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22]
                     +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
                        +- Project [i#0]
                           +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
      :  +- ReusedSubquery Subquery subquery#2, [id=#40]
      +- *(1) Scan OneRowRelation[]
+- == Initial Plan ==
   Project [Subquery subquery#2, [id=#40].max(i) AS max_i#3, Subquery subquery#4, [id=#41].min(i) AS min_i#5]
   :  :- Subquery subquery#2, [id=#40]
   :  :  +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            ResultQueryStage 1
            +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
               +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
                  +- ShuffleQueryStage 0
                     +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71]
                        +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
                           +- *(1) Project [i#0]
                              +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
         +- == Initial Plan ==
            Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
            +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22]
                  +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
                     +- Project [i#0]
                        +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
   :  +- Subquery subquery#4, [id=#41]
   :     +- AdaptiveSparkPlan isFinalPlan=false
   :        +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
   :           +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
   :              +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=37]
   :                 +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
   :                    +- Project [i#0]
   :                       +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
   +- Scan OneRowRelation[]
```

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52529 from yhuang-db/scan-canonicalization.

Authored-by: yhuang-db <[email protected]>
Signed-off-by: Peter Toth <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.