Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ package object config {
ConfigBuilder("spark.blacklist.application.fetchFailure.enabled")
.booleanConf
.createWithDefault(false)

private[spark] val BLACKLIST_ALWAYSBLACKLISTEDNODES_CONF =
ConfigBuilder("spark.blacklist.alwaysBlacklistedNodes")
.stringConf
.createOptional
// End blacklist confs

private[spark] val UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,21 @@ private[scheduler] class BlacklistTracker (
*/
val nodeToBlacklistedExecs = new HashMap[String, HashSet[String]]()

/**
* Blacklists permanently the nodes listed in spark.blacklist.alwaysBlacklistedNodes
* The blacklist timeout is set to a large value, effectively never expiring.
*/
private val permanentlyBlacklistedNodes = BlacklistTracker.getBlacklistedNodes(conf)
if (permanentlyBlacklistedNodes.nonEmpty) {
val now = clock.getTimeMillis()
for (nodeName <- permanentlyBlacklistedNodes) {
nodeIdToBlacklistExpiryTime.put(nodeName, Long.MaxValue)
listenerBus.post(SparkListenerNodeBlacklisted(now, nodeName, 0))
logWarning(s"Permanently blacklisted node $nodeName")
}
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
}

/**
* Un-blacklists executors and nodes that have been blacklisted for at least
* BLACKLIST_TIMEOUT_MILLIS
Expand Down Expand Up @@ -409,6 +424,15 @@ private[scheduler] object BlacklistTracker extends Logging {
}
}

// Return a set of node names from the config spark.blacklist.alwaysBlacklistedNodes
def getBlacklistedNodes(conf: SparkConf): Set[String] = {
val listNodes = conf.get(config.BLACKLIST_ALWAYSBLACKLISTEDNODES_CONF)
listNodes match {
case Some(nodes) => (nodes + ",").split(',').map(_.trim).toSet
case None => Set()
}
}

/**
* Verify that blacklist configurations are consistent; if not, throw an exception. Should only
* be called if blacklisting is enabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,4 +585,16 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
2000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
assert(blacklist.nextExpiryTime === 1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
}

test("Nodes can be permanently blacklisted, SPARK-21829") {
val blacklistedNodes = "hostA, hostB"
conf.set("spark.blacklist.alwaysBlacklistedNodes", blacklistedNodes)

val allocationClientMock = mock[ExecutorAllocationClient]
blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock)
for (nodeName <- blacklistedNodes.split(',').map(_.trim)) {
assert(blacklist.nodeIdToBlacklistExpiryTime.contains(nodeName))
(blacklist.nodeIdToBlacklistExpiryTime.get(nodeName) === Long.MaxValue)
}
}
}
11 changes: 11 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1528,6 +1528,17 @@ Apart from these, the following properties are also available, and may be useful
blacklisted.
</td>
</tr>
<tr>
<td><code>spark.blacklist.alwaysBlacklistedNodes</code></td>
<td>(none)</td>
<td>
A comma-separated list of cluster nodes that will be put in the scheduler blacklist at the start of the Spark Context.
These nodes are permanently blacklisted and are exempt from the spark.blacklist.timeout mechanism.
If the cluster manager allocates executors on nodes in the blacklist, they will be rejected by the scheduler.
This feature can be used to prevent running executors/tasks on a user-specified list of cluster nodes.
Dependency: requires spark.blacklist.enabled=true
</td>
</tr>
<tr>
<td><code>spark.speculation</code></td>
<td>false</td>
Expand Down