-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-19501][YARN] Reduce the number of HDFS RPCs during YARN deployment #16916
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
984d818 to
dac61b5
Compare
|
ok to test |
|
Test build #72828 has finished for PR 16916 at commit
|
…during YARN deployment
|
Test build #72832 has finished for PR 16916 at commit
|
|
Is it possible to merge the stat cache and the symlink cache? It seems both are sort of doing the same thing. |
|
@vanzin |
|
I see. Makes sense. Pinging @tgravescs too, he wrote the original stuff that introduced symlink resolution, maybe he can comment on whether it's really necessary. If it is, then the patch looks good to me, otherwise we could get rid of it. |
|
we should not remove symlink resolution. |
|
Let me know if any further modification is needed before committing this to master. Once it is merged, I will prepare the backports to branch-2.0 and branch-2.1. Thanks. |
|
Merging to master. |
…ment ## What changes were proposed in this pull request? As discussed in [JIRA](https://issues.apache.org/jira/browse/SPARK-19501), this patch addresses the problem where too many HDFS RPCs are made when there are many URIs specified in `spark.yarn.jars`, potentially adding hundreds of RTTs to YARN before the application launches. This becomes significant when submitting the application to a non-local YARN cluster (where the RTT may be in order of 100ms, for example). For each URI specified, the current implementation makes at least two HDFS RPCs, for: - [Calling `getFileStatus()` before uploading each file to the distributed cache in `ClientDistributedCacheManager.addResource()`](https://github.com/apache/spark/blob/v2.1.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala#L71). - [Resolving any symbolic links in each of the file URI](https://github.com/apache/spark/blob/v2.1.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L377-L379), which repeatedly makes HDFS RPCs until the all symlinks are resolved. (see [`FileContext.resolve(Path)`](https://github.com/apache/hadoop/blob/release-2.7.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java#L2189-L2195), [`FSLinkResolver.resolve(FileContext, Path)`](https://github.com/apache/hadoop/blob/release-2.7.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSLinkResolver.java#L79-L112), and [`AbstractFileSystem.resolvePath()`](https://github.com/apache/hadoop/blob/release-2.7.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java#L464-L468).) The first `getFileStatus` RPC can be removed, using `statCache` populated with the file statuses retrieved with [the previous `globStatus` call](https://github.com/apache/spark/blob/v2.1.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L531). The second one can be largely reduced by caching the symlink resolution results in a mutable.HashMap. This patch adds a local variable in `yarn.Client.prepareLocalResources()` and passes it as an additional parameter to `yarn.Client.copyFileToRemote`. [The symlink resolution code was added in 2013](a35472e#diff-b050df3f55b82065803d6e83453b9706R187) and has not changed since. I am assuming that this is still required, but otherwise we can remove using `symlinkCache` and symlink resolution altogether. ## How was this patch tested? This patch is based off 8e8afb3, currently the latest YARN patch on master. All tests except a few in spark-hive passed with `./dev/run-tests` on my machine, using JDK 1.8.0_112 on macOS 10.12.3; also tested myself with this modified version of SPARK 2.2.0-SNAPSHOT which performed a normal deployment and execution on a YARN cluster without errors. Author: Jong Wook Kim <[email protected]> Closes #16916 from jongwook/SPARK-19501. (cherry picked from commit ab9872d) Signed-off-by: Marcelo Vanzin <[email protected]>
…ment ## What changes were proposed in this pull request? As discussed in [JIRA](https://issues.apache.org/jira/browse/SPARK-19501), this patch addresses the problem where too many HDFS RPCs are made when there are many URIs specified in `spark.yarn.jars`, potentially adding hundreds of RTTs to YARN before the application launches. This becomes significant when submitting the application to a non-local YARN cluster (where the RTT may be in order of 100ms, for example). For each URI specified, the current implementation makes at least two HDFS RPCs, for: - [Calling `getFileStatus()` before uploading each file to the distributed cache in `ClientDistributedCacheManager.addResource()`](https://github.com/apache/spark/blob/v2.1.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala#L71). - [Resolving any symbolic links in each of the file URI](https://github.com/apache/spark/blob/v2.1.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L377-L379), which repeatedly makes HDFS RPCs until the all symlinks are resolved. (see [`FileContext.resolve(Path)`](https://github.com/apache/hadoop/blob/release-2.7.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java#L2189-L2195), [`FSLinkResolver.resolve(FileContext, Path)`](https://github.com/apache/hadoop/blob/release-2.7.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSLinkResolver.java#L79-L112), and [`AbstractFileSystem.resolvePath()`](https://github.com/apache/hadoop/blob/release-2.7.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java#L464-L468).) The first `getFileStatus` RPC can be removed, using `statCache` populated with the file statuses retrieved with [the previous `globStatus` call](https://github.com/apache/spark/blob/v2.1.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L531). The second one can be largely reduced by caching the symlink resolution results in a mutable.HashMap. This patch adds a local variable in `yarn.Client.prepareLocalResources()` and passes it as an additional parameter to `yarn.Client.copyFileToRemote`. [The symlink resolution code was added in 2013](a35472e#diff-b050df3f55b82065803d6e83453b9706R187) and has not changed since. I am assuming that this is still required, but otherwise we can remove using `symlinkCache` and symlink resolution altogether. ## How was this patch tested? This patch is based off 8e8afb3, currently the latest YARN patch on master. All tests except a few in spark-hive passed with `./dev/run-tests` on my machine, using JDK 1.8.0_112 on macOS 10.12.3; also tested myself with this modified version of SPARK 2.2.0-SNAPSHOT which performed a normal deployment and execution on a YARN cluster without errors. Author: Jong Wook Kim <[email protected]> Closes #16916 from jongwook/SPARK-19501. (cherry picked from commit ab9872d) Signed-off-by: Marcelo Vanzin <[email protected]>
|
Merged to branch-2.1 and branch-2.0 too (after running yarn tests). |
|
Cool! Thank you :) |
…ment ## What changes were proposed in this pull request? As discussed in [JIRA](https://issues.apache.org/jira/browse/SPARK-19501), this patch addresses the problem where too many HDFS RPCs are made when there are many URIs specified in `spark.yarn.jars`, potentially adding hundreds of RTTs to YARN before the application launches. This becomes significant when submitting the application to a non-local YARN cluster (where the RTT may be in order of 100ms, for example). For each URI specified, the current implementation makes at least two HDFS RPCs, for: - [Calling `getFileStatus()` before uploading each file to the distributed cache in `ClientDistributedCacheManager.addResource()`](https://github.com/apache/spark/blob/v2.1.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala#L71). - [Resolving any symbolic links in each of the file URI](https://github.com/apache/spark/blob/v2.1.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L377-L379), which repeatedly makes HDFS RPCs until the all symlinks are resolved. (see [`FileContext.resolve(Path)`](https://github.com/apache/hadoop/blob/release-2.7.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java#L2189-L2195), [`FSLinkResolver.resolve(FileContext, Path)`](https://github.com/apache/hadoop/blob/release-2.7.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSLinkResolver.java#L79-L112), and [`AbstractFileSystem.resolvePath()`](https://github.com/apache/hadoop/blob/release-2.7.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java#L464-L468).) The first `getFileStatus` RPC can be removed, using `statCache` populated with the file statuses retrieved with [the previous `globStatus` call](https://github.com/apache/spark/blob/v2.1.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L531). The second one can be largely reduced by caching the symlink resolution results in a mutable.HashMap. This patch adds a local variable in `yarn.Client.prepareLocalResources()` and passes it as an additional parameter to `yarn.Client.copyFileToRemote`. [The symlink resolution code was added in 2013](apache@a35472e#diff-b050df3f55b82065803d6e83453b9706R187) and has not changed since. I am assuming that this is still required, but otherwise we can remove using `symlinkCache` and symlink resolution altogether. ## How was this patch tested? This patch is based off 8e8afb3, currently the latest YARN patch on master. All tests except a few in spark-hive passed with `./dev/run-tests` on my machine, using JDK 1.8.0_112 on macOS 10.12.3; also tested myself with this modified version of SPARK 2.2.0-SNAPSHOT which performed a normal deployment and execution on a YARN cluster without errors. Author: Jong Wook Kim <[email protected]> Closes apache#16916 from jongwook/SPARK-19501.
What changes were proposed in this pull request?
As discussed in JIRA, this patch addresses the problem where too many HDFS RPCs are made when there are many URIs specified in
spark.yarn.jars, potentially adding hundreds of RTTs to YARN before the application launches. This becomes significant when submitting the application to a non-local YARN cluster (where the RTT may be in order of 100ms, for example). For each URI specified, the current implementation makes at least two HDFS RPCs, for:getFileStatus()before uploading each file to the distributed cache inClientDistributedCacheManager.addResource().FileContext.resolve(Path),FSLinkResolver.resolve(FileContext, Path), andAbstractFileSystem.resolvePath().)The first
getFileStatusRPC can be removed, usingstatCachepopulated with the file statuses retrieved with the previousglobStatuscall.The second one can be largely reduced by caching the symlink resolution results in a mutable.HashMap. This patch adds a local variable in
yarn.Client.prepareLocalResources()and passes it as an additional parameter toyarn.Client.copyFileToRemote. The symlink resolution code was added in 2013 and has not changed since. I am assuming that this is still required, but otherwise we can remove usingsymlinkCacheand symlink resolution altogether.How was this patch tested?
This patch is based off 8e8afb3, currently the latest YARN patch on master. All tests except a few in spark-hive passed with
./dev/run-testson my machine, using JDK 1.8.0_112 on macOS 10.12.3; also tested myself with this modified version of SPARK 2.2.0-SNAPSHOT which performed a normal deployment and execution on a YARN cluster without errors.