-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-24379] BroadcastExchangeExec should catch SparkOutOfMemory and re-throw SparkFatalException, which wraps SparkOutOfMemory inside. #21424
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… re-throw SparkFatalException, which wraps SparkOutOfMemory inside.
|
cc @cloud-fan @JoshRosen |
| case oe: SparkOutOfMemoryError => | ||
| throw new SparkFatalException( | ||
| new OutOfMemoryError(s"Not enough memory to build and broadcast the table to " + | ||
| new SparkOutOfMemoryError(s"Not enough memory to build and broadcast the table to " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we fully control the creation of SparkOutOfMemoryError, can we move the error message to where we throw SparkOutOfMemoryError when building hash relation?
|
Test build #91105 has finished for PR 21424 at commit
|
|
Test build #91204 has finished for PR 21424 at commit
|
|
Test build #91203 has finished for PR 21424 at commit
|
| } | ||
| } catch { | ||
| case oe: SparkOutOfMemoryError => | ||
| throw new SparkOutOfMemoryError(s"If this SparkOutOfMemoryError happens in Spark driver," + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah i see. So the SparkOutOfMemoryError is thrown by BytesToBytesMap, we need to catch and rethrow it to attach the error message anyway.
I also found that we may throw OOM when calling child.executeCollectIterator which calls RDD#collect, seems the previous code is corrected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I change back ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems we don't need to change anything, maybe just add some comments to say where OOM can occur, i.e. RDD#collect and BroadcastMode#transform
My previous understanding is that Spark throw If |
|
That's a good point! According to the document of That said, I think it's fine to catch One thing we can do for future safety: do not change the exception type when enhancing the error message. |
Thanks a lot for deep explanation, I will close this pr and leave it as unchanged :) |
|
I left the JIRA resolved as "Won't fix" given the discussion above. |
What changes were proposed in this pull request?
After #20014, Spark won't fails the entire executor but only fails the task suffering
SparkOutOfMemoryError. After #21342,BroadcastExchangeExectry-catchOutOfMemoryError. Think about below scenario:SparkOutOfMemoryError(subclass ofOutOfMemoryError) is thrown inscala.concurrent.Future;SparkOutOfMemoryErroris caught and anOutOfMemoryErroris wrapped inSparkFatalExceptionand re-thrown;ThreadUtils.awaitResultcatchesSparkFatalExceptionand aOutOfMemoryErroris thrown;OutOfMemoryError will go toSparkUncaughtExceptionHandler.uncaughtExceptionand Executor fails.So it makes more sense to catch
SparkOutOfMemoryand re-throwSparkFatalException, which wrapsSparkOutOfMemoryinside.