Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,13 @@ import scala.collection.mutable.{HashMap, HashSet}
import com.google.common.collect.HashBiMap
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver}

import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException, TaskState}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcAddress
import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException, TaskState}

/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
Expand All @@ -60,6 +59,11 @@ private[spark] class CoarseMesosSchedulerBackend(
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt

// Minimum MB memory per core. Will reduce cores to meet this
val minMBPerCore = conf.getDouble("spark.cores.mb.min", 0.0)
// Maximum MB memory per core. Will truncate memory request to this limit
val maxMBPerCore = conf.getDouble("spark.cores.mb.max", Double.MaxValue)

// If shuffle service is enabled, the Spark driver will register with the shuffle service.
// This is for cleaning up shuffle files reliably.
private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
Expand Down Expand Up @@ -254,14 +258,18 @@ private[spark] class CoarseMesosSchedulerBackend(
val cpus = getResource(offer.getResourcesList, "cpus").toInt
val id = offer.getId.getValue
if (meetsConstraints) {
// Returns NONE if offer isn't sufficient
val maybeUsableResources = calculateDesiredResources(
sc, math.min(cpus, maxCores - totalCoresAcquired), mem.toInt
)
if (taskIdToSlaveId.size < executorLimit &&
totalCoresAcquired < maxCores &&
mem >= calculateTotalMemory(sc) &&
cpus >= 1 &&
maybeUsableResources.isDefined &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
// Launch an executor on the slave
val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
val cpusToUse = maybeUsableResources.get._1
val memToUse = maybeUsableResources.get._2
totalCoresAcquired += cpusToUse
val taskId = newMesosTaskId()
taskIdToSlaveId.put(taskId, slaveId)
Expand All @@ -271,7 +279,7 @@ private[spark] class CoarseMesosSchedulerBackend(
val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.getResourcesList, "cpus", cpusToUse)
val (_, memResourcesToUse) =
partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc))
partitionResources(remainingResources.asJava, "mem", memToUse)
val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
Expand Down Expand Up @@ -399,6 +407,55 @@ private[spark] class CoarseMesosSchedulerBackend(
s"$slaveId/$taskId"
}

/**
* Try and fit the resources to the constraints
* @param sc Spark context
* @param availableCpus The available CPUs
* @param availableMem The available Memory
* @return Tuple of CPU (integer cores) and Memory (integer MB) desired
*/
def calculateDesiredResources(sc: SparkContext, availableCpus: Int, availableMem: Int):
Option[(Int, Int)] =
{
if (availableCpus <= 0) {
// Don't want divide by 0 error
logTrace(s"No CPUs")
return None
}
val desiredMemory = super.calculateTotalMemory(sc)
val minMB = math.max(desiredMemory, minMBPerCore).toInt
val mbCoreRatio = availableMem.toDouble / availableCpus.toDouble

if (availableMem < minMB) {
logTrace(s"Offer of $availableMem has insufficient memory. Needs at least $minMB")
None
} else if (mbCoreRatio > maxMBPerCore) {
// Too high? lower memory
val desiredCPUMem = (maxMBPerCore * availableCpus).toInt
if(desiredCPUMem >= minMB) {
logTrace(s"Too much memory available. Truncating to $desiredCPUMem")
Option((availableCpus, desiredCPUMem))
} else {
logTrace(s"Desired memory from number of cores $availableCpus " +
s"insufficient to meet minimum MB required $minMB")
None
}
} else if (mbCoreRatio < minMBPerCore.toDouble) {
// Too low? lower CPU
val desiredCpus = (availableMem.toDouble / minMBPerCore.toDouble).toInt
if(desiredCpus <= 0) {
logTrace(s"Weird double rounding problem. memory/cpu ratio too close to threshold")
None
} else {
logTrace(s"Not enough memory per cpu, lowering cpu count to $desiredCpus")
Option((desiredCpus, availableMem))
}
} else {
logTrace(s"Cpu and memory just right")
Option((availableCpus, availableMem))
}
}

override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = {
logInfo(s"Mesos slave lost: ${slaveId.getValue}")
executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@ import java.util.Collections
import org.apache.mesos.Protos.Value.Scalar
import org.apache.mesos.Protos._
import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
import org.mockito.Matchers
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.mockito.Matchers
import org.scalatest.mock.MockitoSugar
import org.scalatest.BeforeAndAfter

import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SecurityManager, SparkFunSuite}
import org.scalatest.mock.MockitoSugar

class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
with LocalSparkContext
Expand Down Expand Up @@ -77,6 +76,18 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
backend
}

private def buildLimitedBackend() = {
sc.conf.set("spark.cores.mb.min", "10000")
sc.conf.set("spark.cores.mb.max", "100000")
val driver = mock[SchedulerDriver]
val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
val backend = createSchedulerBackend(taskScheduler, driver)
assert(backend.maxMBPerCore == 100000.0)
assert(backend.minMBPerCore == 10000.0)
backend
}

var sparkConf: SparkConf = _

before {
Expand All @@ -88,6 +99,98 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
sc = new SparkContext(sparkConf)
}

test("coarse mesos backend correctly keep sufficient offer") {
assert(
buildLimitedBackend().calculateDesiredResources(sc, 1, 80000)
.count(x => x._1 == 1 && x._2 == 80000) == 1
)
}
test("coarse mesos backend correctly ignores insufficient offer") {
val backend = buildLimitedBackend()
assert(backend.calculateDesiredResources(sc, 1, 800).isEmpty)
}

test("coarse mesos backend correctly truncates CPU when too high") {
assert(
buildLimitedBackend().calculateDesiredResources(sc, 10, 80000)
.count(x => x._1 == 8 &&x._2 == 80000) == 1
)
}

test("coarse mesos backend correctly truncates MEM when too high") {
assert(
buildLimitedBackend().calculateDesiredResources(sc, 1, 800000)
.count(x => x._1 == 1 && x._2 == 100000) == 1
)
}

test("coarse mesos backend correctly handles zero cpu") {
assert(buildLimitedBackend().calculateDesiredResources(sc, 0, 800000).isEmpty)
}

test("coarse mesos backend correctly handles zero mem") {
assert(buildLimitedBackend().calculateDesiredResources(sc, 1, 0).isEmpty)
}

test("coarse mesos backend correctly handles zero everything") {
assert(buildLimitedBackend().calculateDesiredResources(sc, 0, 0).isEmpty)
}

test("coarse mesos backend correctly handles default mem limit") {
sc.conf.set("spark.cores.mb.min", "1")
val driver = mock[SchedulerDriver]
val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
val backend = createSchedulerBackend(taskScheduler, driver)
assert(backend.maxMBPerCore == Double.MaxValue)
assert(backend.minMBPerCore == 1.0)
val minimumMem = backend.calculateTotalMemory(sc)
val minimumOffer = backend.calculateDesiredResources(sc, 1, minimumMem)
assert(minimumOffer.isDefined)
assert(minimumOffer.get._1 == 1)
assert(minimumOffer.get._2 == minimumMem)
assert(
backend.calculateDesiredResources(sc, 10, minimumMem)
.count(x => x._1 == 10 && x._2 == minimumMem) == 1
)
assert(
backend.calculateDesiredResources(sc, minimumMem, minimumMem)
.count(x => x._1 == minimumMem && x._2 == minimumMem) == 1
)
assert(
backend.calculateDesiredResources(sc, minimumMem + 1, minimumMem)
.count(x => x._1 == minimumMem && x._2 == minimumMem) == 1
)
assert(backend.calculateDesiredResources(sc, 1, minimumMem - 1).isEmpty)
}

test("coarse mesos backend correctly handles unset mem limit") {
val driver = mock[SchedulerDriver]
val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
val backend = createSchedulerBackend(taskScheduler, driver)
assert(backend.maxMBPerCore == Double.MaxValue)
assert(backend.minMBPerCore == 0.0)
assert(backend.calculateDesiredResources(sc, 1, 1).isEmpty)
assert(
backend.calculateDesiredResources(sc, 1, 10000)
.count(x => x._1 == 1 && x._2 == 10000) == 1
)
assert(
backend.calculateDesiredResources(sc, 1, backend.calculateTotalMemory(sc))
.count(x => x._1 == 1 && x._2 == backend.calculateTotalMemory(sc)) == 1
)
assert(
backend.calculateDesiredResources(sc, 1, Integer.MAX_VALUE)
.count(x => x._1 == 1 && x._2 == Integer.MAX_VALUE) == 1
)
assert(
backend.calculateDesiredResources(sc, Integer.MAX_VALUE, 10000)
.count(x => x._1 == Integer.MAX_VALUE && x._2 == 10000) == 1
)
}


test("mesos supports killing and limiting executors") {
val driver = mock[SchedulerDriver]
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
Expand Down