-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53738][SQL] PlannedWrite should preserve custom sort order when query output contains literal #52474
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| val literalColumns = queryOutput.flatMap { ne => isLiteral(ne, ne.name) } | ||
|
|
||
| // We should first sort by dynamic partition columns, then bucket id, and finally sorting | ||
| // columns, then drop literal columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, SortOrder(Literal) has been eliminated by EliminateSorts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you basically change requiredOrdering and drop those columns that are defined as literals in outputExpressions of the top OrderPreservingUnaryExecNode node. But what if a literal definition is not in the top node?
Shouldn't we do the other way around and fix actualOrdering? I mean if we have a query:
+- Project [i, j, 0 AS k]
+ ...
+- Sort [i]
+- Relation
Then shouldn't actualOrdering (outputOrdering of the Project) be Seq(SortOrder(k), SortOrder(i)) as we know that k is a constant? I.e. Project could prepend its contants to the alias transformed child.outputOrdering.
And similarly, when we have:
+- Sort [i]
+ ...
+- Project [i, j, 0 AS k]
+- Relation
Then shoudn't outputOrdering of the Project node be Seq(SortOrder(k)), and outputOrdering of the Sort be Seq(SortOrder(k), SortOrder(i))? I.e. Project could somehow mark that SortOrder(k) as "constant order", and Sort should just extend "constant order" expressions from child.outputOrdering with the new order expressions (i).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@peter-toth thanks for your tips, that sounds reasonable, and I have updated the code in this approach, it's effective both w/ and w/o SPARK-53707
…n query output contains literal
9b549cd to
a0aa9f4
Compare
|
|
||
| val listener = new QueryExecutionListener { | ||
| override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { | ||
| val conf = qe.sparkSession.sessionState.conf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a bugfix, the listener runs in another thread, without this change, conf.getConf actually gets conf from the thread local, thus may cause issues on concurrency running tests
|
cc @cloud-fan, could you please take a look? |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
Outdated
Show resolved
Hide resolved
...alyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
Outdated
Show resolved
Hide resolved
|
@pan3793 , I can take a deeper look at this PR tomorrow, but the |
|
@peter-toth Let me take this example to explain what I'm trying to do, In and the idea is to leverage the alias information in BUT, when I debugged it last night, I found the issue had gone magically, and in After some investigation, I found this was accidentally fixed by SPARK-53707 (#52449), which got merged just a few days ago (I happened to start constructing the UT before it got in ...), it fixes the issue by adding a Note: the physics plan change in this PR is still required to satisfy the UT. Now, I'm not sure if this is still an issue ... |
|
I see, thanks for the details @pan3793. I feel that a more comprehensive fix would be to not change But this is a complex topic, so @ulysses-you or @cloud-fan or others might have better ideas. |
| newOrdering.takeWhile(_.isDefined).flatten.toSeq ++ outputExpressions.filter { | ||
| case Alias(child, _) => child.foldable | ||
| case expr => expr.foldable | ||
| }.map(SortOrder(_, Ascending).copy(isConstant = true)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, do we need to add the whole Alias to SortOrder expression or could adding only the generated attribute work?
Also, I wonder if it would be a breaking change to add Constant as a new SortDirection instead of using a boolean flag?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@peter-toth the Constant SortDirection sounds like a good idea, I have updated the code to use it.
while I have tried to add only the generated attribute (I left the code as comments)
newOrdering.takeWhile(_.isDefined).flatten.toSeq ++ outputExpressions.flatMap {
case alias @ Alias(child, _) if child.foldable =>
Some(SortOrder(alias.toAttribute, Constant))
case expr if expr.foldable =>
Some(SortOrder(expr, Constant))
case _ => None
}
there are two tests fail (haven't figured out the root cause)
[info] CachedTableSuite:
...
[info] - SPARK-36120: Support cache/uncache table with TimestampNTZ type *** FAILED *** (43 milliseconds)
[info] AttributeSet(TIMESTAMP_NTZ '2021-01-01 00:00:00'#17739) was not empty The optimized logical plan has missing inputs:
[info] InMemoryRelation [TIMESTAMP_NTZ '2021-01-01 00:00:00'#17776], StorageLevel(disk, memory, deserialized, 1 replicas)
[info] +- *(1) Project [2021-01-01 00:00:00 AS TIMESTAMP_NTZ '2021-01-01 00:00:00'#17739]
[info] +- *(1) Scan OneRowRelation[] (QueryTest.scala:241)
...
[info] - SPARK-52692: Support cache/uncache table with Time type *** FAILED *** (58 milliseconds)
[info] AttributeSet(TIME '22:00:00'#18852) was not empty The optimized logical plan has missing inputs:
[info] InMemoryRelation [TIME '22:00:00'#18889], StorageLevel(disk, memory, deserialized, 1 replicas)
[info] +- *(1) Project [22:00:00 AS TIME '22:00:00'#18852]
[info] +- *(1) Scan OneRowRelation[] (QueryTest.scala:241)
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem seems to be that InMemoryRelation.withOutput() doesn't remap outputOrdering. And because outputOrdering is present in InMemoryRelation as case class argument the unmapped ordering attributes are considered missing inputs.
This seems to be another hidden issue with InMemoryRelation.outputOrdering and got exposed with this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened a small PR into this PR: pan3793#2, hopefully it helps fixing the above tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@peter-toth Many thanks for your professionalism and patience! I tested locally, and it did fix the issue. Have educated a lot from your review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It a pleasure working with you @pan3793!
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
Outdated
Show resolved
Hide resolved
|
Thank you @pan3793. I like the new approach, just have a minor suggestions. |
|
|
||
| override def makeCopy(newArgs: Array[AnyRef]): LogicalPlan = { | ||
| val copied = super.makeCopy(newArgs).asInstanceOf[InMemoryRelation] | ||
| copied.statsOfPlanToCache = this.statsOfPlanToCache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel this is a hidden bug just exposed by this change.
|
We should adjust |
...catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
Outdated
Show resolved
Hide resolved
InMemoryRelation.withOutput fix
|
Suppose all code issues are fixed, let's wait for another round CI, I will update the comment soon. |
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
Outdated
Show resolved
Hide resolved
|
cc @cloud-fan , @ulysses-you |
|
I plan to merge this PR tomorrow if there are no more comments. |
|
Thanks @pan3793 for the fix! Merged to |
|
It seems overkill to me to introduce the new Constant SortDirection in many code places (may break thirdparty code repo since we add a new SortDirection). Can we just add a rule to remove foldable expression in sort order ? and we can even remove the whole sort operator if all sort orders are foldable. |
@ulysses-you |
|
just back from the holiday. I think a new constant SortOrder is too much, as it complicates the foundemental ordering framework. I agree that we need to propogate the constant information so that we can know if an attribute is a constant or not. The rule A straightforward fix: in the rule |
|
Allright and sorry @pan3793 for taking you to a wrong path. Shall I revert the commit from |
|
@peter-toth I'm away from my laptop now, you can directly revert the commit on master, and I will try @cloud-fan's suggestion later (likely Saturday) |
|
Reverted the change from |
…ble orderings ### What changes were proposed in this pull request? This is the second try of #52474, following [the suggestion from cloud-fan](#52474 (comment)) This PR fixes a bug in `plannedWrite`, where the `query` has foldable orderings in the partition columns. ``` CREATE TABLE t (i INT, j INT, k STRING) USING PARQUET PARTITIONED BY (k); INSERT OVERWRITE t SELECT j AS i, i AS j, '0' as k FROM t0 SORT BY k, i; ``` The evaluation of `FileFormatWriter.orderingMatched` fails because `SortOrder(Literal)` is eliminated by `EliminateSorts`. ### Why are the changes needed? `V1Writes` will override the custom sort order when the query output ordering does not satisfy the required ordering. Before SPARK-53707, when the query's output contains literals in partition columns, the judgment produces a false-negative result, thus causing the sort order not to take effect. SPARK-53707 partially fixes the issue on the logical plan by adding a `Project` of query in `V1Writes`. Before SPARK-53707 ``` Sort [0 ASC NULLS FIRST, i#280 ASC NULLS FIRST], false +- Project [j#287 AS i#280, i#286 AS j#281, 0 AS k#282] +- Relation spark_catalog.default.t0[i#286,j#287,k#288] parquet ``` After SPARK-53707 ``` Project [i#284, j#285, 0 AS k#290] +- Sort [0 ASC NULLS FIRST, i#284 ASC NULLS FIRST], false +- Project [i#284, j#285] +- Relation spark_catalog.default.t0[i#284,j#285,k#286] parquet ``` Note, note the issue still exists because there is another place to check the ordering match again in `FileFormatWriter`. This PR fixes the issue thoroughly, with new UTs added. ### Does this PR introduce _any_ user-facing change? Yes, it's a bug fix. ### How was this patch tested? New UTs are added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52584 from pan3793/SPARK-53738-rework. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Peter Toth <[email protected]>
…ble orderings ### What changes were proposed in this pull request? This is the second try of #52474, following [the suggestion from cloud-fan](#52474 (comment)) This PR fixes a bug in `plannedWrite`, where the `query` has foldable orderings in the partition columns. ``` CREATE TABLE t (i INT, j INT, k STRING) USING PARQUET PARTITIONED BY (k); INSERT OVERWRITE t SELECT j AS i, i AS j, '0' as k FROM t0 SORT BY k, i; ``` The evaluation of `FileFormatWriter.orderingMatched` fails because `SortOrder(Literal)` is eliminated by `EliminateSorts`. ### Why are the changes needed? `V1Writes` will override the custom sort order when the query output ordering does not satisfy the required ordering. Before SPARK-53707, when the query's output contains literals in partition columns, the judgment produces a false-negative result, thus causing the sort order not to take effect. SPARK-53707 partially fixes the issue on the logical plan by adding a `Project` of query in `V1Writes`. Before SPARK-53707 ``` Sort [0 ASC NULLS FIRST, i#280 ASC NULLS FIRST], false +- Project [j#287 AS i#280, i#286 AS j#281, 0 AS k#282] +- Relation spark_catalog.default.t0[i#286,j#287,k#288] parquet ``` After SPARK-53707 ``` Project [i#284, j#285, 0 AS k#290] +- Sort [0 ASC NULLS FIRST, i#284 ASC NULLS FIRST], false +- Project [i#284, j#285] +- Relation spark_catalog.default.t0[i#284,j#285,k#286] parquet ``` Note, note the issue still exists because there is another place to check the ordering match again in `FileFormatWriter`. This PR fixes the issue thoroughly, with new UTs added. ### Does this PR introduce _any_ user-facing change? Yes, it's a bug fix. ### How was this patch tested? New UTs are added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52584 from pan3793/SPARK-53738-rework. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Peter Toth <[email protected]> (cherry picked from commit f33d8aa) Signed-off-by: Peter Toth <[email protected]>
…ble orderings ### What changes were proposed in this pull request? This is the second try of apache#52474, following [the suggestion from cloud-fan](apache#52474 (comment)) This PR fixes a bug in `plannedWrite`, where the `query` has foldable orderings in the partition columns. ``` CREATE TABLE t (i INT, j INT, k STRING) USING PARQUET PARTITIONED BY (k); INSERT OVERWRITE t SELECT j AS i, i AS j, '0' as k FROM t0 SORT BY k, i; ``` The evaluation of `FileFormatWriter.orderingMatched` fails because `SortOrder(Literal)` is eliminated by `EliminateSorts`. ### Why are the changes needed? `V1Writes` will override the custom sort order when the query output ordering does not satisfy the required ordering. Before SPARK-53707, when the query's output contains literals in partition columns, the judgment produces a false-negative result, thus causing the sort order not to take effect. SPARK-53707 partially fixes the issue on the logical plan by adding a `Project` of query in `V1Writes`. Before SPARK-53707 ``` Sort [0 ASC NULLS FIRST, i#280 ASC NULLS FIRST], false +- Project [j#287 AS i#280, i#286 AS j#281, 0 AS k#282] +- Relation spark_catalog.default.t0[i#286,j#287,k#288] parquet ``` After SPARK-53707 ``` Project [i#284, j#285, 0 AS k#290] +- Sort [0 ASC NULLS FIRST, i#284 ASC NULLS FIRST], false +- Project [i#284, j#285] +- Relation spark_catalog.default.t0[i#284,j#285,k#286] parquet ``` Note, note the issue still exists because there is another place to check the ordering match again in `FileFormatWriter`. This PR fixes the issue thoroughly, with new UTs added. ### Does this PR introduce _any_ user-facing change? Yes, it's a bug fix. ### How was this patch tested? New UTs are added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52584 from pan3793/SPARK-53738-rework. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Peter Toth <[email protected]> (cherry picked from commit f33d8aa) Signed-off-by: Peter Toth <[email protected]>
…foldable orderings Backport #52584 to branch-3.5 ### What changes were proposed in this pull request? This is the second try of #52474, following [the suggestion from cloud-fan](#52474 (comment)) This PR fixes a bug in `plannedWrite`, where the `query` has foldable orderings in the partition columns. ``` CREATE TABLE t (i INT, j INT, k STRING) USING PARQUET PARTITIONED BY (k); INSERT OVERWRITE t SELECT j AS i, i AS j, '0' as k FROM t0 SORT BY k, i; ``` The evaluation of `FileFormatWriter.orderingMatched` fails because `SortOrder(Literal)` is eliminated by `EliminateSorts`. ### Why are the changes needed? `V1Writes` will override the custom sort order when the query output ordering does not satisfy the required ordering. Before SPARK-53707, when the query's output contains literals in partition columns, the judgment produces a false-negative result, thus causing the sort order not to take effect. SPARK-53707 partially fixes the issue on the logical plan by adding a `Project` of query in `V1Writes`. Before SPARK-53707 ``` Sort [0 ASC NULLS FIRST, i#280 ASC NULLS FIRST], false +- Project [j#287 AS i#280, i#286 AS j#281, 0 AS k#282] +- Relation spark_catalog.default.t0[i#286,j#287,k#288] parquet ``` After SPARK-53707 ``` Project [i#284, j#285, 0 AS k#290] +- Sort [0 ASC NULLS FIRST, i#284 ASC NULLS FIRST], false +- Project [i#284, j#285] +- Relation spark_catalog.default.t0[i#284,j#285,k#286] parquet ``` Note, note the issue still exists because there is another place to check the ordering match again in `FileFormatWriter`. This PR fixes the issue thoroughly, with new UTs added. ### Does this PR introduce _any_ user-facing change? Yes, it's a bug fix. ### How was this patch tested? New UTs are added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52697 from pan3793/SPARK-53694-3.5. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Peter Toth <[email protected]>
…ble orderings ### What changes were proposed in this pull request? This is the second try of apache#52474, following [the suggestion from cloud-fan](apache#52474 (comment)) This PR fixes a bug in `plannedWrite`, where the `query` has foldable orderings in the partition columns. ``` CREATE TABLE t (i INT, j INT, k STRING) USING PARQUET PARTITIONED BY (k); INSERT OVERWRITE t SELECT j AS i, i AS j, '0' as k FROM t0 SORT BY k, i; ``` The evaluation of `FileFormatWriter.orderingMatched` fails because `SortOrder(Literal)` is eliminated by `EliminateSorts`. ### Why are the changes needed? `V1Writes` will override the custom sort order when the query output ordering does not satisfy the required ordering. Before SPARK-53707, when the query's output contains literals in partition columns, the judgment produces a false-negative result, thus causing the sort order not to take effect. SPARK-53707 partially fixes the issue on the logical plan by adding a `Project` of query in `V1Writes`. Before SPARK-53707 ``` Sort [0 ASC NULLS FIRST, i#280 ASC NULLS FIRST], false +- Project [j#287 AS i#280, i#286 AS j#281, 0 AS k#282] +- Relation spark_catalog.default.t0[i#286,j#287,k#288] parquet ``` After SPARK-53707 ``` Project [i#284, j#285, 0 AS k#290] +- Sort [0 ASC NULLS FIRST, i#284 ASC NULLS FIRST], false +- Project [i#284, j#285] +- Relation spark_catalog.default.t0[i#284,j#285,k#286] parquet ``` Note, note the issue still exists because there is another place to check the ordering match again in `FileFormatWriter`. This PR fixes the issue thoroughly, with new UTs added. ### Does this PR introduce _any_ user-facing change? Yes, it's a bug fix. ### How was this patch tested? New UTs are added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52584 from pan3793/SPARK-53738-rework. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Peter Toth <[email protected]> (cherry picked from commit 6289672) Signed-off-by: Peter Toth <[email protected]>
What changes were proposed in this pull request?
This PR fixes a bug in
plannedWrite, where thequeryhas a literal output of the partition column.The evaluation of
FileFormatWriter.orderingMatchedfails becauseSortOrder(Literal)is eliminated byEliminateSorts.The idea is to expose and keep "constant order" expressions from
child.outputOrderingWhy are the changes needed?
V1Writeswill override the custom sort order when the query output ordering does not satisfy the required ordering. Before SPARK-53707, when the query's output contains literals in partition columns, the judgment produces a false-negative result, thus causing the sort order not to take effect.SPARK-53707 fixes the issue accidentally(and partially) by adding a
Projectof query inV1Writes.Before SPARK-53707
After SPARK-53707
This PR fixes the issue thoroughly, with a new UT added.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
UT is added.
Was this patch authored or co-authored using generative AI tooling?
No.