-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-13704][CORE][YARN] Reduce rack resolution time #24245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…alizing TaskSetManager
|
This is an update of #23951, to address the last couple review items. All credit still to @LantaoJin. Note that I also removed the skipRackResolution option entirely, rather than moving it to a new conf, because I think this should improve time enough to make that unnecessary. But I haven't tested on a 5k node cluster :) |
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks ok but there's a conflict...
| require(registered, "Must register AM before creating allocator.") | ||
| new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, appAttemptId, securityMgr, | ||
| localResources, new SparkRackResolver()) | ||
| localResources, new SparkRackResolver(conf)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want the shared instance here instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh good point, i do want the shared instance, to use a shared CachedDnsToSwitchMapping. In cluster mode that will make sure we re-use the cache.
| } | ||
| override val defaultRackValue: Option[String] = Some(NetworkTopology.DEFAULT_RACK) | ||
|
|
||
| private[spark] val resolver = new SparkRackResolver(sc.hadoopConfiguration) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the shared instance?
Or, if not using the shared instance, then the SparkRackResolver object can go away.
|
Test build #104096 has finished for PR 24245 at commit
|
|
retest this please |
attilapiros
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have found only some really minor things.
|
|
||
| // By default, rack is unknown | ||
| def getRackForHost(value: String): Option[String] = None | ||
| def getRackForHost(hosts: String): Option[String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: rename hosts to host
|
|
||
| test("SPARK-13704 Rack Resolution is done with a batch of de-duped hosts") { | ||
| val conf = new SparkConf() | ||
| .set(config.LOCALITY_WAIT.key, "0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: .key is not needed that way setting the config will be type safe
| taskSetManagerSpy.handleFailedTask(taskDesc.get.taskId, TaskState.FAILED, e) | ||
|
|
||
| verify(taskSetManagerSpy, times(1)).addPendingTask(anyInt()) | ||
| verify(taskSetManagerSpy, times(1)).addPendingTask(anyInt(), anyBoolean()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can match here for exact arguments:
verify(taskSetManagerSpy, times(1))
.addPendingTask(argEq(0), argEq(false))Assuming the import:
import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyString, eq => argEq}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. while I like your naming more, the standard we've used in spark is to rename it to meq eg. https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala#L22
| .isDefined) | ||
| } | ||
| } | ||
| assert(FakeRackUtil.numBatchInvocation === 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As addPendingTasks() is called during the construction of TaskSetManager instance (and that is the only one place where multiple hosts can be passed to getRacksForHosts()) I did not get why numBatchInvocation === 1 is so important that it is emphasised by this assert.
I assume we are thinking about any potential future code which would use the getRacksForHosts() (meanwhile it is an expensive call), am I right?
| val taskSet = FakeTask.createTaskSet(100, locations: _*) | ||
| val clock = new ManualClock | ||
| // make sure we only do one rack resolution call, for the entire batch of hosts, as this | ||
| // can be expensive. the FakeTaskScheduler calls rack resolution more than the real one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: "expensive. the " => "expensive. The "
|
Test build #104170 has finished for PR 24245 at commit
|
|
Test build #104172 has finished for PR 24245 at commit
|
| /** | ||
| * It will return the static resolver instance. If there is already an instance, the passed | ||
| * conf is entirely ignored. If there is not a shared instance, it will create one with the | ||
| * given conf. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we explain how to instantiate a separate resolver with a separate config here?
Instantiate a separate resolver with a separate config by
new SparkRackResolver(conf)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its kinda obvious, no?
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good aside from some minor comments. The comments could use some grammar updates too, but not a big deal since you didn't write them.
| } | ||
| // Resolve the rack for each host. This can be slow, so de-dupe the list of hosts, | ||
| // and assign the rack to all relevant task indices. | ||
| val racks = sched.getRacksForHosts(pendingTasksForHost.keySet.toSeq) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an implicit assumption here that map.keySet and map.values iterate in the same order. I'm not sure if that's guaranteed, and at the same time I don't see why that wouldn't be the case, but just wanted to point this out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the exact same thought when I reached that line.
Even thought about a possible solutions:
- creating a new val with the value
racks.entrySetand generating the keys and values from this entry set (as in entry set the key and value is bound together the ordering will be fixed; even with one iteration the key and the value can be generated). - Another possible and more elegant solution is calling
racks.asScala.unzip.
Both solutions has some performance cost.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correction for last sentence: the first solution can be done without performance cost but probably the code will be a bit less elegant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great point, I have updated this to use unzip
| taskSetManagerSpy.handleFailedTask(taskDesc.get.taskId, TaskState.FAILED, e) | ||
|
|
||
| verify(taskSetManagerSpy, times(1)).addPendingTask(anyInt()) | ||
| verify(taskSetManagerSpy, times(1)).addPendingTask(meq(0), meq(false)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can actually omit the meq in this case.
| * It will cache the rack for individual hosts to avoid | ||
| * repeatedly performing the same expensive lookup. | ||
| * | ||
| * Its logic refers [[org.apache.hadoop.yarn.util.RackResolver]] and enhanced. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This paragraph actually refers to the code in the class, now, not the object anymore.
|
Test build #104218 has finished for PR 24245 at commit
|
|
retest this please |
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @attilapiros @vanzin . I also updated went through the comments and updated them a bit (I just deleted a few that I thought weren't really adding anything).
|
I have checked the new changes (the last commit) and it looks good to me. So pending tests otherwise LGTM. |
|
Test build #104257 has finished for PR 24245 at commit
|
|
Test build #4683 has started for PR 24245 at commit |
|
retest this please |
|
Test build #4688 has finished for PR 24245 at commit
|
|
Test build #4689 has started for PR 24245 at commit |
|
The pyspark failures are very strange, I'm not sure if its an infra issue or something else. I filed https://issues.apache.org/jira/browse/SPARK-27389 |
|
Looks fine. I still think that |
That's because if |
|
This update PR, LGTM now |
If only you could write code to wrap a String in an Option... |
|
Test build #4693 has finished for PR 24245 at commit
|
xuanyuanking
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to give a +1 for this, seems a great performance optimization on large clusters.
|
merged to master. Thanks @LantaoJin ! |
What changes were proposed in this pull request?
When you submit a stage on a large cluster, rack resolving takes a long time when initializing TaskSetManager because a script is invoked to resolve the rack of each host, one by one.
Based on current implementation, it takes 30~40 seconds to resolve the racks in our 5000 nodes' cluster. After applied the patch, it decreased to less than 15 seconds.
YARN-9332 has added an interface to handle multiple hosts in one invocation to save time. But before upgrading to the newest Hadoop, we could construct the same tool in Spark to resolve this issue.
How was this patch tested?
UT and manually testing on a 5000 node cluster.