Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.Utils


/**
* A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
* separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
Expand All @@ -55,6 +54,12 @@ private[spark] class MesosSchedulerBackend(

var classLoader: ClassLoader = null

/** The number of cores we currently use in the cluster. */
private[spark] var totalCoresAcquired: Double = 0

/** The maximum number of cores we can use at any one time. */
private val maxCores: Double = sc.conf.getDouble("spark.cores.max", Double.MaxValue)

// The listener bus to publish executor added/removed events.
val listenerBus = sc.listenerBus

Expand Down Expand Up @@ -85,8 +90,8 @@ private[spark] class MesosSchedulerBackend(
val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
.orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
.getOrElse {
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
}
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
}
val environment = Environment.newBuilder()
sc.conf.getOption("spark.executor.extraClassPath").foreach { cp =>
environment.addVariables(
Expand Down Expand Up @@ -202,55 +207,86 @@ private[spark] class MesosSchedulerBackend(
}

/**
* Method called by Mesos to offer resources on slaves. We respond by asking our active task sets
* for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
* tasks are balanced across the cluster.
* Return the usable Mesos offers and corresponding WorkerOffers.
*
* This method declines Mesos offers that don't meet minimum cpu, memory or attribute
* requirements.
*
* @param d Mesos SchedulerDriver to decline offers
* @param offers Mesos offers to be considered
* @return a pair of Mesos offers and corresponding WorkerOffer that can be used by the
* fine-grained scheduler.
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
inClassLoader() {
// Fail-fast on offers we know will be rejected
val (usableOffers, unUsableOffers) = offers.asScala.partition { o =>
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
val offerAttributes = toAttributeMap(o.getAttributesList)

// check if all constraints are satisfield
// 1. Attribute constraints
// 2. Memory requirements
// 3. CPU requirements - need at least 1 for executor, 1 for task
val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)

val meetsRequirements =
(meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) ||
private[spark] def usableWorkerOffers(d: SchedulerDriver,
offers: JList[Offer]): (Seq[Protos.Offer], Seq[WorkerOffer]) = {
// Fail-fast on offers we know will be rejected
val (usableOffers, unUsableOffers) = offers.asScala.partition { o =>
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
val offerAttributes = toAttributeMap(o.getAttributesList)

// check if all constraints are satisfield
// 1. Attribute constraints
// 2. Memory requirements
// 3. CPU requirements - need at least 1 for executor, 1 for task
val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)

val meetsRequirements =
(meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) ||
(slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)

// add some debug messaging
val debugstr = if (meetsRequirements) "Accepting" else "Declining"
val id = o.getId.getValue
logDebug(s"$debugstr offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
// add some debug messaging
val debugstr = if (meetsRequirements) "Accepting" else "Declining"
val id = o.getId.getValue
logDebug(s"$debugstr offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")

meetsRequirements
}

// Decline offers we ruled out immediately
unUsableOffers.foreach(o => d.declineOffer(o.getId))

var availableCores = Math.max(0, maxCores - totalCoresAcquired)

meetsRequirements
val workerOffers = (for (o <- usableOffers) yield {
val coresInOffer = getResource(o.getResourcesList, "cpus")
val cores = if (slaveIdToExecutorInfo.contains(o.getSlaveId.getValue)) {
coresInOffer.toInt
} else {
// If the Mesos executor has not been started on this slave yet, set aside a few
// cores for the Mesos executor by offering fewer cores to the Spark executor
availableCores -= mesosExecutorCores
(coresInOffer - mesosExecutorCores).toInt
}

// Decline offers we ruled out immediately
unUsableOffers.foreach(o => d.declineOffer(o.getId))
// check that we can still acquire cpus
val actualCores = Math.min(availableCores, cores).toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will availableCores becomes negative and just starts adding actual Cores?
I think it's probably safer to check if availableCores < Cores then we just return None.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll refactor this condition.

availableCores -= actualCores

val workerOffers = usableOffers.map { o =>
val cpus = if (slaveIdToExecutorInfo.contains(o.getSlaveId.getValue)) {
getResource(o.getResourcesList, "cpus").toInt
} else {
// If the Mesos executor has not been started on this slave yet, set aside a few
// cores for the Mesos executor by offering fewer cores to the Spark executor
(getResource(o.getResourcesList, "cpus") - mesosExecutorCores).toInt
}
new WorkerOffer(
if (actualCores > 0) {
Option(new WorkerOffer(
o.getSlaveId.getValue,
o.getHostname,
cpus)
actualCores))
} else {
None
}
}).flatten

(usableOffers, workerOffers)
}

/**
* Method called by Mesos to offer resources on slaves. We respond by asking our active task sets
* for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
* tasks are balanced across the cluster.
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
inClassLoader() {
val (usableOffers, workerOffers) = usableWorkerOffers(d, offers)

val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap
val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap
Expand Down Expand Up @@ -278,20 +314,25 @@ private[spark] class MesosSchedulerBackend(
mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
.add(mesosTask)
slaveIdToResources(slaveId) = remainingResources

totalCoresAcquired += getResource(mesosTask.getResourcesList, "cpus")
}
}

// Reply to the offers
val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?

mesosTasks.foreach { case (slaveId, tasks) =>
slaveIdToWorkerOffer.get(slaveId).foreach(o =>
listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId,
// TODO: Add support for log urls for Mesos
new ExecutorInfo(o.host, o.cores, Map.empty)))
)
logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}")
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
mesosTasks.foreach {
case (slaveId, tasks) =>
// add the cores reserved for each Mesos executor (one per slave)
totalCoresAcquired += getResource(tasks.get(0).getExecutor.getResourcesList, "cpus")

slaveIdToWorkerOffer.get(slaveId).foreach(o =>
listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId,
// TODO: Add support for log urls for Mesos
new ExecutorInfo(o.host, o.cores, Map.empty))))
logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}")
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
}

// Decline offers that weren't used
Expand Down Expand Up @@ -339,6 +380,8 @@ private[spark] class MesosSchedulerBackend(
}
if (TaskState.isFinished(state)) {
taskIdToSlaveId.remove(tid)
// here we assume that any Mesos task was allocated CPUS_PER_TASK
totalCoresAcquired -= scheduler.CPUS_PER_TASK
}
}
scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
Expand Down Expand Up @@ -370,7 +413,12 @@ private[spark] class MesosSchedulerBackend(
private def removeExecutor(slaveId: String, reason: String) = {
synchronized {
listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), slaveId, reason))
slaveIdToExecutorInfo -= slaveId
val executorInfo = slaveIdToExecutorInfo.remove(slaveId)
// in case we had an executor there, we need to update the total cores
executorInfo.map { info =>
// we could use mesosExecutorCores but this way we are sure we're counting correctly
totalCoresAcquired -= getResource(info.getResourcesList, "cpus")
}
}
}

Expand All @@ -386,18 +434,19 @@ private[spark] class MesosSchedulerBackend(
recordSlaveLost(d, slaveId, SlaveLost())
}

override def executorLost(d: SchedulerDriver, executorId: ExecutorID,
slaveId: SlaveID, status: Int) {
override def executorLost(
d: SchedulerDriver,
executorId: ExecutorID,
slaveId: SlaveID, status: Int) {
logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
slaveId.getValue))
slaveId.getValue))
recordSlaveLost(d, slaveId, ExecutorExited(status))
}

override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
mesosDriver.killTask(
TaskID.newBuilder()
.setValue(taskId.toString).build()
)
.setValue(taskId.toString).build())
}

// TODO: query Mesos for number of cores
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.mesos.protobuf.{ByteString, GeneratedMessage}
import org.apache.spark.{SparkException, SparkConf, Logging, SparkContext}
import org.apache.spark.util.Utils


/**
* Shared trait for implementing a Mesos Scheduler. This holds common state and helper
* methods and Mesos scheduler will use.
Expand Down Expand Up @@ -158,6 +157,12 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
/**
* Partition the existing set of resources into two groups, those remaining to be
* scheduled and those requested to be used for a new task.
*
* @note This method assumes there are enough resources to fulfill the request. In case
* there aren't it will return partial results. For instance, if amountToUse is
* 2 cpus, but only 1 is available, it will return a used `Resource` for
* 1 cpu.
*
* @param resources The full list of available resources
* @param resourceName The name of the resource to take from the available resources
* @param amountToUse The amount of resources to take from the available resources
Expand Down Expand Up @@ -197,7 +202,6 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
(attr.getName, attr.getText.getValue.split(',').toSet)
}


/** Build a Mesos resource protobuf object */
protected def createResource(resourceName: String, quantity: Double): Protos.Resource = {
Resource.newBuilder()
Expand Down Expand Up @@ -225,7 +229,6 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
}).toMap
}


/**
* Match the requirements (if any) to the offer attributes.
* if attribute requirements are not specified - return true
Expand Down
Loading