Skip to content

Commit ab62595

Browse files
jerryshaosryza
authored andcommitted
[SPARK-4352] [YARN] [WIP] Incorporate locality preferences in dynamic allocation requests
Currently there's no locality preference for container request in YARN mode, this will affect the performance if fetching data remotely, so here proposed to add locality in Yarn dynamic allocation mode. Ping sryza, please help to review, thanks a lot. Author: jerryshao <[email protected]> Closes apache#6394 from jerryshao/SPARK-4352 and squashes the following commits: d45fecb [jerryshao] Add documents 6c3fe5c [jerryshao] Fix bug 8db6c0e [jerryshao] Further address the comments 2e2b2cb [jerryshao] Fix rebase compiling problem ce5f096 [jerryshao] Fix style issue 7f7df95 [jerryshao] Fix rebase issue 9ca9e07 [jerryshao] Code refactor according to comments d3e4236 [jerryshao] Further address the comments 5e7a593 [jerryshao] Fix bug introduced code rebase 9ca7783 [jerryshao] Style changes 08317f9 [jerryshao] code and comment refines 65b2423 [jerryshao] Further address the comments a27c587 [jerryshao] address the comment 27faabc [jerryshao] redundant code remove 9ce06a1 [jerryshao] refactor the code f5ba27b [jerryshao] Style fix 2c6cc8a [jerryshao] Fix bug and add unit tests 0757335 [jerryshao] Consider the distribution of existed containers to recalculate the new container requests 0ad66ff [jerryshao] Fix compile bugs 1c20381 [jerryshao] Minor fix 5ef2dc8 [jerryshao] Add docs and improve the code 3359814 [jerryshao] Fix rebase and test bugs 0398539 [jerryshao] reinitialize the new implementation 67596d6 [jerryshao] Still fix the code 654e1d2 [jerryshao] Fix some bugs 45b1c89 [jerryshao] Further polish the algorithm dea0152 [jerryshao] Enable node locality information in YarnAllocator 74bbcc6 [jerryshao] Support node locality for dynamic allocation initial commit
1 parent 2104931 commit ab62595

16 files changed

