-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-32921][SHUFFLE] MapOutputTracker extensions to support push-based shuffle #30480
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Seems all recent PRs in Spark are all failing the build at the javadoc step. |
|
ok to test |
|
Test build #131687 has finished for PR 30480 at commit
|
|
Test build #131691 has finished for PR 30480 at commit
|
|
Test build #131710 has finished for PR 30480 at commit
|
|
Retest this please |
|
Test build #131807 has finished for PR 30480 at commit
|
|
@tgravescs @attilapiros @Ngone51 @jiangxb1987 @otterc @mridulm @dongjoon-hyun PR ready for review. |
|
Test build #131792 has finished for PR 30480 at commit
|
core/src/main/scala/org/apache/spark/scheduler/MergeStatus.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary? (except the tests)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only used in test, will clarify.
|
Test build #132071 has finished for PR 30480 at commit
|
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the work !
Took a pass through it @Victsm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If fetchMergeResult == true, is it right that there is an expectation that (mapOutputStatuses == null) == (mergeResultStatuses == null) ?
If yes, can we simplify this ?
a) Make this method simpler by using that condition.
b) Do we have any usecase for GetMergeResultStatuses withough also fetching GetMapOutputStatuses immediately before ? If not, combine both to avoid two rpc's when fetchMergeResult == true ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not always true.
We currently fetch map status and merge status using 2 separate RPCs.
Although the fetching of these statuses is guarded by the lock, the initial check at line 1113 for these statuses being not null is out of the lock.
So, it would be possible that a task might see the map status being non-null while merge status being null.
We always need to fetch both map status and merge status together, either during the initial fetch or during fallback.
Combine both RPCs into 1 would increase the code complexity.
For now, the RPC just returns the pre-serialized bytes for either MapStatus array or MergeStatus array.
If we want to combine both into a single RPC, we would need to define additional RPC messages so that we can encode the 2 byte arrays for serialized MapStatus array and MergeStatus array together.
Combining both together does not seem to bring enough benefits.
We haven't observed any issue indicating Spark driver performance regression with doubling the number of RPCs for fetching shuffle statuses.
This would also help to keep code simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect multiple rpc's to not be the preferred option given the impact on driver, but code simplicity needs to be balanced against that.
+CC @JoshRosen, @Ngone51 who last made changes here. Any thoughts on this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to combine them. Actually, the first time when I reviewed this PR, I began to think about a unified way to provide a consistent API for both map status and merged status in MapOutputTracker & ShuffleStatus. Unfortunately, I didn't get a good idea.
I think one RPC would ease the error handling for us. Not sure how much complexity you'd expect?
And I'd suggest adding an additional new RPC for the combined case and leave the current one as it is, so that we don't affect the existing code path when push-based shuffle disabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One option could be replace GetMergeResultStatuses with GetMapOutputAndMergeResultStatuses.
That keeps non push based shuffle codepaths unchanged, and when push based shuffle is enabled, a single rpc handles the response : the code change would mirror what has been done for GetMergeResultStatuses already.
Thoughts @Ngone51, @Victsm, @venkata91 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the current RPC (RpcCallContext) mechanism with MapOutputTracker, we can only send one response as oppose to other RPC mechanisms with in Spark. If we have to combine getting both MapStatuses and MergeStatuses when push based shuffle is enabled, then we have couple of options:
- Encode both MapStatuses and MergeStatuses in the same
Array[Byte]getting returned fromserializedOutputStatuswith some encoding scheme likelengthin bytes of mapStatuses as first part and then mapStatuses similarly for mergeStatuses and in thedeserializeOutputStatuseswe have to decode it accordingly for the output ofGetMapOutputAndMergeResultStatusesRPC call. This is some what not a cleaner approach as the client keeps the semantics of encoding/decoding of the byte array instead of the RPC layer itself. Although this is already being done wrt whether the mapStatuses are aDIRECTfetch orBROADCASTfetch. - If not, we might need to make changes to
RpcCallContextin order to respond with 2 byte arrays. This seems to be lot of additional overhead just for this purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is an implementation detail of what is response of GetMapOutputAndMergeResultStatuses right ?
It can simply be encoding of Array[Array[Byte]] (for example) - where result(0) is for MapStatus and result(1) is for MergeStatus - keeping everything else same ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine with Tuple as well.
+CC @Ngone51 in case you have any other thoughts.
|
Just to provide an update here on this PR. |
1. Handling of MergeResults from the executors in MapOutputTracker
2. Shuffle merge finalization in DagScheduler
This also includes the following changes:
- LIHADOOP-52972 Tests for changes in MapOutputTracker and DagScheduler related to pushbased shuffle.
Author: Chandni Singh <[email protected]>
- LIHADOOP-52202 Utility to create a directory with 770 permission.
Author: Chandni Singh <[email protected]>
- LIHADOOP-52972 Moved isPushBasedShuffleEnabled to Utils and added a unit test for it.
Author: Ye Zhou <[email protected]>
…scheduler encounters a shuffle chunk failure RB=2151376 BUG=LIHADOOP-54115 G=spark-reviewers R=yezhou,mshen A=mshen
… a shuffle chunk fails
|
ok to test |
|
Looks good to me, thanks for the changes @venkata91 |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #137398 has finished for PR 30480 at commit
|
|
@mridulm It seems like it is still failing, not sure why these tests are failing. I ran the failing tests on my laptop that worked fine. Checked the |
| if (fetchedMapStatuses == null || fetchedMergeStatuses == null) { | ||
| logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint) | ||
| val fetchedBytes = | ||
| askTracker[(Array[Byte], Array[Byte])](GetMapAndMergeResultStatuses(shuffleId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may miss some discussion after my last discussion, I think this breaches our decision made before:
we won't affect the existing code path in the case of map status only.
I think you can return the mapstatus only at the sender side to keep the same behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean separate out the handling of both GetMapStatusMessage and GetMapAndMergeStatusMessage to avoid returning (mapStatuses, null) in the case of GetMapStatusMessage and keep it the same way as it is before, just returning mapStatuses?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. (cc @mridulm)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ngone51 I updated the PR assuming that is what you meant with your above comment. Let me know if thats not the case.
| if (shuffleStatus != null) { | ||
| // Check if the map output is pre-merged and if the merge ratio is above the threshold. | ||
| // If so, the location of the merged block is the preferred location. | ||
| val preferredLoc = if (pushBasedShuffleEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this path need to respect shuffleLocalityEnabled too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we should make it consistent, but there's also clear difference between locality calculation for push-based shuffle and the original shuffle.
My understanding of the reason for adding this flag is due to the potentially costly computation for shuffle locality in the original shuffle.
For push based shuffle, that cost is no longer a concern, and the reducer task can achieve much much better locality.
Always calculating shuffle locality is preferred.
593f092 to
e62e953
Compare
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #137544 has finished for PR 30480 at commit
|
|
Test build #137556 has finished for PR 30480 at commit
|
|
Test build #137557 has finished for PR 30480 at commit
|
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #137657 has finished for PR 30480 at commit
|
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good to me.
+CC @Ngone51, @attilapiros, @tgravescs for another pass before merging...
|
ok to test |
|
Merged to master, thanks for working on this @venkata91 and @Victsm ! |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
…sed shuffle ### What changes were proposed in this pull request? This is one of the patches for SPIP SPARK-30602 for push-based shuffle. Summary of changes: - Introduce `MergeStatus` which tracks the partition level metadata for a merged shuffle partition in the Spark driver - Unify `MergeStatus` and `MapStatus` under a single trait to allow code reusing inside `MapOutputTracker` - Extend `MapOutputTracker` to support registering / unregistering `MergeStatus`, calculate preferred locations for a shuffle taking into consideration of merged shuffle partitions, and serving reducer requests for block fetching locations with merged shuffle partitions. The added APIs in `MapOutputTracker` will be used by `DAGScheduler` in SPARK-32920 and by `ShuffleBlockFetcherIterator` in SPARK-32922 ### Why are the changes needed? Refer to SPARK-30602 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests. Lead-authored-by: Min Shen mshenlinkedin.com Co-authored-by: Chandni Singh chsinghlinkedin.com Co-authored-by: Venkata Sowrirajan vsowrirajanlinkedin.com Closes apache#30480 from Victsm/SPARK-32921. Lead-authored-by: Venkata krishnan Sowrirajan <[email protected]> Co-authored-by: Min Shen <[email protected]> Co-authored-by: Chandni Singh <[email protected]> Co-authored-by: Chandni Singh <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…sed shuffle This is one of the patches for SPIP SPARK-30602 for push-based shuffle. Summary of changes: - Introduce `MergeStatus` which tracks the partition level metadata for a merged shuffle partition in the Spark driver - Unify `MergeStatus` and `MapStatus` under a single trait to allow code reusing inside `MapOutputTracker` - Extend `MapOutputTracker` to support registering / unregistering `MergeStatus`, calculate preferred locations for a shuffle taking into consideration of merged shuffle partitions, and serving reducer requests for block fetching locations with merged shuffle partitions. The added APIs in `MapOutputTracker` will be used by `DAGScheduler` in SPARK-32920 and by `ShuffleBlockFetcherIterator` in SPARK-32922 Refer to SPARK-30602 No Added unit tests. Lead-authored-by: Min Shen mshenlinkedin.com Co-authored-by: Chandni Singh chsinghlinkedin.com Co-authored-by: Venkata Sowrirajan vsowrirajanlinkedin.com Closes #30480 from Victsm/SPARK-32921. Lead-authored-by: Venkata krishnan Sowrirajan <[email protected]> Co-authored-by: Min Shen <[email protected]> Co-authored-by: Chandni Singh <[email protected]> Co-authored-by: Chandni Singh <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
What changes were proposed in this pull request?
This is one of the patches for SPIP SPARK-30602 for push-based shuffle.
Summary of changes:
MergeStatuswhich tracks the partition level metadata for a merged shuffle partition in the Spark driverMergeStatusandMapStatusunder a single trait to allow code reusing insideMapOutputTrackerMapOutputTrackerto support registering / unregisteringMergeStatus, calculate preferred locations for a shuffle taking into consideration of merged shuffle partitions, and serving reducer requests for block fetching locations with merged shuffle partitions.The added APIs in
MapOutputTrackerwill be used byDAGSchedulerin SPARK-32920 and byShuffleBlockFetcherIteratorin SPARK-32922Why are the changes needed?
Refer to SPARK-30602
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added unit tests.
Lead-authored-by: Min Shen [email protected]
Co-authored-by: Chandni Singh [email protected]
Co-authored-by: Venkata Sowrirajan [email protected]