Skip to content

Commit 2a4ed10

Browse files
committed
Address some review comments:
- When a resourceOffers() call has multiple offers, force the TaskSets to consider them in increasing order of locality levels so that they get a chance to launch stuff locally across all offers - Simplify ClusterScheduler.prioritizeContainers - Add docs on the new configuration options
1 parent 222c897 commit 2a4ed10

File tree

7 files changed

+68
-23
lines changed

7 files changed

+68
-23
lines changed

core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -184,27 +184,29 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
184184
}
185185
}
186186

187-
// Build a list of tasks to assign to each slave
187+
// Build a list of tasks to assign to each worker
188188
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
189189
val availableCpus = offers.map(o => o.cores).toArray
190-
val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
191-
for (manager <- sortedTaskSetQueue) {
190+
val sortedTaskSets = rootPool.getSortedTaskSetQueue()
191+
for (taskSet <- sortedTaskSets) {
192192
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
193-
manager.parent.name, manager.name, manager.runningTasks))
193+
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
194194
}
195195

196+
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
197+
// of locality levels so that it gets a chance to launch local tasks on all of them.
196198
var launchedTask = false
197-
for (manager <- sortedTaskSetQueue; offer <- offers) {
199+
for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
198200
do {
199201
launchedTask = false
200202
for (i <- 0 until offers.size) {
201203
val execId = offers(i).executorId
202204
val host = offers(i).host
203-
for (task <- manager.resourceOffer(execId, host, availableCpus(i))) {
205+
for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
204206
tasks(i) += task
205207
val tid = task.taskId
206-
taskIdToTaskSetId(tid) = manager.taskSet.id
207-
taskSetTaskIds(manager.taskSet.id) += tid
208+
taskIdToTaskSetId(tid) = taskSet.taskSet.id
209+
taskSetTaskIds(taskSet.taskSet.id) += tid
208210
taskIdToExecutorId(tid) = execId
209211
activeExecutorIds += execId
210212
executorsByHost(host) += execId
@@ -402,8 +404,7 @@ object ClusterScheduler {
402404

403405
// order keyList based on population of value in map
404406
val keyList = _keyList.sortWith(
405-
// TODO(matei): not sure why we're using getOrElse if keyList = map.keys... see if it matters
406-
(left, right) => map.get(left).getOrElse(Set()).size > map.get(right).getOrElse(Set()).size
407+
(left, right) => map(left).size > map(right).size
407408
)
408409

409410
val retval = new ArrayBuffer[T](keyList.size * 2)

core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
4343
extends TaskSetManager with Logging {
4444

4545
// CPUs to request per task
46-
val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toDouble
46+
val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt
4747

4848
// Maximum times a task is allowed to fail before failing the job
4949
val MAX_TASK_FAILURES = System.getProperty("spark.task.maxFailures", "4").toInt
@@ -325,15 +325,22 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
325325
/**
326326
* Respond to an offer of a single slave from the scheduler by finding a task
327327
*/
328-
override def resourceOffer(execId: String, host: String, availableCpus: Double)
328+
override def resourceOffer(
329+
execId: String,
330+
host: String,
331+
availableCpus: Int,
332+
maxLocality: TaskLocality.TaskLocality)
329333
: Option[TaskDescription] =
330334
{
331335
if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
332336
val curTime = System.currentTimeMillis()
333337

334-
val locality = getAllowedLocalityLevel(curTime)
338+
var allowedLocality = getAllowedLocalityLevel(curTime)
339+
if (allowedLocality > maxLocality) {
340+
allowedLocality = maxLocality // We're not allowed to search for farther-away tasks
341+
}
335342

336-
findTask(execId, host, locality) match {
343+
findTask(execId, host, allowedLocality) match {
337344
case Some((index, taskLocality)) => {
338345
// Found a task; do some bookkeeping and return a task description
339346
val task = tasks(index)
@@ -347,7 +354,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
347354
taskInfos(taskId) = info
348355
taskAttempts(index) = info :: taskAttempts(index)
349356
// Update our locality level for delay scheduling
350-
currentLocalityIndex = getLocalityIndex(locality)
357+
currentLocalityIndex = getLocalityIndex(allowedLocality)
351358
lastLaunchTime = curTime
352359
// Serialize and return the task
353360
val startTime = System.currentTimeMillis()

core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@ private[spark] trait TaskSetManager extends Schedulable {
2929

3030
def taskSet: TaskSet
3131

32-
def resourceOffer(execId: String, hostPort: String, availableCpus: Double)
32+
def resourceOffer(
33+
execId: String,
34+
host: String,
35+
availableCpus: Int,
36+
maxLocality: TaskLocality.TaskLocality)
3337
: Option[TaskDescription]
3438

3539
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer)

core/src/main/scala/spark/scheduler/local/LocalScheduler.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
141141
for (manager <- sortedTaskSetQueue) {
142142
do {
143143
launchTask = false
144-
// TODO(matei): don't pass null here?
145-
manager.resourceOffer(null, null, freeCpuCores) match {
144+
manager.resourceOffer(null, null, freeCpuCores, null) match {
146145
case Some(task) =>
147146
tasks += task
148147
taskIdToTaskSetId(task.taskId) = manager.taskSet.id

core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,11 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
9898
return None
9999
}
100100

101-
override def resourceOffer(execId: String, host: String, availableCpus: Double)
101+
override def resourceOffer(
102+
execId: String,
103+
host: String,
104+
availableCpus: Int,
105+
maxLocality: TaskLocality.TaskLocality)
102106
: Option[TaskDescription] =
103107
{
104108
SparkEnv.set(sched.env)

core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,11 @@ class DummyTaskSetManager(
7272
override def executorLost(executorId: String, host: String): Unit = {
7373
}
7474

75-
override def resourceOffer(execId: String, host: String, availableCpus: Double)
75+
override def resourceOffer(
76+
execId: String,
77+
host: String,
78+
availableCpus: Int,
79+
maxLocality: TaskLocality.TaskLocality)
7680
: Option[TaskDescription] =
7781
{
7882
if (tasksFinished + runningTasks < numTasks) {
@@ -120,7 +124,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
120124
logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
121125
}
122126
for (taskSet <- taskSetQueue) {
123-
taskSet.resourceOffer("execId_1", "hostname_1", 1) match {
127+
taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match {
124128
case Some(task) =>
125129
return taskSet.stageId
126130
case None => {}

docs/configuration.md

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,8 +243,34 @@ Apart from these, the following properties are also available, and may be useful
243243
<td>3000</td>
244244
<td>
245245
Number of milliseconds to wait to launch a data-local task before giving up and launching it
246-
in a non-data-local location. You should increase this if your tasks are long and you are seeing
247-
poor data locality, but the default generally works well.
246+
on a less-local node. The same wait will be used to step through multiple locality levels
247+
(process-local, node-local, rack-local and then any). It is also possible to customize the
248+
waiting time for each level by setting <code>spark.locality.wait.node</code>, etc.
249+
You should increase this setting if your tasks are long and see poor locality, but the
250+
default usually works well.
251+
</td>
252+
</tr>
253+
<tr>
254+
<td>spark.locality.wait.process</td>
255+
<td>spark.locality.wait</td>
256+
<td>
257+
Customize the locality wait for process locality. This affects tasks that attempt to access
258+
cached data in a particular executor process.
259+
</td>
260+
</tr>
261+
<tr>
262+
<td>spark.locality.wait.node</td>
263+
<td>spark.locality.wait</td>
264+
<td>
265+
Customize the locality wait for node locality. For example, you can set this to 0 to skip
266+
node locality and search immediately for rack locality (if your cluster has rack information).
267+
</td>
268+
</tr>
269+
<tr>
270+
<td>spark.locality.wait.rack</td>
271+
<td>spark.locality.wait</td>
272+
<td>
273+
Customize the locality wait for rack locality.
248274
</td>
249275
</tr>
250276
<tr>

0 commit comments

Comments
 (0)