-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-13001] [CORE] [MESOS] Prevent getting offers when reached max cores #10924
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
ok to test |
|
Test build #50288 has finished for PR 10924 at commit
|
|
Test build #50334 has finished for PR 10924 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the two cases where offers are rejected for a longer period should be consolidated in a simple helper function that logs the reason and declines the offer. I'd also reorganize the code to be less nested:
if (!meetsConstraints) {
declineFor("unmet constraints", rejectOfferDurationUnmetConstraints)
} else if (totalCoresAcquired >= maxCores) {
declineFor("reached max cores", rejectOfferDurationMaxCores)
} else {
.. happy case
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree and proposed this before, not sure we want to do this change in this patch. I'm thinking perhaps we can get all the smal changes we want in a reasonable way and refactor all three schedulers (or remove one). What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My first implementation was actually removing all the code duplication in the decline offer path, but it seemed overkill:
def declineOffer(reason: Option[String] = None, refuseSeconds: Option[Long] = None) {
logDebug("Declining offer" +
reason.fold("") { r => s" ($r)"} +
s": $id with attributes: $offerAttributes mem: $mem cpu: $cpus" +
refuseSeconds.fold("") { r => s" for $r seconds" })
refuseSeconds match {
case Some(seconds) => {
val filter = Filters.newBuilder().setRefuseSeconds(seconds).build()
d.declineOffer(offer.getId, filter)
}
case _ => d.declineOffer(offer.getId)
}
}Also this cannot be reused easily in the fine-grained mode since it relies on attributes computed locally in the loop. I opted for simplicity thinking that the resourceOffers method would be refactored at some point, making it easier to factor out the common code. I'm happy to use the implementation above for refuseOffer if you think that it's better. It can be simplified quite a bit if it's only for the 2 cases where offers are rejected for a longer period but then we still have similar code for the default rejection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me it seems the only thing it depends on is the offerId, so it could go in MesosSchedulerUtils.
But if that's overkill, let's do it only for this one, and get rid of the nested if structure. It also means there's no need to use Option for the reason and duration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also depends on id, offerAttributes, mem and cpus for logging. They're all derived from offer and some are easier than others to get but we shouldn't compute them twice just for logging.
Okay I'll change it to handle only the case where offers are rejected for a longer period.
..and sorry for the delay in reviewing this. |
|
I'm not sure we want to use only one rejection delay setting in these 2 cases. Arguably we could reject offers for a much longer period of time for And for the fine-grained mode, there's no reason to not add the same logic. I'll do the change and test it. Unfortunately, the example function |
|
You are right about having two different settings, makes sense. Let's go with that for the moment. |
|
@mgummelt works on Spark full time at Mesosphere too. :-) |
|
@sebastienrainville sorry for my confusion. Fine-grained mode does not respect |
|
@dragos sorry for the delay. I had also forgotten that |
|
Cool, looking forward to pushing this over the finish line! |
|
ping @sebastienrainville |
…ode when reached max cores
|
@dragos I finally did the change. Sorry for the delay |
|
Test build #57661 has finished for PR 10924 at commit
|
|
What starvation behavior were you seeing? With the DRF allocator, Mesos should offer rejected resources to other frameworks before re-offering to the Spark job. |
| } | ||
|
|
||
| declineUnmatchedOffers(d, unmatchedOffers) | ||
| unmatchedOffers.foreach { offer => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please keep this in a separate function declineUnmatchedOffers
|
please add a test |
|
@mgummelt the problem we were seeing is when running many spark apps (~20) and most of them small (long lived small streaming apps), then the bigger apps just get allocated a small number of cores even though the cluster still has a lot of available cores. In that scenario the big apps are not actually receiving offers from Mesos anymore, and that's because the small apps have a much smaller "max share" so they get the offers first. With a low number of apps it's okay because with the default The solution implemented in this PR is to refuse the offers for a long period of time when we know that we don't need offers anymore because the app already acquired |
|
Sounds good. Also seems like something we should document, right? |
…me when reached spark.cores.max
|
All the comments should be addressed now |
|
Test build #57699 has finished for PR 10924 at commit
|
|
| // more and more problematic, to the point where Mesos stops sending offers to the apps | ||
| // ranked the lowest by DRF, i.e. the big apps. We mitigate this problem by declining | ||
| // the offers for a long period of time when we know that we don't need offers anymore | ||
| // because the app already acquired all the cores it needs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit verbose. I think something like "Reject an offer for a configurable amount of time to avoid starving other frameworks" is sufficient.
Also, thanks for the code docs, but I was thinking we should add this config var to the user docs as well.
|
@mgummelt I fixed the documentation and test description. The variable names are quite long though so they push the content of the right columns quite a bit: https://github.com/sebastienrainville/spark/blob/master/docs/running-on-mesos.md At least the names are descriptive and WDYT? |
docs/running-on-mesos.md
Outdated
| <td><code>spark.mesos.rejectOfferDurationForReachedMaxCores</code></td> | ||
| <td><code>120s</code></td> | ||
| <td> | ||
| Set the amount of time for which offers are rejected when the app already acquired <code>spark.cores.max</code> cores. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add "This is used to prevent starvation of other frameworks."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I added that comment to spark.mesos.rejectOfferDurationForUnmetConstraints as well since it's the same idea.
|
LGTM. cc @andrewor14 |
|
Test build #57774 has finished for PR 10924 at commit
|
docs/running-on-mesos.md
Outdated
| </td> | ||
| </tr> | ||
| <tr> | ||
| <td><code>spark.mesos.rejectOfferDurationForReachedMaxCores</code></td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would actually not document these configs. Doing so would require us to maintain backward compatibility. I can't think of any strong use case where someone would want to change these values so I don't think it's worth the maintenance burden.
|
Looks good. Just minor comments. |
|
This should be ready to go now |
|
LGTM |
|
Test build #57786 has finished for PR 10924 at commit
|
|
Merging into master 2.0, thanks for bringing this back to life. |
Similar to #8639 This change rejects offers for 120s when reached `spark.cores.max` in coarse-grained mode to mitigate offer starvation. This prevents Mesos to send us offers again and again, starving other frameworks. This is especially problematic when running many small frameworks on the same Mesos cluster, e.g. many small Sparks streaming jobs, and cause the bigger spark jobs to stop receiving offers. By rejecting the offers for a long period of time, they become available to those other frameworks. Author: Sebastien Rainville <[email protected]> Closes #10924 from sebastienrainville/master. (cherry picked from commit eb019af) Signed-off-by: Andrew Or <[email protected]>
|
Test build #57794 has finished for PR 10924 at commit
|
Similar to #8639
This change rejects offers for 120s when reached
spark.cores.maxin coarse-grained mode to mitigate offer starvation. This prevents Mesos to send us offers again and again, starving other frameworks. This is especially problematic when running many small frameworks on the same Mesos cluster, e.g. many small Sparks streaming jobs, and cause the bigger spark jobs to stop receiving offers. By rejecting the offers for a long period of time, they become available to those other frameworks.