-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-10618] [Mesos] Refactoring scheduling condition and adding test for same #10326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
daf54f0
0132b47
c319c4a
a54c1b6
82aefee
720a5d1
3aa7ebb
d441911
6d45ed8
29556bf
fd0d48b
a7b1fce
94164a2
5e58288
7d88f53
01551c5
dd50d76
54b1c89
eb846ce
9799433
fd196d9
557167d
3e0102c
343f9d7
f3c3dd3
4b7a494
26bc2ee
2534c7c
c88c760
6ed3313
c342ef1
a84c0ef
8b9e7c0
fe0e447
a6ee8a0
67b921e
131352b
5e4c4a8
3392386
187e403
6ac2a75
ce0edf9
ec7e453
5b38164
68ff33c
abe08fb
919153c
e54151b
b6860b9
710e445
3de5d82
fdfd9a1
db5f114
46d4bdb
ca56fe8
7d9a876
95aa0ee
4445bf0
776d6cd
378f137
24bfd58
c4d1ae0
39b3359
c461877
f693dae
e686df1
948c6f1
2e9a943
163e38e
9cf3e2e
01ab37d
efd9661
6f8fb6f
a46c68b
7f4be9f
3ff48ba
e61fe19
cd6f330
09fd767
8f0ce8b
eb2d13a
90bcb15
7606bb6
8871b24
f4f27a5
170a8d5
b70c651
041df50
f5ef4d1
1bca918
6527885
1117281
6876c50
2e3c994
7148fb9
7f4868d
73b7a6a
be50460
4eefdf6
6d5a6f9
c1c5f56
a67fbee
3ed25b4
2bb1336
582cc93
c3c92f8
6d6be99
1c86f0f
64d6254
e69bb9f
2d3b873
854bff1
99d89f7
ff3cb64
a12d099
9a45fb6
5c5776d
ea01d53
def672c
dc12745
5c49b17
0a7f722
40d8a51
29c481b
f3aa57a
9982fd2
b33e47b
4d3ad94
83f77bf
4102584
680ab41
55c2ae6
8e03d8f
ef02e1d
0e51b47
9e0cb2c
3af9596
e084743
c7bb3b2
bb017dd
db2eca8
8658058
6211930
cfd3bf4
b11245a
c0e42e5
48f2914
675b92a
7d60877
3dc7ca1
f7fc52a
1ee1b4f
303e6f2
a551f53
e0accf0
4426d95
526862c
bf0176b
b82de20
add793d
3fd7988
1bcc18b
a384868
a4aaed0
c6a00f4
1258f45
7298d57
81a02af
49c3d5d
ab069c0
2f2b101
d525dd8
a012ce5
874635e
6ae8e1e
c3f9969
0e97f52
c4187c6
c006bba
72dbfc1
041c0a1
06844cc
e4b8401
f5cdabf
c1bf494
a08155b
de86e36
67b0607
f9b12f8
6aef5a5
1e53e1b
34a9bc1
4983e4d
8ccbe0e
fb5b0f8
7413ec8
c415627
eb564db
c50c7d0
a45d1fe
3bff1d2
a2376c3
24352b7
74bc858
9f25f85
1c5cd9e
5b39f6c
a82431f
53389bd
6659b4e
af3e071
cb0b872
0acee0c
bbdb3e7
50a33d8
490e0bc
4f28c60
8a8d663
c8a084a
d9608be
cd4ce8b
e05f455
9602389
e809ee0
01dc131
8f55388
e45f4c2
1ba6d14
8195550
e7046b5
9c36948
442df7c
f4aee47
64cf457
84198a7
98904fd
11bb8c1
d690e1c
f354717
1c533ab
140a0b8
29d744d
8b64134
95385ea
3b01c2a
cc67b7d
fde2994
66f1750
c10af66
412f56d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -254,12 +254,7 @@ private[spark] class CoarseMesosSchedulerBackend( | |
| val cpus = getResource(offer.getResourcesList, "cpus").toInt | ||
| val id = offer.getId.getValue | ||
| if (meetsConstraints) { | ||
| if (taskIdToSlaveId.size < executorLimit && | ||
| totalCoresAcquired < maxCores && | ||
| mem >= calculateTotalMemory(sc) && | ||
| cpus >= 1 && | ||
| failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && | ||
| !slaveIdsWithExecutors.contains(slaveId)) { | ||
| if (isOfferSatisfiesRequirements(slaveId, mem, cpus, sc)) { | ||
| // Launch an executor on the slave | ||
| val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) | ||
| totalCoresAcquired += cpusToUse | ||
|
|
@@ -308,6 +303,35 @@ private[spark] class CoarseMesosSchedulerBackend( | |
| } | ||
| } | ||
|
|
||
| // ToDo: Abstract out each condition and log them. | ||
| def isOfferSatisfiesRequirements(slaveId: String, mem: Double, cpusOffered: Int, | ||
| sc: SparkContext): Boolean = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. style:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also, the method signature is shared across both fine-grained and coarse-grained modes, so I would put this in the helper trait |
||
| val meetsMemoryRequirements = mem >= calculateTotalMemory(sc) | ||
| val meetsCPURequirements = cpusOffered >= 1 | ||
| val needMoreCores = totalCoresAcquired < maxCores | ||
| val healthySlave = failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES | ||
| val taskOnEachSlaveLessThanExecutorLimit = taskIdToSlaveId.size < executorLimit | ||
| val executorNotRunningOnSlave = !slaveIdsWithExecutors.contains(slaveId) | ||
|
|
||
| executorNotRunningOnSlave && | ||
| taskOnEachSlaveLessThanExecutorLimit && | ||
| needMoreCores && | ||
| meetsMemoryRequirements && | ||
| meetsCPURequirements && | ||
| healthySlave | ||
| } | ||
|
|
||
| def isOfferValidForScheduling(meetsConstraints: Boolean, | ||
| slaveId: String, mem: Double, | ||
| cpus: Int, sc: SparkContext): Boolean = { | ||
| taskIdToSlaveId.size < executorLimit && | ||
| totalCoresAcquired < maxCores && | ||
| meetsConstraints && | ||
| mem >= calculateTotalMemory(sc) && | ||
| cpus >= 1 && | ||
| failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && | ||
| !slaveIdsWithExecutors.contains(slaveId) | ||
| } | ||
|
|
||
| override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { | ||
| val taskId = status.getTaskId.getValue.toInt | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -246,14 +246,13 @@ private[spark] class MesosSchedulerBackend( | |
| val slaveId = o.getSlaveId.getValue | ||
| val offerAttributes = toAttributeMap(o.getAttributesList) | ||
|
|
||
| // check offers for | ||
| // 1. Memory requirements | ||
| // 2. CPU requirements - need at least 1 for executor, 1 for task | ||
| val meetsMemoryRequirements = mem >= calculateTotalMemory(sc) | ||
| val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK) | ||
| // check if Attribute constraints is satisfied | ||
| val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) | ||
|
|
||
| val meetsRequirements = | ||
| (meetsMemoryRequirements && meetsCPURequirements) || | ||
| (slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) | ||
| isOfferSatisfiesRequirements(cpus, mem, slaveId, sc) | ||
|
|
||
| // add some debug messaging | ||
| val debugstr = if (meetsRequirements) "Accepting" else "Declining" | ||
| logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: " | ||
| + s"$offerAttributes mem: $mem cpu: $cpus") | ||
|
|
@@ -328,6 +327,18 @@ private[spark] class MesosSchedulerBackend( | |
| } | ||
| } | ||
|
|
||
| // check if all constraints are satisfied | ||
| // 1. Memory requirements | ||
| // 2. CPU requirements - need at least 1 for executor, 1 for task | ||
| def isOfferSatisfiesRequirements(cpusOffered: Double, memory : Double, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Turn this into a ScalaDoc. Would it be possible to not duplicate this method, but put the common logic between coarse and fine-grained somewhere else? Same comment regarding visibility as for the other one. |
||
| slaveId: String, sc : SparkContext): Boolean = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use consistent style (and similar to the other places in the code). Don't put a space between identifier and |
||
| val meetsMemoryRequirements = memory >= calculateTotalMemory(sc) | ||
| val meetsCPURequirements = cpusOffered >= (mesosExecutorCores + scheduler.CPUS_PER_TASK) | ||
|
|
||
| (meetsMemoryRequirements && meetsCPURequirements) || | ||
| (slaveIdToExecutorInfo.contains(slaveId) && cpusOffered >= scheduler.CPUS_PER_TASK) | ||
| } | ||
|
|
||
| /** Turn a Spark TaskDescription into a Mesos task and also resources unused by the task */ | ||
| def createMesosTask( | ||
| task: TaskDescription, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,7 +58,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite | |
|
|
||
| private def createSchedulerBackend( | ||
| taskScheduler: TaskSchedulerImpl, | ||
| driver: SchedulerDriver): CoarseMesosSchedulerBackend = { | ||
| driver: SchedulerDriver, sc: SparkContext): CoarseMesosSchedulerBackend = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. style: this needs to go on the next line |
||
| val securityManager = mock[SecurityManager] | ||
| val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master", securityManager) { | ||
| override protected def createSchedulerDriver( | ||
|
|
@@ -77,16 +77,25 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite | |
| backend | ||
| } | ||
|
|
||
| private def createSchedulerBackendForGivenSparkConf(sc : SparkContext) = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. style: no space before colon, and need return type |
||
| val driver = mock[SchedulerDriver] | ||
| when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) | ||
| val taskScheduler = mock[TaskSchedulerImpl] | ||
| when(taskScheduler.sc).thenReturn(sc) | ||
| createSchedulerBackend(taskScheduler, driver, sc) | ||
| } | ||
|
|
||
| var sparkConf: SparkConf = _ | ||
|
|
||
| before { | ||
| sparkConf = (new SparkConf) | ||
| .setMaster("local[*]") | ||
| .setAppName("test-mesos-dynamic-alloc") | ||
| .setSparkHome("/path") | ||
| .set("spark.cores.max", "10") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do this? Add a comment to explain? |
||
|
|
||
| sc = new SparkContext(sparkConf) | ||
| } | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please revert this indentation change |
||
|
|
||
| test("mesos supports killing and limiting executors") { | ||
| val driver = mock[SchedulerDriver] | ||
|
|
@@ -97,7 +106,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite | |
| sparkConf.set("spark.driver.host", "driverHost") | ||
| sparkConf.set("spark.driver.port", "1234") | ||
|
|
||
| val backend = createSchedulerBackend(taskScheduler, driver) | ||
| val backend = createSchedulerBackend(taskScheduler, driver, sc) | ||
| val minMem = backend.calculateTotalMemory(sc) | ||
| val minCpu = 4 | ||
|
|
||
|
|
@@ -145,15 +154,15 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite | |
| val taskScheduler = mock[TaskSchedulerImpl] | ||
| when(taskScheduler.sc).thenReturn(sc) | ||
|
|
||
| val backend = createSchedulerBackend(taskScheduler, driver) | ||
| val backend = createSchedulerBackend(taskScheduler, driver, sc) | ||
| val minMem = backend.calculateTotalMemory(sc) + 1024 | ||
| val minCpu = 4 | ||
|
|
||
| val mesosOffers = new java.util.ArrayList[Offer] | ||
| val offer1 = createOffer("o1", "s1", minMem, minCpu) | ||
| mesosOffers.add(offer1) | ||
|
|
||
| val offer2 = createOffer("o2", "s1", minMem, 1); | ||
| val offer2 = createOffer("o2", "s1", minMem, 1) | ||
|
|
||
| backend.resourceOffers(driver, mesosOffers) | ||
|
|
||
|
|
@@ -184,4 +193,47 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite | |
|
|
||
| verify(driver, times(1)).reviveOffers() | ||
| } | ||
|
|
||
| test("isOfferSatisfiesRequirements return true when there is a valid offer") { | ||
| val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) | ||
|
|
||
| assert(schedulerBackend.isOfferSatisfiesRequirements("Slave1", 10000, 5, sc)) | ||
| } | ||
|
|
||
|
|
||
| test("isOfferSatisfiesRequirements return false when memory in offer is less" + | ||
| " than required memory") { | ||
| val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) | ||
|
|
||
| assert(schedulerBackend.isOfferSatisfiesRequirements("Slave1", 1, 5, sc) === false) | ||
| } | ||
|
|
||
| test("isOfferSatisfiesRequirements return false when cpu in offer is less than required cpu") { | ||
| val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) | ||
|
|
||
| assert(schedulerBackend.isOfferSatisfiesRequirements("Slave1", 10000, 0, sc) === false) | ||
| } | ||
|
|
||
| test("isOfferSatisfiesRequirements return false when offer is from slave already running" + | ||
| " an executor") { | ||
| val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) | ||
| schedulerBackend.slaveIdsWithExecutors += "Slave2" | ||
|
|
||
| assert(schedulerBackend.isOfferSatisfiesRequirements("Slave2", 10000, 5, sc) === false) | ||
| } | ||
|
|
||
| test("isOfferSatisfiesRequirements return false when task is failed more than " + | ||
| "MAX_SLAVE_FAILURES times on the given slave") { | ||
| val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) | ||
| schedulerBackend.failuresBySlaveId("Slave3") = 2 | ||
|
|
||
| assert(schedulerBackend.isOfferSatisfiesRequirements("Slave3", 10000, 5, sc) === false) | ||
| } | ||
|
|
||
| test("isOfferSatisfiesRequirements return false when max core is already acquired") { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these tests can probably all be grouped in one test called |
||
| val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) | ||
| schedulerBackend.totalCoresAcquired = 10 | ||
|
|
||
| assert(schedulerBackend.isOfferSatisfiesRequirements("Slave1", 10000, 5, sc) === false) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. instead of |
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,7 +28,7 @@ import scala.collection.mutable.ArrayBuffer | |
|
|
||
| import org.apache.mesos.Protos.Value.Scalar | ||
| import org.apache.mesos.Protos._ | ||
| import org.apache.mesos.SchedulerDriver | ||
| import org.apache.mesos.{Protos, SchedulerDriver} | ||
| import org.mockito.Matchers._ | ||
| import org.mockito.Mockito._ | ||
| import org.mockito.{ArgumentCaptor, Matchers} | ||
|
|
@@ -344,4 +344,66 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi | |
| r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("prod") | ||
| }) | ||
| } | ||
|
|
||
| private def createSchedulerBackendForGivenSparkConf(sc : SparkContext) : MesosSchedulerBackend = { | ||
| val conf = new SparkConf | ||
|
|
||
| val listenerBus = mock[LiveListenerBus] | ||
| listenerBus.post( | ||
| SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) | ||
|
|
||
| when(sc.getSparkHome()).thenReturn(Option("/spark-home")) | ||
|
|
||
| when(sc.conf).thenReturn(conf) | ||
| when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) | ||
| when(sc.executorMemory).thenReturn(100) | ||
| when(sc.listenerBus).thenReturn(listenerBus) | ||
|
|
||
| val taskScheduler = mock[TaskSchedulerImpl] | ||
| when(taskScheduler.CPUS_PER_TASK).thenReturn(2) | ||
|
|
||
| new MesosSchedulerBackend(taskScheduler, sc, "master") | ||
| } | ||
|
|
||
| test("isOfferSatisfiesRequirements return true when there offer meet cpu and" + | ||
| " memory requirement") { | ||
| val sc = mock[SparkContext] | ||
| val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) | ||
|
|
||
| assert(schedulerBackend.isOfferSatisfiesRequirements( 5, 10000, "Slave1", sc)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. style, space after |
||
| } | ||
|
|
||
| test("isOfferSatisfiesRequirements return false when memory in offer is less " + | ||
| "than required memory") { | ||
| val sc = mock[SparkContext] | ||
| val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) | ||
|
|
||
| assert(schedulerBackend.isOfferSatisfiesRequirements(5, 10, "Slave1", sc) === false) | ||
| } | ||
|
|
||
| test("isOfferSatisfiesRequirements return false when cpu in offer is less than required cpu") { | ||
| val sc = mock[SparkContext] | ||
| val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) | ||
|
|
||
| assert(schedulerBackend.isOfferSatisfiesRequirements(0, 10000, "Slave1", sc) === false) | ||
| } | ||
|
|
||
| test("isOfferSatisfiesRequirements return true when offer is from slave already running and" + | ||
| " cpu is less than minimum cpu per task an executor") { | ||
| val sc = mock[SparkContext] | ||
| val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) | ||
| schedulerBackend.slaveIdToExecutorInfo("Slave2") = null | ||
|
|
||
| assert(schedulerBackend.isOfferSatisfiesRequirements(2, 10000, "Slave2", sc) === true) | ||
| } | ||
|
|
||
| test("isOfferSatisfiesRequirements return false when offer is from slave already running but" + | ||
| " cpu is less than minimum cpu per task an executor") { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here, can group these tests |
||
| val sc = mock[SparkContext] | ||
| val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc) | ||
| schedulerBackend.slaveIdToExecutorInfo("Slave2") = null | ||
|
|
||
| assert(schedulerBackend.isOfferSatisfiesRequirements(1, 10000, "Slave2", sc) === false) | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name sounds a bit funny to me. Maybe just
offerSatisfiesRequirements?Also, this might be
private, orprivate[spark]to allow testing.Ideally the
TODOwould be implemented as well.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, a short scaladoc explaining the difference between
meetsConstraintsandsatisfiesRequirements. To the casual reader they are the same.