Skip to content

Commit 74f5027

Browse files
FelixBechstein0000Andrew Or
authored andcommitted
[SPARK-10471][CORE][MESOS] prevent getting offers for unmet constraints
this change rejects offers for slaves with unmet constraints for 120s to mitigate offer starvation. this prevents mesos to send us these offers again and again. in return, we get more offers for slaves which might meet our constraints. and it enables mesos to send the rejected offers to other frameworks. Author: Felix Bechstein <[email protected]> Closes #8639 from felixb/decline_offers_constraint_mismatch. (cherry picked from commit 5039a49) Signed-off-by: Andrew Or <[email protected]>
1 parent 2459b34 commit 74f5027

File tree

3 files changed

+91
-53
lines changed

3 files changed

+91
-53
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala

Lines changed: 52 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ private[spark] class CoarseMesosSchedulerBackend(
101101
private val slaveOfferConstraints =
102102
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
103103

104+
// reject offers with mismatched constraints in seconds
105+
private val rejectOfferDurationForUnmetConstraints =
106+
getRejectOfferDurationForUnmetConstraints(sc)
107+
104108
// A client for talking to the external shuffle service, if it is a
105109
private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
106110
if (shuffleServiceEnabled) {
@@ -249,48 +253,56 @@ private[spark] class CoarseMesosSchedulerBackend(
249253
val mem = getResource(offer.getResourcesList, "mem")
250254
val cpus = getResource(offer.getResourcesList, "cpus").toInt
251255
val id = offer.getId.getValue
252-
if (taskIdToSlaveId.size < executorLimit &&
253-
totalCoresAcquired < maxCores &&
254-
meetsConstraints &&
255-
mem >= calculateTotalMemory(sc) &&
256-
cpus >= 1 &&
257-
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
258-
!slaveIdsWithExecutors.contains(slaveId)) {
259-
// Launch an executor on the slave
260-
val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
261-
totalCoresAcquired += cpusToUse
262-
val taskId = newMesosTaskId()
263-
taskIdToSlaveId.put(taskId, slaveId)
264-
slaveIdsWithExecutors += slaveId
265-
coresByTaskId(taskId) = cpusToUse
266-
// Gather cpu resources from the available resources and use them in the task.
267-
val (remainingResources, cpuResourcesToUse) =
268-
partitionResources(offer.getResourcesList, "cpus", cpusToUse)
269-
val (_, memResourcesToUse) =
270-
partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc))
271-
val taskBuilder = MesosTaskInfo.newBuilder()
272-
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
273-
.setSlaveId(offer.getSlaveId)
274-
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
275-
.setName("Task " + taskId)
276-
.addAllResources(cpuResourcesToUse.asJava)
277-
.addAllResources(memResourcesToUse.asJava)
278-
279-
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
280-
MesosSchedulerBackendUtil
281-
.setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
256+
if (meetsConstraints) {
257+
if (taskIdToSlaveId.size < executorLimit &&
258+
totalCoresAcquired < maxCores &&
259+
mem >= calculateTotalMemory(sc) &&
260+
cpus >= 1 &&
261+
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
262+
!slaveIdsWithExecutors.contains(slaveId)) {
263+
// Launch an executor on the slave
264+
val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
265+
totalCoresAcquired += cpusToUse
266+
val taskId = newMesosTaskId()
267+
taskIdToSlaveId.put(taskId, slaveId)
268+
slaveIdsWithExecutors += slaveId
269+
coresByTaskId(taskId) = cpusToUse
270+
// Gather cpu resources from the available resources and use them in the task.
271+
val (remainingResources, cpuResourcesToUse) =
272+
partitionResources(offer.getResourcesList, "cpus", cpusToUse)
273+
val (_, memResourcesToUse) =
274+
partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc))
275+
val taskBuilder = MesosTaskInfo.newBuilder()
276+
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
277+
.setSlaveId(offer.getSlaveId)
278+
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
279+
.setName("Task " + taskId)
280+
.addAllResources(cpuResourcesToUse.asJava)
281+
.addAllResources(memResourcesToUse.asJava)
282+
283+
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
284+
MesosSchedulerBackendUtil
285+
.setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
286+
}
287+
288+
// Accept the offer and launch the task
289+
logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
290+
slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
291+
d.launchTasks(
292+
Collections.singleton(offer.getId),
293+
Collections.singleton(taskBuilder.build()), filters)
294+
} else {
295+
// Decline the offer
296+
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
297+
d.declineOffer(offer.getId)
282298
}
283-
284-
// accept the offer and launch the task
285-
logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
286-
slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
287-
d.launchTasks(
288-
Collections.singleton(offer.getId),
289-
Collections.singleton(taskBuilder.build()), filters)
290299
} else {
291-
// Decline the offer
292-
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
293-
d.declineOffer(offer.getId)
300+
// This offer does not meet constraints. We don't need to see it again.
301+
// Decline the offer for a long period of time.
302+
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus"
303+
+ s" for $rejectOfferDurationForUnmetConstraints seconds")
304+
d.declineOffer(offer.getId, Filters.newBuilder()
305+
.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
294306
}
295307
}
296308
}

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ private[spark] class MesosSchedulerBackend(
6363
private[this] val slaveOfferConstraints =
6464
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
6565

66+
// reject offers with mismatched constraints in seconds
67+
private val rejectOfferDurationForUnmetConstraints =
68+
getRejectOfferDurationForUnmetConstraints(sc)
69+
6670
@volatile var appId: String = _
6771

6872
override def start() {
@@ -212,29 +216,47 @@ private[spark] class MesosSchedulerBackend(
212216
*/
213217
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
214218
inClassLoader() {
215-
// Fail-fast on offers we know will be rejected
216-
val (usableOffers, unUsableOffers) = offers.asScala.partition { o =>
219+
// Fail first on offers with unmet constraints
220+
val (offersMatchingConstraints, offersNotMatchingConstraints) =
221+
offers.asScala.partition { o =>
222+
val offerAttributes = toAttributeMap(o.getAttributesList)
223+
val meetsConstraints =
224+
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
225+
226+
// add some debug messaging
227+
if (!meetsConstraints) {
228+
val id = o.getId.getValue
229+
logDebug(s"Declining offer: $id with attributes: $offerAttributes")
230+
}
231+
232+
meetsConstraints
233+
}
234+
235+
// These offers do not meet constraints. We don't need to see them again.
236+
// Decline the offer for a long period of time.
237+
offersNotMatchingConstraints.foreach { o =>
238+
d.declineOffer(o.getId, Filters.newBuilder()
239+
.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
240+
}
241+
242+
// Of the matching constraints, see which ones give us enough memory and cores
243+
val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o =>
217244
val mem = getResource(o.getResourcesList, "mem")
218245
val cpus = getResource(o.getResourcesList, "cpus")
219246
val slaveId = o.getSlaveId.getValue
220247
val offerAttributes = toAttributeMap(o.getAttributesList)
221248

222-
// check if all constraints are satisfield
223-
// 1. Attribute constraints
224-
// 2. Memory requirements
225-
// 3. CPU requirements - need at least 1 for executor, 1 for task
226-
val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
249+
// check offers for
250+
// 1. Memory requirements
251+
// 2. CPU requirements - need at least 1 for executor, 1 for task
227252
val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
228253
val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
229-
230254
val meetsRequirements =
231-
(meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) ||
255+
(meetsMemoryRequirements && meetsCPURequirements) ||
232256
(slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
233-
234-
// add some debug messaging
235257
val debugstr = if (meetsRequirements) "Accepting" else "Declining"
236-
val id = o.getId.getValue
237-
logDebug(s"$debugstr offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
258+
logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: "
259+
+ s"$offerAttributes mem: $mem cpu: $cpus")
238260

239261
meetsRequirements
240262
}

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,4 +336,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
336336
}
337337
}
338338

339+
protected def getRejectOfferDurationForUnmetConstraints(sc: SparkContext): Long = {
340+
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s")
341+
}
342+
339343
}

0 commit comments

Comments
 (0)