Skip to content

Conversation

@LantaoJin
Copy link
Contributor

@LantaoJin LantaoJin commented Mar 4, 2019

What changes were proposed in this pull request?

If submits a stage with abundant tasks, rack resolving takes a long time when initializing TaskSetManager caused by a mass of loops to execute rack resolving script.
Based on current implementation, it takes 30~40 seconds to resolve the racks in our 5000 nodes' cluster. After applied the patch, it decreased to less than 15 seconds. Furthermore, in another cluster which is a disaggregated storage and compute architecture, setting locality wait time to zero, there is no delay at all when launches tasks.

for (i <- (0 until numTasks).reverse) {
    addPendingTask(i, true)
}
...
private[spark] def addPendingTask(index: Int) {
    for (loc <- tasks(index).preferredLocations) {
      ...
      for (rack <- sched.getRackForHost(loc.host)) {  //<--- invoke one host per time
        ...
      }
    }
  ...
}

YARN-9332 has added an interface to handle multiple hosts in one invocation to save time. But before upgrading to the newest Hadoop, we could construct the same tool in Spark to resolve this issue.

How was this patch tested?

UT and manually testing:

Cluster Baseline PR
5000 nodes 40s 13s
isolation 40s 0s

@LantaoJin
Copy link
Contributor Author

@squito @jerryshao @dongjoon-hyun @kiszk Could you help to review when you have time?

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall this makes sense to me, though I do have some questions & some suggested changes.

}
assert(total === 100) // verify the total number always equals 100 with/without SPARK-27038
// verify elapsed time should be less than 1s, without SPARK-27038, it should be larger 10s
assert(manager.addTaskElapsedTime < 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is going to be super flaky on jenkins. I've seen tests randomly pause for multiple seconds.

Rather than testing the timing, I'd count how many times sched.getRackForHost etc. are called.

Copy link
Contributor Author

@LantaoJin LantaoJin Mar 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @squito , I changed it with a simulation function of org.apache.hadoop.net.ScriptBasedMapping.runResolveCommand(). I think it could equal to time reducing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to counting how many times getRacksForHosts

if (classOf[ScriptBasedMapping].isAssignableFrom(dnsToSwitchMappingClass)) {
val numArgs = conf.getInt(CommonConfigurationKeysPublic.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_KEY,
CommonConfigurationKeysPublic.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_DEFAULT)
if (numArgs < ADVISORY_MINIMUM_NUMBER_SCRIPT_ARGS) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't understand the numArgs part at all, can you please explain?

Copy link
Contributor Author

@LantaoJin LantaoJin Mar 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the max allowed count of script arguments in one execution. In Hadoop, the length of script arguments over numArgs will be executed in next round in loop. The default value 100 means hosts over 100 couldn't be resolved in one shot. For instance, we have 201 hosts to be resolved for a job, Hadoop will launch script three times to resolve:

script.py host1 host2 ... host100
script.py host101... host200, host201
script.py host201

It launches this python script 3 times. First is for host1 to host100, the second round resolves host100 to host200, the last round only resolves host201.

For Spark, in a big cluster, we hope to reduce the script execution rounds. 100 may be too small. In a small cluster or small job, whatever this value is 100 or 10000, there is no harm. But in for a large cluster with a big job. Logging out this warning log is to remind users to increase the configuration net.topology.script.number.args to reduce the inefficient execution loops of script.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the value 10000 I set hasn't any evidence from production testing. I just think it could be resolved in one execution shot and I think almost clusters in industry are under 10000 nodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the warning log doesn't give user any gold value for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so this has no effect on how you prepare the arguments to this script in spark itself, it just controls the behavior of that script, right?

so is there a downside to running with a huge number, say 10M? Not knowing anything about that script, I figure there must be some tradeoff here. I am not sure you should be setting the value to 10K if you haven't tried anything which hits that limit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't any existing "the script" in Hadoop or Spark. This needs to be provided by Hadoop administrator. If it is not set, the default value of DEFAULT_RACK is returned for all node names. But whatever the script looks like, the interface runResolveCommand in ScriptBasedMapping is fixed: It accepts a host list and executes several times based on net.topology.script.number.args we mentioned above. So I didn't take account of any execution performance of this script. Logging with a higher advisory value is based on the character of Spark jobs comparing to MR job. I will change this logging to INFO and remove the fixed value 10k.

@vanzin
Copy link
Contributor

vanzin commented Mar 5, 2019

Two things without going into details of the implementation:

  • PR title should explain the fix, not the problem.
  • Spark already has SparkRackResolver which is used to wrap YARN's code; that is not currently hooked into YarnScheduler, but I'd prefer to reuse that class instead of adding yet another one.

@LantaoJin LantaoJin changed the title [SPARK-27038][CORE][YARN] Rack resolving takes a long time when initializing TaskSetManager [SPARK-27038][CORE][YARN] Re-implement RackResolver to reduce resolving time Mar 7, 2019
@LantaoJin
Copy link
Contributor Author

LantaoJin commented Mar 7, 2019

  • SparkRackResolver

Thanks for reminding @vanzin.

}
loopCount += 1
}
loopCount
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there really any value in adding this complexity to the test?

