-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-11334][CORE] Fix bug in Executor allocation manager in running tasks calculation #19580
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… tasks calculation
|
Could you fix the bug number (SPARK-11334)? |
|
Test build #83089 has finished for PR 19580 at commit
|
| */ | ||
| def totalRunningTasks(): Int = numRunningTasks | ||
| def totalRunningTasks(): Int = { | ||
| stageIdToNumRunningTask.values.sum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be inside allocationManager.synchronized, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind, this is called from a synchronized context. Except in your unit tests, that is (which call the privatetotalRunningTasks you added to the manager).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be nice to make the other method calling this synchronized, just to be paranoid.
| s"when it is already pending to be removed!") | ||
| return false | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: no need for this change.
| assert(numExecutorsToAdd(manager) === 1) | ||
| } | ||
|
|
||
| test("Ignore task end events from completed stages") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: lower case "ignore" to match other tests.
| allocationManager.synchronized { | ||
| numRunningTasks += 1 | ||
| if (stageIdToNumRunningTask.contains(stageId)) { | ||
| stageIdToNumRunningTask(stageId) = stageIdToNumRunningTask(stageId) + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this can be changed to stageIdToNumRunningTask(stageId) += 1
| (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor | ||
| } | ||
|
|
||
| private def totalRunningTasks(): Int = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like no one invoke this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is being called from the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why do we need to add a method which only used for unit test. If want to verify the behavior of totalRunningTasks, I think maxNumExecutorsNeeded can also be used indirectly for verification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its okay to add a method which is used for unit testing purpose only. I am not inclined towards the idea of using maxNumExecutorsNeeded to indirectly verify totalRunningTasks for the following reason -
Currently, the test case is testing what it is supposed to. If you check for maxNumExecutorsNeeded instead, it might not be clear what we are testing.
| allocationManager.synchronized { | ||
| numRunningTasks -= 1 | ||
| if (stageIdToNumRunningTask.contains(stageId)) { | ||
| stageIdToNumRunningTask(stageId) = stageIdToNumRunningTask(stageId) - 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
|
Test build #83099 has finished for PR 19580 at commit
|
|
Test build #83101 has finished for PR 19580 at commit
|
|
Test build #83107 has finished for PR 19580 at commit
|
|
jenkins, retest this please. |
|
Jenkins, retest this please. |
|
Test build #83257 has finished for PR 19580 at commit
|
|
Merging to master / 2.2 / 2.1. |
|
failed to merge to 2.2... |
What changes were proposed in this pull request?
We often see the issue of Spark jobs stuck because the Executor Allocation Manager does not ask for any executor even if there are pending tasks in case dynamic allocation is turned on. Looking at the logic in Executor Allocation Manager, which calculates the running tasks, it can happen that the calculation will be wrong and the number of running tasks can become negative.
How was this patch tested?
Added unit test