@@ -341,7 +341,21 @@ private[spark] class CoarseMesosSchedulerBackend(
341341 Collections .singleton(offer.getId),
342342 offerTasks.asJava)
343343 } else if (totalCoresAcquired >= maxCores) {
344- declineOffer(d, offer, Some (" reached max cores" ),
344+ // We already acquired the maximum number of cores so we don't need to get new offers
345+ // unless an executor goes down. Setting a high "refuse seconds" filter is especially
346+ // important when running a lot of frameworks in the same Mesos cluster to avoid resource
347+ // starvation. One such case of starvation happens when running many small Spark apps
348+ // (e.g. small Spark streaming jobs) then a new big Spark app would get offered only a
349+ // fraction of the cores available in the cluster and Mesos would then stop sending it
350+ // offers. That's because the small apps have a much smaller "max share" so they get the
351+ // offers first. With a low number of apps it's okay because with the default
352+ // refuse_seconds value of 5 seconds it's enough time for Mesos to cycle through every
353+ // app and send offers to each of them. But as the number of apps increases it becomes
354+ // more and more problematic, to the point where Mesos stops sending offers to the apps
355+ // ranked the lowest by DRF, i.e. the big apps. We mitigate this problem by declining
356+ // the offers for a long period of time when we know that we don't need offers anymore
357+ // because the app already acquired all the cores it needs.
358+ declineOffer(d, offer, Some (" reached spark.cores.max" ),
345359 Some (rejectOfferDurationForReachedMaxCores))
346360 } else {
347361 declineOffer(d, offer)
0 commit comments