Skip to content

Commit 9ab0f82

Browse files
Pascal Gilletpgillet
authored andcommitted
[SPARK-23499][MESOS] Support for priority queues in Mesos scheduler
### What changes were proposed in this pull request? I push this PR as I could not re-open the stale one #20665 . As for Yarn or Kubernetes, Mesos users should be able to specify priority queues to define a workload management policy for queued drivers in the Mesos Cluster Dispatcher. This would ensure scheduling order while enqueuing Spark applications for a Mesos cluster. ### Why are the changes needed? Currently, submitted drivers are kept in order of their submission: the first driver added to the queue will be the first one to be executed (FIFO), regardless of their priority. See https://issues.apache.org/jira/projects/SPARK/issues/SPARK-23499 for more details. ### Does this PR introduce _any_ user-facing change? The MesosClusterDispatcher UI shows now Spark jobs along with the queue to which they are submitted. ### How was this patch tested? Unit tests. Also, this feature has been in production for 3 years now as we use a modified Spark 2.4.0 since then. Closes #30352 from pgillet/mesos-scheduler-priority-queue. Lead-authored-by: Pascal Gillet <[email protected]> Co-authored-by: pgillet <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent b5eca18 commit 9ab0f82

File tree

6 files changed

+222
-10
lines changed

6 files changed

+222
-10
lines changed

docs/running-on-mesos.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -734,6 +734,38 @@ See the [configuration page](configuration.html) for information on Spark config
734734
</td>
735735
<td>2.1.0</td>
736736
</tr>
737+
<tr>
738+
<td><code>spark.mesos.dispatcher.queue</code></td>
739+
<td><code>(none)</code></td>
740+
<td>
741+
Set the name of the dispatcher queue to which the application is submitted.
742+
The specified queue must be added to the dispatcher with <code>spark.mesos.dispatcher.queue.[QueueName]</code>.
743+
If no queue is specified, then the application is submitted to the "default" queue with 0.0 priority.
744+
</td>
745+
<td>3.1.0</td>
746+
</tr>
747+
<tr>
748+
<td><code>spark.mesos.dispatcher.queue.[QueueName]</code></td>
749+
<td><code>0.0</code></td>
750+
<td>
751+
Add a new queue for submitted drivers with the specified priority.
752+
Higher numbers indicate higher priority.
753+
The user can specify multiple queues to define a workload management policy for queued drivers in the dispatcher.
754+
A driver can then be submitted to a specific queue with <code>spark.mesos.dispatcher.queue</code>.
755+
By default, the dispatcher has a single queue with 0.0 priority (cannot be overridden).
756+
It is possible to implement a consistent and overall workload management policy throughout the lifecycle of drivers
757+
by mapping priority queues to weighted Mesos roles, and by specifying a
758+
<code>spark.mesos.role</code> along with a <code>spark.mesos.dispatcher.queue</code> when submitting an application.
759+
For example, with the URGENT Mesos role:
760+
<pre>
761+
spark.mesos.dispatcher.queue.URGENT=1.0
762+
763+
spark.mesos.dispatcher.queue=URGENT
764+
spark.mesos.role=URGENT
765+
</pre>
766+
</td>
767+
<td>3.1.0</td>
768+
</tr>
737769
<tr>
738770
<td><code>spark.mesos.gpus.max</code></td>
739771
<td><code>0</code></td>

resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,16 @@ package object config {
113113
.stringConf
114114
.createOptional
115115

116+
private[spark] val DISPATCHER_QUEUE =
117+
ConfigBuilder("spark.mesos.dispatcher.queue")
118+
.doc("Set the name of the dispatcher queue to which the application is submitted. " +
119+
"The specified queue must be added to the dispatcher " +
120+
"with \"spark.mesos.dispatcher.queue.[QueueName]\". If no queue is specified, then " +
121+
"the application is submitted to the \"default\" queue with 0.0 priority.")
122+
.version("3.1.0")
123+
.stringConf
124+
.createWithDefaultString("default")
125+
116126
private[spark] val DRIVER_LABELS =
117127
ConfigBuilder("spark.mesos.driver.labels")
118128
.doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value " +

resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import javax.servlet.http.HttpServletRequest
2222
import scala.xml.Node
2323

2424
import org.apache.spark.deploy.Command
25-
import org.apache.spark.deploy.mesos.MesosDriverDescription
25+
import org.apache.spark.deploy.mesos.{config, MesosDriverDescription}
2626
import org.apache.spark.scheduler.cluster.mesos.{MesosClusterRetryState, MesosClusterSubmissionState}
2727
import org.apache.spark.ui.{UIUtils, WebUIPage}
2828

@@ -153,6 +153,13 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver")
153153
<tr>
154154
<td>Memory</td><td>{driver.mem}</td>
155155
</tr>
156+
<tr>
157+
<td>Queue</td>
158+
<td>
159+
{driver.conf.get(
160+
"spark.mesos.dispatcher.queue", config.DISPATCHER_QUEUE.defaultValueString)}
161+
</td>
162+
</tr>
156163
<tr>
157164
<td>Submitted</td><td>{UIUtils.formatDate(driver.submissionDate)}</td>
158165
</tr>

resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.xml.Node
2323

2424
import org.apache.mesos.Protos.TaskStatus
2525

26-
import org.apache.spark.deploy.mesos.MesosDriverDescription
26+
import org.apache.spark.deploy.mesos.{config, MesosDriverDescription}
2727
import org.apache.spark.deploy.mesos.config._
2828
import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState
2929
import org.apache.spark.ui.{UIUtils, WebUIPage}
@@ -36,7 +36,7 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
3636

3737
val driverHeader = Seq("Driver ID")
3838
val historyHeader = historyServerURL.map(url => Seq("History")).getOrElse(Nil)
39-
val submissionHeader = Seq("Submit Date", "Main Class", "Driver Resources")
39+
val submissionHeader = Seq("Queue", "Submit Date", "Main Class", "Driver Resources")
4040
val sandboxHeader = Seq("Sandbox")
4141

4242
val queuedHeaders = driverHeader ++ submissionHeader
@@ -69,6 +69,10 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
6969
val id = submission.submissionId
7070
<tr>
7171
<td><a href={s"driver?id=$id"}>{id}</a></td>
72+
<td>
73+
{submission.conf.get(
74+
"spark.mesos.dispatcher.queue", config.DISPATCHER_QUEUE.defaultValueString)}
75+
</td>
7276
<td>{UIUtils.formatDate(submission.submissionDate)}</td>
7377
<td>{submission.command.mainClass}</td>
7478
<td>cpus: {submission.cores}, mem: {submission.mem}</td>
@@ -99,6 +103,10 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
99103
<tr>
100104
<td><a href={s"driver?id=$id"}>{id}</a></td>
101105
{historyCol}
106+
<td>
107+
{state.driverDescription.conf.get(
108+
"spark.mesos.dispatcher.queue", config.DISPATCHER_QUEUE.defaultValueString)}
109+
</td>
102110
<td>{UIUtils.formatDate(state.driverDescription.submissionDate)}</td>
103111
<td>{state.driverDescription.command.mainClass}</td>
104112
<td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td>

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.File
2121
import java.util.{Collections, Date, List => JList}
2222

2323
import scala.collection.JavaConverters._
24+
import scala.collection.immutable
2425
import scala.collection.mutable
2526
import scala.collection.mutable.ArrayBuffer
2627

@@ -131,6 +132,8 @@ private[spark] class MesosClusterScheduler(
131132
private val queuedCapacity = conf.get(config.MAX_DRIVERS)
132133
private val retainedDrivers = conf.get(config.RETAINED_DRIVERS)
133134
private val maxRetryWaitTime = conf.get(config.CLUSTER_RETRY_WAIT_MAX_SECONDS)
135+
private val queues: immutable.Map[String, Float] =
136+
conf.getAllWithPrefix("spark.mesos.dispatcher.queue.").map(t => (t._1, t._2.toFloat)).toMap
134137
private val schedulerState = engineFactory.createEngine("scheduler")
135138
private val stateLock = new Object()
136139
// Keyed by submission id
@@ -144,7 +147,19 @@ private[spark] class MesosClusterScheduler(
144147
// state of the tasks from Mesos. Keyed by task Id.
145148
private val pendingRecover = new mutable.HashMap[String, AgentID]()
146149
// Stores all the submitted drivers that hasn't been launched, keyed by submission id
147-
private val queuedDrivers = new ArrayBuffer[MesosDriverDescription]()
150+
// and sorted by priority, then by submission date
151+
private val driverOrdering = new Ordering[MesosDriverDescription] {
152+
override def compare(x: MesosDriverDescription, y: MesosDriverDescription): Int = {
153+
val xp = getDriverPriority(x)
154+
val yp = getDriverPriority(y)
155+
if (xp != yp) {
156+
xp compare yp
157+
} else {
158+
y.submissionDate.compareTo(x.submissionDate)
159+
}
160+
}
161+
}
162+
private val queuedDrivers = new mutable.TreeSet[MesosDriverDescription]()(driverOrdering.reverse)
148163
// All supervised drivers that are waiting to retry after termination, keyed by submission id
149164
private val pendingRetryDrivers = new ArrayBuffer[MesosDriverDescription]()
150165
private val queuedDriversState = engineFactory.createEngine("driverQueue")
@@ -374,6 +389,16 @@ private[spark] class MesosClusterScheduler(
374389
s"${frameworkId}-${desc.submissionId}${retries}"
375390
}
376391

392+
private[mesos] def getDriverPriority(desc: MesosDriverDescription): Float = {
393+
val defaultQueueName = config.DISPATCHER_QUEUE.defaultValueString
394+
val queueName = desc.conf.get("spark.mesos.dispatcher.queue", defaultQueueName)
395+
if (queueName != defaultQueueName) {
396+
queues.getOrElse(queueName, throw new NoSuchElementException(queueName))
397+
} else {
398+
0.0f
399+
}
400+
}
401+
377402
private def getDriverTaskId(desc: MesosDriverDescription): String = {
378403
val sId = desc.submissionId
379404
desc.retryState.map(state => sId + s"${RETRY_SEP}${state.retries.toString}").getOrElse(sId)
@@ -710,7 +735,7 @@ private[spark] class MesosClusterScheduler(
710735
}
711736

712737
private def copyBuffer(
713-
buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = {
738+
buffer: TraversableOnce[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = {
714739
val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
715740
buffer.copyToBuffer(newBuffer)
716741
newBuffer
@@ -827,13 +852,13 @@ private[spark] class MesosClusterScheduler(
827852
status: Int): Unit = {}
828853

829854
private def removeFromQueuedDrivers(subId: String): Boolean = {
830-
val index = queuedDrivers.indexWhere(_.submissionId == subId)
831-
if (index != -1) {
832-
queuedDrivers.remove(index)
855+
val matchOption = queuedDrivers.find(_.submissionId == subId)
856+
if (matchOption.isEmpty) {
857+
false
858+
} else {
859+
queuedDrivers.remove(matchOption.get)
833860
queuedDriversState.expunge(subId)
834861
true
835-
} else {
836-
false
837862
}
838863
}
839864

resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,136 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
603603
assert(scheduler.getDriverCommandValue(driverDesc) == expectedCmd)
604604
}
605605

606+
test("SPARK-23499: Test dispatcher priority queue with non float value") {
607+
val conf = new SparkConf()
608+
conf.set("spark.mesos.dispatcher.queue.ROUTINE", "1.0")
609+
conf.set("spark.mesos.dispatcher.queue.URGENT", "abc")
610+
conf.set("spark.mesos.dispatcher.queue.EXCEPTIONAL", "3.0")
611+
assertThrows[NumberFormatException] {
612+
setScheduler(conf.getAll.toMap)
613+
}
614+
}
615+
616+
test("SPARK-23499: Get driver priority") {
617+
val conf = new SparkConf()
618+
conf.set("spark.mesos.dispatcher.queue.ROUTINE", "1.0")
619+
conf.set("spark.mesos.dispatcher.queue.URGENT", "2.0")
620+
conf.set("spark.mesos.dispatcher.queue.EXCEPTIONAL", "3.0")
621+
setScheduler(conf.getAll.toMap)
622+
623+
val mem = 1000
624+
val cpu = 1
625+
626+
// Test queue not declared in scheduler
627+
var desc = new MesosDriverDescription("d1", "jar", mem, cpu, true,
628+
command,
629+
Map("spark.mesos.dispatcher.queue" -> "dummy"),
630+
"s1",
631+
new Date())
632+
633+
assertThrows[NoSuchElementException] {
634+
scheduler.getDriverPriority(desc)
635+
}
636+
637+
// Test with no specified queue
638+
desc = new MesosDriverDescription("d1", "jar", mem, cpu, true,
639+
command,
640+
Map[String, String](),
641+
"s2",
642+
new Date())
643+
644+
assert(scheduler.getDriverPriority(desc) == 0.0f)
645+
646+
// Test with "default" queue specified
647+
desc = new MesosDriverDescription("d1", "jar", mem, cpu, true,
648+
command,
649+
Map("spark.mesos.dispatcher.queue" -> "default"),
650+
"s3",
651+
new Date())
652+
653+
assert(scheduler.getDriverPriority(desc) == 0.0f)
654+
655+
// Test queue declared in scheduler
656+
desc = new MesosDriverDescription("d1", "jar", mem, cpu, true,
657+
command,
658+
Map("spark.mesos.dispatcher.queue" -> "ROUTINE"),
659+
"s4",
660+
new Date())
661+
662+
assert(scheduler.getDriverPriority(desc) == 1.0f)
663+
664+
// Test other queue declared in scheduler
665+
desc = new MesosDriverDescription("d1", "jar", mem, cpu, true,
666+
command,
667+
Map("spark.mesos.dispatcher.queue" -> "URGENT"),
668+
"s5",
669+
new Date())
670+
671+
assert(scheduler.getDriverPriority(desc) == 2.0f)
672+
}
673+
674+
test("SPARK-23499: Can queue drivers with priority") {
675+
val conf = new SparkConf()
676+
conf.set("spark.mesos.dispatcher.queue.ROUTINE", "1.0")
677+
conf.set("spark.mesos.dispatcher.queue.URGENT", "2.0")
678+
conf.set("spark.mesos.dispatcher.queue.EXCEPTIONAL", "3.0")
679+
setScheduler(conf.getAll.toMap)
680+
681+
val mem = 1000
682+
val cpu = 1
683+
684+
val response0 = scheduler.submitDriver(
685+
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
686+
Map("spark.mesos.dispatcher.queue" -> "ROUTINE"), "s0", new Date()))
687+
assert(response0.success)
688+
689+
val response1 = scheduler.submitDriver(
690+
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
691+
Map[String, String](), "s1", new Date()))
692+
assert(response1.success)
693+
694+
val response2 = scheduler.submitDriver(
695+
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
696+
Map("spark.mesos.dispatcher.queue" -> "EXCEPTIONAL"), "s2", new Date()))
697+
assert(response2.success)
698+
699+
val response3 = scheduler.submitDriver(
700+
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
701+
Map("spark.mesos.dispatcher.queue" -> "URGENT"), "s3", new Date()))
702+
assert(response3.success)
703+
704+
val state = scheduler.getSchedulerState()
705+
val queuedDrivers = state.queuedDrivers.toList
706+
assert(queuedDrivers(0).submissionId == response2.submissionId)
707+
assert(queuedDrivers(1).submissionId == response3.submissionId)
708+
assert(queuedDrivers(2).submissionId == response0.submissionId)
709+
assert(queuedDrivers(3).submissionId == response1.submissionId)
710+
}
711+
712+
test("SPARK-23499: Can queue drivers with negative priority") {
713+
val conf = new SparkConf()
714+
conf.set("spark.mesos.dispatcher.queue.LOWER", "-1.0")
715+
setScheduler(conf.getAll.toMap)
716+
717+
val mem = 1000
718+
val cpu = 1
719+
720+
val response0 = scheduler.submitDriver(
721+
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
722+
Map("spark.mesos.dispatcher.queue" -> "LOWER"), "s0", new Date()))
723+
assert(response0.success)
724+
725+
val response1 = scheduler.submitDriver(
726+
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
727+
Map[String, String](), "s1", new Date()))
728+
assert(response1.success)
729+
730+
val state = scheduler.getSchedulerState()
731+
val queuedDrivers = state.queuedDrivers.toList
732+
assert(queuedDrivers(0).submissionId == response1.submissionId)
733+
assert(queuedDrivers(1).submissionId == response0.submissionId)
734+
}
735+
606736
private def launchDriverTask(addlSparkConfVars: Map[String, String]): List[TaskInfo] = {
607737
setScheduler()
608738
val mem = 1000

0 commit comments

Comments
 (0)