Skip to content

Conversation

@venkata91
Copy link
Contributor

@venkata91 venkata91 commented Oct 27, 2020

What changes were proposed in this pull request?

Driver side changes for coordinating push based shuffle by selecting external shuffle services for merging partitions.

This PR includes changes related to ShuffleMapStage preparation which is selection of merger locations and initializing them as part of ShuffleDependency.

Currently this code is not used as some of the changes would come subsequently as part of https://issues.apache.org/jira/browse/SPARK-32917 (shuffle blocks push as part of ShuffleMapTask), https://issues.apache.org/jira/browse/SPARK-32918 (support for finalize API) and https://issues.apache.org/jira/browse/SPARK-32920 (finalization of push/merge phase). This is why the tests here are also partial, once these above mentioned changes are raised as PR we will have enough tests for DAGScheduler piece of code as well.

Why are the changes needed?

Added a new API in SchedulerBackend to get merger locations for push based shuffle. This is currently implemented for Yarn and other cluster managers can have separate implementations which is why a new API is introduced.

Does this PR introduce any user-facing change?

Yes, user facing config to enable push based shuffle is introduced

How was this patch tested?

Added unit tests partially and some of the changes in DAGScheduler depends on future changes, DAGScheduler tests will be added along with those changes.

Lead-authored-by: Venkata krishnan Sowrirajan [email protected]
Co-authored-by: Min Shen [email protected]

…huffle by selecting external shuffle services for merging partitions
@venkata91 venkata91 changed the title [SPARK-32919][CORE] Driver side changes for coordinating push based s… [SPARK-32919][SHUFFLE] Driver side changes for coordinating push based s… Oct 27, 2020
@venkata91 venkata91 changed the title [SPARK-32919][SHUFFLE] Driver side changes for coordinating push based s… [SPARK-32919][SHUFFLE] Driver side changes for coordinating push based shuffle by selecting external shuffle services for merging partitions Oct 27, 2020
@venkata91
Copy link
Contributor Author

@tgravescs
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Oct 28, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34979/

@SparkQA
Copy link

SparkQA commented Oct 28, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34979/

@tgravescs
Copy link
Contributor

the paper talks about:
we choose Magnet shu�e ser-vices in locations beyond the active Spark executors, andlaunch Spark executors later via dynamic allocation basedon locations of the chosen Magnet shu�e services. Thisway, instead of choosing Magnet shu�e services based onSpark executor locations, we launch Spark executors basedon locations of Magnet shu�e services. This optimization ispossible because of Magnet's integration with Spark native

I know this was also talked about in the SPIP. The current implementation seems to not do this. can the description here please be updated to state what it does and why. Is this something to be PR'd later or just talked about as a possibility

@Victsm
Copy link
Contributor

Victsm commented Oct 28, 2020

@tgravescs
What we have is the same as what's described in the paper and the SPIP doc.
For handling DRA, we are essentially doing 2 things:

  1. Choose shuffle service locations beyond the current active Spark executors.
  2. Launching Spark executors with DRA based on locations of the chosen shuffle services.

This PR enables the first.
By keeping track of all historical locations of executors launched for a given Spark application, we get 2 benefits.

  1. When DRA kicks in later on, and significantly reduces the number of available active executors, we can still look into the historical locations of past executors to get sufficient shuffle service locations to perform block push/merge.
  2. On a YARN cluster with authentication enabled, picking historical locations of past executors would ensure that the executor can talk to the shuffle service performing SASL authentication, and upon application finishing up the local dirs storing the merged shuffle files get cleaned up.

In a follow up patch for driver side change (MapOutputTracker#getPreferredLocationsForShuffle), the second is enabled.
Preferred location for shuffle now takes into consideration of shuffle service locations for a given shuffle.
This would set the preferred locations for the corresponding ShuffleRDD as well, which would then have 2 impacts.

  1. When TaskSetManager schedules tasks to executors, this would impact the task placement strategy.
  2. When ExecutorAllocationManager requests more executors for DRA, this preferred location would be passed to YARN to request containers with the preferred locality.

@SparkQA
Copy link

SparkQA commented Oct 28, 2020

Test build #130376 has finished for PR 30164 at commit 2688df2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

thanks for the explanation

* at the same time. Will improve this in a later version.
*/
def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = {
conf.get(PUSH_BASED_SHUFFLE_ENABLED) && conf.get(SHUFFLE_SERVICE_ENABLED)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I look forward to the day when the second condition will be disabled :-)
It will be relevant for both k8s and spark streaming !

+CC @dongjoon-hyun you might be interested in this in future.

@SparkQA
Copy link

SparkQA commented Nov 2, 2020

Test build #130533 has finished for PR 30164 at commit 0423970.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class GetShufflePushMergerLocations(numMergersNeeded: Int, hostsToFilter: Set[String])

@SparkQA
Copy link

SparkQA commented Nov 2, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35133/

@SparkQA
Copy link

SparkQA commented Nov 2, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35133/

@SparkQA
Copy link

SparkQA commented Nov 3, 2020

Test build #130575 has finished for PR 30164 at commit 3a6219f.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 3, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35176/

@SparkQA
Copy link

SparkQA commented Nov 3, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35176/

@SparkQA
Copy link

SparkQA commented Nov 5, 2020

Test build #130628 has finished for PR 30164 at commit ca44d03.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 18, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35904/

@SparkQA
Copy link

SparkQA commented Nov 18, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35906/

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Test build #131302 has finished for PR 30164 at commit 050a5ae.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Test build #131298 has finished for PR 30164 at commit 050a5ae.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Test build #131303 has finished for PR 30164 at commit 1714829.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ExecutorSource(

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Test build #131297 has finished for PR 30164 at commit 2849051.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mridulm
Copy link
Contributor

mridulm commented Nov 19, 2020

The scala-2.13 failure is unrelated to this PR, and is getting fixed.
Given all review comments have been addressed, I will merge this once there is a clean build.
@venkata91 If there are pending commit's, please let me know - I saw some recent changes made after reviews.

@venkata91
Copy link
Contributor Author

The scala-2.13 failure is unrelated to this PR, and is getting fixed.
Given all review comments have been addressed, I will merge this once there is a clean build.
@venkata91 If there are pending commit's, please let me know - I saw some recent changes made after reviews.

No thats it, only addressing @dongjoon-hyun comments and merging the upstream master as it had some merge conflicts.

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35971/

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35971/

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Test build #131367 has finished for PR 30164 at commit 1ba7668.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class LikeAllBase extends UnaryExpression with ImplicitCastInputTypes with NullIntolerant
  • case class LikeAll(child: Expression, patterns: Seq[UTF8String]) extends LikeAllBase
  • case class NotLikeAll(child: Expression, patterns: Seq[UTF8String]) extends LikeAllBase
  • implicit class MetadataColumnsHelper(metadata: Array[MetadataColumn])

@mridulm
Copy link
Contributor

mridulm commented Nov 20, 2020

Can you update the PR @venkata91 ? The failures are unrelated, but want to make sure - the upstream changes should help fix the issues. Thx !

@venkata91
Copy link
Contributor Author

Can you update the PR @venkata91 ? The failures are unrelated, but want to make sure - the upstream changes should help fix the issues. Thx !

Done. Thanks.

@mridulm
Copy link
Contributor

mridulm commented Nov 20, 2020

ok to test

@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35981/

@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35982/

@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35982/

@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35981/

@mridulm
Copy link
Contributor

mridulm commented Nov 20, 2020

+CC @dongjoon-hyun Looks like the test failures in jenkins are from executor decomissioning. The github actions went through fine.
Is this known to be flakey ? Thx

@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Test build #131378 has finished for PR 30164 at commit 5ce2934.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

I agree with you. @mridulm . Please proceed to merge.

@dongjoon-hyun
Copy link
Member

Due to the K8s IT flakiness of - Test basic decommissioning *** FAILED ***, I have been monitoring it.

@asfgit asfgit closed this in 8218b48 Nov 20, 2020
@mridulm
Copy link
Contributor

mridulm commented Nov 20, 2020

Merging to master, thanks for working on this @venkata91 !
Thanks for the reviews, @tgravescs, @Ngone51, @attilapiros, @Victsm, @otterc, @dongjoon-hyun

@dongjoon-hyun
Copy link
Member

Thank you all!

FYI, K8s IT is happy on master branch with this patch.

@venkata91
Copy link
Contributor Author

Merging to master, thanks for working on this @venkata91 !
Thanks for the reviews, @tgravescs, @Ngone51, @attilapiros, @Victsm, @otterc, @dongjoon-hyun

Thanks Mridul for shepherding this all along :)

wangyum pushed a commit that referenced this pull request May 26, 2023
…s for coordinating push based shuffle by selecting external shuffle services for merging partitions

Driver side changes for coordinating push based shuffle by selecting external shuffle services for merging partitions.

This PR includes changes related to `ShuffleMapStage` preparation which is selection of merger locations and initializing them as part of `ShuffleDependency`.

Currently this code is not used as some of the changes would come subsequently as part of https://issues.apache.org/jira/browse/SPARK-32917 (shuffle blocks push as part of `ShuffleMapTask`), https://issues.apache.org/jira/browse/SPARK-32918 (support for finalize API) and https://issues.apache.org/jira/browse/SPARK-32920 (finalization of push/merge phase). This is why the tests here are also partial, once these above mentioned changes are raised as PR we will have enough tests for DAGScheduler piece of code as well.

Added a new API in `SchedulerBackend` to get merger locations for push based shuffle. This is currently implemented for Yarn and other cluster managers can have separate implementations which is why a new API is introduced.

Yes, user facing config to enable push based shuffle is introduced

Added unit tests partially and some of the changes in DAGScheduler depends on future changes, DAGScheduler tests will be added along with those changes.

Lead-authored-by: Venkata krishnan Sowrirajan vsowrirajanlinkedin.com
Co-authored-by: Min Shen mshenlinkedin.com

Closes #30164 from venkata91/upstream-SPARK-32919.

Lead-authored-by: Venkata krishnan Sowrirajan <[email protected]>
Co-authored-by: Min Shen <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants