Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ private[spark] class TaskSchedulerImpl(

private[scheduler] var barrierCoordinator: RpcEndpoint = null

/**
* It can be override in different TaskScheduler, like Yarn.
* None by default. This should be initialized before any invocation.
*/
protected val defaultRackValue: Option[String] = None

private def maybeInitBarrierCoordinator(): Unit = {
if (barrierCoordinator == null) {
barrierCoordinator = new BarrierCoordinator(barrierSyncTimeout, sc.listenerBus,
Expand Down Expand Up @@ -375,9 +381,10 @@ private[spark] class TaskSchedulerImpl(
executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
newExecAvail = true
}
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
val hosts = offers.map(_.host).toSet.toSeq
for ((host, Some(rack)) <- hosts.zip(getRacksForHosts(hosts))) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += host
}

// Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
Expand Down Expand Up @@ -811,8 +818,38 @@ private[spark] class TaskSchedulerImpl(
blacklistTrackerOpt.map(_.nodeBlacklist()).getOrElse(scala.collection.immutable.Set())
}

// Add a on-off switch to save time for rack resolving
private def skipRackResolving: Boolean = sc.conf.get(LOCALITY_WAIT_RACK) == 0L
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just reviewing another patch related to delay scheduling, and I realized this optimization is a bit too aggressive. That configuration only controls whether you wait for a resource that is rack-local. Even when the wait is 0, spark still tries to to find a rack-local task for a given resource offer; it just will schedule a non-rack-local task even if it can't find a rack-local one. But it won't be able to do that if it doesn't know what racks the resource offers are on.

So I think you either need to:

a) change this to use a new conf, with an extra check that you only turn off rack resolution entirely if its also true that sc.conf.get(LOCALITY_WAIT_RACK) == 0L

b) is this optimization even needed, considering how much time the rest of this change should save? Maybe we should still always do the rack resolution, since it should be pretty fast after the rest of your change.

Copy link
Contributor Author

@LantaoJin LantaoJin Apr 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I knew that. Isolation deployment of computing and storage is more and more popular in industry. If someone set LOCALITY_WAIT to 0 on purpose, almost time it is no need data locality at all (especially rack level). Rack resolving and locality algorithm still spends time on this isolation deployment case. Could we open a new ticket to address this?

Copy link
Contributor Author

@LantaoJin LantaoJin Apr 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito #24175 also faces same situation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for pointing out the other pr that is similar -- yes I agree, I can see cases where you would want to skip rack resolution entirely for the "disaggregated" clusters you are talking about. I'll comment on the other ticket as well. Would you like to follow up with that?


/**
* Get racks info for hosts. This is the internal method of [[getRacksForHosts]].
* It should be override in different TaskScheduler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line.

* The return racks must have to be the same length as the hosts passed in.
* Return [[defaultRackValue]] sequence by default.
*/
protected def doGetRacksForHosts(hosts: Seq[String]): Seq[Option[String]] = {
hosts.map(_ => defaultRackValue)
}

// By default, rack is unknown
def getRackForHost(value: String): Option[String] = None
def getRackForHost(hosts: String): Option[String] = {
if (skipRackResolving) {
defaultRackValue
} else {
doGetRacksForHosts(Seq(hosts)).head
}
}

/**
* null in return sequences will be replaced to [[defaultRackValue]].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what this is explaining? The code in this method doesn't seem to have any null handling at all.

*/
def getRacksForHosts(hosts: Seq[String]): Seq[Option[String]] = {
if (skipRackResolving) {
hosts.map(_ => defaultRackValue)
} else {
doGetRacksForHosts(hosts)
}
}

private def waitBackendReady(): Unit = {
if (backend.isReady) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,23 @@ private[spark] class TaskSetManager(

// Add all our tasks to the pending lists. We do this in reverse order
// of task index so that tasks with low indices get launched first.
for (i <- (0 until numTasks).reverse) {
addPendingTask(i)
addPendingTasks()

private def addPendingTasks(): Unit = {
val (_, duration) = Utils.timeTakenMs {
val hostToIndices = new HashMap[String, ArrayBuffer[Int]]()
for (i <- (0 until numTasks).reverse) {
addPendingTask(i, initializing = true, Some(hostToIndices))
}
// Resolve the rack for each host. This can be slow, so de-dupe the list of hosts,
// and assign the rack to all relevant task indices.
for (
(Some(rack), indices) <- sched.getRacksForHosts(hostToIndices.keySet.toSeq)
.zip(hostToIndices.values)) {
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) ++= indices
}
}
logDebug(s"Adding pending tasks take $duration ms")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/take/took

}

/**
Expand All @@ -214,7 +229,10 @@ private[spark] class TaskSetManager(
private[scheduler] var emittedTaskSizeWarning = false

/** Add a task to all the pending-task lists that it should be on. */
private[spark] def addPendingTask(index: Int) {
private[spark] def addPendingTask(
index: Int,
initializing: Boolean = false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I like passing these arguments to this function. Could this be solved with a resolveRacks: Boolean argument, and using pendingTasksForHost to resolve the racks in the addPendingTasks method?

It seems that initializingMap and pendingTasksForHost are pretty much the same thing during initialization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case it was unclear, that was exactly my suggestion earlier as well. Sorry for any confusion.

initializingMap: Option[HashMap[String, ArrayBuffer[Int]]] = None) {
for (loc <- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =>
Expand All @@ -234,8 +252,14 @@ private[spark] class TaskSetManager(
case _ =>
}
pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
for (rack <- sched.getRackForHost(loc.host)) {
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index

if (initializing) {
// preferredLocation -> task indices, initializingMap used when TaskSetManager initializing
initializingMap.foreach(_.getOrElseUpdate(loc.host, new ArrayBuffer) += index)
} else {
for (rack <- sched.getRackForHost(loc.host)) {
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}
}

Expand All @@ -249,24 +273,27 @@ private[spark] class TaskSetManager(
/**
* Return the pending tasks list for a given executor ID, or an empty list if
* there is no map entry for that host
* This is visible for testing.
*/
private def getPendingTasksForExecutor(executorId: String): ArrayBuffer[Int] = {
private[scheduler] def getPendingTasksForExecutor(executorId: String): ArrayBuffer[Int] = {
pendingTasksForExecutor.getOrElse(executorId, ArrayBuffer())
}

/**
* Return the pending tasks list for a given host, or an empty list if
* there is no map entry for that host
* This is visible for testing.
*/
private def getPendingTasksForHost(host: String): ArrayBuffer[Int] = {
private[scheduler] def getPendingTasksForHost(host: String): ArrayBuffer[Int] = {
pendingTasksForHost.getOrElse(host, ArrayBuffer())
}

/**
* Return the pending rack-local task list for a given rack, or an empty list if
* there is no map entry for that rack
* This is visible for testing.
*/
private def getPendingTasksForRack(rack: String): ArrayBuffer[Int] = {
private[scheduler] def getPendingTasksForRack(rack: String): ArrayBuffer[Int] = {
pendingTasksForRack.getOrElse(rack, ArrayBuffer())
}

Expand Down Expand Up @@ -331,7 +358,7 @@ private[spark] class TaskSetManager(
val executors = prefs.flatMap(_ match {
case e: ExecutorCacheTaskLocation => Some(e.executorId)
case _ => None
});
})
if (executors.contains(execId)) {
speculatableTasks -= index
return Some((index, TaskLocality.PROCESS_LOCAL))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import java.util.{Properties, Random}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.mockito.ArgumentMatchers.{any, anyInt, anyString}
import org.mockito.Mockito.{mock, never, spy, times, verify, when}
import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyString}
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer

Expand Down Expand Up @@ -69,17 +69,23 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
// Get the rack for a given host
object FakeRackUtil {
private val hostToRack = new mutable.HashMap[String, String]()
var numBatchInvocation = 0

def cleanUp() {
hostToRack.clear()
numBatchInvocation = 0
}

def assignHostToRack(host: String, rack: String) {
hostToRack(host) = rack
}

def getRackForHost(host: String): Option[String] = {
hostToRack.get(host)
def getRacksForHosts(hosts: Seq[String]): Seq[Option[String]] = {
assert(hosts.toSet.size == hosts.size) // no dups in hosts
if (hosts.nonEmpty && hosts.length != 1) {
numBatchInvocation += 1
}
hosts.map(hostToRack.get(_))
}
}

Expand All @@ -99,6 +105,9 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
val speculativeTasks = new ArrayBuffer[Int]

val executors = new mutable.HashMap[String, String]

// this must be initialized before addExecutor
override val defaultRackValue: Option[String] = Some("default")
for ((execId, host) <- liveExecutors) {
addExecutor(execId, host)
}
Expand Down Expand Up @@ -144,8 +153,9 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
}
}


override def getRackForHost(value: String): Option[String] = FakeRackUtil.getRackForHost(value)
override def doGetRacksForHosts(hosts: Seq[String]): Seq[Option[String]] = {
FakeRackUtil.getRacksForHosts(hosts)
}
}

/**
Expand Down Expand Up @@ -1316,7 +1326,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val taskDesc = taskSetManagerSpy.resourceOffer(exec, host, TaskLocality.ANY)

// Assert the task has been black listed on the executor it was last executed on.
when(taskSetManagerSpy.addPendingTask(anyInt())).thenAnswer(
when(taskSetManagerSpy.addPendingTask(anyInt(), anyBoolean(), any())).thenAnswer(
new Answer[Unit] {
override def answer(invocationOnMock: InvocationOnMock): Unit = {
val task: Int = invocationOnMock.getArgument(0)
Expand All @@ -1330,7 +1340,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val e = new ExceptionFailure("a", "b", Array(), "c", None)
taskSetManagerSpy.handleFailedTask(taskDesc.get.taskId, TaskState.FAILED, e)

verify(taskSetManagerSpy, times(1)).addPendingTask(anyInt())
verify(taskSetManagerSpy, times(1)).addPendingTask(anyInt(), anyBoolean(), any())
}

test("SPARK-21563 context's added jars shouldn't change mid-TaskSet") {
Expand Down Expand Up @@ -1602,4 +1612,60 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
verify(sched.dagScheduler).taskEnded(manager.tasks(3), Success, result.value(),
result.accumUpdates, info3)
}

test("SPARK-13704 Rack Resolution is done with a batch of de-duped hosts") {
val conf = new SparkConf()
.set(config.LOCALITY_WAIT.key, "0")
.set(config.LOCALITY_WAIT_RACK.key, "1s")
sc = new SparkContext("local", "test", conf)
// Create a cluster with 20 racks, with hosts spread out among them
val execAndHost = (0 to 199).map { i =>
FakeRackUtil.assignHostToRack("host" + i, "rack" + (i % 20))
("exec" + i, "host" + i)
}
sched = new FakeTaskScheduler(sc, execAndHost: _*)
// make a taskset with preferred locations on the first 100 hosts in our cluster
val locations = new ArrayBuffer[Seq[TaskLocation]]()
for (i <- 0 to 99) {
locations += Seq(TaskLocation("host" + i))
}
val taskSet = FakeTask.createTaskSet(100, locations: _*)
val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// with rack locality, reject an offer on a host with an unknown rack
assert(manager.resourceOffer("otherExec", "otherHost", TaskLocality.RACK_LOCAL).isEmpty)
(0 until 20).foreach { rackIdx =>
(0 until 5).foreach { offerIdx =>
// if we offer hosts which are not in preferred locations,
// we'll reject them at NODE_LOCAL level,
// but accept them at RACK_LOCAL level if they're on OK racks
val hostIdx = 100 + rackIdx
assert(manager.resourceOffer("exec" + hostIdx, "host" + hostIdx, TaskLocality.NODE_LOCAL)
.isEmpty)
assert(manager.resourceOffer("exec" + hostIdx, "host" + hostIdx, TaskLocality.RACK_LOCAL)
.isDefined)
}
}
assert(FakeRackUtil.numBatchInvocation === 1)
}

test("SPARK-13704 Rack resolving is skipped when spark.locality.wait is zero") {
val conf = new SparkConf().set(config.LOCALITY_WAIT.key, "0")
sc = new SparkContext("local", "test", conf)
for (i <- 0 to 99) {
FakeRackUtil.assignHostToRack("host" + i, "rack" + (i % 20))
}
sched = new FakeTaskScheduler(sc,
("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
val locations = new ArrayBuffer[Seq[TaskLocation]]()
for (i <- 0 to 99) {
locations += Seq(TaskLocation("host" + i))
}
val taskSet = FakeTask.createTaskSet(100, locations: _*)
val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// verify the total number not changed with SPARK-13704
assert(manager.getPendingTasksForRack(sched.defaultRackValue.get).length === 100)
assert(FakeRackUtil.numBatchInvocation === 0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,8 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
// Only filter out the ratio which is larger than 0, which means the current host can
// still be allocated with new container request.
val hosts = preferredLocalityRatio.filter(_._2 > 0).keys.toArray
val racks = hosts.map { h =>
resolver.resolve(yarnConf, h)
}.toSet
val racks = resolver.resolve(yarnConf, hosts).map(_.getNetworkLocation)
.filter(_ != null).toSet
containerLocalityPreferences += ContainerLocalityPreferences(hosts, racks.toArray)

// Minus 1 each time when the host is used. When the current ratio is 0,
Expand Down
Loading