Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,14 @@ package object config {
.checkValue(v => v > 0, "The value should be a positive time value.")
.createWithDefaultString("365d")

private[spark] val UNSCHEDULABLE_TASKSET_TIMEOUT =
ConfigBuilder("spark.scheduler.blacklist.unschedulableTaskSetTimeout")
.doc("The timeout in seconds to wait to acquire a new executor and schedule a task " +
"before aborting a TaskSet which is unschedulable because of being completely blacklisted.")
.timeConf(TimeUnit.SECONDS)
.checkValue(v => v >= 0, "The value should be a non negative time value.")
.createWithDefault(120)

private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL =
ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval")
.doc("Time in seconds to wait between a max concurrent tasks check failure and the next " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,31 @@ private[scheduler] class BlacklistTracker (
nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry)
}

private def killExecutor(exec: String, msg: String): Unit = {
allocationClient match {
case Some(a) =>
logInfo(msg)
a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false,
force = true)
case None =>
logInfo(s"Not attempting to kill blacklisted executor id $exec " +
s"since allocation client is not defined.")
}
}

private def killBlacklistedExecutor(exec: String): Unit = {
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
allocationClient match {
case Some(a) =>
logInfo(s"Killing blacklisted executor id $exec " +
s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false,
force = true)
case None =>
logWarning(s"Not attempting to kill blacklisted executor id $exec " +
s"since allocation client is not defined.")
}
killExecutor(exec,
s"Killing blacklisted executor id $exec since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
}
}

private[scheduler] def killBlacklistedIdleExecutor(exec: String): Unit = {
killExecutor(exec,
s"Killing blacklisted idle executor id $exec because of task unschedulability and trying " +
"to acquire a new executor.")
}

private def killExecutorsOnBlacklistedNode(node: String): Unit = {
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
allocationClient match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.rpc.RpcEndpoint
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}
import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils}

/**
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
Expand Down Expand Up @@ -117,6 +117,11 @@ private[spark] class TaskSchedulerImpl(

protected val executorIdToHost = new HashMap[String, String]

private val abortTimer = new Timer(true)
private val clock = new SystemClock
// Exposed for testing
val unschedulableTaskSetToExpiryTime = new HashMap[TaskSetManager, Long]

// Listener object to pass upcalls into
var dagScheduler: DAGScheduler = null

Expand Down Expand Up @@ -415,9 +420,53 @@ private[spark] class TaskSchedulerImpl(
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}

if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex =>
// If the taskSet is unschedulable we try to find an existing idle blacklisted
// executor. If we cannot find one, we abort immediately. Else we kill the idle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried that the idle condition will be too strict in some scenarios, if there is a large backlog of tasks from another taskset, or whatever the error is, the tasks take a while to fail (eg., you've really got a bad executor, but its not apparent till after network timeouts or something). Eg. that could happen if you're doing a big join, and while preparing the input on the map-side, one side just has one straggler left but the other side still has a big backlog of tasks. Or, in a jobserver style situation, and there are always other tasksets coming in.

that said, I don't have any better ideas at the moment, and I still think this is an improvement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By clearing the abort timer as soon as a task is launched we are relaxing this situation.
If there are large backlog of tasks:

  • If we acquire new executors or launch new tasks we will defer the check
  • If we cannot acquire new executors and we are running with long running tasks such that no new tasks can be launched and we have less no. of executors compared to max failures, in that case this will end up being harsh. This can happen, but seems more like a very specific edge case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is true -- if there is no idle executor here, you abort the taskset immediately, you're not starting any timer, from this case lower down: case _ => // Abort Immediately.

I think to do what you described, you would instead need to do something different in that case, like start the same abortTimer, and also set a flag needToKillIdleExecutor and then on every call to resourceOffer, check that flag and potentially find an executor to kill. (However I haven't totally thought through that, not sure if it would really work. again, I'm not saying this has to be addressed now, just thinking this through)

// executor and kick off an abortTimer which if it doesn't schedule a task within the
// the timeout will abort the taskSet if we were unable to schedule any task from the
// taskSet.
// Note 1: We keep track of schedulability on a per taskSet basis rather than on a per
// task basis.
// Note 2: The taskSet can still be aborted when there are more than one idle
// blacklisted executors and dynamic allocation is on. This can happen when a killed
// idle executor isn't replaced in time by ExecutorAllocationManager as it relies on
// pending tasks and doesn't kill executors on idle timeouts, resulting in the abort
// timer to expire and abort the taskSet.
executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isExecutorBusy is going to probe the same hashmap again, you could just do executorIdToRunningTaskIds.find(_._2.isEmpty)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was preferring the code to be more readable. As this isn't a frequently running scenario, may be we could keep it. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I thought the name executorIdToRunningTaskIds makes the other version clear enough, but I don't really feel strongly.

case Some ((executorId, _)) =>
if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId))

val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout
logInfo(s"Waiting for $timeout ms for completely "
+ s"blacklisted task to be schedulable again before aborting $taskSet.")
abortTimer.schedule(
createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout)
}
case None => // Abort Immediately
logInfo("Cannot schedule any task because of complete blacklisting. No idle" +
s" executors can be found to kill. Aborting $taskSet." )
taskSet.abortSinceCompletelyBlacklisted(taskIndex)
}
}
} else {
// We want to defer killing any taskSets as long as we have a non blacklisted executor
// which can be used to schedule a task from any active taskSets. This ensures that the
// job can make progress.
// Note: It is theoretically possible that a taskSet never gets scheduled on a
// non-blacklisted executor and the abort timer doesn't kick in because of a constant
// submission of new TaskSets. See the PR for more details.
if (unschedulableTaskSetToExpiryTime.nonEmpty) {
logInfo("Clearing the expiry times for all unschedulable taskSets as a task was " +
"recently scheduled.")
unschedulableTaskSetToExpiryTime.clear()
}
}

if (launchedAnyTask && taskSet.isBarrier) {
// Check whether the barrier tasks are partially launched.
// TODO SPARK-24818 handle the assert failure case (that can happen when some locality
Expand Down Expand Up @@ -453,6 +502,23 @@ private[spark] class TaskSchedulerImpl(
return tasks
}

private def createUnschedulableTaskSetAbortTimer(
taskSet: TaskSetManager,
taskIndex: Int): TimerTask = {
new TimerTask() {
override def run() {
if (unschedulableTaskSetToExpiryTime.contains(taskSet) &&
unschedulableTaskSetToExpiryTime(taskSet) <= clock.getTimeMillis()) {
logInfo("Cannot schedule any task because of complete blacklisting. " +
s"Wait time for scheduling expired. Aborting $taskSet.")
taskSet.abortSinceCompletelyBlacklisted(taskIndex)
} else {
this.cancel()
}
}
}
}

/**
* Shuffle offers around to avoid always placing tasks on the same workers. Exposed to allow
* overriding in tests, so it can be deterministic.
Expand Down Expand Up @@ -590,6 +656,7 @@ private[spark] class TaskSchedulerImpl(
barrierCoordinator.stop()
}
starvationTimer.cancel()
abortTimer.cancel()
}

override def defaultParallelism(): Int = backend.defaultParallelism()
Expand Down
41 changes: 23 additions & 18 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -623,8 +623,8 @@ private[spark] class TaskSetManager(
*
* It is possible that this taskset has become impossible to schedule *anywhere* due to the
* blacklist. The most common scenario would be if there are fewer executors than
* spark.task.maxFailures. We need to detect this so we can fail the task set, otherwise the job
* will hang.
* spark.task.maxFailures. We need to detect this so we can avoid the job from being hung.
* We try to acquire new executor/s by killing an existing idle blacklisted executor.
*
* There's a tradeoff here: we could make sure all tasks in the task set are schedulable, but that
* would add extra time to each iteration of the scheduling loop. Here, we take the approach of
Expand All @@ -635,9 +635,9 @@ private[spark] class TaskSetManager(
* failures (this is because the method picks one unscheduled task, and then iterates through each
* executor until it finds one that the task isn't blacklisted on).
*/
private[scheduler] def abortIfCompletelyBlacklisted(
hostToExecutors: HashMap[String, HashSet[String]]): Unit = {
taskSetBlacklistHelperOpt.foreach { taskSetBlacklist =>
private[scheduler] def getCompletelyBlacklistedTaskIfAny(
hostToExecutors: HashMap[String, HashSet[String]]): Option[Int] = {
taskSetBlacklistHelperOpt.flatMap { taskSetBlacklist =>
val appBlacklist = blacklistTracker.get
// Only look for unschedulable tasks when at least one executor has registered. Otherwise,
// task sets will be (unnecessarily) aborted in cases when no executors have registered yet.
Expand All @@ -658,11 +658,11 @@ private[spark] class TaskSetManager(
}
}

pendingTask.foreach { indexInTaskSet =>
pendingTask.find { indexInTaskSet =>
// try to find some executor this task can run on. Its possible that some *other*
// task isn't schedulable anywhere, but we will discover that in some later call,
// when that unschedulable task is the last task remaining.
val blacklistedEverywhere = hostToExecutors.forall { case (host, execsOnHost) =>
hostToExecutors.forall { case (host, execsOnHost) =>
// Check if the task can run on the node
val nodeBlacklisted =
appBlacklist.isNodeBlacklisted(host) ||
Expand All @@ -679,22 +679,27 @@ private[spark] class TaskSetManager(
}
}
}
if (blacklistedEverywhere) {
val partition = tasks(indexInTaskSet).partitionId
abort(s"""
|Aborting $taskSet because task $indexInTaskSet (partition $partition)
|cannot run anywhere due to node and executor blacklist.
|Most recent failure:
|${taskSetBlacklist.getLatestFailureReason}
|
|Blacklisting behavior can be configured via spark.blacklist.*.
|""".stripMargin)
}
}
} else {
None
}
}
}

private[scheduler] def abortSinceCompletelyBlacklisted(indexInTaskSet: Int): Unit = {
taskSetBlacklistHelperOpt.foreach { taskSetBlacklist =>
val partition = tasks(indexInTaskSet).partitionId
abort(s"""
|Aborting $taskSet because task $indexInTaskSet (partition $partition)
|cannot run anywhere due to node and executor blacklist.
|Most recent failure:
|${taskSetBlacklist.getLatestFailureReason}
|
|Blacklisting behavior can be configured via spark.blacklist.*.
|""".stripMargin)
}
}

/**
* Marks the task as getting result and notifies the DAG Scheduler
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,16 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
assertDataStructuresEmpty(noFailure = true)
}

// Make sure that if we've failed on all executors, but haven't hit task.maxFailures yet, the job
// doesn't hang
// Make sure that if we've failed on all executors, but haven't hit task.maxFailures yet, we try
// to acquire a new executor and if we aren't able to get one, the job doesn't hang and we abort
testScheduler(
"SPARK-15865 Progress with fewer executors than maxTaskFailures",
extraConfs = Seq(
config.BLACKLIST_ENABLED.key -> "true",
"spark.testing.nHosts" -> "2",
"spark.testing.nExecutorsPerHost" -> "1",
"spark.testing.nCoresPerExecutor" -> "1"
"spark.testing.nCoresPerExecutor" -> "1",
"spark.scheduler.blacklist.unschedulableTaskSetTimeout" -> "0s"
)
) {
def runBackend(): Unit = {
Expand Down
Loading