Skip to content

Commit 8e7eed8

Browse files
author
Paul Mackles
committed
[SPARK-19606] Support constraints in spark-dispatcher; updates to docs and tests
1 parent 92508ce commit 8e7eed8

File tree

2 files changed

+39
-17
lines changed

2 files changed

+39
-17
lines changed

docs/running-on-mesos.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,8 @@ See the [configuration page](configuration.html) for information on Spark config
464464
<td><code>spark.mesos.driver.constraints</code></td>
465465
<td>(none)</td>
466466
<td>
467-
Same as <code>spark.mesos.constraints</code> except applied to drivers.
467+
Same as <code>spark.mesos.constraints</code> except applied to drivers when launched through the dispatcher. By default,
468+
all resource offers will be accepted.
468469
</td>
469470
</tr>
470471
<tr>

resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -254,30 +254,51 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
254254
assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2")
255255
}
256256

257-
test("declines offers that violate driver constraints") {
257+
test("accept/decline offers with driver constraints") {
258258
setScheduler()
259259

260260
val mem = 1000
261261
val cpu = 1
262-
val s2Attributes = List(Utils.createTextAttribute("c", "u"))
262+
val s2Attributes = List(Utils.createTextAttribute("c1", "a"))
263+
val s3Attributes = List(
264+
Utils.createTextAttribute("c1", "a"),
265+
Utils.createTextAttribute("c2", "b"))
266+
val offers = List(
267+
Utils.createOffer("o1", "s1", mem, cpu, None, 0),
268+
Utils.createOffer("o2", "s2", mem, cpu, None, 0, s2Attributes),
269+
Utils.createOffer("o3", "s3", mem, cpu, None, 0, s3Attributes))
270+
271+
def submitDriver(driverConstraints: String): Unit = {
272+
val response = scheduler.submitDriver(
273+
new MesosDriverDescription("d1", "jar", mem, cpu, true,
274+
command,
275+
Map("spark.mesos.executor.home" -> "test",
276+
"spark.app.name" -> "test",
277+
"spark.mesos.driver.constraints" -> driverConstraints),
278+
"s1",
279+
new Date()))
280+
assert(response.success)
281+
}
263282

264-
val response = scheduler.submitDriver(
265-
new MesosDriverDescription("d1", "jar", mem, cpu, true,
266-
command,
267-
Map("spark.mesos.executor.home" -> "test",
268-
"spark.app.name" -> "test",
269-
"spark.mesos.driver.constraints" -> "c:v"),
270-
"s1",
271-
new Date()))
283+
submitDriver("c1:x")
284+
scheduler.resourceOffers(driver, offers.asJava)
285+
offers.foreach(o => Utils.verifyTaskNotLaunched(driver, o.getId.getValue))
272286

273-
assert(response.success)
287+
submitDriver("c1:y;c2:z")
288+
scheduler.resourceOffers(driver, offers.asJava)
289+
offers.foreach(o => Utils.verifyTaskNotLaunched(driver, o.getId.getValue))
290+
291+
submitDriver("")
292+
scheduler.resourceOffers(driver, offers.asJava)
293+
Utils.verifyTaskLaunched(driver, "o1")
274294

275-
val offer1 = Utils.createOffer("o1", "s1", mem, cpu, None, 0)
276-
val offer2 = Utils.createOffer("o2", "s2", mem, cpu, None, 0, s2Attributes)
277-
scheduler.resourceOffers(driver, List(offer1, offer2).asJava)
295+
submitDriver("c1:a")
296+
scheduler.resourceOffers(driver, offers.asJava)
297+
Utils.verifyTaskLaunched(driver, "o2")
278298

279-
Utils.verifyTaskNotLaunched(driver, "o1")
280-
Utils.verifyTaskNotLaunched(driver, "o2")
299+
submitDriver("c1:a;c2:b")
300+
scheduler.resourceOffers(driver, offers.asJava)
301+
Utils.verifyTaskLaunched(driver, "o3")
281302
}
282303

283304
test("supports spark.mesos.driver.labels") {

0 commit comments

Comments
 (0)