From 5253b3134119b2a28cdaa1406d7bafb55f62cbc1 Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Thu, 30 Aug 2018 13:08:58 -0500 Subject: [PATCH 01/18] [SPARK-22148] Acquire new executors to avoid hang because of blacklisting --- .../spark/scheduler/BlacklistTracker.scala | 2 +- .../spark/scheduler/TaskSchedulerImpl.scala | 62 ++++++++++++++++++- .../spark/scheduler/TaskSetManager.scala | 35 +++++++---- 3 files changed, 82 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 980fbbe516b9..33ee0f372146 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -146,7 +146,7 @@ private[scheduler] class BlacklistTracker ( nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry) } - private def killBlacklistedExecutor(exec: String): Unit = { + private[scheduler] def killBlacklistedExecutor(exec: String): Unit = { if (conf.get(config.BLACKLIST_KILL_ENABLED)) { allocationClient match { case Some(a) => diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 8992d7e2284a..cd6e2ca6eb6a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -34,7 +34,7 @@ import org.apache.spark.rpc.RpcEndpoint import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler.TaskLocality.TaskLocality import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} +import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils} /** * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend. @@ -116,6 +116,17 @@ private[spark] class TaskSchedulerImpl( protected val executorIdToHost = new HashMap[String, String] + // Threshold above which we abort the TaskSet if a task could not be scheduled because of complete + // blacklisting. + val UNSCHEDULABLE_TASKSET_TIMEOUT_MS = + conf.getTimeAsMs("spark.scheduler.unschedulableTaskSetTimeout", "120s") + + private val abortTimer = new Timer(true) + + private val clock = new SystemClock + + protected val unschedulableTaskSetToExpiryTime = new HashMap[TaskSetManager, Long] + // Listener object to pass upcalls into var dagScheduler: DAGScheduler = null @@ -414,9 +425,54 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) - } + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { + case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { + // If the taskSet is unschedulable we kill the existing blacklisted executor/s and + // kick off an abortTimer which after waiting will abort the taskSet if we were + // unable to get new executors and couldn't schedule a task from the taskSet. + // Note: We keep a track of schedulability on a per taskSet basis rather than on a + // per task basis. + if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { + hostToExecutors.valuesIterator.foreach(executors => executors.foreach({ + executor => + logDebug("Killing executor because of task unschedulability: " + executor) + blacklistTrackerOpt.foreach(blt => blt.killBlacklistedExecutor(executor)) + }) + ) + unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + abortTimer.schedule(new TimerTask() { + override def run() { + if (unschedulableTaskSetToExpiryTime.contains(taskSet) && + (unschedulableTaskSetToExpiryTime(taskSet) + + UNSCHEDULABLE_TASKSET_TIMEOUT_MS) + <= clock.getTimeMillis() + ) { + logInfo("Cannot schedule any task because of complete blacklisting. " + + "Wait time for scheduling expired. Aborting the application.") + taskSet.abortSinceCompletelyBlacklisted(taskIndex.get) + } else { + this.cancel() + } + } + }, UNSCHEDULABLE_TASKSET_TIMEOUT_MS) + } + } else { + // TODO: try acquiring new executors for static allocation before aborting. + taskSet.abortSinceCompletelyBlacklisted(taskIndex.get) + } + case _ => // Do nothing. + } + } else { + // If a task was scheduled, we clear the expiry time for the taskSet. The abort timer + // checks this entry to decide if we want to abort the taskSet. + if (unschedulableTaskSetToExpiryTime.contains(taskSet)) { + unschedulableTaskSetToExpiryTime.remove(taskSet) + } + } + if (launchedAnyTask && taskSet.isBarrier) { // Check whether the barrier tasks are partially launched. // TODO SPARK-24818 handle the assert failure case (that can happen when some locality diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 8b77641e85b7..ec07664e6a2f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -623,8 +623,9 @@ private[spark] class TaskSetManager( * * It is possible that this taskset has become impossible to schedule *anywhere* due to the * blacklist. The most common scenario would be if there are fewer executors than - * spark.task.maxFailures. We need to detect this so we can fail the task set, otherwise the job - * will hang. + * spark.task.maxFailures. We need to detect this so we can avoid the job from being hung. + * If dynamic allocation is enabled we try to acquire new executor/s by killing the existing one. + * In case of static allocation we abort the taskSet immediately to fail the job. * * There's a tradeoff here: we could make sure all tasks in the task set are schedulable, but that * would add extra time to each iteration of the scheduling loop. Here, we take the approach of @@ -635,8 +636,8 @@ private[spark] class TaskSetManager( * failures (this is because the method picks one unscheduled task, and then iterates through each * executor until it finds one that the task isn't blacklisted on). */ - private[scheduler] def abortIfCompletelyBlacklisted( - hostToExecutors: HashMap[String, HashSet[String]]): Unit = { + private[scheduler] def getCompletelyBlacklistedTaskIfAny( + hostToExecutors: HashMap[String, HashSet[String]]): Option[Int] = { taskSetBlacklistHelperOpt.foreach { taskSetBlacklist => val appBlacklist = blacklistTracker.get // Only look for unschedulable tasks when at least one executor has registered. Otherwise, @@ -679,20 +680,28 @@ private[spark] class TaskSetManager( } } } + if (blacklistedEverywhere) { - val partition = tasks(indexInTaskSet).partitionId - abort(s""" - |Aborting $taskSet because task $indexInTaskSet (partition $partition) - |cannot run anywhere due to node and executor blacklist. - |Most recent failure: - |${taskSetBlacklist.getLatestFailureReason} - | - |Blacklisting behavior can be configured via spark.blacklist.*. - |""".stripMargin) + return Some(indexInTaskSet) } } } } + None + } + + private[scheduler] def abortSinceCompletelyBlacklisted(indexInTaskSet: Int): Unit = { + taskSetBlacklistHelperOpt.foreach { taskSetBlacklist => + val partition = tasks(indexInTaskSet).partitionId + abort(s""" + |Aborting $taskSet because task $indexInTaskSet (partition $partition) + |cannot run anywhere due to node and executor blacklist. + |Most recent failure: + |${taskSetBlacklist.getLatestFailureReason} + | + |Blacklisting behavior can be configured via spark.blacklist.*. + |""".stripMargin) + } } /** From 87c4e57bb966078c8a78eabc5a5e4b6f60c78f28 Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Wed, 5 Sep 2018 13:37:18 -0500 Subject: [PATCH 02/18] Kill only a single executor at a time. --- .../spark/scheduler/TaskSchedulerImpl.scala | 28 ++++++++----------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index cd6e2ca6eb6a..fac332f1b5f3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -429,19 +429,17 @@ private[spark] class TaskSchedulerImpl( if (!launchedAnyTask) { taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable - if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { - // If the taskSet is unschedulable we kill the existing blacklisted executor/s and - // kick off an abortTimer which after waiting will abort the taskSet if we were - // unable to get new executors and couldn't schedule a task from the taskSet. - // Note: We keep a track of schedulability on a per taskSet basis rather than on a - // per task basis. - if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { - hostToExecutors.valuesIterator.foreach(executors => executors.foreach({ - executor => - logDebug("Killing executor because of task unschedulability: " + executor) - blacklistTrackerOpt.foreach(blt => blt.killBlacklistedExecutor(executor)) - }) - ) + + // If the taskSet is unschedulable we kill an existing blacklisted executor/s and + // kick off an abortTimer which after waiting will abort the taskSet if we were + // unable to schedule any task from the taskSet. + // Note: We keep a track of schedulability on a per taskSet basis rather than on a + // per task basis. + val executor = hostToExecutors.valuesIterator.next().iterator.next() + logDebug("Killing executor because of task unschedulability: " + executor) + blacklistTrackerOpt.foreach(blt => blt.killBlacklistedExecutor(executor)) + + if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() abortTimer.schedule(new TimerTask() { override def run() { @@ -459,10 +457,6 @@ private[spark] class TaskSchedulerImpl( } }, UNSCHEDULABLE_TASKSET_TIMEOUT_MS) } - } else { - // TODO: try acquiring new executors for static allocation before aborting. - taskSet.abortSinceCompletelyBlacklisted(taskIndex.get) - } case _ => // Do nothing. } } else { From ffbc9c32d14a0c82036defb90eb18167f93bad4d Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Fri, 21 Sep 2018 09:58:50 -0500 Subject: [PATCH 03/18] Kill idle executor only else abort immediately --- .../spark/internal/config/package.scala | 11 +++ .../spark/scheduler/BlacklistTracker.scala | 32 ++++--- .../spark/scheduler/TaskSchedulerImpl.scala | 70 ++++++++------ .../spark/scheduler/TaskSetManager.scala | 3 +- .../scheduler/BlacklistIntegrationSuite.scala | 7 +- .../scheduler/TaskSchedulerImplSuite.scala | 92 ++++++++++++++++++- 6 files changed, 167 insertions(+), 48 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index eb08628ce111..6ba8bb04a069 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -577,4 +577,15 @@ package object config { .timeConf(TimeUnit.SECONDS) .checkValue(v => v > 0, "The value should be a positive time value.") .createWithDefaultString("365d") + + // Threshold above which we abort the TaskSet if a task could not be scheduled because of complete + // blacklisting. + private[spark] val UNSCHEDULABLE_TASKSET_TIMEOUT = + ConfigBuilder("spark.scheduler.unschedulableTaskSetTimeout") + .doc("The timeout in seconds to wait before aborting a TaskSet to acquire a new executor " + + "and schedule a task which was previously unschedulable because of being completely " + + "blacklisted.") + .timeConf(TimeUnit.SECONDS) + .checkValue(v => v >= 0, "The value should be a non negative time value.") + .createWithDefault(120) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 33ee0f372146..45c6db2a92da 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -146,21 +146,31 @@ private[scheduler] class BlacklistTracker ( nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry) } - private[scheduler] def killBlacklistedExecutor(exec: String): Unit = { + private def killExecutor(exec: String, msg: String): Unit = { + allocationClient match { + case Some(a) => + logInfo(msg) + a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false, + force = true) + case None => + logWarning(s"Not attempting to kill blacklisted executor id $exec " + + s"since allocation client is not defined.") + } + } + + private def killBlacklistedExecutor(exec: String): Unit = { if (conf.get(config.BLACKLIST_KILL_ENABLED)) { - allocationClient match { - case Some(a) => - logInfo(s"Killing blacklisted executor id $exec " + - s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.") - a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false, - force = true) - case None => - logWarning(s"Not attempting to kill blacklisted executor id $exec " + - s"since allocation client is not defined.") - } + killExecutor(exec, + s"Killing blacklisted executor id $exec since ${config.BLACKLIST_KILL_ENABLED.key} is set.") } } + private[scheduler] def killBlacklistedIdleExecutor(exec: String): Unit = { + killExecutor(exec, + s"Killing blacklisted idle executor id $exec because of task unschedulability and trying " + + "to acquire a new executor.") + } + private def killExecutorsOnBlacklistedNode(node: String): Unit = { if (conf.get(config.BLACKLIST_KILL_ENABLED)) { allocationClient match { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index fac332f1b5f3..273fff480789 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -116,11 +116,6 @@ private[spark] class TaskSchedulerImpl( protected val executorIdToHost = new HashMap[String, String] - // Threshold above which we abort the TaskSet if a task could not be scheduled because of complete - // blacklisting. - val UNSCHEDULABLE_TASKSET_TIMEOUT_MS = - conf.getTimeAsMs("spark.scheduler.unschedulableTaskSetTimeout", "120s") - private val abortTimer = new Timer(true) private val clock = new SystemClock @@ -430,33 +425,48 @@ private[spark] class TaskSchedulerImpl( taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable - // If the taskSet is unschedulable we kill an existing blacklisted executor/s and - // kick off an abortTimer which after waiting will abort the taskSet if we were - // unable to schedule any task from the taskSet. - // Note: We keep a track of schedulability on a per taskSet basis rather than on a + // If the taskSet is unschedulable we try to find an existing idle blacklisted + // executor. If we cannot find one, we abort immediately. Else we kill the idle + // executor and kick off an abortTimer which after waiting will abort the taskSet if + // we were unable to schedule any task from the taskSet. + // Note 1: We keep a track of schedulability on a per taskSet basis rather than on a // per task basis. - val executor = hostToExecutors.valuesIterator.next().iterator.next() - logDebug("Killing executor because of task unschedulability: " + executor) - blacklistTrackerOpt.foreach(blt => blt.killBlacklistedExecutor(executor)) - - if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { - unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() - abortTimer.schedule(new TimerTask() { - override def run() { - if (unschedulableTaskSetToExpiryTime.contains(taskSet) && - (unschedulableTaskSetToExpiryTime(taskSet) - + UNSCHEDULABLE_TASKSET_TIMEOUT_MS) - <= clock.getTimeMillis() - ) { - logInfo("Cannot schedule any task because of complete blacklisting. " + - "Wait time for scheduling expired. Aborting the application.") - taskSet.abortSinceCompletelyBlacklisted(taskIndex.get) - } else { - this.cancel() + // Note 2: The taskSet can still be aborted when there are more than one idle + // blacklisted executors and dynamic allocation is on. This is because we rely on the + // ExecutorAllocationManager to acquire a new executor based on the pending tasks and + // it won't release any blacklisted executors which idle timeout after we kill an + // executor to acquire a new one, resulting in the abort timer to expire and abort the + // taskSet. + executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match { + case Some (x) => + val executorId = x._1 + if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { + blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId)) + + unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000 + logInfo(s"Waiting for $timeout ms for completely " + + s"blacklisted task to be schedulable again before aborting $taskSet.") + abortTimer.schedule(new TimerTask() { + override def run() { + if (unschedulableTaskSetToExpiryTime.contains(taskSet) && + (unschedulableTaskSetToExpiryTime(taskSet) + timeout) + <= clock.getTimeMillis() + ) { + logInfo("Cannot schedule any task because of complete blacklisting. " + + s"Wait time for scheduling expired. Aborting $taskSet.") + taskSet.abortSinceCompletelyBlacklisted(taskIndex.get) + } else { + this.cancel() + } } - } - }, UNSCHEDULABLE_TASKSET_TIMEOUT_MS) - } + }, timeout) + } + case _ => // Abort Immediately + logInfo("Cannot schedule any task because of complete blacklisting. No idle" + + s" executors could be found. Aborting $taskSet." ) + taskSet.abortSinceCompletelyBlacklisted(taskIndex.get) + } case _ => // Do nothing. } } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index ec07664e6a2f..5ebcad2e4108 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -624,8 +624,7 @@ private[spark] class TaskSetManager( * It is possible that this taskset has become impossible to schedule *anywhere* due to the * blacklist. The most common scenario would be if there are fewer executors than * spark.task.maxFailures. We need to detect this so we can avoid the job from being hung. - * If dynamic allocation is enabled we try to acquire new executor/s by killing the existing one. - * In case of static allocation we abort the taskSet immediately to fail the job. + * We try to acquire new executor/s by killing an existing idle blacklisted executor. * * There's a tradeoff here: we could make sure all tasks in the task set are schedulable, but that * would add extra time to each iteration of the scheduling loop. Here, we take the approach of diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index d3bbfd11d406..84f05707b4a1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -97,15 +97,16 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM assertDataStructuresEmpty(noFailure = true) } - // Make sure that if we've failed on all executors, but haven't hit task.maxFailures yet, the job - // doesn't hang + // Make sure that if we've failed on all executors, but haven't hit task.maxFailures yet, we try + // to acquire a new executor and if we aren't able to get one, the job doesn't hang and we abort testScheduler( "SPARK-15865 Progress with fewer executors than maxTaskFailures", extraConfs = Seq( config.BLACKLIST_ENABLED.key -> "true", "spark.testing.nHosts" -> "2", "spark.testing.nExecutorsPerHost" -> "1", - "spark.testing.nCoresPerExecutor" -> "1" + "spark.testing.nCoresPerExecutor" -> "1", + "spark.scheduler.unschedulableTaskSetTimeout" -> "0s" ) ) { def runBackend(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 38e26a82e750..4369eb256e67 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -81,10 +81,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B setupHelper() } - def setupSchedulerWithMockTaskSetBlacklist(): TaskSchedulerImpl = { + def setupSchedulerWithMockTaskSetBlacklist(confs: (String, String)*): TaskSchedulerImpl = { blacklist = mock[BlacklistTracker] val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite") conf.set(config.BLACKLIST_ENABLED, true) + confs.foreach{ case (k, v) => conf.set(k, v) } + sc = new SparkContext(conf) taskScheduler = new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4)) { @@ -465,7 +467,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } } - test("abort stage when all executors are blacklisted") { + test("abort stage when all executors are blacklisted and we cannot acquire new executor") { taskScheduler = setupSchedulerWithMockTaskSetBlacklist() val taskSet = FakeTask.createTaskSet(numTasks = 10, stageAttemptId = 0) taskScheduler.submitTasks(taskSet) @@ -502,6 +504,92 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B verify(tsm).abort(anyString(), anyObject()) } + test("SPARK-22148 abort timer should kick in when task is completely blacklisted & no new " + + "executor can be acquired") { + // set the abort timer to fail immediately + taskScheduler = setupSchedulerWithMockTaskSetBlacklist( + config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0") + + // We have only 1 task remaining with 1 executor + val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0) + taskScheduler.submitTasks(taskSet) + val tsm = stageToMockTaskSetManager(0) + + // submit an offer with one executor + val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten + + // Fail the running task + val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get + taskScheduler.statusUpdate( + tid = failedTask.taskId, + state = TaskState.FAILED, + serializedData = ByteBuffer.allocate(0) + ) + // Wait for the failed task to propagate. + Thread.sleep(500) + // taskScheduler.handleFailedTask(tsm, failedTask.taskId, TaskState.FAILED, TaskResultLost) + // tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, TaskResultLost) + + when(stageToMockTaskSetBlacklist(0).isExecutorBlacklistedForTask("executor0", failedTask.index)) + .thenReturn(true) + + // make an offer on the blacklisted executor. We won't schedule anything, and set the abort + // timer to kick in immediately + assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten.size === 0) + // Wait for the abort timer to kick in. Without sleep the test exits before the timer is + // triggered. + Thread.sleep(500) + assert(tsm.isZombie) + } + + test("SPARK-22148 try to acquire a new executor when task is unschedulable with 1 executor") { + + taskScheduler = setupSchedulerWithMockTaskSetBlacklist( + config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10") + + // We have only 1 task remaining with 1 executor + val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0) + taskScheduler.submitTasks(taskSet) + val tsm = stageToMockTaskSetManager(0) + + // submit an offer with one executor + val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten + + // Fail the running task + val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get + taskScheduler.statusUpdate( + tid = failedTask.taskId, + state = TaskState.FAILED, + serializedData = ByteBuffer.allocate(0) + ) + // Wait for the failed task to propagate + Thread.sleep(500) + when(stageToMockTaskSetBlacklist(0).isExecutorBlacklistedForTask("executor0", failedTask.index)) + .thenReturn(true) + + // make an offer on the blacklisted executor. We won't schedule anything, and set the abort + // timer to expire if no new executors could be acquired. We kill the existing idle blacklisted + // executor and try to acquire a new one. + assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten.size === 0) + Thread.sleep(500) + assert(!tsm.isZombie) + + // Offer a new executor which should be accepted + assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor1", "host0", 1) + )).flatten.size === 1) + + assert(!tsm.isZombie) + } + /** * Helper for performance tests. Takes the explicitly blacklisted nodes and executors; verifies * that the blacklists are used efficiently to ensure scheduling is not O(numPendingTasks). From 43e0af2238855dc69c93b36e40493c3eda670e90 Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Tue, 9 Oct 2018 11:55:32 -0500 Subject: [PATCH 04/18] Address review comments + update docs --- .../spark/internal/config/package.scala | 8 ++--- .../spark/scheduler/TaskSchedulerImpl.scala | 33 ++++++++----------- .../scheduler/TaskSchedulerImplSuite.scala | 3 -- docs/configuration.md | 8 +++++ 4 files changed, 26 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 30f7d334f626..e1df6124ec21 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -600,10 +600,10 @@ package object config { // Threshold above which we abort the TaskSet if a task could not be scheduled because of complete // blacklisting. private[spark] val UNSCHEDULABLE_TASKSET_TIMEOUT = - ConfigBuilder("spark.scheduler.unschedulableTaskSetTimeout") - .doc("The timeout in seconds to wait before aborting a TaskSet to acquire a new executor " + - "and schedule a task which was previously unschedulable because of being completely " + - "blacklisted.") + ConfigBuilder("spark.scheduler.blacklist.unschedulableTaskSetTimeout") + .doc("The timeout in seconds to wait to try to acquire a new executor and schedule a task " + + "before aborting a TaskSet which was previously unschedulable because of being " + + "completely blacklisted.") .timeConf(TimeUnit.SECONDS) .checkValue(v => v >= 0, "The value should be a non negative time value.") .createWithDefault(120) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index f7f93ae4af6c..63a7d8519049 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -22,10 +22,9 @@ import java.util.{Locale, Timer, TimerTask} import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicLong -import scala.collection.Set +import scala.collection.{Set, mutable} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.util.Random - import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.ExecutorMetrics @@ -118,7 +117,6 @@ private[spark] class TaskSchedulerImpl( protected val executorIdToHost = new HashMap[String, String] private val abortTimer = new Timer(true) - private val clock = new SystemClock protected val unschedulableTaskSetToExpiryTime = new HashMap[TaskSetManager, Long] @@ -430,14 +428,13 @@ private[spark] class TaskSchedulerImpl( // executor. If we cannot find one, we abort immediately. Else we kill the idle // executor and kick off an abortTimer which after waiting will abort the taskSet if // we were unable to schedule any task from the taskSet. - // Note 1: We keep a track of schedulability on a per taskSet basis rather than on a - // per task basis. + // Note 1: We keep track of schedulability on a per taskSet basis rather than on a per + // task basis. // Note 2: The taskSet can still be aborted when there are more than one idle - // blacklisted executors and dynamic allocation is on. This is because we rely on the - // ExecutorAllocationManager to acquire a new executor based on the pending tasks and - // it won't release any blacklisted executors which idle timeout after we kill an - // executor to acquire a new one, resulting in the abort timer to expire and abort the - // taskSet. + // blacklisted executors and dynamic allocation is on. This can happen when a killed + // idle executor isn't replaced in time by ExecutorAllocationManager as it relies on + // pending tasks and doesn't kill executors on idle timeouts, resulting in the abort + // timer to expire and abort the taskSet. executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match { case Some (x) => val executorId = x._1 @@ -465,18 +462,16 @@ private[spark] class TaskSchedulerImpl( } case _ => // Abort Immediately logInfo("Cannot schedule any task because of complete blacklisting. No idle" + - s" executors could be found. Aborting $taskSet." ) + s" executors can be found to kill. Aborting $taskSet." ) taskSet.abortSinceCompletelyBlacklisted(taskIndex.get) } - case _ => // Do nothing. - } - } else { - // If a task was scheduled, we clear the expiry time for the taskSet. The abort timer - // checks this entry to decide if we want to abort the taskSet. - if (unschedulableTaskSetToExpiryTime.contains(taskSet)) { - unschedulableTaskSetToExpiryTime.remove(taskSet) - } + case _ => // Do nothing if no tasks completely blacklisted. } + } else { + // If a task was scheduled, we clear the expiry time for the taskSet. The abort timer + // checks this entry to decide if we want to abort the taskSet. + unschedulableTaskSetToExpiryTime.remove(taskSet) + } if (launchedAnyTask && taskSet.isBarrier) { // Check whether the barrier tasks are partially launched. diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 925591022e9e..48fe8753adcf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -530,8 +530,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B ) // Wait for the failed task to propagate. Thread.sleep(500) - // taskScheduler.handleFailedTask(tsm, failedTask.taskId, TaskState.FAILED, TaskResultLost) - // tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, TaskResultLost) when(stageToMockTaskSetBlacklist(0).isExecutorBlacklistedForTask("executor0", failedTask.index)) .thenReturn(true) @@ -548,7 +546,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } test("SPARK-22148 try to acquire a new executor when task is unschedulable with 1 executor") { - taskScheduler = setupSchedulerWithMockTaskSetBlacklist( config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10") diff --git a/docs/configuration.md b/docs/configuration.md index 782ccff66707..c36bf38f3e30 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1583,6 +1583,14 @@ Apart from these, the following properties are also available, and may be useful driver using more memory. + + spark.scheduler.blacklist.unschedulableTaskSetTimeout + 120s + + The timeout in seconds to wait to try to acquire a new executor and schedule a task before + aborting a TaskSet which was previously unschedulable because of being completely blacklisted. + + spark.blacklist.enabled From 2ac135b2def99cb8de9c525dbaacc1c32f3a680a Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Tue, 9 Oct 2018 13:21:02 -0500 Subject: [PATCH 05/18] Fix scala style checks --- .../main/scala/org/apache/spark/internal/config/package.scala | 2 +- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e1df6124ec21..091579aa4451 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -607,7 +607,7 @@ package object config { .timeConf(TimeUnit.SECONDS) .checkValue(v => v >= 0, "The value should be a non negative time value.") .createWithDefault(120) - + private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL = ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval") .doc("Time in seconds to wait between a max concurrent tasks check failure and the next " + diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 63a7d8519049..b13b39a3cd16 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -22,9 +22,10 @@ import java.util.{Locale, Timer, TimerTask} import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicLong -import scala.collection.{Set, mutable} +import scala.collection.Set import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.util.Random + import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.ExecutorMetrics From a12a3fbc5a40e8182a8029d5a38b347671f3c26e Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Tue, 9 Oct 2018 13:43:11 -0500 Subject: [PATCH 06/18] Remove extra line --- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index b13b39a3cd16..c06c709d0b55 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -119,7 +119,6 @@ private[spark] class TaskSchedulerImpl( private val abortTimer = new Timer(true) private val clock = new SystemClock - protected val unschedulableTaskSetToExpiryTime = new HashMap[TaskSetManager, Long] // Listener object to pass upcalls into @@ -427,8 +426,9 @@ private[spark] class TaskSchedulerImpl( // If the taskSet is unschedulable we try to find an existing idle blacklisted // executor. If we cannot find one, we abort immediately. Else we kill the idle - // executor and kick off an abortTimer which after waiting will abort the taskSet if - // we were unable to schedule any task from the taskSet. + // executor and kick off an abortTimer which if it doesn't schedule a task within the + // the timeout will abort the taskSet if we were unable to schedule any task from the + // taskSet. // Note 1: We keep track of schedulability on a per taskSet basis rather than on a per // task basis. // Note 2: The taskSet can still be aborted when there are more than one idle From 4ce7610522927ecabd0b8a75fc0a557938135b0d Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Tue, 9 Oct 2018 18:37:59 -0500 Subject: [PATCH 07/18] Update the config name --- .../org/apache/spark/scheduler/BlacklistIntegrationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index d67d612f8e25..29bb8232f44f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -105,7 +105,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM "spark.testing.nHosts" -> "2", "spark.testing.nExecutorsPerHost" -> "1", "spark.testing.nCoresPerExecutor" -> "1", - "spark.scheduler.unschedulableTaskSetTimeout" -> "0s" + "spark.scheduler.blacklist.unschedulableTaskSetTimeout" -> "0s" ) ) { def runBackend(): Unit = { From c361693a3d08a1bea1d2919f0a8e970c03959cc8 Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Wed, 10 Oct 2018 12:28:37 -0500 Subject: [PATCH 08/18] Update comment and abort timer in stop --- .../scala/org/apache/spark/internal/config/package.scala | 5 ++--- .../scala/org/apache/spark/scheduler/BlacklistTracker.scala | 2 +- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 1 + docs/configuration.md | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 091579aa4451..9bf563659de7 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -601,9 +601,8 @@ package object config { // blacklisting. private[spark] val UNSCHEDULABLE_TASKSET_TIMEOUT = ConfigBuilder("spark.scheduler.blacklist.unschedulableTaskSetTimeout") - .doc("The timeout in seconds to wait to try to acquire a new executor and schedule a task " + - "before aborting a TaskSet which was previously unschedulable because of being " + - "completely blacklisted.") + .doc("The timeout in seconds to wait to acquire a new executor and schedule a task " + + "before aborting a TaskSet which is unschedulable because of being completely blacklisted.") .timeConf(TimeUnit.SECONDS) .checkValue(v => v >= 0, "The value should be a non negative time value.") .createWithDefault(120) diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 45c6db2a92da..ef6d02d85c27 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -153,7 +153,7 @@ private[scheduler] class BlacklistTracker ( a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false, force = true) case None => - logWarning(s"Not attempting to kill blacklisted executor id $exec " + + logInfo(s"Not attempting to kill blacklisted executor id $exec " + s"since allocation client is not defined.") } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index c06c709d0b55..84726193d4dc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -646,6 +646,7 @@ private[spark] class TaskSchedulerImpl( barrierCoordinator.stop() } starvationTimer.cancel() + abortTimer.cancel() } override def defaultParallelism(): Int = backend.defaultParallelism() diff --git a/docs/configuration.md b/docs/configuration.md index c36bf38f3e30..b3fc040e684a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1587,8 +1587,8 @@ Apart from these, the following properties are also available, and may be useful spark.scheduler.blacklist.unschedulableTaskSetTimeout 120s - The timeout in seconds to wait to try to acquire a new executor and schedule a task before - aborting a TaskSet which was previously unschedulable because of being completely blacklisted. + The timeout in seconds to wait to acquire a new executor and schedule a task before aborting a + TaskSet which is unschedulable because of being completely blacklisted. From 2c5a75354d36d08199b9805a7513a4ec4a546a27 Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Tue, 16 Oct 2018 15:07:48 -0500 Subject: [PATCH 09/18] Add more unit tests --- .../spark/internal/config/package.scala | 2 - .../spark/scheduler/TaskSchedulerImpl.scala | 13 ++- .../scheduler/TaskSchedulerImplSuite.scala | 87 +++++++++++++++---- 3 files changed, 77 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 9bf563659de7..b2b6ceb9a0dc 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -597,8 +597,6 @@ package object config { .checkValue(v => v > 0, "The value should be a positive time value.") .createWithDefaultString("365d") - // Threshold above which we abort the TaskSet if a task could not be scheduled because of complete - // blacklisting. private[spark] val UNSCHEDULABLE_TASKSET_TIMEOUT = ConfigBuilder("spark.scheduler.blacklist.unschedulableTaskSetTimeout") .doc("The timeout in seconds to wait to acquire a new executor and schedule a task " + diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 84726193d4dc..131953f46eb3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -119,7 +119,8 @@ private[spark] class TaskSchedulerImpl( private val abortTimer = new Timer(true) private val clock = new SystemClock - protected val unschedulableTaskSetToExpiryTime = new HashMap[TaskSetManager, Long] + // Exposed for testing + val unschedulableTaskSetToExpiryTime = new HashMap[TaskSetManager, Long] // Listener object to pass upcalls into var dagScheduler: DAGScheduler = null @@ -469,9 +470,13 @@ private[spark] class TaskSchedulerImpl( case _ => // Do nothing if no tasks completely blacklisted. } } else { - // If a task was scheduled, we clear the expiry time for the taskSet. The abort timer - // checks this entry to decide if we want to abort the taskSet. - unschedulableTaskSetToExpiryTime.remove(taskSet) + // If a task was scheduled, we clear the expiry time for all the taskSets. This ensures + // that we have got atleast a non blacklisted executor and the job can progress. The + // abort timer checks this entry to decide if we want to abort the taskSet. + if (unschedulableTaskSetToExpiryTime.nonEmpty) { + logInfo("Clearing the expiry times for all unschedulable taskSets as") + unschedulableTaskSetToExpiryTime.clear() + } } if (launchedAnyTask && taskSet.isBarrier) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 48fe8753adcf..64302b882346 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -523,16 +523,13 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // Fail the running task val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get - taskScheduler.statusUpdate( - tid = failedTask.taskId, - state = TaskState.FAILED, - serializedData = ByteBuffer.allocate(0) - ) - // Wait for the failed task to propagate. - Thread.sleep(500) - - when(stageToMockTaskSetBlacklist(0).isExecutorBlacklistedForTask("executor0", failedTask.index)) - .thenReturn(true) + taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) + // we explicitly call the handleFailedTask method here to avoid adding a sleep in the test suite + // Reason being - handleFailedTask is run by an executor service and there is a momentary delay + // before it is launched and this fails the assertion check. + tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason) + when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( + "executor0", failedTask.index)).thenReturn(true) // make an offer on the blacklisted executor. We won't schedule anything, and set the abort // timer to kick in immediately @@ -561,15 +558,13 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // Fail the running task val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get - taskScheduler.statusUpdate( - tid = failedTask.taskId, - state = TaskState.FAILED, - serializedData = ByteBuffer.allocate(0) - ) - // Wait for the failed task to propagate - Thread.sleep(500) - when(stageToMockTaskSetBlacklist(0).isExecutorBlacklistedForTask("executor0", failedTask.index)) - .thenReturn(true) + taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) + // we explicitly call the handleFailedTask method here to avoid adding a sleep in the test suite + // Reason being - handleFailedTask is run by an executor service and there is a momentary delay + // before it is launched and this fails the assertion check. + tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason) + when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( + "executor0", failedTask.index)).thenReturn(true) // make an offer on the blacklisted executor. We won't schedule anything, and set the abort // timer to expire if no new executors could be acquired. We kill the existing idle blacklisted @@ -588,6 +583,60 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!tsm.isZombie) } + // This is to test a scenario where we have two taskSets completely blacklisted and on acquiring + // a new executor we don't want the abort timer for the second taskSet to expire and abort the job + test("SPARK-22148 abort timer should clear unschedulableTaskSetToExpiryTime for all TaskSets") { + taskScheduler = setupSchedulerWithMockTaskSetBlacklist( + config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10") + + // We have 2 taskSets with 1 task remaining in each with 1 executor completely blacklisted + val taskSet1 = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0) + taskScheduler.submitTasks(taskSet1) + val taskSet2 = FakeTask.createTaskSet(numTasks = 1, stageId = 1, stageAttemptId = 0) + taskScheduler.submitTasks(taskSet2) + val tsm = stageToMockTaskSetManager(0) + + // submit an offer with one executor + val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten + + // Fail the running task + val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get + taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) + when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( + "executor0", failedTask.index)).thenReturn(true) + + // make an offer. We will schedule the task from the second taskSet + val secondTaskAttempts = taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten + + val tsm2 = stageToMockTaskSetManager(1) + val failedTask2 = secondTaskAttempts.find(_.executorId == "executor0").get + taskScheduler.statusUpdate(failedTask2.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) + when(tsm2.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( + "executor0", failedTask2.index)).thenReturn(true) + + // make an offer on the blacklisted executor. We won't schedule anything, and set the abort + // timer for taskSet2 + assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten.size === 0) + + assert(taskScheduler.unschedulableTaskSetToExpiryTime.size == 2) + + // Offer a new executor which should be accepted + assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor1", "host1", 1) + )).flatten.size === 1) + + // Check if all the taskSets are cleared + assert(taskScheduler.unschedulableTaskSetToExpiryTime.size == 0) + + assert(!tsm.isZombie) + } + /** * Helper for performance tests. Takes the explicitly blacklisted nodes and executors; verifies * that the blacklists are used efficiently to ensure scheduling is not O(numPendingTasks). From 69c156b32cf573c7d3a75f63ab093022046ad856 Mon Sep 17 00:00:00 2001 From: Tom Graves Date: Wed, 17 Oct 2018 11:11:58 -0500 Subject: [PATCH 10/18] Fix spacing --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 131953f46eb3..03fe184f2dc6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -464,7 +464,7 @@ private[spark] class TaskSchedulerImpl( } case _ => // Abort Immediately logInfo("Cannot schedule any task because of complete blacklisting. No idle" + - s" executors can be found to kill. Aborting $taskSet." ) + s" executors can be found to kill. Aborting $taskSet." ) taskSet.abortSinceCompletelyBlacklisted(taskIndex.get) } case _ => // Do nothing if no tasks completely blacklisted. From d2af73d4bd10676169f4fddc9dd262c97c0a9967 Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Thu, 18 Oct 2018 11:45:45 -0500 Subject: [PATCH 11/18] Offer resource to kick off abort timer for taskSet2 --- .../apache/spark/scheduler/TaskSchedulerImplSuite.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 64302b882346..6a52959ac011 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -607,7 +607,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( "executor0", failedTask.index)).thenReturn(true) - // make an offer. We will schedule the task from the second taskSet + // make an offer. We will schedule the task from the second taskSet. Since a task was scheduled + // we do not kick off the abort timer for taskSet1 val secondTaskAttempts = taskScheduler.resourceOffers(IndexedSeq( WorkerOffer("executor0", "host0", 1) )).flatten @@ -618,6 +619,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B when(tsm2.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( "executor0", failedTask2.index)).thenReturn(true) + // make an offer on the blacklisted executor. We won't schedule anything, and set the abort + // timer for taskSet1 + assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten.size === 0) + // make an offer on the blacklisted executor. We won't schedule anything, and set the abort // timer for taskSet2 assert(taskScheduler.resourceOffers(IndexedSeq( From b2d0d40771534291bdd5a1e3ebfc2c0c227c5956 Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Fri, 19 Oct 2018 16:23:11 -0500 Subject: [PATCH 12/18] Update comment and minor refactoring --- .../spark/scheduler/TaskSchedulerImpl.scala | 45 ++++++++++--------- .../spark/scheduler/TaskSetManager.scala | 13 +++--- 2 files changed, 30 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 03fe184f2dc6..89b28e32de3a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -423,7 +423,7 @@ private[spark] class TaskSchedulerImpl( if (!launchedAnyTask) { taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { - case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + case Some(taskIndex) => // Returns the taskIndex which was unschedulable // If the taskSet is unschedulable we try to find an existing idle blacklisted // executor. If we cannot find one, we abort immediately. Else we kill the idle @@ -447,34 +447,23 @@ private[spark] class TaskSchedulerImpl( val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000 logInfo(s"Waiting for $timeout ms for completely " + s"blacklisted task to be schedulable again before aborting $taskSet.") - abortTimer.schedule(new TimerTask() { - override def run() { - if (unschedulableTaskSetToExpiryTime.contains(taskSet) && - (unschedulableTaskSetToExpiryTime(taskSet) + timeout) - <= clock.getTimeMillis() - ) { - logInfo("Cannot schedule any task because of complete blacklisting. " + - s"Wait time for scheduling expired. Aborting $taskSet.") - taskSet.abortSinceCompletelyBlacklisted(taskIndex.get) - } else { - this.cancel() - } - } - }, timeout) + abortTimer.schedule(getAbortTimer(taskSet, taskIndex, timeout), timeout) } case _ => // Abort Immediately logInfo("Cannot schedule any task because of complete blacklisting. No idle" + s" executors can be found to kill. Aborting $taskSet." ) - taskSet.abortSinceCompletelyBlacklisted(taskIndex.get) + taskSet.abortSinceCompletelyBlacklisted(taskIndex) } case _ => // Do nothing if no tasks completely blacklisted. } } else { - // If a task was scheduled, we clear the expiry time for all the taskSets. This ensures - // that we have got atleast a non blacklisted executor and the job can progress. The - // abort timer checks this entry to decide if we want to abort the taskSet. + // We want to differ killing any taskSets as long as we have a non blacklisted executor + // which can be used to schedule a task from any active taskSets. This ensures that the + // job can make progress and if we encounter a flawed taskSet it will eventually either + // fail or abort due to being completely blacklisted. if (unschedulableTaskSetToExpiryTime.nonEmpty) { - logInfo("Clearing the expiry times for all unschedulable taskSets as") + logInfo("Clearing the expiry times for all unschedulable taskSets as a task was " + + "recently scheduled.") unschedulableTaskSetToExpiryTime.clear() } } @@ -514,6 +503,22 @@ private[spark] class TaskSchedulerImpl( return tasks } + private def getAbortTimer(taskSet: TaskSetManager, taskIndex: Int, timeout: Long): TimerTask = { + new TimerTask() { + override def run() { + if (unschedulableTaskSetToExpiryTime.contains(taskSet) && + (unschedulableTaskSetToExpiryTime(taskSet) + timeout) <= clock.getTimeMillis() + ) { + logInfo("Cannot schedule any task because of complete blacklisting. " + + s"Wait time for scheduling expired. Aborting $taskSet.") + taskSet.abortSinceCompletelyBlacklisted(taskIndex) + } else { + this.cancel() + } + } + } + } + /** * Shuffle offers around to avoid always placing tasks on the same workers. Exposed to allow * overriding in tests, so it can be deterministic. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index f86c931e1b64..6bf60dd8e9df 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -637,7 +637,7 @@ private[spark] class TaskSetManager( */ private[scheduler] def getCompletelyBlacklistedTaskIfAny( hostToExecutors: HashMap[String, HashSet[String]]): Option[Int] = { - taskSetBlacklistHelperOpt.foreach { taskSetBlacklist => + taskSetBlacklistHelperOpt.flatMap { taskSetBlacklist => val appBlacklist = blacklistTracker.get // Only look for unschedulable tasks when at least one executor has registered. Otherwise, // task sets will be (unnecessarily) aborted in cases when no executors have registered yet. @@ -658,11 +658,11 @@ private[spark] class TaskSetManager( } } - pendingTask.foreach { indexInTaskSet => + pendingTask.find { indexInTaskSet => // try to find some executor this task can run on. Its possible that some *other* // task isn't schedulable anywhere, but we will discover that in some later call, // when that unschedulable task is the last task remaining. - val blacklistedEverywhere = hostToExecutors.forall { case (host, execsOnHost) => + hostToExecutors.forall { case (host, execsOnHost) => // Check if the task can run on the node val nodeBlacklisted = appBlacklist.isNodeBlacklisted(host) || @@ -679,14 +679,11 @@ private[spark] class TaskSetManager( } } } - - if (blacklistedEverywhere) { - return Some(indexInTaskSet) - } } + } else { + None } } - None } private[scheduler] def abortSinceCompletelyBlacklisted(indexInTaskSet: Int): Unit = { From ec3802956d39266c73a175fe4e5dd20fd1f7570e Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Mon, 22 Oct 2018 11:36:49 -0500 Subject: [PATCH 13/18] Replace Thread.sleep with eventually --- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 14 +++++++++----- .../spark/scheduler/TaskSchedulerImplSuite.scala | 10 ++++++---- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 89b28e32de3a..6118742f8b15 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -443,11 +443,12 @@ private[spark] class TaskSchedulerImpl( if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId)) - unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000 + unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout logInfo(s"Waiting for $timeout ms for completely " + s"blacklisted task to be schedulable again before aborting $taskSet.") - abortTimer.schedule(getAbortTimer(taskSet, taskIndex, timeout), timeout) + abortTimer.schedule( + createUnschedulableTaskSetAbortTimer(taskSet, taskIndex, timeout), timeout) } case _ => // Abort Immediately logInfo("Cannot schedule any task because of complete blacklisting. No idle" + @@ -457,7 +458,7 @@ private[spark] class TaskSchedulerImpl( case _ => // Do nothing if no tasks completely blacklisted. } } else { - // We want to differ killing any taskSets as long as we have a non blacklisted executor + // We want to defer killing any taskSets as long as we have a non blacklisted executor // which can be used to schedule a task from any active taskSets. This ensures that the // job can make progress and if we encounter a flawed taskSet it will eventually either // fail or abort due to being completely blacklisted. @@ -503,11 +504,14 @@ private[spark] class TaskSchedulerImpl( return tasks } - private def getAbortTimer(taskSet: TaskSetManager, taskIndex: Int, timeout: Long): TimerTask = { + private def createUnschedulableTaskSetAbortTimer( + taskSet: TaskSetManager, + taskIndex: Int, + timeout: Long): TimerTask = { new TimerTask() { override def run() { if (unschedulableTaskSetToExpiryTime.contains(taskSet) && - (unschedulableTaskSetToExpiryTime(taskSet) + timeout) <= clock.getTimeMillis() + unschedulableTaskSetToExpiryTime(taskSet) <= clock.getTimeMillis() ) { logInfo("Cannot schedule any task because of complete blacklisting. " + s"Wait time for scheduling expired. Aborting $taskSet.") diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 6a52959ac011..6f3286de06a6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -20,10 +20,12 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer import scala.collection.mutable.HashMap +import scala.concurrent.duration._ import org.mockito.Matchers.{anyInt, anyObject, anyString, eq => meq} import org.mockito.Mockito.{atLeast, atMost, never, spy, times, verify, when} import org.scalatest.BeforeAndAfterEach +import org.scalatest.concurrent.Eventually import org.scalatest.mockito.MockitoSugar import org.apache.spark._ @@ -40,7 +42,7 @@ class FakeSchedulerBackend extends SchedulerBackend { } class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach - with Logging with MockitoSugar { + with Logging with MockitoSugar with Eventually { var failedTaskSetException: Option[Throwable] = None var failedTaskSetReason: String = null @@ -538,8 +540,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B )).flatten.size === 0) // Wait for the abort timer to kick in. Without sleep the test exits before the timer is // triggered. - Thread.sleep(500) - assert(tsm.isZombie) + eventually(timeout(500.milliseconds)) { + assert(tsm.isZombie) + } } test("SPARK-22148 try to acquire a new executor when task is unschedulable with 1 executor") { @@ -572,7 +575,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(taskScheduler.resourceOffers(IndexedSeq( WorkerOffer("executor0", "host0", 1) )).flatten.size === 0) - Thread.sleep(500) assert(!tsm.isZombie) // Offer a new executor which should be accepted From 4a5ea827263b8e789f03474d931105c707c6e778 Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Mon, 22 Oct 2018 13:47:28 -0500 Subject: [PATCH 14/18] Fix nits --- .../spark/scheduler/TaskSchedulerImpl.scala | 11 ++++------- .../spark/scheduler/TaskSchedulerImplSuite.scala | 16 +++++++--------- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 6118742f8b15..60896e76b17d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -438,8 +438,7 @@ private[spark] class TaskSchedulerImpl( // pending tasks and doesn't kill executors on idle timeouts, resulting in the abort // timer to expire and abort the taskSet. executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match { - case Some (x) => - val executorId = x._1 + case Some ((executorId, _)) => if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId)) @@ -448,7 +447,7 @@ private[spark] class TaskSchedulerImpl( logInfo(s"Waiting for $timeout ms for completely " + s"blacklisted task to be schedulable again before aborting $taskSet.") abortTimer.schedule( - createUnschedulableTaskSetAbortTimer(taskSet, taskIndex, timeout), timeout) + createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout) } case _ => // Abort Immediately logInfo("Cannot schedule any task because of complete blacklisting. No idle" + @@ -506,13 +505,11 @@ private[spark] class TaskSchedulerImpl( private def createUnschedulableTaskSetAbortTimer( taskSet: TaskSetManager, - taskIndex: Int, - timeout: Long): TimerTask = { + taskIndex: Int): TimerTask = { new TimerTask() { override def run() { if (unschedulableTaskSetToExpiryTime.contains(taskSet) && - unschedulableTaskSetToExpiryTime(taskSet) <= clock.getTimeMillis() - ) { + unschedulableTaskSetToExpiryTime(taskSet) <= clock.getTimeMillis()) { logInfo("Cannot schedule any task because of complete blacklisting. " + s"Wait time for scheduling expired. Aborting $taskSet.") taskSet.abortSinceCompletelyBlacklisted(taskIndex) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 6f3286de06a6..c0fa90c0882d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -88,7 +88,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B blacklist = mock[BlacklistTracker] val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite") conf.set(config.BLACKLIST_ENABLED, true) - confs.foreach{ case (k, v) => conf.set(k, v) } + confs.foreach { case (k, v) => conf.set(k, v) } sc = new SparkContext(conf) taskScheduler = @@ -508,7 +508,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } test("SPARK-22148 abort timer should kick in when task is completely blacklisted & no new " + - "executor can be acquired") { + "executor can be acquired") { // set the abort timer to fail immediately taskScheduler = setupSchedulerWithMockTaskSetBlacklist( config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0") @@ -603,6 +603,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B WorkerOffer("executor0", "host0", 1) )).flatten + assert(taskScheduler.unschedulableTaskSetToExpiryTime.size == 0) + // Fail the running task val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) @@ -615,6 +617,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B WorkerOffer("executor0", "host0", 1) )).flatten + assert(taskScheduler.unschedulableTaskSetToExpiryTime.size == 0) + val tsm2 = stageToMockTaskSetManager(1) val failedTask2 = secondTaskAttempts.find(_.executorId == "executor0").get taskScheduler.statusUpdate(failedTask2.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) @@ -622,13 +626,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B "executor0", failedTask2.index)).thenReturn(true) // make an offer on the blacklisted executor. We won't schedule anything, and set the abort - // timer for taskSet1 - assert(taskScheduler.resourceOffers(IndexedSeq( - WorkerOffer("executor0", "host0", 1) - )).flatten.size === 0) - - // make an offer on the blacklisted executor. We won't schedule anything, and set the abort - // timer for taskSet2 + // timer for taskSet1 and taskSet2 assert(taskScheduler.resourceOffers(IndexedSeq( WorkerOffer("executor0", "host0", 1) )).flatten.size === 0) From 9b2aeaffdf6fc8d76b6c8ba2978c5dc7d6022899 Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Tue, 23 Oct 2018 17:07:40 -0500 Subject: [PATCH 15/18] Add unit test for locality wait --- .../scheduler/TaskSchedulerImplSuite.scala | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index c0fa90c0882d..fe6f2ecbc2e1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -608,6 +608,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // Fail the running task val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) + tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason) when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( "executor0", failedTask.index)).thenReturn(true) @@ -622,6 +623,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val tsm2 = stageToMockTaskSetManager(1) val failedTask2 = secondTaskAttempts.find(_.executorId == "executor0").get taskScheduler.statusUpdate(failedTask2.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) + tsm2.handleFailedTask(failedTask2.taskId, TaskState.FAILED, UnknownReason) when(tsm2.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( "executor0", failedTask2.index)).thenReturn(true) @@ -644,6 +646,42 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!tsm.isZombie) } + // this test is to check that we don't abort a taskSet which is not being scheduled on other + // executors as it is waiting on locality timeout and not being aborted because it is still not + // completely blacklisted. + test("SPARK-22148 Ensure we don't abort the taskSet if we haven't been completely blacklisted") { + taskScheduler = setupSchedulerWithMockTaskSetBlacklist( + config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0") + + val preferredLocation = Seq(ExecutorCacheTaskLocation("host0", "executor0")) + val taskSet1 = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0, + preferredLocation) + taskScheduler.submitTasks(taskSet1) + + val tsm = stageToMockTaskSetManager(0) + + // submit an offer with one executor + var taskAttempts = taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten + + // Fail the running task + val failedTask = taskAttempts.find(_.executorId == "executor0").get + taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) + tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason) + when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( + "executor0", failedTask.index)).thenReturn(true) + + // make an offer but we won't schedule anything yet as scheduler locality is still PROCESS_LOCAL + assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor1", "host0", 1) + )).flatten.isEmpty) + + assert(taskScheduler.unschedulableTaskSetToExpiryTime.size == 0) + + assert(!tsm.isZombie) + } + /** * Helper for performance tests. Takes the explicitly blacklisted nodes and executors; verifies * that the blacklists are used efficiently to ensure scheduling is not O(numPendingTasks). From aac1e9ee1ed5935c6370556b0ade4be3a5db4f29 Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Fri, 26 Oct 2018 16:11:12 -0500 Subject: [PATCH 16/18] Address review comments --- .../spark/scheduler/TaskSchedulerImpl.scala | 5 +--- .../scheduler/TaskSchedulerImplSuite.scala | 29 ++++++++++--------- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 60896e76b17d..96bd0960d33f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -422,9 +422,7 @@ private[spark] class TaskSchedulerImpl( } if (!launchedAnyTask) { - taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { - case Some(taskIndex) => // Returns the taskIndex which was unschedulable - + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex => // If the taskSet is unschedulable we try to find an existing idle blacklisted // executor. If we cannot find one, we abort immediately. Else we kill the idle // executor and kick off an abortTimer which if it doesn't schedule a task within the @@ -454,7 +452,6 @@ private[spark] class TaskSchedulerImpl( s" executors can be found to kill. Aborting $taskSet." ) taskSet.abortSinceCompletelyBlacklisted(taskIndex) } - case _ => // Do nothing if no tasks completely blacklisted. } } else { // We want to defer killing any taskSets as long as we have a non blacklisted executor diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index fe6f2ecbc2e1..5fcfd91b80cf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -538,8 +538,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(taskScheduler.resourceOffers(IndexedSeq( WorkerOffer("executor0", "host0", 1) )).flatten.size === 0) - // Wait for the abort timer to kick in. Without sleep the test exits before the timer is - // triggered. + // Wait for the abort timer to kick in. Even though we configure the timeout to be 0, there is a + // slight delay as the abort timer is launched in a separate thread. eventually(timeout(500.milliseconds)) { assert(tsm.isZombie) } @@ -560,7 +560,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B )).flatten // Fail the running task - val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get + val failedTask = firstTaskAttempts.head taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) // we explicitly call the handleFailedTask method here to avoid adding a sleep in the test suite // Reason being - handleFailedTask is run by an executor service and there is a momentary delay @@ -575,21 +575,21 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(taskScheduler.resourceOffers(IndexedSeq( WorkerOffer("executor0", "host0", 1) )).flatten.size === 0) + assert(taskScheduler.unschedulableTaskSetToExpiryTime.contains(tsm)) assert(!tsm.isZombie) // Offer a new executor which should be accepted assert(taskScheduler.resourceOffers(IndexedSeq( WorkerOffer("executor1", "host0", 1) )).flatten.size === 1) - + assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty) assert(!tsm.isZombie) } // This is to test a scenario where we have two taskSets completely blacklisted and on acquiring // a new executor we don't want the abort timer for the second taskSet to expire and abort the job test("SPARK-22148 abort timer should clear unschedulableTaskSetToExpiryTime for all TaskSets") { - taskScheduler = setupSchedulerWithMockTaskSetBlacklist( - config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10") + taskScheduler = setupSchedulerWithMockTaskSetBlacklist() // We have 2 taskSets with 1 task remaining in each with 1 executor completely blacklisted val taskSet1 = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0) @@ -603,10 +603,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B WorkerOffer("executor0", "host0", 1) )).flatten - assert(taskScheduler.unschedulableTaskSetToExpiryTime.size == 0) + assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty) // Fail the running task - val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get + val failedTask = firstTaskAttempts.head taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason) when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( @@ -618,10 +618,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B WorkerOffer("executor0", "host0", 1) )).flatten - assert(taskScheduler.unschedulableTaskSetToExpiryTime.size == 0) + assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty) val tsm2 = stageToMockTaskSetManager(1) - val failedTask2 = secondTaskAttempts.find(_.executorId == "executor0").get + val failedTask2 = secondTaskAttempts.head taskScheduler.statusUpdate(failedTask2.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) tsm2.handleFailedTask(failedTask2.taskId, TaskState.FAILED, UnknownReason) when(tsm2.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( @@ -632,7 +632,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(taskScheduler.resourceOffers(IndexedSeq( WorkerOffer("executor0", "host0", 1) )).flatten.size === 0) - + assert(taskScheduler.unschedulableTaskSetToExpiryTime.contains(tsm)) + assert(taskScheduler.unschedulableTaskSetToExpiryTime.contains(tsm2)) assert(taskScheduler.unschedulableTaskSetToExpiryTime.size == 2) // Offer a new executor which should be accepted @@ -641,7 +642,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B )).flatten.size === 1) // Check if all the taskSets are cleared - assert(taskScheduler.unschedulableTaskSetToExpiryTime.size == 0) + assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty) assert(!tsm.isZombie) } @@ -666,7 +667,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B )).flatten // Fail the running task - val failedTask = taskAttempts.find(_.executorId == "executor0").get + val failedTask = taskAttempts.head taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason) when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( @@ -677,7 +678,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B WorkerOffer("executor1", "host0", 1) )).flatten.isEmpty) - assert(taskScheduler.unschedulableTaskSetToExpiryTime.size == 0) + assert(taskScheduler.unschedulableTaskSetToExpiryTime.isEmpty) assert(!tsm.isZombie) } From 676be551f84bc1d5304a128be349fe3b017f6925 Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Wed, 31 Oct 2018 11:59:31 -0500 Subject: [PATCH 17/18] Update comment --- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 96bd0960d33f..61556ea64261 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -447,7 +447,7 @@ private[spark] class TaskSchedulerImpl( abortTimer.schedule( createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout) } - case _ => // Abort Immediately + case None => // Abort Immediately logInfo("Cannot schedule any task because of complete blacklisting. No idle" + s" executors can be found to kill. Aborting $taskSet." ) taskSet.abortSinceCompletelyBlacklisted(taskIndex) @@ -456,8 +456,10 @@ private[spark] class TaskSchedulerImpl( } else { // We want to defer killing any taskSets as long as we have a non blacklisted executor // which can be used to schedule a task from any active taskSets. This ensures that the - // job can make progress and if we encounter a flawed taskSet it will eventually either - // fail or abort due to being completely blacklisted. + // job can make progress. + // Note: It is theoretically possible that a taskSet never gets scheduled on a + // non-blacklisted executor and the abort timer doesn't kick in because of a constant + // submission of new TaskSets. See the PR for more details. if (unschedulableTaskSetToExpiryTime.nonEmpty) { logInfo("Clearing the expiry times for all unschedulable taskSets as a task was " + "recently scheduled.") From a30276f3780476ac39de766deef914de8b7b7e0a Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Mon, 5 Nov 2018 10:08:15 -0600 Subject: [PATCH 18/18] Increase locality timeout to avoid potential flakiness --- .../org/apache/spark/scheduler/TaskSchedulerImplSuite.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 5fcfd91b80cf..29172b4664e3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -652,7 +652,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // completely blacklisted. test("SPARK-22148 Ensure we don't abort the taskSet if we haven't been completely blacklisted") { taskScheduler = setupSchedulerWithMockTaskSetBlacklist( - config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0") + config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0", + // This is to avoid any potential flakiness in the test because of large pauses in jenkins + config.LOCALITY_WAIT.key -> "30s" + ) val preferredLocation = Seq(ExecutorCacheTaskLocation("host0", "executor0")) val taskSet1 = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0,