From 775cacb79509d3f070f883fb7f5a222243e3a060 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Sat, 23 May 2020 10:06:24 +0530 Subject: [PATCH 1/7] Speculate tasks which are running on decommission executors based on executor kill interval --- .../spark/internal/config/package.scala | 11 +++ .../spark/scheduler/TaskSetManager.scala | 25 ++++- .../spark/scheduler/TaskSetManagerSuite.scala | 98 +++++++++++++++++++ 3 files changed, 131 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 8ef0c3719856..e3a54e4d84fe 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1842,6 +1842,17 @@ package object config { .timeConf(TimeUnit.MILLISECONDS) .createOptional + private[spark] val EXECUTOR_DECOMMISSION_KILL_INTERVAL = + ConfigBuilder("spark.executor.decommission.killInterval") + .doc("Duration after which a decommissioned executor will be killed forcefully." + + "This config is useful for cloud environments where we know in advance when " + + "an executor is going to go down after decommissioning signal Ex- around 2 mins " + + "in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is currently " + + "used to decide what tasks running on decommission executors to speculate") + .version("3.1.0") + .timeConf(TimeUnit.MILLISECONDS) + .createOptional + private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") .doc("Staging directory used while submitting applications.") .version("2.0.0") diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a0e84b94735e..f3237459abbd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -102,6 +102,7 @@ private[spark] class TaskSetManager( } numTasks <= slots } + val executorDecommissionKillInterval = conf.get(EXECUTOR_DECOMMISSION_KILL_INTERVAL) // For each task, tracks whether a copy of the task has succeeded. A task will also be // marked as "succeeded" if it failed with a fetch failure, in which case it should not @@ -165,6 +166,7 @@ private[spark] class TaskSetManager( // Task index, start and finish time for each task attempt (indexed by task ID) private[scheduler] val taskInfos = new HashMap[Long, TaskInfo] + private[scheduler] val tidToExecutorKillTimeMapping = new HashMap[Long, Long] // Use a MedianHeap to record durations of successful tasks so we know when to launch // speculative tasks. This is only used when speculation is enabled, to avoid the overhead @@ -933,6 +935,7 @@ private[spark] class TaskSetManager( /** If the given task ID is in the set of running tasks, removes it. */ def removeRunningTask(tid: Long): Unit = { + tidToExecutorKillTimeMapping.remove(tid) if (runningTasksSet.remove(tid) && parent != null) { parent.decreaseRunningTasks(1) } @@ -1042,7 +1045,19 @@ private[spark] class TaskSetManager( // bound based on that. logDebug("Task length threshold for speculation: " + threshold) for (tid <- runningTasksSet) { - foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold) + var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold) + if (!speculated && tidToExecutorKillTimeMapping.contains(tid)) { + // Check whether this task will finish before the exectorKillTime assuming + // it will take medianDuration overall. If this task cannot finish within + // executorKillInterval, then this task is a candidate for speculation + val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + medianDuration + val canExceedDeadline = tidToExecutorKillTimeMapping(tid) < + taskEndTimeBasedOnMedianDuration + if (canExceedDeadline) { + speculated = checkAndSubmitSpeculatableTask(tid, time, 0) + } + } + foundTasks |= speculated } } else if (speculationTaskDurationThresOpt.isDefined && speculationTasksLessEqToSlots) { val time = clock.getTimeMillis() @@ -1100,8 +1115,12 @@ private[spark] class TaskSetManager( def executorDecommission(execId: String): Unit = { recomputeLocality() - // Future consideration: if an executor is decommissioned it may make sense to add the current - // tasks to the spec exec queue. + if (executorDecommissionKillInterval.nonEmpty) { + val executorKillTime = clock.getTimeMillis() + executorDecommissionKillInterval.get + runningTasksSet.filter(taskInfos(_).executorId == execId).foreach { tid => + tidToExecutorKillTimeMapping(tid) = executorKillTime + } + } } def recomputeLocality(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 4978be3e04c1..4fe847aabd37 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1892,6 +1892,104 @@ class TaskSetManagerSuite testSpeculationDurationThreshold(true, 2, 1) } + test("Check speculative tasks are launched when an executor is decommissioned" + + " and the tasks running on it cannot finish within EXECUTOR_DECOMMISSION_KILL_INTERVAL") { + sc = new SparkContext("local", "test") + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3")) + val taskSet = FakeTask.createTaskSet(4) + sc.conf.set(config.SPECULATION_ENABLED, true) + sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5) + sc.conf.set(config.SPECULATION_QUANTILE, 0.5) + sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms") + val clock = new ManualClock() + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => + task.metrics.internalAccums + } + + // Offer TASK 0,1 to exec1, Task 2 to exec2 + (0 until 2).foreach { _ => + val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1 + assert(taskOption.isDefined) + assert(taskOption.get.executorId === "exec1") + } + val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1 + assert(taskOption2.isDefined) + assert(taskOption2.get.executorId === "exec2") + + clock.advance(6) // time = 6ms + // Offer TASK 3 to exec2 after some delay + val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1 + assert(taskOption3.isDefined) + assert(taskOption3.get.executorId === "exec2") + + assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) + + clock.advance(4) // time = 10ms + // Complete the first 2 tasks and leave the other 2 tasks in running + for (id <- Set(0, 1)) { + manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) + assert(sched.endedTasks(id) === Success) + } + + // checkSpeculatableTasks checks that the task runtime is greater than the threshold for + // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks need to be running for + // > 15ms for speculation + assert(!manager.checkSpeculatableTasks(0)) + assert(sched.speculativeTasks.toSet === Set()) + + // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will be added to + // executorDecommissionSpeculationTriggerTimeoutOpt + // (TASK2 -> 15, TASK3 -> 15) + manager.executorDecommission("exec2") + + assert(manager.checkSpeculatableTasks(0)) + // Task2 started at t=0, so it can still finish before t=15 (Median task runtime = 10) + // Task3 started at t=6, so it might not finish before t=15 + assert(sched.speculativeTasks.toSet === Set(3)) + assert(manager.copiesRunning(3) === 1) + + // Offer resource to start the speculative attempt for the running task. We offer more + // resources, and ensure that speculative tasks get scheduled appropriately -- only one extra + // copy per speculatable task + val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1 + assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty) + assert(taskOption3New.isDefined) + val speculativeTask3 = taskOption3New.get + assert(speculativeTask3.index === 3) + assert(speculativeTask3.taskId === 4) + assert(speculativeTask3.executorId === "exec3") + assert(speculativeTask3.attemptNumber === 1) + + clock.advance(1) // time = 11 ms + // Running checkSpeculatableTasks again should return false + assert(!manager.checkSpeculatableTasks(0)) + assert(manager.copiesRunning(2) === 1) + assert(manager.copiesRunning(3) === 2) + + clock.advance(5) // time = 16 ms + assert(manager.checkSpeculatableTasks(0)) + assert(sched.speculativeTasks.toSet === Set(2, 3)) + assert(manager.copiesRunning(2) === 1) + assert(manager.copiesRunning(3) === 2) + val taskOption2New = manager.resourceOffer("exec3", "host3", NO_PREF)._1 + assert(taskOption2New.isDefined) + val speculativeTask2 = taskOption2New.get + // Ensure that task index 2 is launched on exec3, host3 + assert(speculativeTask2.index === 2) + assert(speculativeTask2.taskId === 5) + assert(speculativeTask2.executorId === "exec3") + assert(speculativeTask2.attemptNumber === 1) + + assert(manager.copiesRunning(2) === 2) + assert(manager.copiesRunning(3) === 2) + + // Offering additional resources should not lead to any speculative tasks being respawned + assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty) + assert(manager.resourceOffer("exec2", "host2", ANY)._1.isEmpty) + assert(manager.resourceOffer("exec3", "host3", ANY)._1.isEmpty) + } + test("SPARK-29976 Regular speculation configs should still take effect even when a " + "threshold is provided") { val (manager, clock) = testSpeculationDurationSetup( From 7521adf19ee961bffb11e8498360e97d1b5caca2 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Mon, 25 May 2020 19:41:19 +0530 Subject: [PATCH 2/7] add test case --- .../spark/scheduler/TaskSetManagerSuite.scala | 66 +++++++++++++++++-- 1 file changed, 59 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 4fe847aabd37..2c4ff533508d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1892,7 +1892,7 @@ class TaskSetManagerSuite testSpeculationDurationThreshold(true, 2, 1) } - test("Check speculative tasks are launched when an executor is decommissioned" + + test("SPARK-21040: Check speculative tasks are launched when an executor is decommissioned" + " and the tasks running on it cannot finish within EXECUTOR_DECOMMISSION_KILL_INTERVAL") { sc = new SparkContext("local", "test") sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3")) @@ -1907,7 +1907,7 @@ class TaskSetManagerSuite task.metrics.internalAccums } - // Offer TASK 0,1 to exec1, Task 2 to exec2 + // Start TASK 0,1 on exec1, Task 2 on exec2 (0 until 2).foreach { _ => val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOption.isDefined) @@ -1918,7 +1918,7 @@ class TaskSetManagerSuite assert(taskOption2.get.executorId === "exec2") clock.advance(6) // time = 6ms - // Offer TASK 3 to exec2 after some delay + // Start TASK 3 on exec2 after some delay val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1 assert(taskOption3.isDefined) assert(taskOption3.get.executorId === "exec2") @@ -1945,16 +1945,18 @@ class TaskSetManagerSuite assert(manager.checkSpeculatableTasks(0)) // Task2 started at t=0, so it can still finish before t=15 (Median task runtime = 10) - // Task3 started at t=6, so it might not finish before t=15 + // Task3 started at t=6, so it might not finish before t=15. So Task 3 should be part + // of speculativeTasks assert(sched.speculativeTasks.toSet === Set(3)) assert(manager.copiesRunning(3) === 1) - // Offer resource to start the speculative attempt for the running task. We offer more - // resources, and ensure that speculative tasks get scheduled appropriately -- only one extra - // copy per speculatable task + // Offer resource to start the speculative attempt for the running task val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1 + // Offer more resources. Noting should get scheduled now. assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty) assert(taskOption3New.isDefined) + + // Assert info about the newly launched speculative task val speculativeTask3 = taskOption3New.get assert(speculativeTask3.index === 3) assert(speculativeTask3.taskId === 4) @@ -1968,6 +1970,11 @@ class TaskSetManagerSuite assert(manager.copiesRunning(3) === 2) clock.advance(5) // time = 16 ms + // At t=16 ms, Task 4 has completed 16 ms. It is more than the + // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15 ms. So now Task 4 will + // be selected for speculation. Here we are verifying that regular speculation configs + // should still take effect even when a EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and + // corresponding executor is decommissioned assert(manager.checkSpeculatableTasks(0)) assert(sched.speculativeTasks.toSet === Set(2, 3)) assert(manager.copiesRunning(2) === 1) @@ -1990,6 +1997,51 @@ class TaskSetManagerSuite assert(manager.resourceOffer("exec3", "host3", ANY)._1.isEmpty) } + test("SPARK-21040: Check speculative tasks are not launched when an executor" + + " is decommissioned and the tasks running on it can finish within" + + " the EXECUTOR_DECOMMISSION_KILL_INTERVAL") { + sc = new SparkContext("local", "test") + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + val taskSet = FakeTask.createTaskSet(4) + sc.conf.set(config.SPECULATION_ENABLED, true) + sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5) + sc.conf.set(config.SPECULATION_QUANTILE, 0.5) + // Set high value for EXECUTOR_DECOMMISSION_KILL_INTERVAL + sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "50ms") + val clock = new ManualClock() + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => + task.metrics.internalAccums + } + + // Offer resources for 4 tasks to start, 2 on each exec + Seq("exec1" -> "host1", "exec2" -> "host2").foreach { case (exec, host) => + (0 until 2).foreach { _ => + val taskOption = manager.resourceOffer(exec, host, NO_PREF)._1 + assert(taskOption.isDefined) + assert(taskOption.get.executorId === exec) + } + } + assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) + + clock.advance(10) // time = 10ms + // Complete the first 2 tasks and leave the other 2 tasks in running + for (id <- Set(0, 1)) { + manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) + assert(sched.endedTasks(id) === Success) + } + // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will be added to + // executorDecommissionSpeculationTriggerTimeoutOpt + // (TASK2 -> 60, TASK3 -> 60) + manager.executorDecommission("exec2") + + // Since the EXECUTOR_DECOMMISSION_KILL_INTERVAL was high, so the already running tasks + // on executor 2 still have chance to finish. So they should not be speculated. + assert(!manager.checkSpeculatableTasks(0)) + clock.advance(5) // time = 15ms + assert(!manager.checkSpeculatableTasks(0)) + } + test("SPARK-29976 Regular speculation configs should still take effect even when a " + "threshold is provided") { val (manager, clock) = testSpeculationDurationSetup( From 55dc94f622570534a7e70be69ae731cd9bc86766 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Tue, 26 May 2020 20:18:37 +0530 Subject: [PATCH 3/7] address review comments --- .../org/apache/spark/scheduler/TaskSetManager.scala | 4 ++-- .../apache/spark/scheduler/TaskSetManagerSuite.scala | 11 ++++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index f3237459abbd..668c404dd66b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1115,8 +1115,8 @@ private[spark] class TaskSetManager( def executorDecommission(execId: String): Unit = { recomputeLocality() - if (executorDecommissionKillInterval.nonEmpty) { - val executorKillTime = clock.getTimeMillis() + executorDecommissionKillInterval.get + executorDecommissionKillInterval.foreach { interval => + val executorKillTime = clock.getTimeMillis() + interval runningTasksSet.filter(taskInfos(_).executorId == execId).foreach { tid => tidToExecutorKillTimeMapping(tid) = executorKillTime } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 2c4ff533508d..a0774f3bcac4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1942,6 +1942,9 @@ class TaskSetManagerSuite // executorDecommissionSpeculationTriggerTimeoutOpt // (TASK2 -> 15, TASK3 -> 15) manager.executorDecommission("exec2") + assert(manager.tidToExecutorKillTimeMapping.keySet === Set(2, 3)) + assert(manager.tidToExecutorKillTimeMapping(2) === 15) + assert(manager.tidToExecutorKillTimeMapping(3) === 15) assert(manager.checkSpeculatableTasks(0)) // Task2 started at t=0, so it can still finish before t=15 (Median task runtime = 10) @@ -1952,7 +1955,7 @@ class TaskSetManagerSuite // Offer resource to start the speculative attempt for the running task val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1 - // Offer more resources. Noting should get scheduled now. + // Offer more resources. Nothing should get scheduled now. assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty) assert(taskOption3New.isDefined) @@ -1970,8 +1973,8 @@ class TaskSetManagerSuite assert(manager.copiesRunning(3) === 2) clock.advance(5) // time = 16 ms - // At t=16 ms, Task 4 has completed 16 ms. It is more than the - // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15 ms. So now Task 4 will + // At t=16 ms, Task 2 has been running for 16 ms. It is more than the + // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15 ms. So now Task 2 will // be selected for speculation. Here we are verifying that regular speculation configs // should still take effect even when a EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and // corresponding executor is decommissioned @@ -1993,8 +1996,6 @@ class TaskSetManagerSuite // Offering additional resources should not lead to any speculative tasks being respawned assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty) - assert(manager.resourceOffer("exec2", "host2", ANY)._1.isEmpty) - assert(manager.resourceOffer("exec3", "host3", ANY)._1.isEmpty) } test("SPARK-21040: Check speculative tasks are not launched when an executor" + From dae9cfe1e11ea4f8bf064d325a17a839719d2c9b Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Wed, 27 May 2020 20:47:44 +0530 Subject: [PATCH 4/7] remove unnecessary test --- .../spark/internal/config/package.scala | 2 +- .../spark/scheduler/TaskSetManager.scala | 5 +- .../spark/scheduler/TaskSetManagerSuite.scala | 69 ++++--------------- 3 files changed, 16 insertions(+), 60 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e3a54e4d84fe..660e0bfca2bc 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1850,7 +1850,7 @@ package object config { "in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is currently " + "used to decide what tasks running on decommission executors to speculate") .version("3.1.0") - .timeConf(TimeUnit.MILLISECONDS) + .timeConf(TimeUnit.SECONDS) .createOptional private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 668c404dd66b..883119bdb0a5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler import java.io.NotSerializableException import java.nio.ByteBuffer -import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit} import scala.collection.immutable.Map import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} @@ -102,7 +102,8 @@ private[spark] class TaskSetManager( } numTasks <= slots } - val executorDecommissionKillInterval = conf.get(EXECUTOR_DECOMMISSION_KILL_INTERVAL) + val executorDecommissionKillInterval = conf.get(EXECUTOR_DECOMMISSION_KILL_INTERVAL).map( + TimeUnit.SECONDS.toMillis) // For each task, tracks whether a copy of the task has succeeded. A task will also be // marked as "succeeded" if it failed with a fetch failure, in which case it should not diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index a0774f3bcac4..583b69b7b59a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1900,7 +1900,7 @@ class TaskSetManagerSuite sc.conf.set(config.SPECULATION_ENABLED, true) sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5) sc.conf.set(config.SPECULATION_QUANTILE, 0.5) - sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5ms") + sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5s") val clock = new ManualClock() val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => @@ -1917,7 +1917,7 @@ class TaskSetManagerSuite assert(taskOption2.isDefined) assert(taskOption2.get.executorId === "exec2") - clock.advance(6) // time = 6ms + clock.advance(6*1000) // time = 6s // Start TASK 3 on exec2 after some delay val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1 assert(taskOption3.isDefined) @@ -1925,7 +1925,7 @@ class TaskSetManagerSuite assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) - clock.advance(4) // time = 10ms + clock.advance(4*1000) // time = 10s // Complete the first 2 tasks and leave the other 2 tasks in running for (id <- Set(0, 1)) { manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) @@ -1934,7 +1934,7 @@ class TaskSetManagerSuite // checkSpeculatableTasks checks that the task runtime is greater than the threshold for // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks need to be running for - // > 15ms for speculation + // > 15s for speculation assert(!manager.checkSpeculatableTasks(0)) assert(sched.speculativeTasks.toSet === Set()) @@ -1943,12 +1943,12 @@ class TaskSetManagerSuite // (TASK2 -> 15, TASK3 -> 15) manager.executorDecommission("exec2") assert(manager.tidToExecutorKillTimeMapping.keySet === Set(2, 3)) - assert(manager.tidToExecutorKillTimeMapping(2) === 15) - assert(manager.tidToExecutorKillTimeMapping(3) === 15) + assert(manager.tidToExecutorKillTimeMapping(2) === 15*1000) + assert(manager.tidToExecutorKillTimeMapping(3) === 15*1000) assert(manager.checkSpeculatableTasks(0)) - // Task2 started at t=0, so it can still finish before t=15 (Median task runtime = 10) - // Task3 started at t=6, so it might not finish before t=15. So Task 3 should be part + // Task2 started at t=0s, so it can still finish before t=15s (Median task runtime = 10s) + // Task3 started at t=6s, so it might not finish before t=15s. So Task 3 should be part // of speculativeTasks assert(sched.speculativeTasks.toSet === Set(3)) assert(manager.copiesRunning(3) === 1) @@ -1966,15 +1966,15 @@ class TaskSetManagerSuite assert(speculativeTask3.executorId === "exec3") assert(speculativeTask3.attemptNumber === 1) - clock.advance(1) // time = 11 ms + clock.advance(1*1000) // time = 11s // Running checkSpeculatableTasks again should return false assert(!manager.checkSpeculatableTasks(0)) assert(manager.copiesRunning(2) === 1) assert(manager.copiesRunning(3) === 2) - clock.advance(5) // time = 16 ms - // At t=16 ms, Task 2 has been running for 16 ms. It is more than the - // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15 ms. So now Task 2 will + clock.advance(5*1000) // time = 16s + // At t=16s, Task 2 has been running for 16s. It is more than the + // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15s. So now Task 2 will // be selected for speculation. Here we are verifying that regular speculation configs // should still take effect even when a EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and // corresponding executor is decommissioned @@ -1998,51 +1998,6 @@ class TaskSetManagerSuite assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty) } - test("SPARK-21040: Check speculative tasks are not launched when an executor" + - " is decommissioned and the tasks running on it can finish within" + - " the EXECUTOR_DECOMMISSION_KILL_INTERVAL") { - sc = new SparkContext("local", "test") - sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) - val taskSet = FakeTask.createTaskSet(4) - sc.conf.set(config.SPECULATION_ENABLED, true) - sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5) - sc.conf.set(config.SPECULATION_QUANTILE, 0.5) - // Set high value for EXECUTOR_DECOMMISSION_KILL_INTERVAL - sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "50ms") - val clock = new ManualClock() - val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) - val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => - task.metrics.internalAccums - } - - // Offer resources for 4 tasks to start, 2 on each exec - Seq("exec1" -> "host1", "exec2" -> "host2").foreach { case (exec, host) => - (0 until 2).foreach { _ => - val taskOption = manager.resourceOffer(exec, host, NO_PREF)._1 - assert(taskOption.isDefined) - assert(taskOption.get.executorId === exec) - } - } - assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) - - clock.advance(10) // time = 10ms - // Complete the first 2 tasks and leave the other 2 tasks in running - for (id <- Set(0, 1)) { - manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) - assert(sched.endedTasks(id) === Success) - } - // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will be added to - // executorDecommissionSpeculationTriggerTimeoutOpt - // (TASK2 -> 60, TASK3 -> 60) - manager.executorDecommission("exec2") - - // Since the EXECUTOR_DECOMMISSION_KILL_INTERVAL was high, so the already running tasks - // on executor 2 still have chance to finish. So they should not be speculated. - assert(!manager.checkSpeculatableTasks(0)) - clock.advance(5) // time = 15ms - assert(!manager.checkSpeculatableTasks(0)) - } - test("SPARK-29976 Regular speculation configs should still take effect even when a " + "threshold is provided") { val (manager, clock) = testSpeculationDurationSetup( From f5a7313f06bdb6a6913e2459b87dac7b17c85f72 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Thu, 28 May 2020 22:17:54 +0530 Subject: [PATCH 5/7] empty commit to trigger build From 61f850d7d0109a02f8e51148fe946022bfdb629d Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Tue, 9 Jun 2020 10:05:11 +0530 Subject: [PATCH 6/7] use same notation in comments --- .../spark/scheduler/TaskSetManagerSuite.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 583b69b7b59a..8f343cef13fe 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1907,7 +1907,7 @@ class TaskSetManagerSuite task.metrics.internalAccums } - // Start TASK 0,1 on exec1, Task 2 on exec2 + // Start TASK 0,1 on exec1, TASK 2 on exec2 (0 until 2).foreach { _ => val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOption.isDefined) @@ -1939,16 +1939,16 @@ class TaskSetManagerSuite assert(sched.speculativeTasks.toSet === Set()) // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will be added to - // executorDecommissionSpeculationTriggerTimeoutOpt - // (TASK2 -> 15, TASK3 -> 15) + // executorDecommissionSpeculationTriggerTimeoutOpt + // (TASK 2 -> 15, TASK 3 -> 15) manager.executorDecommission("exec2") assert(manager.tidToExecutorKillTimeMapping.keySet === Set(2, 3)) assert(manager.tidToExecutorKillTimeMapping(2) === 15*1000) assert(manager.tidToExecutorKillTimeMapping(3) === 15*1000) assert(manager.checkSpeculatableTasks(0)) - // Task2 started at t=0s, so it can still finish before t=15s (Median task runtime = 10s) - // Task3 started at t=6s, so it might not finish before t=15s. So Task 3 should be part + // TASK 2 started at t=0s, so it can still finish before t=15s (Median task runtime = 10s) + // TASK 3 started at t=6s, so it might not finish before t=15s. So TASK 3 should be part // of speculativeTasks assert(sched.speculativeTasks.toSet === Set(3)) assert(manager.copiesRunning(3) === 1) @@ -1973,8 +1973,8 @@ class TaskSetManagerSuite assert(manager.copiesRunning(3) === 2) clock.advance(5*1000) // time = 16s - // At t=16s, Task 2 has been running for 16s. It is more than the - // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15s. So now Task 2 will + // At t=16s, TASK 2 has been running for 16s. It is more than the + // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15s. So now TASK 2 will // be selected for speculation. Here we are verifying that regular speculation configs // should still take effect even when a EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and // corresponding executor is decommissioned @@ -1985,7 +1985,7 @@ class TaskSetManagerSuite val taskOption2New = manager.resourceOffer("exec3", "host3", NO_PREF)._1 assert(taskOption2New.isDefined) val speculativeTask2 = taskOption2New.get - // Ensure that task index 2 is launched on exec3, host3 + // Ensure that TASK 2 is re-launched on exec3, host3 assert(speculativeTask2.index === 2) assert(speculativeTask2.taskId === 5) assert(speculativeTask2.executorId === "exec3") From d87b311be85819ae884e2a24d94926fdd51165de Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Mon, 15 Jun 2020 19:43:44 +0530 Subject: [PATCH 7/7] doc fixes --- .../main/scala/org/apache/spark/internal/config/package.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index ddb824f535af..f4724627de6a 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1846,9 +1846,9 @@ package object config { ConfigBuilder("spark.executor.decommission.killInterval") .doc("Duration after which a decommissioned executor will be killed forcefully." + "This config is useful for cloud environments where we know in advance when " + - "an executor is going to go down after decommissioning signal Ex- around 2 mins " + + "an executor is going to go down after decommissioning signal i.e. around 2 mins " + "in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is currently " + - "used to decide what tasks running on decommission executors to speculate") + "used to decide what tasks running on decommission executors to speculate.") .version("3.1.0") .timeConf(TimeUnit.SECONDS) .createOptional