-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-31078][SQL] Respect aliases in output ordering #27842
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #119498 has finished for PR 27842 at commit
|
| test("sort should not be introduced when aliases are used") { | ||
| withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { | ||
| withTable("t") { | ||
| df1.repartition(1).write.format("parquet").bucketBy(8, "i").sortBy("i").saveAsTable("t") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add more tests, e.g., orderBy cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean something like the following?
val t1 = spark.range(10).selectExpr("id as k1").orderBy("k1")
.selectExpr("k1 as k11").orderBy("k11")
val t2 = spark.range(10).selectExpr("id as k2").orderBy("k2")
t1.join(t2, t1("k11") === t2("k2")).explain(true)
Extra Sort is optimized away, so it doesn't affect the physical plan (no extra sort).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. How about the aggregate case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a test for SortAggregateExec. HashAggregateExec and ObjectAggregateExec are not affected since ordering is not involved with them.
| * `outputPartitioning` and `outputOrdering` that satisfy distribution and ordering requirements. | ||
| */ | ||
| trait AliasAwareOutputPartitioning extends UnaryExecNode { | ||
| trait AliasAwareOutputs extends UnaryExecNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about the name, AliasAwareOutputPartitioningAndOrdering?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good idea.
|
cc: @cloud-fan |
|
Test build #119528 has finished for PR 27842 at commit
|
| trait AliasAwareOutputPartitioningAndOrdering extends UnaryExecNode { | ||
| protected def outputExpressions: Seq[NamedExpression] | ||
|
|
||
| protected def orderingExpressions: Seq[SortOrder] = child.outputOrdering |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this implicitly indicates that the plan inherits output ordering from its child. This seems risky to me as SparkPlan.outputOrdering is Nil by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Now that you mention it, should I just separate this into AliasAwareOutputPartitioning and AliasAwareOutputOrdering and apply the latter only to SortAggregateExec and ProjectExec so that the intention is clear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, nice catch... Yea, could you try to split it into the two parts?
|
|
||
| // We expect two SortExec nodes on each side of join. | ||
| val sorts = planned.collect { case s: SortExec => s } | ||
| assert(sorts.size == 4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, this is 5 without the fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we have sorts between SortAggregateExec and SortMergeJoinExec?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, nothing between them. Two sorts under SortAggregateExec:
+- SortAggregate(key=[k2#224L], functions=[collect_list(k2#224L, 0, 0)], output=[k2#224L, collect_list(k2)#236])
+- *(5) Sort [k2#224L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(k2#224L, 5), true, [id=#151]
+- SortAggregate(key=[k2#224L], functions=[partial_collect_list(k2#224L, 0, 0)], output=[k2#224L, buf#254])
+- *(4) Sort [k2#224L ASC NULLS FIRST], false, 0
+- *(4) Project [FLOOR((cast(id#222L as double) / 4.0)) AS k2#224L]
+- *(4) Range (0, 20, step=1, splits=2)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah i see
|
Test build #119588 has finished for PR 27842 at commit
|
|
Test build #119598 has finished for PR 27842 at commit
|
|
retest this please |
|
Test build #119605 has finished for PR 27842 at commit
|
|
thanks, merging to master! |
|
late LGTM, thanks, guys. |
### What changes were proposed in this pull request?
Currently, in the following scenario, an unnecessary `Sort` node is introduced:
```scala
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
val df = (0 until 20).toDF("i").as("df")
df.repartition(8, df("i")).write.format("parquet")
.bucketBy(8, "i").sortBy("i").saveAsTable("t")
val t1 = spark.table("t")
val t2 = t1.selectExpr("i as ii")
t1.join(t2, t1("i") === t2("ii")).explain
}
```
```
== Physical Plan ==
*(3) SortMergeJoin [i#8], [ii#10], Inner
:- *(1) Project [i#8]
: +- *(1) Filter isnotnull(i#8)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
+- *(2) Sort [ii#10 ASC NULLS FIRST], false, 0 <==== UNNECESSARY
+- *(2) Project [i#8 AS ii#10]
+- *(2) Filter isnotnull(i#8)
+- *(2) ColumnarToRow
+- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
```
Notice that `Sort [ii#10 ASC NULLS FIRST], false, 0` is introduced even though the underlying data is already sorted. This is because `outputOrdering` doesn't handle aliases correctly. This PR proposes to fix this issue.
### Why are the changes needed?
To better handle aliases in `outputOrdering`.
### Does this PR introduce any user-facing change?
Yes, now with the fix, the `explain` prints out the following:
```
== Physical Plan ==
*(3) SortMergeJoin [i#8], [ii#10], Inner
:- *(1) Project [i#8]
: +- *(1) Filter isnotnull(i#8)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
+- *(2) Project [i#8 AS ii#10]
+- *(2) Filter isnotnull(i#8)
+- *(2) ColumnarToRow
+- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
```
### How was this patch tested?
Tests added.
Closes apache#27842 from imback82/alias_aware_sort_order.
Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request?
Currently, in the following scenario, an unnecessary `Sort` node is introduced:
```scala
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
val df = (0 until 20).toDF("i").as("df")
df.repartition(8, df("i")).write.format("parquet")
.bucketBy(8, "i").sortBy("i").saveAsTable("t")
val t1 = spark.table("t")
val t2 = t1.selectExpr("i as ii")
t1.join(t2, t1("i") === t2("ii")).explain
}
```
```
== Physical Plan ==
*(3) SortMergeJoin [i#8], [ii#10], Inner
:- *(1) Project [i#8]
: +- *(1) Filter isnotnull(i#8)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
+- *(2) Sort [ii#10 ASC NULLS FIRST], false, 0 <==== UNNECESSARY
+- *(2) Project [i#8 AS ii#10]
+- *(2) Filter isnotnull(i#8)
+- *(2) ColumnarToRow
+- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
```
Notice that `Sort [ii#10 ASC NULLS FIRST], false, 0` is introduced even though the underlying data is already sorted. This is because `outputOrdering` doesn't handle aliases correctly. This PR proposes to fix this issue.
### Why are the changes needed?
To better handle aliases in `outputOrdering`.
### Does this PR introduce any user-facing change?
Yes, now with the fix, the `explain` prints out the following:
```
== Physical Plan ==
*(3) SortMergeJoin [i#8], [ii#10], Inner
:- *(1) Project [i#8]
: +- *(1) Filter isnotnull(i#8)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
+- *(2) Project [i#8 AS ii#10]
+- *(2) Filter isnotnull(i#8)
+- *(2) ColumnarToRow
+- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
```
### How was this patch tested?
Tests added.
Closes apache#27842 from imback82/alias_aware_sort_order.
Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 294f605)
### What changes were proposed in this pull request?
Currently, in the following scenario, an unnecessary `Sort` node is introduced:
```scala
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
val df = (0 until 20).toDF("i").as("df")
df.repartition(8, df("i")).write.format("parquet")
.bucketBy(8, "i").sortBy("i").saveAsTable("t")
val t1 = spark.table("t")
val t2 = t1.selectExpr("i as ii")
t1.join(t2, t1("i") === t2("ii")).explain
}
```
```
== Physical Plan ==
*(3) SortMergeJoin [i#8], [ii#10], Inner
:- *(1) Project [i#8]
: +- *(1) Filter isnotnull(i#8)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
+- *(2) Sort [ii#10 ASC NULLS FIRST], false, 0 <==== UNNECESSARY
+- *(2) Project [i#8 AS ii#10]
+- *(2) Filter isnotnull(i#8)
+- *(2) ColumnarToRow
+- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
```
Notice that `Sort [ii#10 ASC NULLS FIRST], false, 0` is introduced even though the underlying data is already sorted. This is because `outputOrdering` doesn't handle aliases correctly. This PR proposes to fix this issue.
### Why are the changes needed?
To better handle aliases in `outputOrdering`.
### Does this PR introduce any user-facing change?
Yes, now with the fix, the `explain` prints out the following:
```
== Physical Plan ==
*(3) SortMergeJoin [i#8], [ii#10], Inner
:- *(1) Project [i#8]
: +- *(1) Filter isnotnull(i#8)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
+- *(2) Project [i#8 AS ii#10]
+- *(2) Filter isnotnull(i#8)
+- *(2) ColumnarToRow
+- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
```
### How was this patch tested?
Tests added.
Closes apache#27842 from imback82/alias_aware_sort_order.
Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 294f605)
### What changes were proposed in this pull request?
Currently, in the following scenario, an unnecessary `Sort` node is introduced:
```scala
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
val df = (0 until 20).toDF("i").as("df")
df.repartition(8, df("i")).write.format("parquet")
.bucketBy(8, "i").sortBy("i").saveAsTable("t")
val t1 = spark.table("t")
val t2 = t1.selectExpr("i as ii")
t1.join(t2, t1("i") === t2("ii")).explain
}
```
```
== Physical Plan ==
*(3) SortMergeJoin [i#8], [ii#10], Inner
:- *(1) Project [i#8]
: +- *(1) Filter isnotnull(i#8)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
+- *(2) Sort [ii#10 ASC NULLS FIRST], false, 0 <==== UNNECESSARY
+- *(2) Project [i#8 AS ii#10]
+- *(2) Filter isnotnull(i#8)
+- *(2) ColumnarToRow
+- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
```
Notice that `Sort [ii#10 ASC NULLS FIRST], false, 0` is introduced even though the underlying data is already sorted. This is because `outputOrdering` doesn't handle aliases correctly. This PR proposes to fix this issue.
### Why are the changes needed?
To better handle aliases in `outputOrdering`.
### Does this PR introduce any user-facing change?
Yes, now with the fix, the `explain` prints out the following:
```
== Physical Plan ==
*(3) SortMergeJoin [i#8], [ii#10], Inner
:- *(1) Project [i#8]
: +- *(1) Filter isnotnull(i#8)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
+- *(2) Project [i#8 AS ii#10]
+- *(2) Filter isnotnull(i#8)
+- *(2) ColumnarToRow
+- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
```
### How was this patch tested?
Tests added.
Closes apache#27842 from imback82/alias_aware_sort_order.
Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 294f605)
…rtitioning and sortorder with respect to aliases to avoid unneeded exchange/sort nodes (#1092) * [SPARK-31078][SQL] Respect aliases in output ordering Currently, in the following scenario, an unnecessary `Sort` node is introduced: ```scala withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { val df = (0 until 20).toDF("i").as("df") df.repartition(8, df("i")).write.format("parquet") .bucketBy(8, "i").sortBy("i").saveAsTable("t") val t1 = spark.table("t") val t2 = t1.selectExpr("i as ii") t1.join(t2, t1("i") === t2("ii")).explain } ``` ``` == Physical Plan == *(3) SortMergeJoin [i#8], [ii#10], Inner :- *(1) Project [i#8] : +- *(1) Filter isnotnull(i#8) : +- *(1) ColumnarToRow : +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8 +- *(2) Sort [ii#10 ASC NULLS FIRST], false, 0 <==== UNNECESSARY +- *(2) Project [i#8 AS ii#10] +- *(2) Filter isnotnull(i#8) +- *(2) ColumnarToRow +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8 ``` Notice that `Sort [ii#10 ASC NULLS FIRST], false, 0` is introduced even though the underlying data is already sorted. This is because `outputOrdering` doesn't handle aliases correctly. This PR proposes to fix this issue. To better handle aliases in `outputOrdering`. Yes, now with the fix, the `explain` prints out the following: ``` == Physical Plan == *(3) SortMergeJoin [i#8], [ii#10], Inner :- *(1) Project [i#8] : +- *(1) Filter isnotnull(i#8) : +- *(1) ColumnarToRow : +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8 +- *(2) Project [i#8 AS ii#10] +- *(2) Filter isnotnull(i#8) +- *(2) ColumnarToRow +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8 ``` Tests added. Closes #27842 from imback82/alias_aware_sort_order. Authored-by: Terry Kim <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> * [SPARK-33399][SQL] Normalize output partitioning and sortorder with respect to aliases to avoid unneeded exchange/sort nodes This pull request tries to remove unneeded exchanges/sorts by normalizing the output partitioning and sortorder information correctly with respect to aliases. Example: consider this join of three tables: |SELECT t2id, t3.id as t3id |FROM ( | SELECT t1.id as t1id, t2.id as t2id | FROM t1, t2 | WHERE t1.id = t2.id |) t12, t3 |WHERE t1id = t3.id The plan for this looks like: *(9) Project [t2id#1034L, id#1004L AS t3id#1035L] +- *(9) SortMergeJoin [t1id#1033L], [id#1004L], Inner :- *(6) Sort [t1id#1033L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(t1id#1033L, 5), true, [id=#1343] <------------------------------ : +- *(5) Project [id#996L AS t1id#1033L, id#1000L AS t2id#1034L] : +- *(5) SortMergeJoin [id#996L], [id#1000L], Inner : :- *(2) Sort [id#996L ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(id#996L, 5), true, [id=#1329] : : +- *(1) Range (0, 10, step=1, splits=2) : +- *(4) Sort [id#1000L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#1000L, 5), true, [id=#1335] : +- *(3) Range (0, 20, step=1, splits=2) +- *(8) Sort [id#1004L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#1004L, 5), true, [id=#1349] +- *(7) Range (0, 30, step=1, splits=2) In this plan, the marked exchange could have been avoided as the data is already partitioned on "t1.id". This happens because AliasAwareOutputPartitioning class handles aliases only related to HashPartitioning. This change normalizes all output partitioning based on aliasing happening in Project. To remove unneeded exchanges. No New UT added. On TPCDS 1000 scale, this change improves the performance of query 95 from 330 seconds to 170 seconds by removing the extra Exchange. Closes #30300 from prakharjain09/SPARK-33399-outputpartitioning. Authored-by: Prakhar Jain <[email protected]> Signed-off-by: Takeshi Yamamuro <[email protected]> * [CARMEL-6306] Fix ut * [CARMEL-6306] Fix alias not compatible with ebay skew implementation Co-authored-by: Terry Kim <[email protected]> Co-authored-by: Prakhar Jain <[email protected]>
What changes were proposed in this pull request?
Currently, in the following scenario, an unnecessary
Sortnode is introduced:Notice that
Sort [ii#10 ASC NULLS FIRST], false, 0is introduced even though the underlying data is already sorted. This is becauseoutputOrderingdoesn't handle aliases correctly. This PR proposes to fix this issue.Why are the changes needed?
To better handle aliases in
outputOrdering.Does this PR introduce any user-facing change?
Yes, now with the fix, the
explainprints out the following:How was this patch tested?
Tests added.