From a225ac2a1eda66bae767d111eb63e9e35b6491d2 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Tue, 27 May 2014 15:14:35 +0800 Subject: [PATCH 01/15] SPARK-1937: fix issue with task locality --- .../org/apache/spark/scheduler/TaskSetManager.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index f3bd0797aa035..ed38ba755cc91 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -148,8 +148,10 @@ private[spark] class TaskSetManager( // Add all our tasks to the pending lists. We do this in reverse order // of task index so that tasks with low indices get launched first. + val delaySchedule = conf.getBoolean("spark.schedule.delaySchedule", true) for (i <- (0 until numTasks).reverse) { - addPendingTask(i) + //if delay schedule is set, we shouldn't enforce check since executors may haven't registered yet + addPendingTask(i, enforceCheck = !delaySchedule) } // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling @@ -169,8 +171,10 @@ private[spark] class TaskSetManager( /** * Add a task to all the pending-task lists that it should be on. If readding is set, we are * re-adding the task so only include it in each list if it's not already there. + * If enforceCheck is set, we'll check the availability of executors/hosts before adding a task + * to the pending list, otherwise, we simply add the task according to its preference. */ - private def addPendingTask(index: Int, readding: Boolean = false) { + private def addPendingTask(index: Int, readding: Boolean = false, enforceCheck: Boolean = true) { // Utility method that adds `index` to a list only if readding=false or it's not already there def addTo(list: ArrayBuffer[Int]) { if (!readding || !list.contains(index)) { @@ -181,12 +185,12 @@ private[spark] class TaskSetManager( var hadAliveLocations = false for (loc <- tasks(index).preferredLocations) { for (execId <- loc.executorId) { - if (sched.isExecutorAlive(execId)) { + if (!enforceCheck || sched.isExecutorAlive(execId)) { addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer)) hadAliveLocations = true } } - if (sched.hasExecutorsAliveOnHost(loc.host)) { + if (!enforceCheck || sched.hasExecutorsAliveOnHost(loc.host)) { addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer)) for (rack <- sched.getRackForHost(loc.host)) { addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer)) From 3dfae86eb492f8b39b34fabf9fb80e7382ba7486 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Wed, 28 May 2014 19:21:27 +0800 Subject: [PATCH 02/15] re-compute pending tasks when new host is added --- .../spark/scheduler/TaskSchedulerImpl.scala | 8 +++++++ .../spark/scheduler/TaskSetManager.scala | 22 ++++++++++++------- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 17292b4c15b8b..eae315ea5c1f6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -111,6 +111,8 @@ private[spark] class TaskSchedulerImpl( // This is a var so that we can reset it for testing purposes. private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this) + private val delaySchedule = conf.getBoolean("spark.schedule.delaySchedule", true) + override def setDAGScheduler(dagScheduler: DAGScheduler) { this.dagScheduler = dagScheduler } @@ -210,11 +212,14 @@ private[spark] class TaskSchedulerImpl( SparkEnv.set(sc.env) // Mark each slave as alive and remember its hostname + //also track if new executor is added + var newExecAvail = false for (o <- offers) { executorIdToHost(o.executorId) = o.host if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) + newExecAvail = true } } @@ -233,6 +238,9 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. var launchedTask = false for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) { + if (delaySchedule && newExecAvail) { + taskSet.reAddPendingTasks() + } do { launchedTask = false for (i <- 0 until shuffledOffers.size) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index ed38ba755cc91..a16b1a44fd61d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -150,8 +150,7 @@ private[spark] class TaskSetManager( // of task index so that tasks with low indices get launched first. val delaySchedule = conf.getBoolean("spark.schedule.delaySchedule", true) for (i <- (0 until numTasks).reverse) { - //if delay schedule is set, we shouldn't enforce check since executors may haven't registered yet - addPendingTask(i, enforceCheck = !delaySchedule) + addPendingTask(i) } // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling @@ -171,10 +170,8 @@ private[spark] class TaskSetManager( /** * Add a task to all the pending-task lists that it should be on. If readding is set, we are * re-adding the task so only include it in each list if it's not already there. - * If enforceCheck is set, we'll check the availability of executors/hosts before adding a task - * to the pending list, otherwise, we simply add the task according to its preference. */ - private def addPendingTask(index: Int, readding: Boolean = false, enforceCheck: Boolean = true) { + private def addPendingTask(index: Int, readding: Boolean = false) { // Utility method that adds `index` to a list only if readding=false or it's not already there def addTo(list: ArrayBuffer[Int]) { if (!readding || !list.contains(index)) { @@ -185,12 +182,12 @@ private[spark] class TaskSetManager( var hadAliveLocations = false for (loc <- tasks(index).preferredLocations) { for (execId <- loc.executorId) { - if (!enforceCheck || sched.isExecutorAlive(execId)) { + if (sched.isExecutorAlive(execId)) { addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer)) hadAliveLocations = true } } - if (!enforceCheck || sched.hasExecutorsAliveOnHost(loc.host)) { + if (sched.hasExecutorsAliveOnHost(loc.host)) { addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer)) for (rack <- sched.getRackForHost(loc.host)) { addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer)) @@ -199,7 +196,8 @@ private[spark] class TaskSetManager( } } - if (!hadAliveLocations) { + if (tasks(index).preferredLocations.isEmpty || + (!delaySchedule && !hadAliveLocations)) { // Even though the task might've had preferred locations, all of those hosts or executors // are dead; put it in the no-prefs list so we can schedule it elsewhere right away. addTo(pendingTasksWithNoPrefs) @@ -742,4 +740,12 @@ private[spark] class TaskSetManager( logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", ")) levels.toArray } + + //Re-compute the pending lists. This should be called when new executor is added + def reAddPendingTasks() { + logInfo("Re-computing pending task lists.") + for (i <- (0 until numTasks).reverse.filter(index => copiesRunning(index) == 0 && !successful(index))) { + addPendingTask(i, readding = true) + } + } } From cf0d6acc86754179e88bed9bf663e85c9b962317 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Wed, 4 Jun 2014 10:13:50 +0800 Subject: [PATCH 03/15] fix code style --- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 8 ++++---- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index eae315ea5c1f6..309db8263075f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -212,7 +212,7 @@ private[spark] class TaskSchedulerImpl( SparkEnv.set(sc.env) // Mark each slave as alive and remember its hostname - //also track if new executor is added + // Also track if new executor is added var newExecAvail = false for (o <- offers) { executorIdToHost(o.executorId) = o.host @@ -232,15 +232,15 @@ private[spark] class TaskSchedulerImpl( for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) + if (delaySchedule && newExecAvail) { + taskSet.reAddPendingTasks() + } } // Take each TaskSet in our scheduling order, and then offer it each node in increasing order // of locality levels so that it gets a chance to launch local tasks on all of them. var launchedTask = false for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) { - if (delaySchedule && newExecAvail) { - taskSet.reAddPendingTasks() - } do { launchedTask = false for (i <- 0 until shuffledOffers.size) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a16b1a44fd61d..8b9f46261bf54 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -196,8 +196,7 @@ private[spark] class TaskSetManager( } } - if (tasks(index).preferredLocations.isEmpty || - (!delaySchedule && !hadAliveLocations)) { + if (tasks(index).preferredLocations.isEmpty || (!delaySchedule && !hadAliveLocations)) { // Even though the task might've had preferred locations, all of those hosts or executors // are dead; put it in the no-prefs list so we can schedule it elsewhere right away. addTo(pendingTasksWithNoPrefs) @@ -744,7 +743,8 @@ private[spark] class TaskSetManager( //Re-compute the pending lists. This should be called when new executor is added def reAddPendingTasks() { logInfo("Re-computing pending task lists.") - for (i <- (0 until numTasks).reverse.filter(index => copiesRunning(index) == 0 && !successful(index))) { + for (i <- (0 until numTasks).reverse.filter(index => copiesRunning(index) == 0 + && !successful(index))) { addPendingTask(i, readding = true) } } From 539a578b3648b2c59978675628230e1dc8284bfc Mon Sep 17 00:00:00 2001 From: Rui Li Date: Wed, 4 Jun 2014 10:17:10 +0800 Subject: [PATCH 04/15] fix code style --- .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 8b9f46261bf54..5071783d8d0c5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -740,7 +740,7 @@ private[spark] class TaskSetManager( levels.toArray } - //Re-compute the pending lists. This should be called when new executor is added + // Re-compute the pending lists. This should be called when new executor is added def reAddPendingTasks() { logInfo("Re-computing pending task lists.") for (i <- (0 until numTasks).reverse.filter(index => copiesRunning(index) == 0 From cab4c7103c0ad5cd233c26e9314baed854d62419 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Thu, 5 Jun 2014 18:28:28 +0800 Subject: [PATCH 05/15] revised patch --- .../spark/scheduler/TaskSchedulerImpl.scala | 12 ++--- .../spark/scheduler/TaskSetManager.scala | 51 +++++++++++++------ 2 files changed, 40 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 309db8263075f..e5ba4c76c353a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -111,7 +111,6 @@ private[spark] class TaskSchedulerImpl( // This is a var so that we can reset it for testing purposes. private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this) - private val delaySchedule = conf.getBoolean("spark.schedule.delaySchedule", true) override def setDAGScheduler(dagScheduler: DAGScheduler) { this.dagScheduler = dagScheduler @@ -211,15 +210,16 @@ private[spark] class TaskSchedulerImpl( def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { SparkEnv.set(sc.env) + val sortedTaskSets = rootPool.getSortedTaskSetQueue // Mark each slave as alive and remember its hostname - // Also track if new executor is added - var newExecAvail = false for (o <- offers) { executorIdToHost(o.executorId) = o.host if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) - newExecAvail = true + for (taskSet <- sortedTaskSets) { + taskSet.executorAdded(o.executorId, o.host) + } } } @@ -228,13 +228,9 @@ private[spark] class TaskSchedulerImpl( // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o => o.cores).toArray - val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) - if (delaySchedule && newExecAvail) { - taskSet.reAddPendingTasks() - } } // Take each TaskSet in our scheduling order, and then offer it each node in increasing order diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 5071783d8d0c5..4346a44e071fd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -54,8 +54,15 @@ private[spark] class TaskSetManager( clock: Clock = SystemClock) extends Schedulable with Logging { + // Remember when this TaskSetManager is created + val creationTime = clock.getTime() val conf = sched.sc.conf + // The period we wait for new executors to come up + // After this period, tasks in pendingTasksWithNoPrefs will be considered as PROCESS_LOCAL + private val WAIT_NEW_EXEC_TIMEOUT = conf.getLong("spark.scheduler.waitNewExecutorTime", 3000L) + private var waitingNewExec = true + /* * Sometimes if an executor is dead or in an otherwise invalid state, the driver * does not realize right away leading to repeated task failures. If enabled, @@ -118,7 +125,7 @@ private[spark] class TaskSetManager( private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]] // Set containing pending tasks with no locality preferences. - val pendingTasksWithNoPrefs = new ArrayBuffer[Int] + var pendingTasksWithNoPrefs = new ArrayBuffer[Int] // Set containing all pending tasks (also used as a stack, as above). val allPendingTasks = new ArrayBuffer[Int] @@ -148,7 +155,6 @@ private[spark] class TaskSetManager( // Add all our tasks to the pending lists. We do this in reverse order // of task index so that tasks with low indices get launched first. - val delaySchedule = conf.getBoolean("spark.schedule.delaySchedule", true) for (i <- (0 until numTasks).reverse) { addPendingTask(i) } @@ -183,20 +189,21 @@ private[spark] class TaskSetManager( for (loc <- tasks(index).preferredLocations) { for (execId <- loc.executorId) { if (sched.isExecutorAlive(execId)) { - addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer)) hadAliveLocations = true } + addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer)) } if (sched.hasExecutorsAliveOnHost(loc.host)) { - addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer)) - for (rack <- sched.getRackForHost(loc.host)) { - addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer)) - } + hadAliveLocations = true + } + addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer)) + for (rack <- sched.getRackForHost(loc.host)) { + addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer)) hadAliveLocations = true } } - if (tasks(index).preferredLocations.isEmpty || (!delaySchedule && !hadAliveLocations)) { + if (!hadAliveLocations) { // Even though the task might've had preferred locations, all of those hosts or executors // are dead; put it in the no-prefs list so we can schedule it elsewhere right away. addTo(pendingTasksWithNoPrefs) @@ -362,7 +369,8 @@ private[spark] class TaskSetManager( } // Look for no-pref tasks after rack-local tasks since they can run anywhere. - for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) { + for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs) + if (!waitingNewExec || tasks(index).preferredLocations.isEmpty)) { return Some((index, TaskLocality.PROCESS_LOCAL)) } @@ -392,6 +400,9 @@ private[spark] class TaskSetManager( if (allowedLocality > maxLocality) { allowedLocality = maxLocality // We're not allowed to search for farther-away tasks } + if (waitingNewExec && curTime - creationTime > WAIT_NEW_EXEC_TIMEOUT) { + waitingNewExec = false + } findTask(execId, host, allowedLocality) match { case Some((index, taskLocality)) => { @@ -740,12 +751,22 @@ private[spark] class TaskSetManager( levels.toArray } - // Re-compute the pending lists. This should be called when new executor is added - def reAddPendingTasks() { - logInfo("Re-computing pending task lists.") - for (i <- (0 until numTasks).reverse.filter(index => copiesRunning(index) == 0 - && !successful(index))) { - addPendingTask(i, readding = true) + // Re-compute pendingTasksWithNoPrefs since new preferred locations may become available + def executorAdded(execId: String, host: String) { + def newLocAvail(index: Int): Boolean = { + for (loc <- tasks(index).preferredLocations) { + if (execId.equals(loc.executorId.getOrElse(null)) || host.equals(loc.host)) { + return true + } + val availRack = sched.getRackForHost(host) + val prefRack = sched.getRackForHost(loc.host) + if (prefRack.isDefined && prefRack.get.equals(availRack.getOrElse(null))) { + return true + } + } + false } + logInfo("Re-computing pending task lists.") + pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.filter(!newLocAvail(_)) } } From 3d7da0268107eb3650d715256949fd6759cd81b3 Mon Sep 17 00:00:00 2001 From: lirui-intel Date: Thu, 5 Jun 2014 19:07:31 +0800 Subject: [PATCH 06/15] Update TaskSchedulerImpl.scala --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index e5ba4c76c353a..2862542c38c4a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -111,7 +111,6 @@ private[spark] class TaskSchedulerImpl( // This is a var so that we can reset it for testing purposes. private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this) - override def setDAGScheduler(dagScheduler: DAGScheduler) { this.dagScheduler = dagScheduler } From c7b93b5dce7d92ddd10e4b4c92fc5b7683cc0131 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Thu, 5 Jun 2014 22:25:12 +0800 Subject: [PATCH 07/15] revise patch --- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 11 +++++++---- .../org/apache/spark/scheduler/TaskSetManager.scala | 11 ++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 2862542c38c4a..dc7a6aedd989b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -209,16 +209,15 @@ private[spark] class TaskSchedulerImpl( def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { SparkEnv.set(sc.env) - val sortedTaskSets = rootPool.getSortedTaskSetQueue // Mark each slave as alive and remember its hostname + // Also track if new executor is added + var newExecAvail = false for (o <- offers) { executorIdToHost(o.executorId) = o.host if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) - for (taskSet <- sortedTaskSets) { - taskSet.executorAdded(o.executorId, o.host) - } + newExecAvail = true } } @@ -227,9 +226,13 @@ private[spark] class TaskSchedulerImpl( // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o => o.cores).toArray + val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) + if (newExecAvail) { + taskSet.executorAdded() + } } // Take each TaskSet in our scheduling order, and then offer it each node in increasing order diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 4346a44e071fd..7ce5dcbf415e7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -752,15 +752,12 @@ private[spark] class TaskSetManager( } // Re-compute pendingTasksWithNoPrefs since new preferred locations may become available - def executorAdded(execId: String, host: String) { + def executorAdded() { def newLocAvail(index: Int): Boolean = { for (loc <- tasks(index).preferredLocations) { - if (execId.equals(loc.executorId.getOrElse(null)) || host.equals(loc.host)) { - return true - } - val availRack = sched.getRackForHost(host) - val prefRack = sched.getRackForHost(loc.host) - if (prefRack.isDefined && prefRack.get.equals(availRack.getOrElse(null))) { + if (sched.hasExecutorsAliveOnHost(loc.host) || + (loc.executorId.isDefined && sched.isExecutorAlive(loc.executorId.get)) || + sched.getRackForHost(loc.host).isDefined) { return true } } From 7b0177aaf9a0e75a848c6f94d9104ade44643697 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Fri, 6 Jun 2014 10:59:37 +0800 Subject: [PATCH 08/15] remove redundant code --- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 7ce5dcbf415e7..fa54292f7e3bd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -188,9 +188,6 @@ private[spark] class TaskSetManager( var hadAliveLocations = false for (loc <- tasks(index).preferredLocations) { for (execId <- loc.executorId) { - if (sched.isExecutorAlive(execId)) { - hadAliveLocations = true - } addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer)) } if (sched.hasExecutorsAliveOnHost(loc.host)) { @@ -756,7 +753,6 @@ private[spark] class TaskSetManager( def newLocAvail(index: Int): Boolean = { for (loc <- tasks(index).preferredLocations) { if (sched.hasExecutorsAliveOnHost(loc.host) || - (loc.executorId.isDefined && sched.isExecutorAlive(loc.executorId.get)) || sched.getRackForHost(loc.host).isDefined) { return true } From 685ed3d3a8e63a23b06f994ebfd4f312cc2a8288 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Sat, 7 Jun 2014 09:36:02 +0800 Subject: [PATCH 09/15] remove delay shedule for pendingTasksWithNoPrefs --- .../org/apache/spark/scheduler/TaskSetManager.scala | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index fa54292f7e3bd..a8b66708e840d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -54,15 +54,8 @@ private[spark] class TaskSetManager( clock: Clock = SystemClock) extends Schedulable with Logging { - // Remember when this TaskSetManager is created - val creationTime = clock.getTime() val conf = sched.sc.conf - // The period we wait for new executors to come up - // After this period, tasks in pendingTasksWithNoPrefs will be considered as PROCESS_LOCAL - private val WAIT_NEW_EXEC_TIMEOUT = conf.getLong("spark.scheduler.waitNewExecutorTime", 3000L) - private var waitingNewExec = true - /* * Sometimes if an executor is dead or in an otherwise invalid state, the driver * does not realize right away leading to repeated task failures. If enabled, @@ -366,8 +359,7 @@ private[spark] class TaskSetManager( } // Look for no-pref tasks after rack-local tasks since they can run anywhere. - for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs) - if (!waitingNewExec || tasks(index).preferredLocations.isEmpty)) { + for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) { return Some((index, TaskLocality.PROCESS_LOCAL)) } @@ -397,9 +389,6 @@ private[spark] class TaskSetManager( if (allowedLocality > maxLocality) { allowedLocality = maxLocality // We're not allowed to search for farther-away tasks } - if (waitingNewExec && curTime - creationTime > WAIT_NEW_EXEC_TIMEOUT) { - waitingNewExec = false - } findTask(execId, host, allowedLocality) match { case Some((index, taskLocality)) => { From fff41235dc522a086cf3692f3758d2734e6772ad Mon Sep 17 00:00:00 2001 From: Rui Li Date: Tue, 10 Jun 2014 16:03:28 +0800 Subject: [PATCH 10/15] fix computing valid locality levels --- .../org/apache/spark/scheduler/TaskSetManager.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a8b66708e840d..1df616144743d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -153,7 +153,7 @@ private[spark] class TaskSetManager( } // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling - val myLocalityLevels = computeValidLocalityLevels() + var myLocalityLevels = computeValidLocalityLevels() val localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level // Delay scheduling variables: we keep track of our current locality level and the time we @@ -386,7 +386,7 @@ private[spark] class TaskSetManager( val curTime = clock.getTime() var allowedLocality = getAllowedLocalityLevel(curTime) - if (allowedLocality > maxLocality) { + if (allowedLocality > maxLocality && myLocalityLevels.contains(maxLocality)) { allowedLocality = maxLocality // We're not allowed to search for farther-away tasks } @@ -723,10 +723,12 @@ private[spark] class TaskSetManager( private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = { import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY} val levels = new ArrayBuffer[TaskLocality.TaskLocality] - if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0) { + if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 && + pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) { levels += PROCESS_LOCAL } - if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0) { + if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 && + pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) { levels += NODE_LOCAL } if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0) { @@ -750,5 +752,6 @@ private[spark] class TaskSetManager( } logInfo("Re-computing pending task lists.") pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.filter(!newLocAvail(_)) + myLocalityLevels = computeValidLocalityLevels() } } From 99f843e0153357f7b98887cd0b5d53cdff87af76 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Tue, 10 Jun 2014 17:05:19 +0800 Subject: [PATCH 11/15] add unit test and fix bug --- .../spark/scheduler/TaskSetManager.scala | 3 +- .../spark/scheduler/TaskSetManagerSuite.scala | 41 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 1df616144743d..7c746faa386df 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -154,7 +154,7 @@ private[spark] class TaskSetManager( // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling var myLocalityLevels = computeValidLocalityLevels() - val localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level + var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level // Delay scheduling variables: we keep track of our current locality level and the time we // last launched a task at that level, and move up a level when localityWaits[curLevel] expires. @@ -753,5 +753,6 @@ private[spark] class TaskSetManager( logInfo("Re-computing pending task lists.") pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.filter(!newLocAvail(_)) myLocalityLevels = computeValidLocalityLevels() + localityWaits = myLocalityLevels.map(getLocalityWait) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index c92b6dc96c8eb..2320b84653e3b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -77,6 +77,10 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex override def isExecutorAlive(execId: String): Boolean = executors.contains(execId) override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host) + + def addExecutor(newExecutors: (String, String)*) { + executors ++= newExecutors + } } class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { @@ -384,6 +388,43 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(sched.taskSetsFailed.contains(taskSet.id)) } + test("new executors get added") { + sc = new SparkContext("local", "test") + val sched = new FakeTaskScheduler(sc) + val taskSet = FakeTask.createTaskSet(4, + Seq(TaskLocation("host1", "execA")), + Seq(TaskLocation("host1", "execB")), + Seq(TaskLocation("host2", "execC")), + Seq()) + val clock = new FakeClock + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) + // All tasks added to no-pref list since no preferred location is available + assert(manager.pendingTasksWithNoPrefs.size === 4) + // Only ANY is valid + assert(manager.myLocalityLevels.size === 1) + // Add a new executor + sched.addExecutor(("execD", "host1")) + manager.executorAdded() + // Task 0 and 1 should be removed from no-pref list + assert(manager.pendingTasksWithNoPrefs.size === 2) + // Valid locality should contain NODE_LOCAL and ANY + assert(manager.myLocalityLevels.size === 2) + // Offer host1, execD, at PROCESS_LOCAL level: task 0 should be chosen + // because PROCESS_LOCAL is not valid at the moment + assert(manager.resourceOffer("execD", "host1", PROCESS_LOCAL).get.index === 0) + // Add another executor + sched.addExecutor(("execC", "host2")) + manager.executorAdded() + // No-pref list now only contains task 3 + assert(manager.pendingTasksWithNoPrefs.size === 1) + // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY + assert(manager.myLocalityLevels.size === 3) + // Offer host2, execC, at PROCESS_LOCAL level: task 2 should be chosen + assert(manager.resourceOffer("execC", "host2", PROCESS_LOCAL).get.index === 2) + // Offer host1, execD again at PROCESS_LOCAL level: task 3 should be chosen + assert(manager.resourceOffer("execD", "host1", PROCESS_LOCAL).get.index === 3) + } + def createTaskResult(id: Int): DirectTaskResult[Int] = { val valueSer = SparkEnv.get.serializer.newInstance() new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics) From 5b3fb2f95ebb7ed8a0479da1bc8a566679871c36 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Tue, 10 Jun 2014 19:07:44 +0800 Subject: [PATCH 12/15] refine UT --- .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 2320b84653e3b..ceeb1357503ad 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -401,14 +401,14 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { // All tasks added to no-pref list since no preferred location is available assert(manager.pendingTasksWithNoPrefs.size === 4) // Only ANY is valid - assert(manager.myLocalityLevels.size === 1) + assert(manager.myLocalityLevels.sameElements(Array(ANY))) // Add a new executor sched.addExecutor(("execD", "host1")) manager.executorAdded() // Task 0 and 1 should be removed from no-pref list assert(manager.pendingTasksWithNoPrefs.size === 2) // Valid locality should contain NODE_LOCAL and ANY - assert(manager.myLocalityLevels.size === 2) + assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, ANY))) // Offer host1, execD, at PROCESS_LOCAL level: task 0 should be chosen // because PROCESS_LOCAL is not valid at the moment assert(manager.resourceOffer("execD", "host1", PROCESS_LOCAL).get.index === 0) @@ -418,7 +418,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { // No-pref list now only contains task 3 assert(manager.pendingTasksWithNoPrefs.size === 1) // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY - assert(manager.myLocalityLevels.size === 3) + assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY))) // Offer host2, execC, at PROCESS_LOCAL level: task 2 should be chosen assert(manager.resourceOffer("execC", "host2", PROCESS_LOCAL).get.index === 2) // Offer host1, execD again at PROCESS_LOCAL level: task 3 should be chosen From 18f9e053d35b7839d94f6feb7560bdaf92ceb015 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Wed, 11 Jun 2014 10:21:01 +0800 Subject: [PATCH 13/15] restrict allowed locality --- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +- .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 7 ------- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 7c746faa386df..dbf9a309b89ac 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -386,7 +386,7 @@ private[spark] class TaskSetManager( val curTime = clock.getTime() var allowedLocality = getAllowedLocalityLevel(curTime) - if (allowedLocality > maxLocality && myLocalityLevels.contains(maxLocality)) { + if (allowedLocality > maxLocality) { allowedLocality = maxLocality // We're not allowed to search for farther-away tasks } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index ceeb1357503ad..bf9dcee98ac69 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -409,9 +409,6 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(manager.pendingTasksWithNoPrefs.size === 2) // Valid locality should contain NODE_LOCAL and ANY assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, ANY))) - // Offer host1, execD, at PROCESS_LOCAL level: task 0 should be chosen - // because PROCESS_LOCAL is not valid at the moment - assert(manager.resourceOffer("execD", "host1", PROCESS_LOCAL).get.index === 0) // Add another executor sched.addExecutor(("execC", "host2")) manager.executorAdded() @@ -419,10 +416,6 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(manager.pendingTasksWithNoPrefs.size === 1) // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY))) - // Offer host2, execC, at PROCESS_LOCAL level: task 2 should be chosen - assert(manager.resourceOffer("execC", "host2", PROCESS_LOCAL).get.index === 2) - // Offer host1, execD again at PROCESS_LOCAL level: task 3 should be chosen - assert(manager.resourceOffer("execD", "host1", PROCESS_LOCAL).get.index === 3) } def createTaskResult(id: Int): DirectTaskResult[Int] = { From fafd57f483cead8b52a13dc36a1ddb60c7bd3a14 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Thu, 12 Jun 2014 20:50:12 +0800 Subject: [PATCH 14/15] keep locality constraints within the valid levels --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index dc7a6aedd989b..5ed2803d76afc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -238,7 +238,7 @@ private[spark] class TaskSchedulerImpl( // Take each TaskSet in our scheduling order, and then offer it each node in increasing order // of locality levels so that it gets a chance to launch local tasks on all of them. var launchedTask = false - for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) { + for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) { do { launchedTask = false for (i <- 0 until shuffledOffers.size) { From 8444d7ce49e982b59cfbcb826500da26d8607632 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Mon, 23 Jun 2014 10:36:17 +0800 Subject: [PATCH 15/15] fix code style --- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 6 +++--- .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index dbf9a309b89ac..b5bcdd7e99c58 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -724,11 +724,11 @@ private[spark] class TaskSetManager( import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY} val levels = new ArrayBuffer[TaskLocality.TaskLocality] if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 && - pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) { + pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) { levels += PROCESS_LOCAL } if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 && - pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) { + pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) { levels += NODE_LOCAL } if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0) { @@ -744,7 +744,7 @@ private[spark] class TaskSetManager( def newLocAvail(index: Int): Boolean = { for (loc <- tasks(index).preferredLocations) { if (sched.hasExecutorsAliveOnHost(loc.host) || - sched.getRackForHost(loc.host).isDefined) { + sched.getRackForHost(loc.host).isDefined) { return true } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index bf9dcee98ac69..1cabcbe89f592 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -78,8 +78,8 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host) - def addExecutor(newExecutors: (String, String)*) { - executors ++= newExecutors + def addExecutor(execId: String, host: String) { + executors.put(execId, host) } } @@ -403,14 +403,14 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { // Only ANY is valid assert(manager.myLocalityLevels.sameElements(Array(ANY))) // Add a new executor - sched.addExecutor(("execD", "host1")) + sched.addExecutor("execD", "host1") manager.executorAdded() // Task 0 and 1 should be removed from no-pref list assert(manager.pendingTasksWithNoPrefs.size === 2) // Valid locality should contain NODE_LOCAL and ANY assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, ANY))) // Add another executor - sched.addExecutor(("execC", "host2")) + sched.addExecutor("execC", "host2") manager.executorAdded() // No-pref list now only contains task 3 assert(manager.pendingTasksWithNoPrefs.size === 1)