-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-27024] Executor interface for cluster managers to support GPU and other resources #24406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #104710 has finished for PR 24406 at commit
|
|
test failure looks unrelated going to kick again |
|
Jenkins, test this please |
|
Test build #104715 has finished for PR 24406 at commit
|
|
@squito @srowen @mengxr @jiangxb1987 if anyone has time |
srowen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some more style comments to start plus question about units
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
Outdated
Show resolved
Hide resolved
|
Thanks for the comments, at spark summit so will likely update next week. |
|
Test build #105035 has finished for PR 24406 at commit
|
|
Test build #105056 has finished for PR 24406 at commit
|
|
Test build #105057 has finished for PR 24406 at commit
|
jiangxb1987
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks very good, only some nits.
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
Outdated
Show resolved
Hide resolved
|
To clarify, will the driver side resource discovery being added in a separated following PR? |
|
The driver resource discovery will be done in separate PR |
| // check that script exists and try to execute | ||
| if (scriptFile.exists()) { | ||
| try { | ||
| val output = executeAndGetOutput(Seq(script.get), new File(".")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In standalone mode, worker needs to run the script provided by users. Could you leave a TODO inline?
| // requirements (spark.task.resource.*) and that they match the executor configs | ||
| // specified by the user (spark.executor.resource.*) to catch mismatches between what | ||
| // the user requested and what resource manager gave or what the discovery script found. | ||
| private def checkExecResourcesMeetTaskRequirements( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have a general function works for both driver and executor. Essentially, the request is resourcePrefix -> count and the resource is resourcePrefix -> addresses. If they don't match, we just include resourcePrefix in the error message, which is clear to users whether it is driver or executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't completely follow what you are asking for here. We are comparing 3 things
spark.task.resource.* -> count with spark.executor.resource.* -> count and with the actual found in the script or pass in which is a Map[resourceName, ResourceInformation]. You can't have the resourceprefix on the type else it won't compare properly to the Map[resourceName]
I can certainly make it more generic to handle both executor and driver and I made some code changes to go that way but I would prefer to wait til the jira that implements the Driver side to finish that to make sure we don't need anything else. This function will likely have to move somewhere anyway.
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
Outdated
Show resolved
Hide resolved
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just took a brief look, but seems reasonable.
If the script fails, the user would have to get the logs of this executor to try to figure out what went wrong, right? That's OK, I guess, just checking on the error behavior
core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala
Outdated
Show resolved
Hide resolved
|
Yes if the script fails its only in the executor logs |
|
@mengxr I'm not sure if its a github issues or what but your comment on standalone wouldn't let me comment from this page, I had to go into the changes page, so please go there to see my response. I can add something there but not exactly sure what you were asking for |
…ed-executor-clean
|
upmerging to the latest to fix the merge conflict |
|
Test build #105323 has finished for PR 24406 at commit
|
|
Test build #105322 has finished for PR 24406 at commit
|
|
Ok to test |
|
Test this please |
|
Jenkins, test this please |
|
Test build #105330 has finished for PR 24406 at commit
|
| // check that script exists and try to execute | ||
| if (scriptFile.exists()) { | ||
| try { | ||
| val output = executeAndGetOutput(Seq(script.get), new File(".")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, I was thinking about users might be able to let manager/executor run arbitrary scripts.
|
|
||
| test("Resource discoverer multiple gpus") { | ||
| val sparkconf = new SparkConf | ||
| assume(!(Utils.isWindows)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know what happens if user tries to run the test on Windows. Does it fail or silently skip? I don't have a Windows machine to verify. But if other tests are doing this, we might just follow.
|
LGTM. cc: @srowen @squito @jiangxb1987 |
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
Outdated
Show resolved
Hide resolved
|
Test build #105363 has finished for PR 24406 at commit
|
jiangxb1987
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
Outdated
Show resolved
Hide resolved
| // requirements and that they match the configs specified by the user to catch | ||
| // mismatches between what the user requested and what resource manager gave or | ||
| // what the discovery script found. | ||
| private def checkResourcesMeetRequirements( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had an offline discussion with @WeichenXu123 . He suggested refactoring this check to make it easier to read. Now the arguments are:
reqResourcesAndCounts: request per task (not per executor)actualResources: resources allocated per executor
It is not easy to tell from the variable names and hence make the code harder to read. Basically we need the following:
- number allocated per executor cannot be smaller than requested count for each resource name
- requested count for executor cannot be smaller than requested count for task for each resource name. Note that this doesn't require resource discovery.
- the set of requested resource names for executors should match the set of requested resource names for tasks.
It would be nice to refactor the method into those three. We can also do it in a follow-up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a comment to the driver jira since it needs to be refactored for that anyway:
https://issues.apache.org/jira/browse/SPARK-27488
|
Test build #105372 has finished for PR 24406 at commit
|
|
I merged this, thanks for the reviews. |
What changes were proposed in this pull request?
Add in GPU and generic resource type allocation to the executors.
Note this is part of a bigger feature for gpu-aware scheduling and is just how the executor find the resources. The general flow :
In this pr I added configs and arguments to the executor to be able discover resources. The argument to the executor is intended to be used by standalone mode or other cluster managers that don't have isolation so that it can assign specific resources to specific executors in case there are multiple executors on a node. The argument is a file contains JSON Array of ResourceInformation objects.
The discovery script is meant to be used in an isolated environment where the executor only sees the resources it should use.
Note that there will be follow on PRs to add other parts like the scheduler part. See the epic high level jira: https://issues.apache.org/jira/browse/SPARK-24615
How was this patch tested?
Added unit tests and manually tested.
Please review http://spark.apache.org/contributing.html before opening a pull request.