Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,36 @@ See the [configuration page](configuration.html) for information on Spark config
in the history server.
</td>
</tr>
<tr>
<td><code>spark.mesos.dispatcher.queue</code></td>
<td><code>(none)</code></td>
<td>
Set the name of the dispatcher queue to which the application is submitted.
The specified queue must be added to the dispatcher with <code>spark.mesos.dispatcher.queue.[QueueName]</code>.
If no queue is specified, then the application is submitted to the "default" queue with 0.0 priority.
</td>
</tr>
<tr>
<td><code>spark.mesos.dispatcher.queue.[QueueName]</code></td>
<td><code>0.0</code></td>
<td>
Add a new queue for submitted drivers with the specified priority.
Higher numbers indicate higher priority.
The user can specify multiple queues to define a workload management policy for queued drivers in the dispatcher.
A driver can then be submitted to a specific queue with <code>spark.mesos.dispatcher.queue</code>.
By default, the dispatcher has a single queue with 0.0 priority (cannot be overridden).
It is possible to implement a consistent and overall workload management policy throughout the lifecycle of drivers
by mapping priority queues to weighted Mesos roles, and by specifying a
<code>spark.mesos.role</code> along with a <code>spark.mesos.dispatcher.queue</code> when submitting an application.
For example, with the URGENT Mesos role:
<pre>
spark.mesos.dispatcher.queue.URGENT=1.0

spark.mesos.dispatcher.queue=URGENT
spark.mesos.role=URGENT
</pre>
</td>
</tr>
<tr>
<td><code>spark.mesos.gpus.max</code></td>
<td><code>0</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@ package object config {
.stringConf
.createOptional

private[spark] val DISPATCHER_QUEUE =
ConfigBuilder("spark.mesos.dispatcher.queue")
.doc("Set the name of the dispatcher queue to which the application is submitted. " +
"The specified queue must be added to the dispatcher " +
"with \"spark.mesos.dispatcher.queue.[QueueName]\". If no queue is specified, then " +
"the application is submitted to the \"default\" queue with 0.0 priority.")
.stringConf
.createOptional

private[spark] val DEFAULT_QUEUE = "default"

private[spark] val DRIVER_LABELS =
ConfigBuilder("spark.mesos.driver.labels")
.doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node

import org.apache.spark.deploy.Command
import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.deploy.mesos.{config, MesosDriverDescription}
import org.apache.spark.scheduler.cluster.mesos.{MesosClusterRetryState, MesosClusterSubmissionState}
import org.apache.spark.ui.{UIUtils, WebUIPage}

Expand Down Expand Up @@ -154,6 +154,9 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver")
<tr>
<td>Memory</td><td>{driver.mem}</td>
</tr>
<tr>
<td>Queue</td><td>{driver.conf.get("spark.mesos.dispatcher.queue", config.DEFAULT_QUEUE)}</td>
</tr>
<tr>
<td>Submitted</td><td>{UIUtils.formatDate(driver.submissionDate)}</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.xml.Node

import org.apache.mesos.Protos.TaskStatus

import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.deploy.mesos.{config, MesosDriverDescription}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState
import org.apache.spark.ui.{UIUtils, WebUIPage}
Expand All @@ -36,7 +36,7 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(

val driverHeader = Seq("Driver ID")
val historyHeader = historyServerURL.map(url => Seq("History")).getOrElse(Nil)
val submissionHeader = Seq("Submit Date", "Main Class", "Driver Resources")
val submissionHeader = Seq("Queue", "Submit Date", "Main Class", "Driver Resources")
val sandboxHeader = Seq("Sandbox")

val queuedHeaders = driverHeader ++ submissionHeader
Expand Down Expand Up @@ -69,6 +69,7 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
val id = submission.submissionId
<tr>
<td><a href={s"driver?id=$id"}>{id}</a></td>
<td>{submission.conf.get("spark.mesos.dispatcher.queue", config.DEFAULT_QUEUE)}</td>
<td>{UIUtils.formatDate(submission.submissionDate)}</td>
<td>{submission.command.mainClass}</td>
<td>cpus: {submission.cores}, mem: {submission.mem}</td>
Expand Down Expand Up @@ -99,6 +100,9 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
<tr>
<td><a href={s"driver?id=$id"}>{id}</a></td>
{historyCol}
<td>
{state.driverDescription.conf.get("spark.mesos.dispatcher.queue", config.DEFAULT_QUEUE)}
</td>
<td>{UIUtils.formatDate(state.driverDescription.submissionDate)}</td>
<td>{state.driverDescription.command.mainClass}</td>
<td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File
import java.util.{Collections, Date, List => JList}

import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -131,6 +132,8 @@ private[spark] class MesosClusterScheduler(
private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200)
private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute
private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", false)
private val queues: immutable.Map[String, Float] =
conf.getAllWithPrefix("spark.mesos.dispatcher.queue.").toMap.mapValues(_.toFloat)
private val schedulerState = engineFactory.createEngine("scheduler")
private val stateLock = new Object()
// Keyed by submission id
Expand All @@ -144,7 +147,19 @@ private[spark] class MesosClusterScheduler(
// state of the tasks from Mesos. Keyed by task Id.
private val pendingRecover = new mutable.HashMap[String, SlaveID]()
// Stores all the submitted drivers that hasn't been launched, keyed by submission id
private val queuedDrivers = new ArrayBuffer[MesosDriverDescription]()
// and sorted by priority, then by submission date
private val driverOrdering = new Ordering[MesosDriverDescription] {
override def compare(x: MesosDriverDescription, y: MesosDriverDescription): Int = {
val xp = getDriverPriority(x)
val yp = getDriverPriority(y)
if (xp != yp) {
xp compare yp
} else {
y.submissionDate.compareTo(x.submissionDate)
}
}
}
private val queuedDrivers = new mutable.TreeSet[MesosDriverDescription]()(driverOrdering.reverse)
// All supervised drivers that are waiting to retry after termination, keyed by submission id
private val pendingRetryDrivers = new ArrayBuffer[MesosDriverDescription]()
private val queuedDriversState = engineFactory.createEngine("driverQueue")
Expand Down Expand Up @@ -375,6 +390,15 @@ private[spark] class MesosClusterScheduler(
s"${frameworkId}-${desc.submissionId}${retries}"
}

private[mesos] def getDriverPriority(desc: MesosDriverDescription): Float = {
val queueName = desc.conf.get("spark.mesos.dispatcher.queue", config.DEFAULT_QUEUE)
if (queueName != config.DEFAULT_QUEUE) {
queues.getOrElse(queueName, throw new NoSuchElementException(queueName))
} else {
0.0f
}
}

private def getDriverTaskId(desc: MesosDriverDescription): String = {
val sId = desc.submissionId
desc.retryState.map(state => sId + s"${RETRY_SEP}${state.retries.toString}").getOrElse(sId)
Expand Down Expand Up @@ -689,7 +713,7 @@ private[spark] class MesosClusterScheduler(
}

private def copyBuffer(
buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = {
buffer: TraversableOnce[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = {
val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
buffer.copyToBuffer(newBuffer)
newBuffer
Expand Down Expand Up @@ -792,13 +816,14 @@ private[spark] class MesosClusterScheduler(
status: Int): Unit = {}

private def removeFromQueuedDrivers(subId: String): Boolean = {
val index = queuedDrivers.indexWhere(_.submissionId.equals(subId))
if (index != -1) {
queuedDrivers.remove(index)

val matchOption = queuedDrivers.find(_.submissionId.equals(subId))
if (matchOption.isEmpty) {
false
} else {
queuedDrivers.remove(matchOption.get)
queuedDriversState.expunge(subId)
true
} else {
false
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,126 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
Utils.verifyFileBasedValueSecrets(launchedTasks)
}

test("Get driver priority") {
val conf = new SparkConf()
conf.set("spark.mesos.dispatcher.queue.ROUTINE", "1.0")
conf.set("spark.mesos.dispatcher.queue.URGENT", "2.0")
conf.set("spark.mesos.dispatcher.queue.EXCEPTIONAL", "3.0")
setScheduler(conf.getAll.toMap)

val mem = 1000
val cpu = 1

// Test queue not declared in scheduler
var desc = new MesosDriverDescription("d1", "jar", mem, cpu, true,
command,
Map("spark.mesos.dispatcher.queue" -> "dummy"),
"s1",
new Date())

assertThrows[NoSuchElementException] {
scheduler.getDriverPriority(desc)
}

// Test with no specified queue
desc = new MesosDriverDescription("d1", "jar", mem, cpu, true,
command,
Map[String, String](),
"s2",
new Date())

assert(scheduler.getDriverPriority(desc) == 0.0f)

// Test with "default" queue specified
desc = new MesosDriverDescription("d1", "jar", mem, cpu, true,
command,
Map("spark.mesos.dispatcher.queue" -> "default"),
"s3",
new Date())

assert(scheduler.getDriverPriority(desc) == 0.0f)

// Test queue declared in scheduler
desc = new MesosDriverDescription("d1", "jar", mem, cpu, true,
command,
Map("spark.mesos.dispatcher.queue" -> "ROUTINE"),
"s4",
new Date())

assert(scheduler.getDriverPriority(desc) == 1.0f)

// Test other queue declared in scheduler
desc = new MesosDriverDescription("d1", "jar", mem, cpu, true,
command,
Map("spark.mesos.dispatcher.queue" -> "URGENT"),
"s5",
new Date())

assert(scheduler.getDriverPriority(desc) == 2.0f)
}

test("Can queue drivers with priority") {
val conf = new SparkConf()
conf.set("spark.mesos.dispatcher.queue.ROUTINE", "1.0")
conf.set("spark.mesos.dispatcher.queue.URGENT", "2.0")
conf.set("spark.mesos.dispatcher.queue.EXCEPTIONAL", "3.0")
setScheduler(conf.getAll.toMap)

val mem = 1000
val cpu = 1

val response0 = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
Map("spark.mesos.dispatcher.queue" -> "ROUTINE"), "s0", new Date()))
assert(response0.success)

val response1 = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
Map[String, String](), "s1", new Date()))
assert(response1.success)

val response2 = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
Map("spark.mesos.dispatcher.queue" -> "EXCEPTIONAL"), "s2", new Date()))
assert(response2.success)

val response3 = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
Map("spark.mesos.dispatcher.queue" -> "URGENT"), "s3", new Date()))
assert(response3.success)

val state = scheduler.getSchedulerState()
val queuedDrivers = state.queuedDrivers.toList
assert(queuedDrivers(0).submissionId == response2.submissionId)
assert(queuedDrivers(1).submissionId == response3.submissionId)
assert(queuedDrivers(2).submissionId == response0.submissionId)
assert(queuedDrivers(3).submissionId == response1.submissionId)
}

test("Can queue drivers with negative priority") {
val conf = new SparkConf()
conf.set("spark.mesos.dispatcher.queue.LOWER", "-1.0")
setScheduler(conf.getAll.toMap)

val mem = 1000
val cpu = 1

val response0 = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
Map("spark.mesos.dispatcher.queue" -> "LOWER"), "s0", new Date()))
assert(response0.success)

val response1 = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
Map[String, String](), "s1", new Date()))
assert(response1.success)

val state = scheduler.getSchedulerState()
val queuedDrivers = state.queuedDrivers.toList
assert(queuedDrivers(0).submissionId == response1.submissionId)
assert(queuedDrivers(1).submissionId == response0.submissionId)
}

private def launchDriverTask(addlSparkConfVars: Map[String, String]): List[TaskInfo] = {
setScheduler()
val mem = 1000
Expand Down