Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ private[spark] class MesosSchedulerBackend(

var classLoader: ClassLoader = null

/** The number of cores we currently use in the cluster. */
private[spark] var totalCoresAcquired: Double = 0

/** The maximum number of cores we can use at any one time. */
private val maxCores: Double = sc.conf.getDouble("spark.cores.max", Double.MaxValue)

// The listener bus to publish executor added/removed events.
val listenerBus = sc.listenerBus

Expand Down Expand Up @@ -93,8 +99,8 @@ private[spark] class MesosSchedulerBackend(
val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
.orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
.getOrElse {
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
}
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
}
val environment = Environment.newBuilder()
sc.conf.getOption("spark.executor.extraClassPath").foreach { cp =>
environment.addVariables(
Expand Down Expand Up @@ -210,12 +216,18 @@ private[spark] class MesosSchedulerBackend(
}

/**
* Method called by Mesos to offer resources on slaves. We respond by asking our active task sets
* for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
* tasks are balanced across the cluster.
* Return the usable Mesos offers and corresponding WorkerOffers.
*
* This method declines Mesos offers that don't meet minimum cpu, memory or attribute
* requirements.
*
* @param d Mesos SchedulerDriver to decline offers
* @param offers Mesos offers to be considered
* @return a pair of Mesos offers and corresponding WorkerOffer that can be used by the
* fine-grained scheduler.
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
inClassLoader() {
private[spark] def usableWorkerOffers(d: SchedulerDriver,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we've consistently put the function parameters that are longer than 100 in newlines so far, can you move d: SchedulerDriver to the next line?

offers: JList[Offer]): (Seq[Protos.Offer], Seq[WorkerOffer]) = {
// Fail first on offers with unmet constraints
val (offersMatchingConstraints, offersNotMatchingConstraints) =
offers.asScala.partition { o =>
Expand All @@ -225,8 +237,7 @@ private[spark] class MesosSchedulerBackend(

// add some debug messaging
if (!meetsConstraints) {
val id = o.getId.getValue
logDebug(s"Declining offer: $id with attributes: $offerAttributes")
logDebug(s"Declining offer: ${o.getId.getValue} with attributes: $offerAttributes")
}

meetsConstraints
Expand All @@ -239,44 +250,69 @@ private[spark] class MesosSchedulerBackend(
.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
}

// Of the matching constraints, see which ones give us enough memory and cores
val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o =>
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
val offerAttributes = toAttributeMap(o.getAttributesList)

// check offers for
// 1. Memory requirements
// 2. CPU requirements - need at least 1 for executor, 1 for task
val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
val meetsRequirements =
(meetsMemoryRequirements && meetsCPURequirements) ||
// Of the matching constraints, see which ones give us enough memory and cores
val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o =>
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
val offerAttributes = toAttributeMap(o.getAttributesList)

// check offers for
// 1. Memory requirements
// 2. CPU requirements - need at least 1 for executor, 1 for task
val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think technically this is not correct (which exists in code before, not your change). We can still launch new tasks if there is already an executor running without mesosExecutoreCores. Probably fix this in the future.

val meetsRequirements =
(meetsMemoryRequirements && meetsCPURequirements) ||
(slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
val debugstr = if (meetsRequirements) "Accepting" else "Declining"
logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: "
+ s"$offerAttributes mem: $mem cpu: $cpus")
val debugstr = if (meetsRequirements) "Accepting" else "Declining"
logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: "
+ s"$offerAttributes mem: $mem cpu: $cpus")

meetsRequirements
}

// Decline offers we ruled out immediately
unUsableOffers.foreach(o => d.declineOffer(o.getId))

var availableCores = Math.max(0, maxCores - totalCoresAcquired)

meetsRequirements
val workerOffers = (for (o <- usableOffers) yield {
val coresInOffer = getResource(o.getResourcesList, "cpus").toInt
val extraCores = if (slaveIdToExecutorInfo.contains(o.getSlaveId.getValue)) {
0D
} else {
// If the Mesos executor has not been started on this slave yet, set aside a few
// cores for the Mesos executor by offering fewer cores to the Spark executor
mesosExecutorCores
}

// Decline offers we ruled out immediately
unUsableOffers.foreach(o => d.declineOffer(o.getId))
// the cores we can offer for tasks on workers should not exceed neither availableCores
// nor cores in the current offer, after accounting for non-task cores
val taskCores = Math.min(availableCores - extraCores, coresInOffer - extraCores)

val workerOffers = usableOffers.map { o =>
val cpus = if (slaveIdToExecutorInfo.contains(o.getSlaveId.getValue)) {
getResource(o.getResourcesList, "cpus").toInt
} else {
// If the Mesos executor has not been started on this slave yet, set aside a few
// cores for the Mesos executor by offering fewer cores to the Spark executor
(getResource(o.getResourcesList, "cpus") - mesosExecutorCores).toInt
}
new WorkerOffer(
if (taskCores > 0) {
availableCores -= taskCores + extraCores
Option(new WorkerOffer(
o.getSlaveId.getValue,
o.getHostname,
cpus)
taskCores.toInt))
} else {
None
}
}).flatten

(usableOffers, workerOffers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we hit the limit and don't run any more tasks on the remaining usableOffers? We need to decline those too right?

}

/**
* Method called by Mesos to offer resources on slaves. We respond by asking our active task sets
* for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
* tasks are balanced across the cluster.
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
inClassLoader() {
val (usableOffers, workerOffers) = usableWorkerOffers(d, offers)

val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap
val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap
Expand Down Expand Up @@ -304,20 +340,25 @@ private[spark] class MesosSchedulerBackend(
mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
.add(mesosTask)
slaveIdToResources(slaveId) = remainingResources

totalCoresAcquired += getResource(mesosTask.getResourcesList, "cpus")
}
}

// Reply to the offers
val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?

mesosTasks.foreach { case (slaveId, tasks) =>
slaveIdToWorkerOffer.get(slaveId).foreach(o =>
listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId,
// TODO: Add support for log urls for Mesos
new ExecutorInfo(o.host, o.cores, Map.empty)))
)
logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}")
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
mesosTasks.foreach {
case (slaveId, tasks) =>
// add the cores reserved for each Mesos executor (one per slave)
totalCoresAcquired += getResource(tasks.get(0).getExecutor.getResourcesList, "cpus")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only account for executor resources once per slave, not for every task


slaveIdToWorkerOffer.get(slaveId).foreach(o =>
listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId,
// TODO: Add support for log urls for Mesos
new ExecutorInfo(o.host, o.cores, Map.empty))))
logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}")
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
}

// Decline offers that weren't used
Expand Down Expand Up @@ -365,6 +406,8 @@ private[spark] class MesosSchedulerBackend(
}
if (TaskState.isFinished(state)) {
taskIdToSlaveId.remove(tid)
// here we assume that any Mesos task was allocated CPUS_PER_TASK
totalCoresAcquired -= scheduler.CPUS_PER_TASK
}
}
scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
Expand Down Expand Up @@ -396,7 +439,12 @@ private[spark] class MesosSchedulerBackend(
private def removeExecutor(slaveId: String, reason: String) = {
synchronized {
listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), slaveId, reason))
slaveIdToExecutorInfo -= slaveId
val executorInfo = slaveIdToExecutorInfo.remove(slaveId)
// in case we had an executor there, we need to update the total cores
executorInfo.map { info =>
// we could use mesosExecutorCores but this way we are sure we're counting correctly
totalCoresAcquired -= getResource(info.getResourcesList, "cpus")
}
}
}

Expand All @@ -412,8 +460,10 @@ private[spark] class MesosSchedulerBackend(
recordSlaveLost(d, slaveId, SlaveLost())
}

override def executorLost(d: SchedulerDriver, executorId: ExecutorID,
slaveId: SlaveID, status: Int) {
override def executorLost(
d: SchedulerDriver,
executorId: ExecutorID,
slaveId: SlaveID, status: Int) {
logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
slaveId.getValue))
recordSlaveLost(d, slaveId, ExecutorExited(status, exitCausedByApp = true))
Expand All @@ -422,8 +472,7 @@ private[spark] class MesosSchedulerBackend(
override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
mesosDriver.killTask(
TaskID.newBuilder()
.setValue(taskId.toString).build()
)
.setValue(taskId.toString).build())
}

// TODO: query Mesos for number of cores
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.mesos.protobuf.{ByteString, GeneratedMessage}
import org.apache.spark.{SparkException, SparkConf, Logging, SparkContext}
import org.apache.spark.util.Utils


/**
* Shared trait for implementing a Mesos Scheduler. This holds common state and helper
* methods and Mesos scheduler will use.
Expand Down Expand Up @@ -158,6 +157,12 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
/**
* Partition the existing set of resources into two groups, those remaining to be
* scheduled and those requested to be used for a new task.
*
* @note This method assumes there are enough resources to fulfill the request. In case
* there aren't it will return partial results. For instance, if amountToUse is
* 2 cpus, but only 1 is available, it will return a used `Resource` for
* 1 cpu.
*
* @param resources The full list of available resources
* @param resourceName The name of the resource to take from the available resources
* @param amountToUse The amount of resources to take from the available resources
Expand Down Expand Up @@ -197,7 +202,6 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
(attr.getName, attr.getText.getValue.split(',').toSet)
}


/** Build a Mesos resource protobuf object */
protected def createResource(resourceName: String, quantity: Double): Protos.Resource = {
Resource.newBuilder()
Expand Down Expand Up @@ -225,7 +229,6 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
}).toMap
}


/**
* Match the requirements (if any) to the offer attributes.
* if attribute requirements are not specified - return true
Expand Down
Loading