Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ private[spark] class CoarseMesosSchedulerBackend(
private val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))

// reject offers with mismatched constraints in seconds
private val rejectOfferDurationForUnmetConstraints =
getRejectOfferDurationForUnmetConstraints(sc)

// A client for talking to the external shuffle service, if it is a
private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
if (shuffleServiceEnabled) {
Expand Down Expand Up @@ -249,48 +253,56 @@ private[spark] class CoarseMesosSchedulerBackend(
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
val id = offer.getId.getValue
if (taskIdToSlaveId.size < executorLimit &&
totalCoresAcquired < maxCores &&
meetsConstraints &&
mem >= calculateTotalMemory(sc) &&
cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
// Launch an executor on the slave
val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
totalCoresAcquired += cpusToUse
val taskId = newMesosTaskId()
taskIdToSlaveId.put(taskId, slaveId)
slaveIdsWithExecutors += slaveId
coresByTaskId(taskId) = cpusToUse
// Gather cpu resources from the available resources and use them in the task.
val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.getResourcesList, "cpus", cpusToUse)
val (_, memResourcesToUse) =
partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc))
val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
.setName("Task " + taskId)
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)

sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
if (meetsConstraints) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's getting a bit messy now, but I think we can refactor this later as I'd like to introduce more fine grained reasons around rejecting offers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had already done some refactoring around this specific area in patch #8771

if (taskIdToSlaveId.size < executorLimit &&
totalCoresAcquired < maxCores &&
mem >= calculateTotalMemory(sc) &&
cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
// Launch an executor on the slave
val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
totalCoresAcquired += cpusToUse
val taskId = newMesosTaskId()
taskIdToSlaveId.put(taskId, slaveId)
slaveIdsWithExecutors += slaveId
coresByTaskId(taskId) = cpusToUse
// Gather cpu resources from the available resources and use them in the task.
val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.getResourcesList, "cpus", cpusToUse)
val (_, memResourcesToUse) =
partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc))
val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
.setName("Task " + taskId)
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)

sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
}

// Accept the offer and launch the task
logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
d.launchTasks(
Collections.singleton(offer.getId),
Collections.singleton(taskBuilder.build()), filters)
} else {
// Decline the offer
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
d.declineOffer(offer.getId)
}

// accept the offer and launch the task
logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
d.launchTasks(
Collections.singleton(offer.getId),
Collections.singleton(taskBuilder.build()), filters)
} else {
// Decline the offer
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
d.declineOffer(offer.getId)
// This offer does not meet constraints. We don't need to see it again.
// Decline the offer for a long period of time.
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus"
+ s" for $rejectOfferDurationForUnmetConstraints seconds")
d.declineOffer(offer.getId, Filters.newBuilder()
.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ private[spark] class MesosSchedulerBackend(
private[this] val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))

// reject offers with mismatched constraints in seconds
private val rejectOfferDurationForUnmetConstraints =
getRejectOfferDurationForUnmetConstraints(sc)

@volatile var appId: String = _

override def start() {
Expand Down Expand Up @@ -212,29 +216,47 @@ private[spark] class MesosSchedulerBackend(
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
inClassLoader() {
// Fail-fast on offers we know will be rejected
val (usableOffers, unUsableOffers) = offers.asScala.partition { o =>
// Fail first on offers with unmet constraints
val (offersMatchingConstraints, offersNotMatchingConstraints) =
offers.asScala.partition { o =>
val offerAttributes = toAttributeMap(o.getAttributesList)
val meetsConstraints =
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)

// add some debug messaging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove (doesn't add any value). Also remove the random blank lines around this.

if (!meetsConstraints) {
val id = o.getId.getValue
logDebug(s"Declining offer: $id with attributes: $offerAttributes")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just inline it

logDebug(s"Declining offer: ${o.getId.getValue} with attributes: $offerAttributes")

}

meetsConstraints
}

// These offers do not meet constraints. We don't need to see them again.
// Decline the offer for a long period of time.
offersNotMatchingConstraints.foreach { o =>
d.declineOffer(o.getId, Filters.newBuilder()
.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
}

// Of the matching constraints, see which ones give us enough memory and cores
val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o =>
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
val offerAttributes = toAttributeMap(o.getAttributesList)

// check if all constraints are satisfield
// 1. Attribute constraints
// 2. Memory requirements
// 3. CPU requirements - need at least 1 for executor, 1 for task
val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
// check offers for
// 1. Memory requirements
// 2. CPU requirements - need at least 1 for executor, 1 for task
val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)

val meetsRequirements =
(meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) ||
(meetsMemoryRequirements && meetsCPURequirements) ||
(slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)

// add some debug messaging
val debugstr = if (meetsRequirements) "Accepting" else "Declining"
val id = o.getId.getValue
logDebug(s"$debugstr offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: "
+ s"$offerAttributes mem: $mem cpu: $cpus")

meetsRequirements
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,4 +336,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
}
}

protected def getRejectOfferDurationForUnmetConstraints(sc: SparkContext): Long = {
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s")
}

}