-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[WIP][SPARK-31198][CORE] Use graceful decommissioning as part of dynamic scaling #28818
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #123956 has finished for PR 28818 at commit
|
agrawaldevesh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @holdenk .. Using proper decommissioning for dynamic allocation would be great and help unify these codepaths. Thank you again for working on this.
My two comments below are mere nits and just to make sure I am following along. It reads fine as is.
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
Outdated
Show resolved
Hide resolved
|
Test build #123995 has finished for PR 28818 at commit
|
ef3f523 to
7691d2d
Compare
|
Test build #124139 has finished for PR 28818 at commit
|
| * committed. | ||
| */ | ||
| event.blockUpdatedInfo.blockId match { | ||
| case ShuffleDataBlockId(shuffleId, _, _) => exec.addShuffle(shuffleId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are touching ExecutorMonitor, when do we have a counter operation, exec.removeShuffle? In this PR, it seems that executorsKilled is used. Is it enough?
cc @dbtsai
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah so since we're only doing migrations during decommissioning, whatever shuffle files remain on the host when it dies will do the cleanup. I can't think of why we would need to do a delete operation here as well, but if it would be useful for your follow on work I can add it?
|
Test build #124154 has finished for PR 28818 at commit
|
9bb0293 to
81f9dbb
Compare
|
Test build #124179 has finished for PR 28818 at commit
|
|
Test build #124181 has finished for PR 28818 at commit
|
|
cc @tgravescs too |
|
To be clear this a WIP PR and not yet ready for review. I created it to give additional context after talking with @agrawaldevesh about our respective goals. I'll try and get a "read for review" PR out next week. |
|
Just as a follow up since @HyukjinKwon requested an SPIP for this work I won't be moving this PR out of WIP this week as originally planned. |
agrawaldevesh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One question inline please.
I didn't quite follow why this PR is rebased on top of https://issues.apache.org/jira/browse/SPARK-31197 and https://issues.apache.org/jira/browse/SPARK-20629. They seem orthogonal to me. Can you please update the PR description to reflect the relationship b/w this PR and these Jira tickets. Thanks !
| */ | ||
| def decommissionExecutor(executorId: String): Boolean = { | ||
| val decommissionedExecutors = decommissionExecutors(Seq(executorId), | ||
| adjustTargetNumExecutors = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is adjustTargetNumExecutors defaulting to true here ? This would mean that all schedulers would try to replinish the executor when asked to DecommissionExecutor(...) -- for example by the Master or when an executor gets a SIGPWR.
I think it shouldn't be the default -- it should atleast be configurable. It only makes sense to have adjustTargetNumExecutors=true when called from org.apache.spark.streaming.scheduler.ExecutorAllocationManager#killExecutor (ie when it is truly called from dynamic allocation codepath and we have decided that we want to replinish the executor).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you look above there is a configurable call. This matches how killExecutor is implemented down on line 124.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please point me to where is the configurable call ? I don't see a config check in the code paths that call this method.
It's fine for killExecutor to unconditionally adjust the target number of executors because it is only called in the dynamic allocation codepath, but decommissionExecutor would be called from many other codepaths as well (for example when the driver gets a DecommissionExecutor message) -- and thus I think it should just assume that it should replenish the executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Look on line 95 of this file. I think we should match the semantics of killExecutor as much as possible. If there's a place where we don't want it we can use decommissionExecutors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, Should we rename decommissionExecutor (singular) to decommissionAndKillExecutor to reflect its purpose better ? It would be too easy to confuse it with decommissionExecutors (on line 95 of this file which allows to not replenish the target number of executors).
Do you want to make the change to the callers of decommissionExecutor in this PR and switch them to decommissioExecutors(Seq(executorId), false) instead. The ones I am most concerned about are:
- The handling of message DecommissionExecutor (both sync and async variants) in CoarseGrainedSchedulerBackend
- StandaloneSchedulerBackend.executorDecommissioned
In both the above cases, I think we may not always want replenishing. For example, in the standalone case, when the Worker gets a SIGPWR -- do we want to replenish the executors on the remaining workers (ie oversubscribe the remaining workers) ? Similarly when an executor gets a SIGPWR, do we want to put that load on the remaining executors ? I think the answer to both should be NO unless we are doing a dynamic allocation.
Personally I am fine with any choice of naming here as long as the semantics are not silently changed under the cover, as is the case presently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a new function, what are we changing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExecutorAllocationClient is a base class of CoarseGrainedSchedulerBackend. We moved decommissionExecutor from the latter class to the former and as such it is not a new function. Since CoarseGrainedSchedulerBackend no longer overrides decommissionExecutor, ExecutorAllocationClient.decommissionExecutor will be called when CoarseGrainedSchedulerBackend gets a DecommissionExecutor message -- and the semantics of that codepath have been changed to unconditionally impose adjustTargetNumExecutors=true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool, I'll update the previous calls to decommissioExecutor
2674055 to
9565c40
Compare
|
@agrawaldevesh So this PR depends on the behaviour of the VM eventually exiting (https://issues.apache.org/jira/browse/SPARK-31197 ) since it's replacing the usage of killExecutor during dynamic allocation. |
| /** | ||
| * Mark a given executor as decommissioned and stop making resource offers for it. | ||
| */ | ||
| private def decommissionExecutor(executorId: String): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk ... to answer your question: This block of code was moved to the base class ExecutorAllocationClient. So the code in ExecutorAllocationClient is not "New". Furthermore, the semantics of this code were changed as it was moved to now unconditionally replenish the executors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
|
Test build #126273 has finished for PR 28818 at commit
|
|
Test build #126281 has finished for PR 28818 at commit
|
3db60f2 to
daf96dd
Compare
|
Test build #126292 has finished for PR 28818 at commit
|
Because the mock always says there is an RDD we may replicate more than once, and now that there are independent threads
…we don't scale down too low, update the streaming ExecutorAllocationManager to also delegate to decommission Fix up executor add for resource profile
…eanup the locks we use in decommissioning and clarify some more bits.
…ster manager are re-launched
…ion manager suite.
…o that we can match the pattern for killExecutor/killExecutors
This reverts commit daf96dd.
daf96dd to
f921ddd
Compare
|
Test build #126435 has finished for PR 28818 at commit
|
|
@holdenk can this PR be abandoned/closed now since this is finally in ? |
This is WIP since it is on top of SPARK-31197 (which itself is WIP on top off SPARK-20629 ) and should probably have more testing. We use SPARK-31197's exiting of the executor once decommissioning is finished to allow us to replace the usage of killExecutor with decommission executor when enabled during dynamic allocation.
What changes were proposed in this pull request?
If graceful decommissioning is enabled, Spark's dynamic scaling uses this instead of directly killing executors.
Why are the changes needed?
When scaling down Spark we should avoid triggering recomputes as much as possible.
Does this PR introduce any user-facing change?
Hopefully their jobs run faster. It also enables experimental shuffle service free decommissioning when graceful decommissioning is enabled.
How was this patch tested?
For now I've extended the ExecutorAllocationManagerSuite to cover this.