+578
-49
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,23 @@ package org.apache.spark
2424
private[spark] trait ExecutorAllocationClient {
2525

2626
/**
27-
* Express a preference to the cluster manager for a given total number of executors.
28-
* This can result in canceling pending requests or filing additional requests.
27+
* Update the cluster manager on our scheduling needs. Three bits of information are included
28+
* to help it make decisions.
29+
* @param numExecutors The total number of executors we'd like to have. The cluster manager
30+
* shouldn't kill any running executor to reach this number, but,
31+
* if all existing executors were to die, this is the number of executors
32+
* we'd want to be allocated.
33+
* @param localityAwareTasks The number of tasks in all active stages that have a locality
34+
* preferences. This includes running, pending, and completed tasks.
35+
* @param hostToLocalTaskCount A map of hosts to the number of tasks from all active stages
36+
* that would like to like to run on that host.
37+
* This includes running, pending, and completed tasks.
2938
* @return whether the request is acknowledged by the cluster manager.
3039
*/
31-
private[spark] def requestTotalExecutors(numExecutors: Int): Boolean
40+
private[spark] def requestTotalExecutors(
41+
numExecutors: Int,
42+
localityAwareTasks: Int,
43+
hostToLocalTaskCount: Map[String, Int]): Boolean
3244

3345
/**
3446
* Request an additional number of executors from the cluster manager.

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,12 @@ private[spark] class ExecutorAllocationManager(
161161
// (2) an executor idle timeout has elapsed.
162162
@volatile private var initializing: Boolean = true
163163

164+
// Number of locality aware tasks, used for executor placement.
165+
private var localityAwareTasks = 0
166+
167+
// Host to possible task running on it, used for executor placement.
168+
private var hostToLocalTaskCount: Map[String, Int] = Map.empty
169+
164170
/**
165171
* Verify that the settings specified through the config are valid.
166172
* If not, throw an appropriate exception.
@@ -295,7 +301,7 @@ private[spark] class ExecutorAllocationManager(
295301

296302
// If the new target has not changed, avoid sending a message to the cluster manager
297303
if (numExecutorsTarget < oldNumExecutorsTarget) {
298-
client.requestTotalExecutors(numExecutorsTarget)
304+
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
299305
logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
300306
s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
301307
}
@@ -349,7 +355,8 @@ private[spark] class ExecutorAllocationManager(
349355
return 0
350356
}
351357

352-
val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget)
358+
val addRequestAcknowledged = testing ||
359+
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
353360
if (addRequestAcknowledged) {
354361
val executorsString = "executor" + { if (delta > 1) "s" else "" }
355362
logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" +
@@ -519,13 +526,37 @@ private[spark] class ExecutorAllocationManager(
519526
// Number of tasks currently running on the cluster. Should be 0 when no stages are active.
520527
private var numRunningTasks: Int = _
521528

529+
// stageId to tuple (the number of task with locality preferences, a map where each pair is a
530+
// node and the number of tasks that would like to be scheduled on that node) map,
531+
// maintain the executor placement hints for each stage Id used by resource framework to better
532+
// place the executors.
533+
private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])]
534+
522535
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
523536
initializing = false
524537
val stageId = stageSubmitted.stageInfo.stageId
525538
val numTasks = stageSubmitted.stageInfo.numTasks
526539
allocationManager.synchronized {
527540
stageIdToNumTasks(stageId) = numTasks
528541
allocationManager.onSchedulerBacklogged()
542+
543+
// Compute the number of tasks requested by the stage on each host
544+
var numTasksPending = 0
545+
val hostToLocalTaskCountPerStage = new mutable.HashMap[String, Int]()
546+
stageSubmitted.stageInfo.taskLocalityPreferences.foreach { locality =>
547+
if (!locality.isEmpty) {
548+
numTasksPending += 1
549+
locality.foreach { location =>
550+
val count = hostToLocalTaskCountPerStage.getOrElse(location.host, 0) + 1
551+
hostToLocalTaskCountPerStage(location.host) = count
552+
}
553+
}
554+
}
555+
stageIdToExecutorPlacementHints.put(stageId,
556+
(numTasksPending, hostToLocalTaskCountPerStage.toMap))
557+
558+
// Update the executor placement hints
559+
updateExecutorPlacementHints()
529560
}
530561
}
531562

@@ -534,6 +565,10 @@ private[spark] class ExecutorAllocationManager(
534565
allocationManager.synchronized {
535566
stageIdToNumTasks -= stageId
536567
stageIdToTaskIndices -= stageId
568+
stageIdToExecutorPlacementHints -= stageId
569+
570+
// Update the executor placement hints
571+
updateExecutorPlacementHints()
537572

538573
// If this is the last stage with pending tasks, mark the scheduler queue as empty
539574
// This is needed in case the stage is aborted for any reason
@@ -637,6 +672,29 @@ private[spark] class ExecutorAllocationManager(
637672
def isExecutorIdle(executorId: String): Boolean = {
638673
!executorIdToTaskIds.contains(executorId)
639674
}
675+
676+
/**
677+
* Update the Executor placement hints (the number of tasks with locality preferences,
678+
* a map where each pair is a node and the number of tasks that would like to be scheduled
679+
* on that node).
680+
*
681+
* These hints are updated when stages arrive and complete, so are not up-to-date at task
682+
* granularity within stages.
683+
*/
684+
def updateExecutorPlacementHints(): Unit = {
685+
var localityAwareTasks = 0
686+
val localityToCount = new mutable.HashMap[String, Int]()
687+
stageIdToExecutorPlacementHints.values.foreach { case (numTasksPending, localities) =>
688+
localityAwareTasks += numTasksPending
689+
localities.foreach { case (hostname, count) =>
690+
val updatedCount = localityToCount.getOrElse(hostname, 0) + count
691+
localityToCount(hostname) = updatedCount
692+
}
693+
}
694+
695+
allocationManager.localityAwareTasks = localityAwareTasks
696+
allocationManager.hostToLocalTaskCount = localityToCount.toMap
697+
}
640698
}
641699

642700
/**

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1382,16 +1382,29 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
13821382
}
13831383

13841384
/**
1385-
* Express a preference to the cluster manager for a given total number of executors.
1386-
* This can result in canceling pending requests or filing additional requests.
1387-
* This is currently only supported in YARN mode. Return whether the request is received.
1388-
*/
1389-
private[spark] override def requestTotalExecutors(numExecutors: Int): Boolean = {
1385+
* Update the cluster manager on our scheduling needs. Three bits of information are included
1386+
* to help it make decisions.
1387+
* @param numExecutors The total number of executors we'd like to have. The cluster manager
1388+
* shouldn't kill any running executor to reach this number, but,
1389+
* if all existing executors were to die, this is the number of executors
1390+
* we'd want to be allocated.
1391+
* @param localityAwareTasks The number of tasks in all active stages that have a locality
1392+
* preferences. This includes running, pending, and completed tasks.
1393+
* @param hostToLocalTaskCount A map of hosts to the number of tasks from all active stages
1394+
* that would like to like to run on that host.
1395+
* This includes running, pending, and completed tasks.
1396+
* @return whether the request is acknowledged by the cluster manager.
1397+
*/
1398+
private[spark] override def requestTotalExecutors(
1399+
numExecutors: Int,
1400+
localityAwareTasks: Int,
1401+
hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
1402+
): Boolean = {
13901403
assert(supportDynamicAllocation,
13911404
"Requesting executors is currently only supported in YARN and Mesos modes")
13921405
schedulerBackend match {
13931406
case b: CoarseGrainedSchedulerBackend =>
1394-
b.requestTotalExecutors(numExecutors)
1407+
b.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount)
13951408
case _ =>
13961409
logWarning("Requesting executors is only supported in coarse-grained mode")
13971410
false

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -790,8 +790,28 @@ class DAGScheduler(
790790
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
791791
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
792792
// event.
793-
stage.makeNewStageAttempt(partitionsToCompute.size)
794793
outputCommitCoordinator.stageStart(stage.id)
794+
val taskIdToLocations = try {
795+
stage match {
796+
case s: ShuffleMapStage =>
797+
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
798+
case s: ResultStage =>
799+
val job = s.resultOfJob.get
800+
partitionsToCompute.map { id =>
801+
val p = job.partitions(id)
802+
(id, getPreferredLocs(stage.rdd, p))
803+
}.toMap
804+
}
805+
} catch {
806+
case NonFatal(e) =>
807+
stage.makeNewStageAttempt(partitionsToCompute.size)
808+
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
809+
abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}")
810+
runningStages -= stage
811+
return
812+
}
813+
814+
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
795815
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
796816

797817
// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
@@ -830,7 +850,7 @@ class DAGScheduler(
830850
stage match {
831851
case stage: ShuffleMapStage =>
832852
partitionsToCompute.map { id =>
833-
val locs = getPreferredLocs(stage.rdd, id)
853+
val locs = taskIdToLocations(id)
834854
val part = stage.rdd.partitions(id)
835855
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs)
836856
}
@@ -840,7 +860,7 @@ class DAGScheduler(
840860
partitionsToCompute.map { id =>
841861
val p: Int = job.partitions(id)
842862
val part = stage.rdd.partitions(p)
843-
val locs = getPreferredLocs(stage.rdd, p)
863+
val locs = taskIdToLocations(id)
844864
new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id)
845865
}
846866
}

core/src/main/scala/org/apache/spark/scheduler/Stage.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,11 @@ private[spark] abstract class Stage(
7777
private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)
7878

7979
/** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
80-
def makeNewStageAttempt(numPartitionsToCompute: Int): Unit = {
81-
_latestInfo = StageInfo.fromStage(this, nextAttemptId, Some(numPartitionsToCompute))
80+
def makeNewStageAttempt(
81+
numPartitionsToCompute: Int,
82+
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
83+
_latestInfo = StageInfo.fromStage(
84+
this, nextAttemptId, Some(numPartitionsToCompute), taskLocalityPreferences)
8285
nextAttemptId += 1
8386
}
8487

core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ class StageInfo(
3434
val numTasks: Int,
3535
val rddInfos: Seq[RDDInfo],
3636
val parentIds: Seq[Int],
37-
val details: String) {
37+
val details: String,
38+
private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) {
3839
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
3940
var submissionTime: Option[Long] = None
4041
/** Time when all tasks in the stage completed or when the stage was cancelled. */
@@ -70,7 +71,12 @@ private[spark] object StageInfo {
7071
* shuffle dependencies. Therefore, all ancestor RDDs related to this Stage's RDD through a
7172
* sequence of narrow dependencies should also be associated with this Stage.
7273
*/
73-
def fromStage(stage: Stage, attemptId: Int, numTasks: Option[Int] = None): StageInfo = {
74+
def fromStage(
75+
stage: Stage,
76+
attemptId: Int,
77+
numTasks: Option[Int] = None,
78+
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
79+
): StageInfo = {
7480
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
7581
val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
7682
new StageInfo(
@@ -80,6 +86,7 @@ private[spark] object StageInfo {
8086
numTasks.getOrElse(stage.numTasks),
8187
rddInfos,
8288
stage.parents.map(_.id),
83-
stage.details)
89+
stage.details,
90+
taskLocalityPreferences)
8491
}
8592
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,11 @@ private[spark] object CoarseGrainedClusterMessages {
8686

8787
// Request executors by specifying the new total number of executors desired
8888
// This includes executors already pending or running
89-
case class RequestExecutors(requestedTotal: Int) extends CoarseGrainedClusterMessage
89+
case class RequestExecutors(
90+
requestedTotal: Int,
91+
localityAwareTasks: Int,
92+
hostToLocalTaskCount: Map[String, Int])
93+
extends CoarseGrainedClusterMessage
9094

9195
case class KillExecutors(executorIds: Seq[String]) extends CoarseGrainedClusterMessage
9296

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
6666
// Executors we have requested the cluster manager to kill that have not died yet
6767
private val executorsPendingToRemove = new HashSet[String]
6868

69+
// A map to store hostname with its possible task number running on it
70+
protected var hostToLocalTaskCount: Map[String, Int] = Map.empty
71+
72+
// The number of pending tasks which is locality required
73+
protected var localityAwareTasks = 0
74+
6975
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
7076
extends ThreadSafeRpcEndpoint with Logging {
7177

@@ -339,23 +345,41 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
339345
}
340346
logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
341347
logDebug(s"Number of pending executors is now $numPendingExecutors")
348+
342349
numPendingExecutors += numAdditionalExecutors
343350
// Account for executors pending to be added or removed
344351
val newTotal = numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size
345352
doRequestTotalExecutors(newTotal)
346353
}
347354

348355
/**
349-
* Express a preference to the cluster manager for a given total number of executors. This can
350-
* result in canceling pending requests or filing additional requests.
351-
* @return whether the request is acknowledged.
356+
* Update the cluster manager on our scheduling needs. Three bits of information are included
357+
* to help it make decisions.
358+
* @param numExecutors The total number of executors we'd like to have. The cluster manager
359+
* shouldn't kill any running executor to reach this number, but,
360+
* if all existing executors were to die, this is the number of executors
361+
* we'd want to be allocated.
362+
* @param localityAwareTasks The number of tasks in all active stages that have a locality
363+
* preferences. This includes running, pending, and completed tasks.
364+
* @param hostToLocalTaskCount A map of hosts to the number of tasks from all active stages
365+
* that would like to like to run on that host.
366+
* This includes running, pending, and completed tasks.
367+
* @return whether the request is acknowledged by the cluster manager.
352368
*/
353-
final override def requestTotalExecutors(numExecutors: Int): Boolean = synchronized {
369+
final override def requestTotalExecutors(
370+
numExecutors: Int,
371+
localityAwareTasks: Int,
372+
hostToLocalTaskCount: Map[String, Int]
373+
): Boolean = synchronized {
354374
if (numExecutors < 0) {
355375
throw new IllegalArgumentException(
356376
"Attempted to request a negative number of executor(s) " +
357377
s"$numExecutors from the cluster manager. Please specify a positive number!")
358378
}
379+
380+
this.localityAwareTasks = localityAwareTasks
381+
this.hostToLocalTaskCount = hostToLocalTaskCount
382+
359383
numPendingExecutors =
360384
math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0)
361385
doRequestTotalExecutors(numExecutors)

core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ private[spark] abstract class YarnSchedulerBackend(
5353
* This includes executors already pending or running.
5454
*/
5555
override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
56-
yarnSchedulerEndpoint.askWithRetry[Boolean](RequestExecutors(requestedTotal))
56+
yarnSchedulerEndpoint.askWithRetry[Boolean](
57+
RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
5758
}
5859

5960
/**

0 commit comments

Comments
 (0)