Stepping back a bit, I think there are 2 changes you're making here to improve performance:

  1. you use getRacksForHosts (plural) to requests the racks in bulk

  2. you are changing the way the rack resolver itself works, so that it will cache requests for the same host.

  3. If spark.locality.rack.wait == 0, you skip rack resolution entirely.

I think here you just want to test (1), and for that, you just need to count invocations for getRackForHost() vs. counts of getRacksForHosts()

Would it be worth also adding a test for (2) somehow? I'm not sure what you could do there without it being tied to the internals of the hadoop logic. You could test the instance of the created

Also I think it would be easy and useful for you to add a test for (3) here, the same way you're testing (1), just that you'd expect getRacksForHosts() to be called 0 times.

Copy link
Contributor Author

@LantaoJin LantaoJin Mar 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About (1) Why I added this complexity code for UT is that after applied this patch, the initializing processing of TaskSetManager won't invoke getRackForHost() any more. It has no place to count invocations for getRackForHost() vs counts of getRacksForHosts(). So I added this simulator to count the execution count of script. If this patch reversed, this assert(FakeRackUtil.loopCount === 4) will fail.
About (3), I will add this kind of testing.

Copy link
Contributor Author

@LantaoJin LantaoJin Mar 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About (2) This part has tested in Hadoop and I just create an instance of CachedDNSToSwitchMapping:

       dnsToSwitchMapping = newInstance match {
          case _: CachedDNSToSwitchMapping => newInstance
          case _ => new CachedDNSToSwitchMapping(newInstance)
        }

So I think it doesn't need to add a test for the cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why I added this complexity code for UT is that after applied this patch, the initializing processing of TaskSetManager won't invoke getRackForHost() any more. It has no place to count invocations for getRackForHost() vs counts of getRacksForHosts()

Just count the invocations in FakeRackUtil.getRackForHost() etc.

