Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ private[spark] class TaskSchedulerImpl(
taskIdToExecutorId(tid) = execId
activeExecutorIds += execId
executorsByHost(host) += execId
availableCpus(i) -= 1
availableCpus(i) -= taskSet.CPUS_PER_TASK
assert (availableCpus(i) >= 0)
launchedTask = true
}
}
Expand Down
82 changes: 63 additions & 19 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ private[spark] class TaskSetManager(
// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)

/*
* Sometimes if an executor is dead or in an otherwise invalid state, the driver
* does not realize right away leading to repeated task failures. If enabled,
* this temporarily prevents a task from re-launching on an executor where
* it just failed.
*/
private val EXECUTOR_TASK_BLACKLIST_TIMEOUT =
conf.getLong("spark.scheduler.executorTaskBlacklistTime", 0L)

// Quantile of tasks at which to start speculation
val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75)
val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5)
Expand All @@ -71,7 +80,9 @@ private[spark] class TaskSetManager(
val numTasks = tasks.length
val copiesRunning = new Array[Int](numTasks)
val successful = new Array[Boolean](numTasks)
val numFailures = new Array[Int](numTasks)
private val numFailures = new Array[Int](numTasks)
// key is taskId, value is a Map of executor id to when it failed
private val failedExecutors = new HashMap[Int, HashMap[String, Long]]()
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
var tasksSuccessful = 0

Expand Down Expand Up @@ -228,12 +239,18 @@ private[spark] class TaskSetManager(
* This method also cleans up any tasks in the list that have already
* been launched, since we want that to happen lazily.
*/
private def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = {
while (!list.isEmpty) {
val index = list.last
list.trimEnd(1)
if (copiesRunning(index) == 0 && !successful(index)) {
return Some(index)
private def findTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int] = {
var indexOffset = list.size

while (indexOffset > 0) {
indexOffset -= 1
val index = list(indexOffset)
if (!executorIsBlacklisted(execId, index)) {
// This should almost always be list.trimEnd(1) to remove tail
list.remove(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
return Some(index)
}
}
}
None
Expand All @@ -244,6 +261,21 @@ private[spark] class TaskSetManager(
taskAttempts(taskIndex).exists(_.host == host)
}

/**
* Is this re-execution of a failed task on an executor it already failed in before
* EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed ?
*/
private def executorIsBlacklisted(execId: String, taskId: Int): Boolean = {
if (failedExecutors.contains(taskId)) {
val failed = failedExecutors.get(taskId).get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just write this as failedExectors(taskId)


return failed.contains(execId) &&
clock.getTime() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
}

false
}

/**
* Return a speculative task for a given executor if any are available. The task should not have
* an attempt running on this host, in case the host is slow. In addition, the task should meet
Expand All @@ -254,10 +286,13 @@ private[spark] class TaskSetManager(
{
speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set

def canRunOnHost(index: Int): Boolean =
!hasAttemptOnHost(index, host) && !executorIsBlacklisted(execId, index)

if (!speculatableTasks.isEmpty) {
// Check for process-local or preference-less tasks; note that tasks can be process-local
// on multiple nodes when we replicate cached blocks, as in Spark Streaming
for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
val prefs = tasks(index).preferredLocations
val executors = prefs.flatMap(_.executorId)
if (prefs.size == 0 || executors.contains(execId)) {
Expand All @@ -268,7 +303,7 @@ private[spark] class TaskSetManager(

// Check for node-local tasks
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
val locations = tasks(index).preferredLocations.map(_.host)
if (locations.contains(host)) {
speculatableTasks -= index
Expand All @@ -280,7 +315,7 @@ private[spark] class TaskSetManager(
// Check for rack-local tasks
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
for (rack <- sched.getRackForHost(host)) {
for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
val racks = tasks(index).preferredLocations.map(_.host).map(sched.getRackForHost)
if (racks.contains(rack)) {
speculatableTasks -= index
Expand All @@ -292,7 +327,7 @@ private[spark] class TaskSetManager(

// Check for non-local tasks
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
speculatableTasks -= index
return Some((index, TaskLocality.ANY))
}
Expand All @@ -309,32 +344,32 @@ private[spark] class TaskSetManager(
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value)] =
{
for (index <- findTaskFromList(getPendingTasksForExecutor(execId))) {
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}

if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
for (index <- findTaskFromList(getPendingTasksForHost(host))) {
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL))
}
}

if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- findTaskFromList(getPendingTasksForRack(rack))
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
} {
return Some((index, TaskLocality.RACK_LOCAL))
}
}

// Look for no-pref tasks after rack-local tasks since they can run anywhere.
for (index <- findTaskFromList(pendingTasksWithNoPrefs)) {
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}

if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
for (index <- findTaskFromList(allPendingTasks)) {
for (index <- findTaskFromList(execId, allPendingTasks)) {
return Some((index, TaskLocality.ANY))
}
}
Expand Down Expand Up @@ -460,6 +495,7 @@ private[spark] class TaskSetManager(
logInfo("Ignorning task-finished event for TID " + tid + " because task " +
index + " has already completed successfully")
}
failedExecutors.remove(index)
maybeFinishTaskSet()
}

Expand All @@ -480,17 +516,19 @@ private[spark] class TaskSetManager(
logWarning("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))
}
var taskMetrics : TaskMetrics = null
var failureReason = "unknown"
var failureReason: String = null
reason match {
case fetchFailed: FetchFailed =>
logWarning("Loss was due to fetch failure from " + fetchFailed.bmAddress)
if (!successful(index)) {
successful(index) = true
tasksSuccessful += 1
}
// Not adding to failed executors for FetchFailed.
isZombie = true

case TaskKilled =>
// Not adding to failed executors for TaskKilled.
logWarning("Task %d was killed.".format(tid))

case ef: ExceptionFailure =>
Expand All @@ -504,7 +542,8 @@ private[spark] class TaskSetManager(
return
}
val key = ef.description
failureReason = "Exception failure: %s".format(ef.description)
failureReason = "Exception failure in TID %s on host %s: %s".format(
tid, info.host, ef.description)
val now = clock.getTime()
val (printFull, dupCount) = {
if (recentExceptions.contains(key)) {
Expand Down Expand Up @@ -533,11 +572,16 @@ private[spark] class TaskSetManager(
failureReason = "Lost result for TID %s on host %s".format(tid, info.host)
logWarning(failureReason)

case _ => {}
case _ =>
failureReason = "TID %s on host %s failed for unknown reason".format(tid, info.host)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this to line 519 instead (or if you prefer, remove the default setting on line 519, which now won't ever be used)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will make default null.

}
// always add to failed executors
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
put(info.executorId, clock.getTime())
sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
addPendingTask(index)
if (!isZombie && state != TaskState.KILLED) {
assert (null != failureReason)
numFailures(index) += 1
if (numFailures(index) >= maxTaskFailures) {
logError("Task %s:%d failed %d times; aborting job".format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,93 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
}
}

test("executors should be blacklisted after task failure, in spite of locality preferences") {
val rescheduleDelay = 300L
val conf = new SparkConf().
set("spark.scheduler.executorTaskBlacklistTime", rescheduleDelay.toString).
// dont wait to jump locality levels in this test
set("spark.locality.wait", "0")

sc = new SparkContext("local", "test", conf)
// two executors on same host, one on different.
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
("exec1.1", "host1"), ("exec2", "host2"))
// affinity to exec1 on host1 - which we will fail.
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, 4, clock)

{
val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL)
assert(offerResult.isDefined, "Expect resource offer to return a task")

assert(offerResult.get.index === 0)
assert(offerResult.get.executorId === "exec1")

// Cause exec1 to fail : failure 1
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
assert(!sched.taskSetsFailed.contains(taskSet.id))

// Ensure scheduling on exec1 fails after failure 1 due to blacklist
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.RACK_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.ANY).isEmpty)
}

// Run the task on exec1.1 - should work, and then fail it on exec1.1
{
val offerResult = manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL)
assert(offerResult.isDefined,
"Expect resource offer to return a task for exec1.1, offerResult = " + offerResult)

assert(offerResult.get.index === 0)
assert(offerResult.get.executorId === "exec1.1")

// Cause exec1.1 to fail : failure 2
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
assert(!sched.taskSetsFailed.contains(taskSet.id))

// Ensure scheduling on exec1.1 fails after failure 2 due to blacklist
assert(manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty)
}

// Run the task on exec2 - should work, and then fail it on exec2
{
val offerResult = manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY)
assert(offerResult.isDefined, "Expect resource offer to return a task")

assert(offerResult.get.index === 0)
assert(offerResult.get.executorId === "exec2")

// Cause exec2 to fail : failure 3
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
assert(!sched.taskSetsFailed.contains(taskSet.id))

// Ensure scheduling on exec2 fails after failure 3 due to blacklist
assert(manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY).isEmpty)
}

// After reschedule delay, scheduling on exec1 should be possible.
clock.advance(rescheduleDelay)

{
val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL)
assert(offerResult.isDefined, "Expect resource offer to return a task")

assert(offerResult.get.index === 0)
assert(offerResult.get.executorId === "exec1")

assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty)

// Cause exec1 to fail : failure 4
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
}

// we have failed the same task 4 times now : task id should now be in taskSetsFailed
assert(sched.taskSetsFailed.contains(taskSet.id))
}

def createTaskResult(id: Int): DirectTaskResult[Int] = {
val valueSer = SparkEnv.get.serializer.newInstance()
new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)
Expand Down