-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-27991][CORE] Defer the fetch request on Netty OOM #32287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @mridulm @tgravescs @attilapiros could you take a look? thanks! |
|
+CC @otterc |
common/network-common/src/main/java/org/apache/spark/network/util/NettyOutOfMemoryError.java
Outdated
Show resolved
Hide resolved
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took an initial pass, looks like an interesting issue @Ngone51 !
Thanks for working on it.
common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
common/network-common/src/main/java/org/apache/spark/network/util/NettyOutOfMemoryError.java
Outdated
Show resolved
Hide resolved
|
Test build #137776 has finished for PR 32287 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #137854 has finished for PR 32287 at commit
|
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
Outdated
Show resolved
Hide resolved
|
In the latest update, there're two major changes:
|
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #137997 has finished for PR 32287 at commit
|
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #138062 has finished for PR 32287 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
|
Test build #138066 has finished for PR 32287 at commit
|
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
|
Kubernetes integration test starting |
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #138706 has finished for PR 32287 at commit
|
|
@mridulm I already rebased against that PR :) |
|
thanks, merging to master! |
|
Sorry for missing ping here. +1, late LGTM. |
|
Thank you, everyone!! |
### What changes were proposed in this pull request? This PR proposes a workaround to address the Netty OOM issue (SPARK-24989, SPARK-27991): Basically, `ShuffleBlockFetcherIterator` would catch the `OutOfDirectMemoryError` from Netty and then set a global flag for the shuffle module. Any pending fetch requests would be deferred if there're in-flight requests until the flag is unset. And the flag will be unset when there's a fetch request succeed. Note that catching the Netty OOM rather than abort the application is feasible because Netty manage its own memory region (offheap by default) separately. So Netty OOM doesn't mean the memory shortage of Spark. ### Why are the changes needed? The Netty OOM issue is a very corner case. It usually happens in the large-scale cluster, where a reduce task could fetch shuffle blocks from hundreds of nodes concurrently in a short time. Internally, we found a cluster that has created 260+ clients within 6s before throwing Netty OOM. Although Spark has configurations, e.g., `spark.reducer.maxReqsInFlight` to tune the number of concurrent requests, it's usually not a easy decision for the user to set a reasonable value regarding the workloads, machine resources, etc. But with this fix, Spark would heal the Netty memory issue itself without any specific configurations. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit tests. Closes apache#32287 from Ngone51/SPARK-27991. Authored-by: yi.wu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
|
I'm also having this issue, can I now configure to increase the netty memory? |
What changes were proposed in this pull request?
This PR proposes a workaround to address the Netty OOM issue (SPARK-24989, SPARK-27991):
Basically,
ShuffleBlockFetcherIteratorwould catch theOutOfDirectMemoryErrorfrom Netty and then set a global flag for the shuffle module. Any pending fetch requests would be deferred if there're in-flight requests until the flag is unset. And the flag will be unset when there's a fetch request succeed.Note that catching the Netty OOM rather than abort the application is feasible because Netty manage its own memory region (offheap by default) separately. So Netty OOM doesn't mean the memory shortage of Spark.
Why are the changes needed?
The Netty OOM issue is a very corner case. It usually happens in the large-scale cluster, where a reduce task could fetch shuffle blocks from hundreds of nodes concurrently in a short time. Internally, we found a cluster that has created 260+ clients within 6s before throwing Netty OOM.
Although Spark has configurations, e.g.,
spark.reducer.maxReqsInFlightto tune the number of concurrent requests, it's usually not a easy decision for the user to set a reasonable value regarding the workloads, machine resources, etc. But with this fix, Spark would heal the Netty memory issue itself without any specific configurations.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added unit tests.