-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-34026][SQL] Inject repartition and sort nodes to satisfy required distribution and ordering #31083
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should have added a TODO with a JIRA or remove these tests completely if we don't want to optimize such cases.
The original discussion is here.
I think there are cases when we can remove repartition nodes but not in a general case.
For example, RepartitionByExpr(c1, c2) -> Projects -> RepartitionByExpr(c1, c2) can be simplified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moreover, we cannot even dedup cases like RepartitionByExpr(c1, c2) -> RepartitionByExpr(c1, c2) as CollapseRepartitions runs in the operator optimization batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be interested to hear what everybody thinks about this.
I see a couple of solutions:
- Move
CollapseRepartitionout from the operator optimization batch. - Introduce a new optimizer rule similar to
CollapseRepartitionandEliminateSorts. - Introduce a preparation rule like
RemoveRedundantSortsthat would operate onSparkPlannodes.
I guess we better do this in the optimizer so either option 1 or 2 can work.
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
Outdated
Show resolved
Hide resolved
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #133792 has finished for PR 31083 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
Outdated
Show resolved
Hide resolved
|
Thank you, @aokolnychyi ! |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #133806 has finished for PR 31083 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious why other transforms are not supported yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot map other transforms to valid Catalyst expressions as Spark is not capable of resolving data source transforms. We need a function catalog for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my understanding, is it possible that in certain situations the repartition and/or sort can be avoided? for instance the input data to write is already in the correct shape.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea of doing this in the optimizer is that Spark will be smart enough to remove redundant sorts and repartitions. For example, I've extended EliminateSorts to cover more cases in PR #29089.
Also, checkWriteRequirements is written in a way that ensures there are only one shuffle and sort node. We can successfully dedup either local or global sorts now.
The situation with repartition nodes is worse. Specifically, CollapseRepartition runs in the operator optimization batch before we construct writes and it does not cover cases when there are filters or projections in between. That's why tests for duplicating repartition nodes are ignored for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @aokolnychyi ! this is very helpful. I guess #30093 , which introduces a physical rule RemoveRedundantSorts, is also useful here. Not sure if the CollapseRepartition issue can be handled in a similar approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @dongjoon-hyun @rdblue @viirya @cloud-fan @HyukjinKwon as well
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
Outdated
Show resolved
Hide resolved
|
Kubernetes integration test starting |
sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
Outdated
Show resolved
Hide resolved
|
Kubernetes integration test status failure |
|
Test build #133837 has finished for PR 31083 at commit
|
…red distribution and ordering
fc3df48 to
f5ea53f
Compare
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #133917 has finished for PR 31083 at commit
|
| } | ||
|
|
||
| val queryWithDistribution = if (distribution.nonEmpty) { | ||
| val numShufflePartitions = conf.numShufflePartitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a limitation for data sources; for DSv1 they could inject any arbitrary operations, including calling repartition method Dataset provides. Most repartition methods have a parameter "numPartitions". Same for repartitionByRange methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, I have a data source written as DSv1 which provides ability to read the state store in streaming query and rewrite it. While the number of partitions in state store is determined by the number of shuffles in streaming query, the value is not guaranteed to be same across applications.
Furthermore, the data source supports rescale which should repartition to arbitrary number of partitions. It would be weird if I have to say "You should change the Spark configuration to set the target number of partitions."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a commit to address this (HeartSaVioR@485c56b), but you deserve the full credit of this PR and I wouldn't like to take a part of your credit.
That said, I prefer to handle this in follow-up JIRA issue (I'll submit a new PR once this PR is merged), but I'm also OK to address this altogether in this PR (I'll submit a new PR to your repo) if we prefer it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR, I think it is going to be useful for some data sources. I did not want to cover this in first PRs as there was no consensus on the dev list around controlling the number of tasks. That's why I added this topic to non-goals of the design doc.
I think one point to think about is who should control the parallelism. I guess the parallelism should depend on incoming data volume in most cases (except when the number of requested partitions is static, like probably in the case mentioned above). Without having statistics about the number of incoming records or their shape, it will be hard for a data source to determine the right number of partitions.
That being said, I think making that number optional like in your change can be a reasonable starting point. However, I'd like us to think about how this will look like in the future. Should Spark report stats about the incoming batch so that data sources can make a better estimate? How will that API look like?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @aokolnychyi that it should be Spark to decide these physical details (like numShufflePartitions), for better performance. It's an ill-pattern to let the data source to decide it.
BTW why do we use conf.numShufflePartitions here? We can use None so that AQE can decide the number of partitions, which is even better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I missed the change from @viirya that made it optional, @cloud-fan. I can switch to that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern is mostly the "static partitions" as I provided as an example like state data source. That's not a matter of whether it's ill-pattern or not, because for the case the ability of restricting the number of partitions is not optional but "required" - the data should be partitioned exactly the same with Spark partitions the rows for hash shuffle, and a partition shouldn't be written concurrently. I don't think end users should do the repartition manually in their queries to not break a thing.
That is easily achievable in DSv1 (I have an implementation based on DSv1 and want to migrate to DSv2) as Spark provides DataFrame to the data source on write. While I don't expect such flexibility for DSv2 (the behavior seems too open), I'm not sure the case is something we'd like to define as "not supported on DSv2 and have to live with DSv1".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even without the static partitioning, there might be something more to think of - data source may know about the physical information of the actual storage which may be some points on optimizing writes, or on opposite way, throttle on parallelism to not doing effective DDOS by ourselves. I guess it's beyond of the scope on this PR, but just wanted to bring this as a food for thought.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I knew this would require discussion so I propose to do this in a follow-up like @HeartSaVioR suggested.
I don’t feel strongly about this point and it would be great to accommodate all use cases to drive the adoption of DS V2. However, we should be really careful and I think we should continue to discuss.
I still believe the parallelism should depend on data volume in a general case. That’s why it is going to be useful only if Spark propagates stats about the incoming batch.
| import org.apache.spark.sql.types.{IntegerType, StringType, StructType} | ||
| import org.apache.spark.sql.util.QueryExecutionListener | ||
|
|
||
| class WriteDistributionAndOrderingSuite |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we please have same sort of tests in streaming query and see whether all functionalities work with Structured Streaming as well? Repartition may work, but analyzer/optimizer may deny sorting with misunderstanding that the sort should happen globally (across micro-batches) whereas it shouldn't (just need to be per micro-batch).
No need to consider continuous mode - it doesn't support repartition. Though still probably might worth to check whether Spark will throw the right error on that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the PR is already huge enough - it would be OK if we can deal with follow-up issue. Just need to make sure both are addressed in same release (3.2.0 probably).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, bringing this logic to streaming plans would require more effort. That's why there are no tests for that yet. The main problem is the absence of dedicated logical plans (SPARK-27484?) and that we construct StreamingWrite right away.
Right now, MicroBatchExecution creates WriteToMicroBatchDataSource which is replaced with WriteToDataSourceV2 in runBatch. StreamingWrite is constructed while generating the logical plan in createStreamingWrite. It is then wrapped into MicroBatchWrite that implements BatchWrite.
Locally, I had a prototype that introduces the following plans:
AppendDataToMicroBatchDataSourceOverwriteDataInMicroBatchDataSourceUpdateDataInMicroBatchDataSource
Then MicroBatchExecution creates an appropriate logical plan node and we handle those nodes in V2Writes next to batch nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need more feedback from folks who work on Structured Streaming to see whether this is aligned with their thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sunchao, will separate plans for streaming make it easier to invalidate the cache for streaming writes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think SPARK-27484 is something we should deal with. I tried it before and it didn't go well as considering DSv1 and continuous mode weren't so trivial. If I remember correctly on the difficulty of the issue, Sink (DSv1) requires DataFrame which is ready for write - I'm not sure that is something we can include on the logical/physical plan, or even able to inject it later.
But honestly I'm not an expert of Catalyst and that might be a root cause of the difficulty. Probably I'd try it again in near future but it would be also nice if someone is interested on this and try it out in parallel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR, I can share what I had if people think having those plans is reasonable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK we'd probably like to also cc. to those people as well for major SS changes or something needs discussion on SS
@tdas @zsxwing @jose-torres @brkyvz @xuanyuanking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing I can tell is that SS doesn't officially support upsert on sink side - it's either "append" or "truncate and append".
Update mode simply leverages append and defers data source to deal with the upsert without telling which columns are used as (composite) key. That's not quite right but that's just where we are at.
I've initiated discussion around semantic mismatch on output mode in SS and write behavior on sink in dev mailing list, but didn't get enough love on that. http://apache-spark-developers-list.1001551.n3.nabble.com/Output-mode-in-Structured-Streaming-and-DSv1-sink-DSv2-table-td30216.html#a30239
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late, just notice this and thanks for pinging me @HeartSaVioR
+1 for having the streaming writing logical plans. Now we mix the analysis logic with stream write API code. It's harmful to further extension indeed. I'll have a try for SPARK-27484.
.../main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala
Outdated
Show resolved
Hide resolved
|
My major concern is how to specify the required distribution/ordering in the query plan. The current PR chooses to insert the I need more time to think about the tradeoffs between these two approaches, but probably it's better to follow the existing framework. We can add the |
|
@cloud-fan, I agree that doing this via the required child distribution and ordering in the physical nodes seems like the way to go. In fact, that was the first idea that I tried to implement. Unfortunately, I could not make it work for the reasons described in the design doc here. Could you let me know if you see an alternative solution? |
|
@cloud-fan, I guess we could make it work if we really want by extending I am not sure we will gain anything that way but I am open to considering that. |
|
I recalled why the existing framework put It's still OK to add the shuffle/sort at the optimizer phase, and eliminate it later at the physical phase. But it's more natural to not add the shuffle/sort at the first place. The cast is a problem, but if can fix it in |
|
@cloud-fan, let's try to make the required distribution and ordering work. I have one case that I am not sure how to address using this approach: explicit sorts/repartitions that have no effect. We can probably cover cases when adding sorts/repartitions is redundant (as we can check the child distribution and ordering) but I am not sure we can remove sorts/repartitions that have no effect if we don't go through the optimizer. For example, if someone does a manual global sort by (c1, c2) but the table requires a global sort by (c3, c4). Even though the manual sort will have no effect, we will still perform it. That's why we will end up with 2 global sorts. Under the current approach, this case is handled and the manual sort will be removed (thanks to |
|
BTW, the point that sort-merge and shuffle-hash joins have different output ordering is a good one to have in mind. |
Very good point. Perhaps all other operators should add the required shuffle/sort at the optimizer if they can, or we add more physical optimization to remove unnecessary shuffle/sort, but that's unrelated to this PR. Let's move forward with this approach and figure out the best way later. |
|
Sounds good, @cloud-fan. Thanks for thinking this through. I would be happy to reconsider this approach if we have a way to detect redundant repartitions and sorts. |
|
Alright, I've updated this PR and I think there are NO major outstanding questions related to this particular PR. Minor open points to be addressed in this PR:
Other open points to be addressed after this PR:
In order to make progress on this feature, I'd ask everybody to review this PR as I think it is close to go in. We can discuss other points in the follow-up PRs. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #134120 has finished for PR 31083 at commit
|
|
I'm OK to deal with support on Structured Streaming in further follow-up, but it would be really odd if the feature only works with batch query. I thought we only lack tests for SS. Isn't it the case? If this addition doesn't address functionality on SS, I'd feel we should have a JIRA issue as a "blocker" and ensure both would be supported at the time of shipping feature. I can file and submit a follow-up PR for the ability to define the number of partitions. Probably we can discuss further from there. My honest opinion for DSv2 is that it should cover almost every cases for DSv1, so that DSv1 could be deprecated in near future. It's not ideal for us to maintain both DSv1 and DSv2 and eventually we should drop DSv1 to reduce maintenance cost. Removing unnecessary repartitions sounds like further improvements, not a requirement for the PR. |
I agree with this. I've created SPARK-34183.
+1.
+1 too. I've created SPARK-34184 (I'll annotate the ignored tests next). |
sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is one of the required change and looks good to me in general.
As @aokolnychyi summarized,
-
WriteDistributionAndOrderingSuite.scala
+1 for filing JIRAs with the description of those ignored test cases and remove it from this PR.
https://github.com/apache/spark/pull/31083/files#r553252442
https://github.com/apache/spark/pull/31083/files#r553880583 -
InMemoryTable.scala
I believe we can close the comment thread.
#31083 (comment) -
StreamExecution.scala
Let's resolve the comment thread because we keep it for almost two week already after I merged #31093 .
#31083 (comment) -
DistributionAndOrderingUtils.scala
I hope we need to reviveFunctionCatalogPR shortly after merging this PR.
#31083 (comment) -
WriteDistributionAndOrderingSuite.scala
For SS domain feature and its test coverage parity, I'm not sure if that's blocker of 3.2.0 or not.
However, I'm happy about the AS-IS agreement and will not be against keeping it blocker if that gives more visibility.
#31083 (comment) -
DistributionAndOrderingUtils.scala
Let's fix it first since it's obvious.Usage of V2Expression alias (discussed here).
|
@cloud-fan, I tried to implement the suggestion to let AQE set the number of partitions but it changes the output partitioning to be I created SPARK-34230 to reconsider this but will keep the current logic for now. I'd appreciate more input on AQE. |
|
I've updated this PR to address related points. Let me know if I missed something but I think it should be ready for a final review round. |
|
Test build #134461 has started for PR 31083 at commit |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for updating. +1, LGTM.
|
Thanks @aokolnychyi |
This is something we should fix in AQE later. We don't need to block this PR. |
|
thanks, merging to master! |
|
Thanks, @cloud-fan @dongjoon-hyun @sunchao @rdblue @HyukjinKwon @HeartSaVioR! Looking forward to follow-ups. |
…red distribution and ordering ### What changes were proposed in this pull request? This PR adds repartition and sort nodes to satisfy the required distribution and ordering introduced in SPARK-33779. Note: This PR contains the final part of changes discussed in PR apache#29066. ### Why are the changes needed? These changes are the next step as discussed in the [design doc](https://docs.google.com/document/d/1X0NsQSryvNmXBY9kcvfINeYyKC-AahZarUqg3nS1GQs/edit#) for SPARK-23889. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This PR comes with a new test suite. Closes apache#31083 from aokolnychyi/spark-34026. Authored-by: Anton Okolnychyi <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…red distribution and ordering (apache#905) ### What changes were proposed in this pull request? This PR adds repartition and sort nodes to satisfy the required distribution and ordering introduced in SPARK-33779. Note: This PR contains the final part of changes discussed in PR apache#29066. ### Why are the changes needed? These changes are the next step as discussed in the [design doc](https://docs.google.com/document/d/1X0NsQSryvNmXBY9kcvfINeYyKC-AahZarUqg3nS1GQs/edit#) for SPARK-23889. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This PR comes with a new test suite. Closes apache#31083 from aokolnychyi/spark-34026. Authored-by: Anton Okolnychyi <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
|
@aokolnychyi I notice this PR does not handle CTAS and RTAS commands, and it's not easy to handle them in the current design because we can not get the |
What changes were proposed in this pull request?
This PR adds repartition and sort nodes to satisfy the required distribution and ordering introduced in SPARK-33779.
Note: This PR contains the final part of changes discussed in PR #29066.
Why are the changes needed?
These changes are the next step as discussed in the design doc for SPARK-23889.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
This PR comes with a new test suite.