Skip to content

Commit caceaec

Browse files
attilapirossquito
authored andcommitted
[SPARK-26688][YARN] Provide configuration of initially blacklisted YARN nodes
## What changes were proposed in this pull request? Introducing new config for initially blacklisted YARN nodes. ## How was this patch tested? With existing and a new unit test. Closes #23616 from attilapiros/SPARK-26688. Lead-authored-by: “attilapiros” <[email protected]> Co-authored-by: Attila Zsolt Piros <[email protected]> Signed-off-by: Imran Rashid <[email protected]>
1 parent 5fd4d74 commit caceaec

File tree

4 files changed

+41
-9
lines changed

4 files changed

+41
-9
lines changed

docs/running-on-yarn.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,13 @@ To use a custom metrics.properties for the application master and executors, upd
462462
<code>spark.blacklist.application.maxFailedExecutorsPerNode</code>.
463463
</td>
464464
</tr>
465+
<tr>
466+
<td><code>spark.yarn.exclude.nodes</code></td>
467+
<td>(none)</td>
468+
<td>
469+
Comma-separated list of YARN node names which are excluded from resource allocation.
470+
</td>
471+
</tr>
465472
<tr>
466473
<td><code>spark.yarn.metrics.namespace</code></td>
467474
<td>(none)</td>

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ private[spark] class YarnAllocatorBlacklistTracker(
5656

5757
private val maxFailuresPerHost = sparkConf.get(MAX_FAILED_EXEC_PER_NODE)
5858

59+
private val excludeNodes = sparkConf.get(YARN_EXCLUDE_NODES).toSet
60+
5961
private val allocatorBlacklist = new HashMap[String, Long]()
6062

6163
private var currentBlacklistedYarnNodes = Set.empty[String]
@@ -105,7 +107,7 @@ private[spark] class YarnAllocatorBlacklistTracker(
105107

106108
private def refreshBlacklistedNodes(): Unit = {
107109
removeExpiredYarnBlacklistedNodes()
108-
val allBlacklistedNodes = schedulerBlacklist ++ allocatorBlacklist.keySet
110+
val allBlacklistedNodes = excludeNodes ++ schedulerBlacklist ++ allocatorBlacklist.keySet
109111
synchronizeBlacklistedNodeWithYarn(allBlacklistedNodes)
110112
}
111113

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,12 @@ package object config {
310310
.booleanConf
311311
.createWithDefault(false)
312312

313+
/* Initially blacklisted YARN nodes. */
314+
private[spark] val YARN_EXCLUDE_NODES = ConfigBuilder("spark.yarn.exclude.nodes")
315+
.stringConf
316+
.toSequence
317+
.createWithDefault(Nil)
318+
313319
private[yarn] val YARN_EXECUTOR_RESOURCE_TYPES_PREFIX = "spark.yarn.executor.resource."
314320
private[yarn] val YARN_DRIVER_RESOURCE_TYPES_PREFIX = "spark.yarn.driver.resource."
315321
private[yarn] val YARN_AM_RESOURCE_TYPES_PREFIX = "spark.yarn.am.resource."

resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.mockito.Mockito._
2525
import org.scalatest.{BeforeAndAfterEach, Matchers}
2626

2727
import org.apache.spark.{SparkConf, SparkFunSuite}
28-
import org.apache.spark.deploy.yarn.config.YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED
28+
import org.apache.spark.deploy.yarn.config.{YARN_EXCLUDE_NODES, YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED}
2929
import org.apache.spark.internal.config.{BLACKLIST_TIMEOUT_CONF, MAX_FAILED_EXEC_PER_NODE}
3030
import org.apache.spark.util.ManualClock
3131

@@ -35,27 +35,31 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
3535
val BLACKLIST_TIMEOUT = 100L
3636
val MAX_FAILED_EXEC_PER_NODE_VALUE = 2
3737

38+
var sparkConf: SparkConf = _
3839
var amClientMock: AMRMClient[ContainerRequest] = _
39-
var yarnBlacklistTracker: YarnAllocatorBlacklistTracker = _
40-
var failureTracker: FailureTracker = _
4140
var clock: ManualClock = _
4241

4342
override def beforeEach(): Unit = {
44-
val sparkConf = new SparkConf()
43+
sparkConf = new SparkConf()
4544
sparkConf.set(BLACKLIST_TIMEOUT_CONF, BLACKLIST_TIMEOUT)
4645
sparkConf.set(YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED, true)
4746
sparkConf.set(MAX_FAILED_EXEC_PER_NODE, MAX_FAILED_EXEC_PER_NODE_VALUE)
4847
clock = new ManualClock()
49-
5048
amClientMock = mock(classOf[AMRMClient[ContainerRequest]])
51-
failureTracker = new FailureTracker(sparkConf, clock)
52-
yarnBlacklistTracker =
49+
super.beforeEach()
50+
}
51+
52+
private def createYarnAllocatorBlacklistTracker(
53+
sparkConf: SparkConf = sparkConf): YarnAllocatorBlacklistTracker = {
54+
val failureTracker = new FailureTracker(sparkConf, clock)
55+
val yarnBlacklistTracker =
5356
new YarnAllocatorBlacklistTracker(sparkConf, amClientMock, failureTracker)
5457
yarnBlacklistTracker.setNumClusterNodes(4)
55-
super.beforeEach()
58+
yarnBlacklistTracker
5659
}
5760

5861
test("expiring its own blacklisted nodes") {
62+
val yarnBlacklistTracker = createYarnAllocatorBlacklistTracker()
5963
(1 to MAX_FAILED_EXEC_PER_NODE_VALUE).foreach {
6064
_ => {
6165
yarnBlacklistTracker.handleResourceAllocationFailure(Some("host"))
@@ -77,6 +81,8 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
7781
}
7882

7983
test("not handling the expiry of scheduler blacklisted nodes") {
84+
val yarnBlacklistTracker = createYarnAllocatorBlacklistTracker()
85+
8086
yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2"))
8187
verify(amClientMock)
8288
.updateBlacklist(Arrays.asList("host1", "host2"), Collections.emptyList())
@@ -91,6 +97,15 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
9197
}
9298

9399
test("combining scheduler and allocation blacklist") {
100+
sparkConf.set(YARN_EXCLUDE_NODES, Seq("initial1", "initial2"))
101+
val yarnBlacklistTracker = createYarnAllocatorBlacklistTracker(sparkConf)
102+
yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set())
103+
104+
// initial1 and initial2 is added as blacklisted nodes at the very first updateBlacklist call
105+
// and they are never removed
106+
verify(amClientMock)
107+
.updateBlacklist(Arrays.asList("initial1", "initial2"), Collections.emptyList())
108+
94109
(1 to MAX_FAILED_EXEC_PER_NODE_VALUE).foreach {
95110
_ => {
96111
yarnBlacklistTracker.handleResourceAllocationFailure(Some("host1"))
@@ -117,6 +132,7 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
117132
}
118133

119134
test("blacklist all available nodes") {
135+
val yarnBlacklistTracker = createYarnAllocatorBlacklistTracker()
120136
yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2", "host3"))
121137
verify(amClientMock)
122138
.updateBlacklist(Arrays.asList("host1", "host2", "host3"), Collections.emptyList())
@@ -137,4 +153,5 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
137153
verify(amClientMock).updateBlacklist(Arrays.asList("host4"), Collections.emptyList())
138154
assert(yarnBlacklistTracker.isAllNodeBlacklisted)
139155
}
156+
140157
}

0 commit comments

Comments
 (0)