-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-3453] Netty-based BlockTransferService, extracted from Spark core #2753
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Also includes some partial support for uploading blocks.
…lockFetcherIteratorSuite
- use same pool for boss and worker - remove ioratio - disable caching of byte buf allocator - childoption sendbuf/receivebuf - fire exception through pipeline In addition: - fire failure handler BlockFetchingListener at least once per block. - enabled a bunch of ignored tests
|
QA tests have started for PR 2753 at commit
|
|
QA tests have finished for PR 2753 at commit
|
|
Test FAILed. |
|
Test FAILed. |
|
Jenkins, retest this please. |
|
Test build #22421 has started for PR 2753 at commit
|
|
Test build #22421 has finished for PR 2753 at commit
|
|
Test PASSed. |
|
That is a pass with netty turned on. Now I am turning it off for preparation to merge. |
|
Test build #22427 has started for PR 2753 at commit
|
|
Test build #22427 has finished for PR 2753 at commit
|
|
Test FAILed. |
|
Jenkins, retest this please. |
|
Test build #22437 has started for PR 2753 at commit
|
|
Test build #22437 has finished for PR 2753 at commit
|
|
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - it might be slightly more clear to write it this way
if (cachedClient != null) {
if (cachedClient.isActive) {
return cachedClient;
} else {
connectionPool.remove(address, cachedClient); // Remove inactive clients.
}
}You can do it in the next PR.
|
Merging this in master. Thanks! |
This patch introduces the tooling necessary to construct an external shuffle service which is independent of Spark executors, and then use this service inside Spark. An example (just for the sake of this PR) of the service creation can be found in Worker, and the service itself is used by plugging in the StandaloneShuffleClient as Spark's ShuffleClient (setup in BlockManager). This PR continues the work from apache#2753, which extracted out the transport layer of Spark's block transfer into an independent package within Spark. A new package was created which contains the Spark business logic necessary to retrieve the actual shuffle data, which is completely independent of the transport layer introduced in the previous patch. Similar to the transport layer, this package must not depend on Spark as we anticipate plugging this service as a lightweight process within, say, the YARN ApplicationManager, and do not wish to include Spark's dependencies (including Scala itself). There are several outstanding tasks which must be complete before this PR can be merged: - [ ] Complete unit testing of network/shuffle package. - [ ] Performance and correctness testing on a real cluster. - [ ] Documentation of the feature in the Spark docs. - [ ] Remove example service instantiation from Worker.scala. There are even more shortcomings of this PR which should be addressed in followup patche: - Don't use Java serializer for RPC layer! It is not cross-version compatible. - Handle shuffle file cleanup for dead executors once the application terminates or the ContextCleaner triggers. - Integrate unit testing with Spark's tests (currently only runnable via maven). - Improve behavior if the shuffle service itself goes down (right now we don't blacklist it, and new executors cannot spawn on that machine). - SSL and SASL integration - Nice to have: Handle shuffle file consolidation (this would requires changes to Spark's implementation).
This patch introduces the tooling necessary to construct an external shuffle service which is independent of Spark executors, and then use this service inside Spark. An example (just for the sake of this PR) of the service creation can be found in Worker, and the service itself is used by plugging in the StandaloneShuffleClient as Spark's ShuffleClient (setup in BlockManager). This PR continues the work from #2753, which extracted out the transport layer of Spark's block transfer into an independent package within Spark. A new package was created which contains the Spark business logic necessary to retrieve the actual shuffle data, which is completely independent of the transport layer introduced in the previous patch. Similar to the transport layer, this package must not depend on Spark as we anticipate plugging this service as a lightweight process within, say, the YARN NodeManager, and do not wish to include Spark's dependencies (including Scala itself). There are several outstanding tasks which must be complete before this PR can be merged: - [x] Complete unit testing of network/shuffle package. - [x] Performance and correctness testing on a real cluster. - [x] Remove example service instantiation from Worker.scala. There are even more shortcomings of this PR which should be addressed in followup patches: - Don't use Java serializer for RPC layer! It is not cross-version compatible. - Handle shuffle file cleanup for dead executors once the application terminates or the ContextCleaner triggers. - Documentation of the feature in the Spark docs. - Improve behavior if the shuffle service itself goes down (right now we don't blacklist it, and new executors cannot spawn on that machine). - SSL and SASL integration - Nice to have: Handle shuffle file consolidation (this would requires changes to Spark's implementation). Author: Aaron Davidson <[email protected]> Closes #3001 from aarondav/shuffle-service and squashes the following commits: 4d1f8c1 [Aaron Davidson] Remove changes to Worker 705748f [Aaron Davidson] Rename Standalone* to External* fd3928b [Aaron Davidson] Do not unregister executor outputs unduly 9883918 [Aaron Davidson] Make suggested build changes 3d62679 [Aaron Davidson] Add Spark integration test 7fe51d5 [Aaron Davidson] Fix SBT integration 56caa50 [Aaron Davidson] Address comments c8d1ac3 [Aaron Davidson] Add unit tests 2f70c0c [Aaron Davidson] Fix unit tests 5483e96 [Aaron Davidson] Fix unit tests 46a70bf [Aaron Davidson] Whoops, bracket 5ea4df6 [Aaron Davidson] [SPARK-3796] Create external service which can serve shuffle files
This commit removes the netty shuffle implementation (the default shuffle implementation uses Java's NIO library). According to Patrick Wendell, this never worked well anyway (there were a bunch of corner cases it didn't support, and in the more recent Spark branch this code was completely rewritten: apache#2753), and having multiple implementations of the network layer made it much more difficult to change some of the code.
This commit removes the netty shuffle implementation (the default shuffle implementation uses Java's NIO library). According to Patrick Wendell, this never worked well anyway (there were a bunch of corner cases it didn't support, and in the more recent Spark branch this code was completely rewritten: apache/spark#2753), and having multiple implementations of the network layer made it much more difficult to change some of the code.
This PR encapsulates #2330, which is itself a continuation of #2240. The first goal of this PR is to provide an alternate, simpler implementation of the ConnectionManager which is based on Netty.
In addition to this goal, however, we want to resolve SPARK-3796, which calls for a standalone shuffle service which can be integrated into the YARN NodeManager, Standalone Worker, or on its own. This PR makes the first step in this direction by ensuring that the actual Netty service is as small as possible and extracted from Spark core. Given this, we should be able to construct this standalone jar which can be included in other JVMs without incurring significant dependency or runtime issues. The actual work to ensure that such a standalone shuffle service would work in Spark will be left for a future PR, however.
In order to minimize dependencies and allow for the service to be long-running (possibly much longer-running than Spark, and possibly having to support multiple version of Spark simultaneously), the entire service has been ported to Java, where we have full control over the binary compatibility of the components and do not depend on the Scala runtime or version.
These issues: have been addressed by folding in #2330:
SPARK-3453: Refactor Netty module to use BlockTransferService interface
SPARK-3018: Release all buffers upon task completion/failure
SPARK-3002: Create a connection pool and reuse clients across different threads
SPARK-3017: Integration tests and unit tests for connection failures
SPARK-3049: Make sure client doesn't block when server/connection has error(s)
SPARK-3502: SO_RCVBUF and SO_SNDBUF should be bootstrap childOption, not option
SPARK-3503: Disable thread local cache in PooledByteBufAllocator
TODO before mergeable: