From fa5d9f15d38f66230c2fdba7e0b82e3936d4cde9 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Sun, 16 Mar 2014 09:09:21 -0700 Subject: [PATCH 1/5] Bugfixes/improvements to scheduler : PR #517 --- .../spark/scheduler/TaskSchedulerImpl.scala | 3 +- .../spark/scheduler/TaskSetManager.scala | 81 +++++++++++++---- .../spark/scheduler/TaskSetManagerSuite.scala | 88 +++++++++++++++++++ 3 files changed, 153 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 23b06612fd7ab..abff252597e16 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -235,7 +235,8 @@ private[spark] class TaskSchedulerImpl( taskIdToExecutorId(tid) = execId activeExecutorIds += execId executorsByHost(host) += execId - availableCpus(i) -= 1 + availableCpus(i) -= taskSet.CPUS_PER_TASK + assert (availableCpus(i) >= 0) launchedTask = true } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 5ea4557bbf56a..b7de24c889cbb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -59,6 +59,15 @@ private[spark] class TaskSetManager( // CPUs to request per task val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) + /* + * Sometimes if an executor is dead or in an otherwise invalid state, the driver + * does not realize right away leading to repeated task failures. If enabled, + * this temporarily prevents a task from re-launching on an executor where + * it just failed. + */ + private[this] val EXECUTOR_TASK_BLACKLIST_TIMEOUT = + conf.getLong("spark.task.executorBlacklistTimeout", 0L) + // Quantile of tasks at which to start speculation val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75) val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5) @@ -71,7 +80,9 @@ private[spark] class TaskSetManager( val numTasks = tasks.length val copiesRunning = new Array[Int](numTasks) val successful = new Array[Boolean](numTasks) - val numFailures = new Array[Int](numTasks) + private val numFailures = new Array[Int](numTasks) + // key is taskId, value is a Map of executor id to when it failed + private[this] val failedExecutors = new HashMap[Int, HashMap[String, Long]]() val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) var tasksSuccessful = 0 @@ -228,12 +239,18 @@ private[spark] class TaskSetManager( * This method also cleans up any tasks in the list that have already * been launched, since we want that to happen lazily. */ - private def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = { - while (!list.isEmpty) { - val index = list.last - list.trimEnd(1) - if (copiesRunning(index) == 0 && !successful(index)) { - return Some(index) + private def findTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int] = { + var indexOffset = list.size + + while (indexOffset > 0) { + indexOffset -= 1 + val index = list(indexOffset) + if (! executorIsBlacklisted(execId, index)) { + // This should almost always be list.trimEnd(1) to remove tail + list.remove(indexOffset) + if (copiesRunning(index) == 0 && !successful(index)) { + return Some(index) + } } } None @@ -244,6 +261,21 @@ private[spark] class TaskSetManager( taskAttempts(taskIndex).exists(_.host == host) } + /** + * Is this re-execution of a failed task on an executor it already failed in before + * EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed ? + */ + private[this] def executorIsBlacklisted(execId: String, taskId: Int): Boolean = { + if (failedExecutors.contains(taskId)) { + val failed = failedExecutors.get(taskId).get + + return failed.contains(execId) && + clock.getTime() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT + } + + false + } + /** * Return a speculative task for a given executor if any are available. The task should not have * an attempt running on this host, in case the host is slow. In addition, the task should meet @@ -254,10 +286,13 @@ private[spark] class TaskSetManager( { speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set + def canRunOnHost(index: Int): Boolean = + !hasAttemptOnHost(index, host) && !executorIsBlacklisted(execId, index) + if (!speculatableTasks.isEmpty) { // Check for process-local or preference-less tasks; note that tasks can be process-local // on multiple nodes when we replicate cached blocks, as in Spark Streaming - for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) { + for (index <- speculatableTasks if canRunOnHost(index)) { val prefs = tasks(index).preferredLocations val executors = prefs.flatMap(_.executorId) if (prefs.size == 0 || executors.contains(execId)) { @@ -268,7 +303,7 @@ private[spark] class TaskSetManager( // Check for node-local tasks if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { - for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) { + for (index <- speculatableTasks if canRunOnHost(index)) { val locations = tasks(index).preferredLocations.map(_.host) if (locations.contains(host)) { speculatableTasks -= index @@ -280,7 +315,7 @@ private[spark] class TaskSetManager( // Check for rack-local tasks if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) { for (rack <- sched.getRackForHost(host)) { - for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) { + for (index <- speculatableTasks if canRunOnHost(index)) { val racks = tasks(index).preferredLocations.map(_.host).map(sched.getRackForHost) if (racks.contains(rack)) { speculatableTasks -= index @@ -292,7 +327,7 @@ private[spark] class TaskSetManager( // Check for non-local tasks if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) { - for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) { + for (index <- speculatableTasks if canRunOnHost(index)) { speculatableTasks -= index return Some((index, TaskLocality.ANY)) } @@ -309,12 +344,12 @@ private[spark] class TaskSetManager( private def findTask(execId: String, host: String, locality: TaskLocality.Value) : Option[(Int, TaskLocality.Value)] = { - for (index <- findTaskFromList(getPendingTasksForExecutor(execId))) { + for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) { return Some((index, TaskLocality.PROCESS_LOCAL)) } if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { - for (index <- findTaskFromList(getPendingTasksForHost(host))) { + for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) { return Some((index, TaskLocality.NODE_LOCAL)) } } @@ -322,19 +357,19 @@ private[spark] class TaskSetManager( if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) { for { rack <- sched.getRackForHost(host) - index <- findTaskFromList(getPendingTasksForRack(rack)) + index <- findTaskFromList(execId, getPendingTasksForRack(rack)) } { return Some((index, TaskLocality.RACK_LOCAL)) } } // Look for no-pref tasks after rack-local tasks since they can run anywhere. - for (index <- findTaskFromList(pendingTasksWithNoPrefs)) { + for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) { return Some((index, TaskLocality.PROCESS_LOCAL)) } if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) { - for (index <- findTaskFromList(allPendingTasks)) { + for (index <- findTaskFromList(execId, allPendingTasks)) { return Some((index, TaskLocality.ANY)) } } @@ -460,6 +495,7 @@ private[spark] class TaskSetManager( logInfo("Ignorning task-finished event for TID " + tid + " because task " + index + " has already completed successfully") } + failedExecutors.remove(index) maybeFinishTaskSet() } @@ -481,6 +517,10 @@ private[spark] class TaskSetManager( } var taskMetrics : TaskMetrics = null var failureReason = "unknown" + val addToFailedExecutor = () => { + failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()). + put(info.executorId, clock.getTime()) + } reason match { case fetchFailed: FetchFailed => logWarning("Loss was due to fetch failure from " + fetchFailed.bmAddress) @@ -488,6 +528,7 @@ private[spark] class TaskSetManager( successful(index) = true tasksSuccessful += 1 } + // Not adding to failed executors for FetchFailed. isZombie = true case TaskKilled => @@ -504,7 +545,8 @@ private[spark] class TaskSetManager( return } val key = ef.description - failureReason = "Exception failure: %s".format(ef.description) + failureReason = "Exception failure in TID %s on host %s: %s".format( + tid, info.host, ef.description) val now = clock.getTime() val (printFull, dupCount) = { if (recentExceptions.contains(key)) { @@ -533,8 +575,11 @@ private[spark] class TaskSetManager( failureReason = "Lost result for TID %s on host %s".format(tid, info.host) logWarning(failureReason) - case _ => {} + case _ => + failureReason = "TID %s on host %s failed for unknown reason".format(tid, info.host) } + // Add to failed for everything else. + addToFailedExecutor() sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics) addPendingTask(index) if (!isZombie && state != TaskState.KILLED) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 33cc7588b919c..7b1ae658bd794 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -298,6 +298,94 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { } } + test("executors should be blacklisted after task failure, in spite of locality preferences") { + val rescheduleDelay = 300L + val conf = new SparkConf(). + set("spark.task.executorBlacklistTimeout", rescheduleDelay.toString). + // dont wait to jump locality levels in this test + set("spark.locality.wait", "0") + + sc = new SparkContext("local", "test", conf) + // two executors on same host, one on different. + val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), + ("exec1.1", "host1"), ("exec2", "host2")) + // affinity to exec1 on host1 - which we will fail. + val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1"))) + // we need actual time measurement + val clock = new FakeClock + val manager = new TaskSetManager(sched, taskSet, 4, clock) + + { + val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL) + assert(offerResult.isDefined, "Expect resource offer to return a task") + + assert(offerResult.get.index === 0) + assert(offerResult.get.executorId === "exec1") + + // Cause exec1 to fail : failure 1 + manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost) + assert(!sched.taskSetsFailed.contains(taskSet.id)) + + // Ensure scheduling on exec1 fails after failure 1 due to blacklist + assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty) + assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty) + assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.RACK_LOCAL).isEmpty) + assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.ANY).isEmpty) + } + + // Run the task on exec1.1 - should work, and then fail it on exec1.1 + { + val offerResult = manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL) + assert(offerResult.isDefined, + "Expect resource offer to return a task for exec1.1, offerResult = " + offerResult) + + assert(offerResult.get.index === 0) + assert(offerResult.get.executorId === "exec1.1") + + // Cause exec1.1 to fail : failure 2 + manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost) + assert(!sched.taskSetsFailed.contains(taskSet.id)) + + // Ensure scheduling on exec1.1 fails after failure 2 due to blacklist + assert(manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty) + } + + // Run the task on exec2 - should work, and then fail it on exec2 + { + val offerResult = manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY) + assert(offerResult.isDefined, "Expect resource offer to return a task") + + assert(offerResult.get.index === 0) + assert(offerResult.get.executorId === "exec2") + + // Cause exec2 to fail : failure 3 + manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost) + assert(!sched.taskSetsFailed.contains(taskSet.id)) + + // Ensure scheduling on exec2 fails after failure 3 due to blacklist + assert(manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY).isEmpty) + } + + // After reschedule delay, scheduling on exec1 should be possible. + clock.advance(rescheduleDelay) + + { + val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL) + assert(offerResult.isDefined, "Expect resource offer to return a task") + + assert(offerResult.get.index === 0) + assert(offerResult.get.executorId === "exec1") + + assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty) + + // Cause exec1 to fail : failure 4 + manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost) + } + + // we have failed the same task 4 times now : task id should now be in taskSetsFailed + assert(sched.taskSetsFailed.contains(taskSet.id)) + } + def createTaskResult(id: Int): DirectTaskResult[Int] = { val valueSer = SparkEnv.get.serializer.newInstance() new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics) From 270d841609f4ee89603efee392b6242364176273 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Sun, 16 Mar 2014 13:04:31 -0700 Subject: [PATCH 2/5] Address review comments --- .../org/apache/spark/scheduler/TaskSetManager.scala | 11 +++++++---- .../apache/spark/scheduler/TaskSetManagerSuite.scala | 1 - 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b7de24c889cbb..506f8bea872d5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -245,7 +245,7 @@ private[spark] class TaskSetManager( while (indexOffset > 0) { indexOffset -= 1 val index = list(indexOffset) - if (! executorIsBlacklisted(execId, index)) { + if (!executorIsBlacklisted(execId, index)) { // This should almost always be list.trimEnd(1) to remove tail list.remove(indexOffset) if (copiesRunning(index) == 0 && !successful(index)) { @@ -516,7 +516,7 @@ private[spark] class TaskSetManager( logWarning("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index)) } var taskMetrics : TaskMetrics = null - var failureReason = "unknown" + var failureReason: String = null val addToFailedExecutor = () => { failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()). put(info.executorId, clock.getTime()) @@ -532,6 +532,7 @@ private[spark] class TaskSetManager( isZombie = true case TaskKilled => + // Not adding to failed executors for TaskKilled. logWarning("Task %d was killed.".format(tid)) case ef: ExceptionFailure => @@ -547,6 +548,7 @@ private[spark] class TaskSetManager( val key = ef.description failureReason = "Exception failure in TID %s on host %s: %s".format( tid, info.host, ef.description) + addToFailedExecutor() val now = clock.getTime() val (printFull, dupCount) = { if (recentExceptions.contains(key)) { @@ -573,16 +575,17 @@ private[spark] class TaskSetManager( case TaskResultLost => failureReason = "Lost result for TID %s on host %s".format(tid, info.host) + addToFailedExecutor() logWarning(failureReason) case _ => failureReason = "TID %s on host %s failed for unknown reason".format(tid, info.host) + addToFailedExecutor() } - // Add to failed for everything else. - addToFailedExecutor() sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics) addPendingTask(index) if (!isZombie && state != TaskState.KILLED) { + assert (null != failureReason) numFailures(index) += 1 if (numFailures(index) >= maxTaskFailures) { logError("Task %s:%d failed %d times; aborting job".format( diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 7b1ae658bd794..6bd37fa122092 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -311,7 +311,6 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { ("exec1.1", "host1"), ("exec2", "host2")) // affinity to exec1 on host1 - which we will fail. val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1"))) - // we need actual time measurement val clock = new FakeClock val manager = new TaskSetManager(sched, taskSet, 4, clock) From 9bda70ee3a4283426261a8c481bbc3bebc564c37 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Sun, 16 Mar 2014 16:48:35 -0700 Subject: [PATCH 3/5] Address review comments, akwats add to failedExecutors --- .../org/apache/spark/scheduler/TaskSetManager.scala | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 506f8bea872d5..c354b16a6595e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -517,10 +517,6 @@ private[spark] class TaskSetManager( } var taskMetrics : TaskMetrics = null var failureReason: String = null - val addToFailedExecutor = () => { - failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()). - put(info.executorId, clock.getTime()) - } reason match { case fetchFailed: FetchFailed => logWarning("Loss was due to fetch failure from " + fetchFailed.bmAddress) @@ -548,7 +544,6 @@ private[spark] class TaskSetManager( val key = ef.description failureReason = "Exception failure in TID %s on host %s: %s".format( tid, info.host, ef.description) - addToFailedExecutor() val now = clock.getTime() val (printFull, dupCount) = { if (recentExceptions.contains(key)) { @@ -575,13 +570,14 @@ private[spark] class TaskSetManager( case TaskResultLost => failureReason = "Lost result for TID %s on host %s".format(tid, info.host) - addToFailedExecutor() logWarning(failureReason) case _ => failureReason = "TID %s on host %s failed for unknown reason".format(tid, info.host) - addToFailedExecutor() } + // always add to failed executors + failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()). + put(info.executorId, clock.getTime()) sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics) addPendingTask(index) if (!isZombie && state != TaskState.KILLED) { From 167fad82d2ba2f7920929a697b45166479dde47f Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Mon, 17 Mar 2014 21:32:33 -0700 Subject: [PATCH 4/5] Address review comments --- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index c354b16a6595e..a73343c1c0826 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -65,8 +65,8 @@ private[spark] class TaskSetManager( * this temporarily prevents a task from re-launching on an executor where * it just failed. */ - private[this] val EXECUTOR_TASK_BLACKLIST_TIMEOUT = - conf.getLong("spark.task.executorBlacklistTimeout", 0L) + private val EXECUTOR_TASK_BLACKLIST_TIMEOUT = + conf.getLong("spark.scheduler.executorTaskBlacklistTime", 0L) // Quantile of tasks at which to start speculation val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75) @@ -82,7 +82,7 @@ private[spark] class TaskSetManager( val successful = new Array[Boolean](numTasks) private val numFailures = new Array[Int](numTasks) // key is taskId, value is a Map of executor id to when it failed - private[this] val failedExecutors = new HashMap[Int, HashMap[String, Long]]() + private val failedExecutors = new HashMap[Int, HashMap[String, Long]]() val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) var tasksSuccessful = 0 @@ -265,7 +265,7 @@ private[spark] class TaskSetManager( * Is this re-execution of a failed task on an executor it already failed in before * EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed ? */ - private[this] def executorIsBlacklisted(execId: String, taskId: Int): Boolean = { + private def executorIsBlacklisted(execId: String, taskId: Int): Boolean = { if (failedExecutors.contains(taskId)) { val failed = failedExecutors.get(taskId).get From 5ff59c27d48e0c53871d682cfeac7cac6d6f0554 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Mon, 17 Mar 2014 23:16:42 -0700 Subject: [PATCH 5/5] Change property in suite also --- .../scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 6bd37fa122092..73153d23c4698 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -301,7 +301,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { test("executors should be blacklisted after task failure, in spite of locality preferences") { val rescheduleDelay = 300L val conf = new SparkConf(). - set("spark.task.executorBlacklistTimeout", rescheduleDelay.toString). + set("spark.scheduler.executorTaskBlacklistTime", rescheduleDelay.toString). // dont wait to jump locality levels in this test set("spark.locality.wait", "0")