@@ -109,10 +109,14 @@ private[spark] class CoarseMesosSchedulerBackend(
109109 private val slaveOfferConstraints =
110110 parseConstraintString(sc.conf.get(" spark.mesos.constraints" , " " ))
111111
112- // reject offers with mismatched constraints in seconds
112+ // Reject offers with mismatched constraints in seconds
113113 private val rejectOfferDurationForUnmetConstraints =
114114 getRejectOfferDurationForUnmetConstraints(sc)
115115
116+ // Reject offers when we reached the maximum number of cores for this framework
117+ private val rejectOfferDurationForReachedMaxCores =
118+ getRejectOfferDurationForReachedMaxCores(sc)
119+
116120 // A client for talking to the external shuffle service
117121 private val mesosExternalShuffleClient : Option [MesosExternalShuffleClient ] = {
118122 if (shuffleServiceEnabled) {
@@ -273,25 +277,25 @@ private[spark] class CoarseMesosSchedulerBackend(
273277 matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
274278 }
275279
276- declineUnmatchedOffers(d, unmatchedOffers)
280+ unmatchedOffers.foreach { offer =>
281+ declineOffer(d, offer, " unmet constraints" , rejectOfferDurationForUnmetConstraints)
282+ }
283+
277284 handleMatchedOffers(d, matchedOffers)
278285 }
279286 }
280287
281- private def declineUnmatchedOffers (d : SchedulerDriver , offers : Buffer [Offer ]): Unit = {
282- for (offer <- offers) {
283- val id = offer.getId.getValue
284- val offerAttributes = toAttributeMap(offer.getAttributesList)
285- val mem = getResource(offer.getResourcesList, " mem" )
286- val cpus = getResource(offer.getResourcesList, " cpus" )
287- val filters = Filters .newBuilder()
288- .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
288+ private def declineOffer (d : SchedulerDriver , offer : Offer , reason : String , refuseSeconds : Long ) {
289+ val id = offer.getId.getValue
290+ val offerAttributes = toAttributeMap(offer.getAttributesList)
291+ val mem = getResource(offer.getResourcesList, " mem" )
292+ val cpus = getResource(offer.getResourcesList, " cpus" )
289293
290- logDebug(s " Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus "
291- + s " for $rejectOfferDurationForUnmetConstraints seconds " )
294+ logDebug(s " Declining offer ( $reason ) : $id with attributes: $offerAttributes mem: $mem"
295+ + s " cpu: $cpus for $rejectOfferDurationForUnmetConstraints seconds " )
292296
293- d.declineOffer(offer.getId, filters )
294- }
297+ val filters = Filters .newBuilder().setRefuseSeconds(refuseSeconds).build( )
298+ d.declineOffer(offer.getId, filters)
295299 }
296300
297301 /**
@@ -326,7 +330,9 @@ private[spark] class CoarseMesosSchedulerBackend(
326330 d.launchTasks(
327331 Collections .singleton(offer.getId),
328332 offerTasks.asJava)
329- } else { // decline
333+ } else if (totalCoresAcquired >= maxCores) {
334+ declineOffer(d, offer, " reached max cores" , rejectOfferDurationForReachedMaxCores)
335+ } else {
330336 logDebug(s " Declining offer: $id with attributes: $offerAttributes " +
331337 s " mem: $offerMem cpu: $offerCpus" )
332338
0 commit comments