-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-36052][K8S] Introducing a limit for pending PODs #33492
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
...netes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
Show resolved
Hide resolved
| var notRunningPodCountForRpId = | ||
| currentPendingExecutorsForRpId.size + schedulerKnownPendingExecsForRpId.size + | ||
| newlyCreatedExecutorsForRpId.size + schedulerKnownNewlyCreatedExecsForRpId.size | ||
| val podCountForRpId = currentRunningCount + notRunningPodCountForRpId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This again a rename to avoid "known" prefix as it not for scheduler known PODs but PODs for this resource profile.
| currentTime - createTime > executorIdleTimeout | ||
| }.keys.take(excess).toList | ||
| val knownPendingToDelete = currentPendingExecutorsForRpId | ||
| val pendingToDelete = currentPendingExecutorsForRpId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last rename for the same reason as earlier: this are PODs unknow by the scheduler so safe to be removed here (no task can be scheduled on them).
|
Test build #141548 has finished for PR 33492 at commit
|
...netes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
Show resolved
Hide resolved
| .asScala | ||
| .toSeq | ||
| .sortBy(_._1) | ||
| .flatMap { case (rpId, targetNum) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of foreach here is flatMap as we need to do the process in two steps for counting all the not running PODs for all the resource profiles before we decide how to split the remaining pending PODs slot between the resource profiles.
|
Test build #141549 has finished for PR 33492 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test status success |
|
The failure is a tpc-ds query run which must be unrelated. |
|
Thank you for pinging me, @attilapiros . |
holdenk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this. I'm a little confused about the code if you could clarify in the places I asked questions I'd really appreciate that.
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
Outdated
Show resolved
Hide resolved
...netes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
Show resolved
Hide resolved
| Some(rpId, podCountForRpId, targetNum) | ||
| } else { | ||
| // for this resource profile we do not request more PODs | ||
| None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the previous my comment because I'm also not sure what is the best way to inform the users this situation. Do you think we have a good way to inform to the users when we hit this limitation, @attilapiros ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could change this to logInfo:
Lines 323 to 324 in adc512d
| logDebug(s"Still waiting for ${newlyCreatedExecutorsForRpId.size} executors for " + | |
| s"ResourceProfile Id $rpId before requesting more.") |
But for a higher batch allocation size this message could be annoying as every POD status change will generate such a log line while it reaches 0.
| pvcsInUse: Seq[String]): Unit = { | ||
| val numExecutorsToAllocate = math.min(expected - running, podAllocationSize) | ||
| logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " + | ||
| s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This message had better be here inside requestNewExecutors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My reason to change it was to avoid passing more variables for that method.
So from this
Lines 343 to 346 in adc512d
| logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " + | |
| s"ResourceProfile Id: $rpId, target: $targetNum, known: $podCountForRpId, " + | |
| s"sharedSlotFromPendingPods: $sharedSlotFromPendingPods.") | |
| requestNewExecutors(numExecutorsToAllocate, applicationId, rpId, k8sKnownPVCNames) |
we would need to pass:
targetNumpodCountForRpIdsharedSlotFromPendingPods
and they only needed for the log line and to calculate numExecutorsToAllocate.
With the current solution numExecutorsToAllocate is enough and when we will extend the current logic to consider more limits to use for allocation then numExecutorsToAllocate will be still enough.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun what is your opinion?
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, it looks like a good improvement. BTW, I'm curious if you are going to support maxPendingPods per resource profile later.
I am happy you like this. Actually I haven't thought about supporting maxPendingPods per resource profile but if you think that would be valuable for our users I can do that easily in a new PR. |
|
Test build #141832 has finished for PR 33492 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
@holdenk, @dongjoon-hyun may I ask one more pass? |
holdenk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM I don't have strong feelings on the logging message location but would be good to give dongjoon some time if he does have strong feelings about that.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
Sorry for missing this, @attilapiros and @holdenk .
I thought this was already merged.
|
@dongjoon-hyun, @holdenk Thanks to all of you! |
|
Hi, @attilapiros and @holdenk . |
|
cc @gengliangwang , too |
Introducing a limit for pending PODs (newly created/requested executors included). This limit is global for all the resource profiles. So first we have to count all the newly created and pending PODs (decreased by the ones which requested to be deleted) then we can share the remaining pending POD slots among the resource profiles. Without this PR dynamic allocation could request too many PODs and the K8S scheduler could be overloaded and scheduling of PODs will be affected by the load. No. With new unit tests. Closes #33492 from attilapiros/SPARK-36052. Authored-by: attilapiros <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 1dced49) Signed-off-by: Dongjoon Hyun <[email protected]>
|
|
@attilapiros Thanks for the work. |
|
Thank you, @gengliangwang . According to your suggestion, I added |
Introducing a limit for pending PODs (newly created/requested executors included). This limit is global for all the resource profiles. So first we have to count all the newly created and pending PODs (decreased by the ones which requested to be deleted) then we can share the remaining pending POD slots among the resource profiles. Without this PR dynamic allocation could request too many PODs and the K8S scheduler could be overloaded and scheduling of PODs will be affected by the load. No. With new unit tests. Closes apache#33492 from attilapiros/SPARK-36052. Authored-by: attilapiros <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 1dced49) Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit eb09be9) Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 25105db) Signed-off-by: Dongjoon Hyun <[email protected]>
…Allocator#onNewSnapshots` ### What changes were proposed in this pull request? This pr just remove unused local `val outstanding` from `ExecutorPodsAllocator#onNewSnapshots`, the `outstanding > 0` replaced by `newlyCreatedExecutorsForRpId.nonEmpty` after SPARK-36052 | #33492 ### Why are the changes needed? Remove unused local val ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #44703 from LuciferYang/minor-val-outstanding. Authored-by: yangjie01 <[email protected]> Signed-off-by: Kent Yao <[email protected]>
### What changes were proposed in this pull request? Introducing a limit for pending PODs (newly created/requested executors included) per resource profile. There exists a config for a global limit for all resource profiles, but here we add a limit per resource profile. #33492 does a lot of the plumbing for us already, counting newly created and pending pods, and we can just pass through the pending pods per resource profile, and limit the number of requests we were going to make for pods for that resource profile to min(previousRequest, maxPodsPerRP). ### Why are the changes needed? For multiple resource profile use cases you can set limits that apply at the resource profile level, instead of globally. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? unit tests added ### Was this patch authored or co-authored using generative AI tooling? No Closes #51913 from ForVic/vsunderl/max_pending_pods_per_rpid. Authored-by: ForVic <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
Introducing a limit for pending PODs (newly created/requested executors included).
This limit is global for all the resource profiles. So first we have to count all the newly created and pending PODs (decreased by the ones which requested to be deleted) then we can share the remaining pending POD slots among the resource profiles.
Why are the changes needed?
Without this PR dynamic allocation could request too many PODs and the K8S scheduler could be overloaded and scheduling of PODs will be affected by the load.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
With new unit tests.