object FakeRackUtil {
...
  def getRackForHost(host: String): Option[String] = {
    getRackForHostCount += 1
    hostToRack.get(host)
  }
...
test("SPARK-27038 Verify the rack resolving time has been reduced") {
  FakeRackUtil.getRackForHostCount = 0
  FakeRackUtil.getRackForHostsCount = 0
  ...
  assert(FakeRackUtil.getRackForHostCount === 0)
  assert(FakeRackUtil.getRackForHostsCount === 1)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FakeRackUtil.getRackForHostCount couldn't equals to 0 here since there are some other callers.

addPendingTask(i, true)
}
// Convert preferred location list to rack list in one invocation and zip with the origin index
private val rackWithTaskIndex = sched.getRacksForHosts(locationWithTaskIndex.map(_._1).toList)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be worth doing some de-duping here? It would not be unusual to have a taskset with 10K tasks on 100 hosts. I see that CachedDNSToSwitchMapping will cache the lookup itself, but doesn't seem necessary to create those intermediate data structures with 10K elements.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw I meant something like this for avoiding leaving around locationWithTaskIndex and doing the deduping.
squito@fc0b308

Feel free to pull in that commit to your change if you think its helpful.

The de-duping thing is minor, but I am concerned that the locationWithTaskIndex variable is going to be confusing if its left around as a private member variable, even though its only meaningful in this limited context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be worth doing some de-duping here? It would not be unusual to have a taskset with 10K tasks on 100 hosts. I see that CachedDNSToSwitchMapping will cache the lookup itself, but doesn't seem necessary to create those intermediate data structures with 10K elements.

We can not de-duping this List, or the logic about choosing a task to launch will incorrect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The de-duping thing is minor, but I am concerned that the locationWithTaskIndex variable is going to be confusing if its left around as a private member variable, even though its only meaningful in this limited context.

Yes, I don't want to do any de-duping here. I will refactor this part for getting more readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On de-duping -- I know you can't do it blindly, as then you won't correctly keep track of which task goes with which host. But if you look at my proposed change, I think it keeps track of the correct mapping back to the original task indices (it certainly should be possible, anyway, even if that exact version doesn't work).

Looking at CachedDNSToSwitchMapping, if you repeat the same host 1000 times, if that host is not in the cache yet, it will repeat that lookup 1000 times. getUncachedHosts does a lookup to check if its in the cache, but if not it'll just pass each copy on to the uncachedHosts variable. Then that just gets passed through to rawMapping.resolve(). (Perhaps getUncachedHosts really should return a Set<String>.)

so after looking at that code, I'm more convinced de-duping would help you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor with the de-duping approach.

@LantaoJin
Copy link
Contributor Author

Jenkins, ok to test

