Skip to content

Commit b3f9dbf

Browse files
Paul MacklesFelix Cheung
authored andcommitted
[SPARK-19606][MESOS] Support constraints in spark-dispatcher
## What changes were proposed in this pull request? A discussed in SPARK-19606, the addition of a new config property named "spark.mesos.constraints.driver" for constraining drivers running on a Mesos cluster ## How was this patch tested? Corresponding unit test added also tested locally on a Mesos cluster Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Paul Mackles <[email protected]> Closes apache#19543 from pmackles/SPARK-19606.
1 parent 21a7bfd commit b3f9dbf

File tree

5 files changed

+100
-17
lines changed

5 files changed

+100
-17
lines changed

docs/running-on-mesos.md

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,10 @@ resource offers will be accepted.
263263
conf.set("spark.mesos.constraints", "os:centos7;us-east-1:false")
264264
{% endhighlight %}
265265

266-
For example, Let's say `spark.mesos.constraints` is set to `os:centos7;us-east-1:false`, then the resource offers will be checked to see if they meet both these constraints and only then will be accepted to start new executors.
266+
For example, Let's say `spark.mesos.constraints` is set to `os:centos7;us-east-1:false`, then the resource offers will
267+
be checked to see if they meet both these constraints and only then will be accepted to start new executors.
268+
269+
To constrain where driver tasks are run, use `spark.mesos.driver.constraints`
267270

268271
# Mesos Docker Support
269272

@@ -447,7 +450,9 @@ See the [configuration page](configuration.html) for information on Spark config
447450
<td><code>spark.mesos.constraints</code></td>
448451
<td>(none)</td>
449452
<td>
450-
Attribute based constraints on mesos resource offers. By default, all resource offers will be accepted. Refer to <a href="http://mesos.apache.org/documentation/attributes-resources/">Mesos Attributes & Resources</a> for more information on attributes.
453+
Attribute based constraints on mesos resource offers. By default, all resource offers will be accepted. This setting
454+
applies only to executors. Refer to <a href="http://mesos.apache.org/documentation/attributes-resources/">Mesos
455+
Attributes & Resources</a> for more information on attributes.
451456
<ul>
452457
<li>Scalar constraints are matched with "less than equal" semantics i.e. value in the constraint must be less than or equal to the value in the resource offer.</li>
453458
<li>Range constraints are matched with "contains" semantics i.e. value in the constraint must be within the resource offer's value.</li>
@@ -457,6 +462,14 @@ See the [configuration page](configuration.html) for information on Spark config
457462
</ul>
458463
</td>
459464
</tr>
465+
<tr>
466+
<td><code>spark.mesos.driver.constraints</code></td>
467+
<td>(none)</td>
468+
<td>
469+
Same as <code>spark.mesos.constraints</code> except applied to drivers when launched through the dispatcher. By default,
470+
all offers with sufficient resources will be accepted.
471+
</td>
472+
</tr>
460473
<tr>
461474
<td><code>spark.mesos.containerizer</code></td>
462475
<td><code>docker</code></td>

resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,4 +122,11 @@ package object config {
122122
"Example: key1:val1,key2:val2")
123123
.stringConf
124124
.createOptional
125+
126+
private[spark] val DRIVER_CONSTRAINTS =
127+
ConfigBuilder("spark.mesos.driver.constraints")
128+
.doc("Attribute based constraints on mesos resource offers. Applied by the dispatcher " +
129+
"when launching drivers. Default is to accept all offers with sufficient resources.")
130+
.stringConf
131+
.createWithDefault("")
125132
}

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -556,9 +556,10 @@ private[spark] class MesosClusterScheduler(
556556

557557
private class ResourceOffer(
558558
val offer: Offer,
559-
var remainingResources: JList[Resource]) {
559+
var remainingResources: JList[Resource],
560+
var attributes: JList[Attribute]) {
560561
override def toString(): String = {
561-
s"Offer id: ${offer.getId}, resources: ${remainingResources}"
562+
s"Offer id: ${offer.getId}, resources: ${remainingResources}, attributes: ${attributes}"
562563
}
563564
}
564565

@@ -601,10 +602,14 @@ private[spark] class MesosClusterScheduler(
601602
for (submission <- candidates) {
602603
val driverCpu = submission.cores
603604
val driverMem = submission.mem
604-
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
605+
val driverConstraints =
606+
parseConstraintString(submission.conf.get(config.DRIVER_CONSTRAINTS))
607+
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem, " +
608+
s"driverConstraints: $driverConstraints")
605609
val offerOption = currentOffers.find { offer =>
606610
getResource(offer.remainingResources, "cpus") >= driverCpu &&
607-
getResource(offer.remainingResources, "mem") >= driverMem
611+
getResource(offer.remainingResources, "mem") >= driverMem &&
612+
matchesAttributeRequirements(driverConstraints, toAttributeMap(offer.attributes))
608613
}
609614
if (offerOption.isEmpty) {
610615
logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " +
@@ -652,7 +657,7 @@ private[spark] class MesosClusterScheduler(
652657
val currentTime = new Date()
653658

654659
val currentOffers = offers.asScala.map {
655-
offer => new ResourceOffer(offer, offer.getResourcesList)
660+
offer => new ResourceOffer(offer, offer.getResourcesList, offer.getAttributesList)
656661
}.toList
657662

658663
stateLock.synchronized {

resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,53 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
254254
assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2")
255255
}
256256

257+
test("accept/decline offers with driver constraints") {
258+
setScheduler()
259+
260+
val mem = 1000
261+
val cpu = 1
262+
val s2Attributes = List(Utils.createTextAttribute("c1", "a"))
263+
val s3Attributes = List(
264+
Utils.createTextAttribute("c1", "a"),
265+
Utils.createTextAttribute("c2", "b"))
266+
val offers = List(
267+
Utils.createOffer("o1", "s1", mem, cpu, None, 0),
268+
Utils.createOffer("o2", "s2", mem, cpu, None, 0, s2Attributes),
269+
Utils.createOffer("o3", "s3", mem, cpu, None, 0, s3Attributes))
270+
271+
def submitDriver(driverConstraints: String): Unit = {
272+
val response = scheduler.submitDriver(
273+
new MesosDriverDescription("d1", "jar", mem, cpu, true,
274+
command,
275+
Map("spark.mesos.executor.home" -> "test",
276+
"spark.app.name" -> "test",
277+
config.DRIVER_CONSTRAINTS.key -> driverConstraints),
278+
"s1",
279+
new Date()))
280+
assert(response.success)
281+
}
282+
283+
submitDriver("c1:x")
284+
scheduler.resourceOffers(driver, offers.asJava)
285+
offers.foreach(o => Utils.verifyTaskNotLaunched(driver, o.getId.getValue))
286+
287+
submitDriver("c1:y;c2:z")
288+
scheduler.resourceOffers(driver, offers.asJava)
289+
offers.foreach(o => Utils.verifyTaskNotLaunched(driver, o.getId.getValue))
290+
291+
submitDriver("")
292+
scheduler.resourceOffers(driver, offers.asJava)
293+
Utils.verifyTaskLaunched(driver, "o1")
294+
295+
submitDriver("c1:a")
296+
scheduler.resourceOffers(driver, offers.asJava)
297+
Utils.verifyTaskLaunched(driver, "o2")
298+
299+
submitDriver("c1:a;c2:b")
300+
scheduler.resourceOffers(driver, offers.asJava)
301+
Utils.verifyTaskLaunched(driver, "o3")
302+
}
303+
257304
test("supports spark.mesos.driver.labels") {
258305
setScheduler()
259306

resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,13 @@ object Utils {
4343
.build()
4444

4545
def createOffer(
46-
offerId: String,
47-
slaveId: String,
48-
mem: Int,
49-
cpus: Int,
50-
ports: Option[(Long, Long)] = None,
51-
gpus: Int = 0): Offer = {
46+
offerId: String,
47+
slaveId: String,
48+
mem: Int,
49+
cpus: Int,
50+
ports: Option[(Long, Long)] = None,
51+
gpus: Int = 0,
52+
attributes: List[Attribute] = List.empty): Offer = {
5253
val builder = Offer.newBuilder()
5354
builder.addResourcesBuilder()
5455
.setName("mem")
@@ -63,7 +64,7 @@ object Utils {
6364
.setName("ports")
6465
.setType(Value.Type.RANGES)
6566
.setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder()
66-
.setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
67+
.setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
6768
}
6869
if (gpus > 0) {
6970
builder.addResourcesBuilder()
@@ -73,9 +74,10 @@ object Utils {
7374
}
7475
builder.setId(createOfferId(offerId))
7576
.setFrameworkId(FrameworkID.newBuilder()
76-
.setValue("f1"))
77+
.setValue("f1"))
7778
.setSlaveId(SlaveID.newBuilder().setValue(slaveId))
7879
.setHostname(s"host${slaveId}")
80+
.addAllAttributes(attributes.asJava)
7981
.build()
8082
}
8183

@@ -125,7 +127,7 @@ object Utils {
125127
.getVariablesList
126128
.asScala
127129
assert(envVars
128-
.count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars
130+
.count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars
129131
val variableOne = envVars.filter(_.getName == "SECRET_ENV_KEY").head
130132
assert(variableOne.getSecret.isInitialized)
131133
assert(variableOne.getSecret.getType == Secret.Type.REFERENCE)
@@ -154,7 +156,7 @@ object Utils {
154156
.getVariablesList
155157
.asScala
156158
assert(envVars
157-
.count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars
159+
.count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars
158160
val variableOne = envVars.filter(_.getName == "USER").head
159161
assert(variableOne.getSecret.isInitialized)
160162
assert(variableOne.getSecret.getType == Secret.Type.VALUE)
@@ -212,4 +214,13 @@ object Utils {
212214
assert(secretVolTwo.getSource.getSecret.getValue.getData ==
213215
ByteString.copyFrom("password".getBytes))
214216
}
217+
218+
def createTextAttribute(name: String, value: String): Attribute = {
219+
Attribute.newBuilder()
220+
.setName(name)
221+
.setType(Value.Type.TEXT)
222+
.setText(Value.Text.newBuilder().setValue(value))
223+
.build()
224+
}
215225
}
226+

0 commit comments

Comments
 (0)