Skip to content

Commit e2ddc1b

Browse files
committed
SPARK-19755 declining offers from blacklisted slave by BlacklistTracker
1 parent f09faf7 commit e2ddc1b

File tree

2 files changed

+25
-1
lines changed

2 files changed

+25
-1
lines changed

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
568568
cpus + totalCoresAcquired <= maxCores &&
569569
mem <= offerMem &&
570570
numExecutors < executorLimit &&
571+
!scheduler.nodeBlacklist().contains(offerHostname) &&
571572
meetsPortRequirements &&
572573
satisfiesLocality(offerHostname)
573574
}
@@ -792,4 +793,4 @@ object IdHelper {
792793
// Use atomic values since Spark contexts can be initialized in parallel
793794
private[mesos] val nextSCNumber = new AtomicLong(0)
794795
private[mesos] val startedBefore = new AtomicBoolean(false)
795-
}
796+
}

resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,28 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
108108
verifyTaskLaunched(driver, "o2")
109109
}
110110

111+
test("mesos declines offers from blacklisted slave") {
112+
setBackend()
113+
114+
// launches a task on a valid offer on slave s1
115+
val minMem = backend.executorMemory(sc) + 1024
116+
val minCpu = 4
117+
val offer1 = Resources(minMem, minCpu)
118+
offerResources(List(offer1))
119+
verifyTaskLaunched(driver, "o1")
120+
121+
// for any reason executor(aka mesos task) failed on s1
122+
val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED)
123+
backend.statusUpdate(driver, status)
124+
when(taskScheduler.nodeBlacklist()).thenReturn(Set("hosts1"))
125+
126+
val offer2 = Resources(minMem, minCpu)
127+
// Offer resources from the same slave
128+
offerResources(List(offer2))
129+
// but since it's blacklisted the offer is declined
130+
verifyDeclinedOffer(driver, createOfferId("o1"))
131+
}
132+
111133
test("mesos supports spark.executor.cores") {
112134
val executorCores = 4
113135
setBackend(Map("spark.executor.cores" -> executorCores.toString))
@@ -790,6 +812,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
790812

791813
taskScheduler = mock[TaskSchedulerImpl]
792814
when(taskScheduler.sc).thenReturn(sc)
815+
when(taskScheduler.nodeBlacklist()).thenReturn(Set[String]())
793816

794817
externalShuffleClient = mock[MesosExternalShuffleClient]
795818

0 commit comments

Comments
 (0)