if (classOf[ScriptBasedMapping].isAssignableFrom(dnsToSwitchMappingClass)) {
val numArgs = conf.getInt(CommonConfigurationKeysPublic.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_KEY,
CommonConfigurationKeysPublic.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_DEFAULT)
logInfo(s"Setting spark.hadoop.net.topology.script.number.args with a higher value " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This message seems more scary than useful.

If the user has changed this already, why are you telling them to do it again?

The default is 100, that seems like a perfectly good number already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simple test result in a 1000 nodes cluster looks like:

bin/spark-sql --master yarn --conf spark.executor.instances=50 --conf spark.executor.cores=3 --conf spark.dynamicAllocation.enabled=false --driver-memory 10g
spark-sql> select count(*) from test_table;

19/03/10 18:21:42 INFO YarnScheduler: Adding task set 1.0 with 19411 tasks
19/03/10 18:22:11 INFO TaskSetManager: Adding pending tasks take 29692 ms

bin/spark-sql --master yarn --conf spark.executor.instances=50 --conf spark.executor.cores=3 --conf spark.dynamicAllocation.enabled=false --driver-memory 10g --conf spark.hadoop.net.topology.script.number.args=10000
spark-sql> select count(*) from test_table;

19/03/10 18:18:56 INFO YarnScheduler: Adding task set 1.0 with 19411 tasks
19/03/10 18:19:11 INFO TaskSetManager: Adding pending tasks take 14935 ms

That's why I hope to inform user to increase the max count of script arguments. @vanzin

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without patching, whatever value of this configuration set, it takes over 30s under the same environment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont' think there is any question that the rest of the patch is useful. I think both marcelo and I are worried that this isn't really giving the user any actionable info. We're just going to get questions about what to set it to. 10K? 1M? 1B? They'll see this msg no matter what they set it to, and they still won't have any idea what else to set it to. Clearly, that hadoop code is intentionally avoiding too large an arg list.

Note that if you used the de-duping I suggest, it would already help in your use case. You have 19K tasks on 1K hosts. With 100 script args, and no de-duping, that's 190 invocations of the script. With your patch & spark.hadoop.net.topology.script.number.args=10K, its down to 2 script invocations. With your patch + de-duping, and leaving spark.hadoop.net.topology.script.number.args at the default of 100, you'd only have 10 invocations. (perhaps each invocation of the script would be faster when its got 100 args vs. 10K args?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They'll see this msg no matter what they set it to, and they still won't have any idea what else to set it to.

Agree. I will look into the de-duping approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done the de-duping approach. @squito could you have a look at it?

}
loopCount += 1
}
loopCount
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why I added this complexity code for UT is that after applied this patch, the initializing processing of TaskSetManager won't invoke getRackForHost() any more. It has no place to count invocations for getRackForHost() vs counts of getRacksForHosts()

Just count the invocations in FakeRackUtil.getRackForHost() etc.

object FakeRackUtil {
...
  def getRackForHost(host: String): Option[String] = {
    getRackForHostCount += 1
    hostToRack.get(host)
  }
...
test("SPARK-27038 Verify the rack resolving time has been reduced") {
  FakeRackUtil.getRackForHostCount = 0
  FakeRackUtil.getRackForHostsCount = 0
  ...
  assert(FakeRackUtil.getRackForHostCount === 0)
  assert(FakeRackUtil.getRackForHostsCount === 1)
}

val taskSet = FakeTask.createTaskSet(100, locations: _*)
val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
assert(sched.skipRackResolving === true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're not testing the actual implementation here, you're testing your re-implementation in the test mock. But I think this exposes an important point -- you shouldn't apply this optimization in YarnScheduler, you should probably do it in TaskSetManager. And then you'd test it here the same way you test above, just by counting invocations of both methods in FakeRackUtil.

Copy link
Contributor Author

@LantaoJin LantaoJin Mar 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're not testing the actual implementation here, you're testing your re-implementation in the test mock.

Yes, I know that. Because YarnScheduler in Yarn module, but TaskSetManagerSuite in Spark core

you shouldn't apply this optimization in YarnScheduler, you should probably do it in TaskSetManager.

I will try this way. But for getRackForHost(), the callers are TaskSetManager and TaskSchedulerImpl

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito I've refactor for above comments. Now this optimization is not in YarnScheduler, I moved it to TaskSchedulerImpl. Based on this way, the testing is a really test instead of re-implementation in the test mock. In previous implementation, I mixed the Yarn and Core code together, now they are spilled.

@squito
Copy link
Contributor

squito commented Mar 13, 2019

btw I just noticed that dynamic allocation may still suffer with rack resolution even after this fix, if YARN-9332 isn't in, because AMRMClientImpl.addContainerRequest() will resolve the racks again:

https://github.com/apache/hadoop/blob/6fa229891e06eea62cb9634efde755f40247e816/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java#L550

which is called here in Spark:

newLocalityRequests.foreach { request =>
amClient.addContainerRequest(request)
}

I'm not sure if there is anything better Spark can do about this, but wanted to mention it while I saw it.

@vanzin vanzin changed the title [SPARK-27038][CORE][YARN] Re-implement RackResolver to reduce resolving time [SPARK-13704][CORE][YARN] Re-implement RackResolver to reduce resolving time Mar 15, 2019
@LantaoJin
Copy link
Contributor Author

LantaoJin commented Mar 18, 2019

#23951 (comment) Thank you @squito , I've filed a ticket YARN-9394 for a long term solution. But currently, the invocation is too deep, we cannot fix this by re-implementation something now.

@SparkQA
Copy link

SparkQA commented Mar 20, 2019

Test build #103720 has finished for PR 23951 at commit aca97f1.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Mar 20, 2019

btw I happened to encounter this issue on a ~1200 node cluster, where this exact problem was causing a 3 minute slowdown (6 minutes in client mode because of the repeated rack resolution). I played around with the script a bit using different number of arguments. I found that with the changes here, it would go down to about 3 seconds (6 seconds in client mode). Furthermore, the script spent almost the entire time in initialization, and took virtually the same amount of time whether it was given 1 argument or 1200. Adjusting the number of arguments to the script would bring the runtime down to ~0.2 seconds.

So your change would bring a huge improvement, and all the other stuff I was commenting about w/ the extra resolution in AMRMClientImpl etc. isn't really so important, and I'd just ignore it for now.

case (Some(rack), indices) =>
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) ++= indices
case _ =>
// Resolve the rack for each host. This can be somehow slow, so de-dupe the list of hosts,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I had a typo in my suggested comment, no "somehow" :P

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will delete it

initializingHostToIndices match {
case Some(hostToIndices) =>
// when TaskSetManager initializing, preferredLocation -> task indices
hostToIndices.getOrElseUpdate(loc.host, new ArrayBuffer) += index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, you would still need to pass in another argument to addPendingTask, to control whether or not you add to pendingTasksForRack, but it could just be initializing: Boolean
(sorry for the back and forth on this)

hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
val hosts = offers.map(_.host)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could have a lot of dups too, so I'd do val hosts = offers.map(_.host).toSet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

* Get racks info for a host list. This is the internal method of [[getRacksForHosts]].
* It should be override in different TaskScheduler. Return [[Nil]] by default.
*/
protected def doGetRacksForHosts(hosts: Seq[String]): Seq[String] = Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't the returned list have to be the same length as the passed in hosts? you take the result of this and zip it with other collections, so I'd assume you do. You should both fix the default implementation and update the doc to mention this requirement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

@SparkQA
Copy link

SparkQA commented Mar 21, 2019

Test build #103758 has finished for PR 23951 at commit 75d22ed.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LantaoJin
Copy link
Contributor Author

  • This patch fails to generate documentation.

What it fails for?

@SparkQA
Copy link

SparkQA commented Mar 21, 2019

Test build #103763 has finished for PR 23951 at commit 2876ad3.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LantaoJin
Copy link
Contributor Author

It's weird, I can build successfully.
retest this please.

@squito
Copy link
Contributor

squito commented Mar 21, 2019

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Mar 21, 2019

Test build #103772 has started for PR 23951 at commit 2876ad3.

@shaneknapp
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Mar 22, 2019

Test build #103786 has finished for PR 23951 at commit 2876ad3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Mar 22, 2019

The failure (in org.apache.spark.scheduler.TaskSchedulerImplSuite"SPARK-16106 locality levels updated if executor added to existing host") is probably real (you might just need to update the test assertions).

@LantaoJin
Copy link
Contributor Author

LantaoJin commented Mar 22, 2019

The failure (in org.apache.spark.scheduler.TaskSchedulerImplSuite"SPARK-16106 locality levels updated if executor added to existing host") is probably real (you might just need to update the test assertions).

That's because I changed the behavior of getRackForHost. The original returns None by default. Now it returns string "unknown". @squito Should I keep current behavior or recover to the original behavior?

@LantaoJin
Copy link
Contributor Author

I prefer to keep the original behavior, I will fix it soon.

@LantaoJin
Copy link
Contributor Author

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Mar 22, 2019

Test build #103803 has finished for PR 23951 at commit 92ef335.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LantaoJin
Copy link
Contributor Author

Jenkins, retest this please

}

/**
* null in return sequences will be replaced to [[defaultRackValue]].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what this is explaining? The code in this method doesn't seem to have any null handling at all.


/**
* Get racks info for hosts. This is the internal method of [[getRacksForHosts]].
* It should be override in different TaskScheduler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line.

pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) ++= indices
}
}
logDebug(s"Adding pending tasks take $duration ms")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/take/took

private[spark] def addPendingTask(index: Int) {
private[spark] def addPendingTask(
index: Int,
initializing: Boolean = false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I like passing these arguments to this function. Could this be solved with a resolveRacks: Boolean argument, and using pendingTasksForHost to resolve the racks in the addPendingTasks method?

It seems that initializingMap and pendingTasksForHost are pretty much the same thing during initialization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case it was unclear, that was exactly my suggestion earlier as well. Sorry for any confusion.

conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
classOf[ScriptBasedMapping], classOf[DNSToSwitchMapping])
ReflectionUtils.newInstance(dnsToSwitchMappingClass, conf)
.asInstanceOf[DNSToSwitchMapping] match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent more

* in hadoop releases with YARN-9332.
*/
def resolve(conf: Configuration, hostNames: Seq[String]): Seq[Node] = {
SparkRackResolver(conf).coreResolve(hostNames)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question, why is the class calling the singleton?

The point of having a class is so that you can have more than one instance of it. You're defeating that by doing this.

* In the meantime, this is a re-implementation for spark's use.
*/
object SparkRackResolver extends Logging {
private var instance: SparkRackResolver = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@volatile

/**
* Instantiate a separate resolver with a separate config.
*/
def get(conf: Configuration): SparkRackResolver = new SparkRackResolver(conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need. You can call the constructor directly.

In fact I'd not use the apply method and have get here return the shared instance. SparkRackResolver(conf) looks too much like you're instantiating something.

}
}

def resolve(conf: Configuration, hostName: String): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods could lose the conf parameter since now the class already knows it.

import org.apache.spark.util.ManualClock

class MockResolver extends SparkRackResolver {
class MockResolver() extends SparkRackResolver(SparkHadoopUtil.get.conf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

() not necessary.

}

// Add a on-off switch to save time for rack resolving
private def skipRackResolving: Boolean = sc.conf.get(LOCALITY_WAIT_RACK) == 0L
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just reviewing another patch related to delay scheduling, and I realized this optimization is a bit too aggressive. That configuration only controls whether you wait for a resource that is rack-local. Even when the wait is 0, spark still tries to to find a rack-local task for a given resource offer; it just will schedule a non-rack-local task even if it can't find a rack-local one. But it won't be able to do that if it doesn't know what racks the resource offers are on.

So I think you either need to:

a) change this to use a new conf, with an extra check that you only turn off rack resolution entirely if its also true that sc.conf.get(LOCALITY_WAIT_RACK) == 0L

b) is this optimization even needed, considering how much time the rest of this change should save? Maybe we should still always do the rack resolution, since it should be pretty fast after the rest of your change.

Copy link
Contributor Author

@LantaoJin LantaoJin Apr 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I knew that. Isolation deployment of computing and storage is more and more popular in industry. If someone set LOCALITY_WAIT to 0 on purpose, almost time it is no need data locality at all (especially rack level). Rack resolving and locality algorithm still spends time on this isolation deployment case. Could we open a new ticket to address this?

Copy link
Contributor Author

@LantaoJin LantaoJin Apr 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito #24175 also faces same situation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for pointing out the other pr that is similar -- yes I agree, I can see cases where you would want to skip rack resolution entirely for the "disaggregated" clusters you are talking about. I'll comment on the other ticket as well. Would you like to follow up with that?

@squito
Copy link
Contributor

squito commented Mar 27, 2019

hi @LantaoJin just checking if you are still working on this. This is a pretty huge improvement on big clusters so we're anxious to get it in. If you're too busy I can finish up the last few things (credit still to you, of course)

@LantaoJin
Copy link
Contributor Author

LantaoJin commented Mar 29, 2019

@squito I could look at this from Apr 1st. Before that, I have to work for my job to meet a deadline.

@LantaoJin
Copy link
Contributor Author

Merged in #24245 , close this.

@LantaoJin LantaoJin closed this Apr 9, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants