-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-23626][CORE] DAGScheduler blocked due to JobSubmitted event #20770
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
For UT, i see DAGSchedulerSuite currently does not have same behaviour as DAGScheduler as its events posted to DAGSchedulerEventProcessLoopTester are running in same thread unlike DAGScheduler where posted events are processed in separate thread . Can i modify this.? or write a separate suite.? Pls suggest |
| private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]] | ||
| private[scheduler] val stageIdToStage = new HashMap[Int, Stage] | ||
| private[scheduler] val jobIdToStageIds = new TrieMap[Int, HashSet[Int]] | ||
| private[scheduler] val stageIdToStage = new TrieMap[Int, Stage] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need TrieMap for concurrence purpose, since job schedule is FIFO(even if after move createResultStage() ) ?
BTW, out of my curiosity, what's TrieMap advantages compare to ConcurrentHashMap ?
|
@AjithShetty2489 I'm not sure just changing these two maps is sufficient ? For example createResultStage could in turn create all the parent stages and the parents stages could be ShuffleMapStage which in turn means that the map |
|
@squito is the master of DAGSchedulerSuite, and can provide you the best advice on changing or adding to the existing DAGSchedulerSuite. I'll be back from skiing next week and try to find some time to look at this. Hopefully @kayousterhout can find some time too. |
|
@Ngone51 Thanks for the question. I am not quite sure of actual implementation of Triemap but i see but i measure the performance of ConcurrentHashMap vs TrieMap with basic put and get operation, it seems TrieMap is slower than ConcurrentHashmap |
|
@shivaram yes, you are right. Let me recheck |
|
took a quick look, agree with shivaram's observations, you've got to handle |
|
Can one of the admins verify this patch? |
|
@ajithme so have you got some time to recheck? |
|
got caught up with something, will relook into this |
|
Closing this as attempted a new approach, please check #24438 |
What changes were proposed in this pull request?
DAGScheduler becomes a bottleneck in cluster when multiple JobSubmitted events has to be processed as DAGSchedulerEventProcessLoop is single threaded and it will block other tasks in queue like TaskCompletion.
The JobSubmitted event is time consuming depending on the nature of the job (Example: calculating parent stage dependencies, shuffle dependencies, partitions) and thus it blocks all the events to be processed.
Similarly in my cluster some jobs partition calculation is time consuming (Similar to stack at SPARK-2647) hence it slows down the spark DAGSchedulerEventProcessLoop which results in user jobs to slowdown, even if its tasks are finished within seconds, as TaskCompletion Events are processed at a slower rate due to blockage.
Move the ResultStage creation to call site thread, which will avoid blocking of DAGScheduler thread for other events
Refer: http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-Scheduler-Spark-DAGScheduler-scheduling-performance-hindered-on-JobSubmitted-Event-td23562.html
How was this patch tested?
Manual test to verify blockage before and after applying patch.