-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-34939][CORE] Throw fetch failure exception when unable to deserialize broadcasted map statuses #32033
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| * explicitly destroyed later on when the ShuffleMapStage is garbage-collected. | ||
| */ | ||
| private[this] var cachedSerializedBroadcast: Broadcast[Array[Byte]] = _ | ||
| private[spark] var cachedSerializedBroadcast: Broadcast[Array[Byte]] = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expose this for test.
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Test build #136839 has finished for PR 32033 at commit
|
| try { | ||
| fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes, conf) | ||
| } catch { | ||
| case e: SparkException => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The failure could be DIRECT, how can you ensure it's only catching exception from broadcast?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, never mind. I saw the code in the following.
| val bcast = deserializeObject(bytes, 1, bytes.length - 1). | ||
| asInstanceOf[Broadcast[Array[Byte]]] | ||
| logInfo("Broadcast mapstatuses size = " + bytes.length + | ||
| ", actual size = " + bcast.value.length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should move line 964 to 967 out of the try block like in DIRECT case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for the need of writing the test case. In the test case, if we call getStatuses, the mapoutput tracker worker will ask tracker master for new broadcasted value. So we cannot test the situation we need.
|
This seems to be the same issue with #27604. cc @liupc @cloud-fan |
| message: String) | ||
| extends FetchFailedException(null, shuffleId, -1L, -1, reduceId, message) | ||
| message: String, | ||
| cause: Throwable = null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see cause is used anywhere. Shall we covert the cause to stack string and append to the message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I just want to keep original stack trace.
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #136863 has finished for PR 32033 at commit
|
| deserializeObject(bcast.value, 1, bcast.value.length - 1).asInstanceOf[Array[MapStatus]] | ||
| } catch { | ||
| case e: IOException => | ||
| logError("Exception encountered during deserializing broadcasted map statuses: ", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is recoverable, maybe shall we lower the level to Warn?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, makes sense.
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #136881 has finished for PR 32033 at commit
|
|
Thanks @dongjoon-hyun |
… deserialize broadcasted map statuses ### What changes were proposed in this pull request? This patch catches `IOException`, which is possibly thrown due to unable to deserialize map statuses (e.g., broadcasted value is destroyed), when deserilizing map statuses. Once `IOException` is caught, `MetadataFetchFailedException` is thrown to let Spark handle it. This is a backport of #32033 to branch-2.4. ### Why are the changes needed? One customer encountered application error. From the log, it is caused by accessing non-existing broadcasted value. The broadcasted value is map statuses. E.g., ``` [info] Cause: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0 [info] at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1410) [info] at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:226) [info] at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:103) [info] at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) [info] at org.apache.spark.MapOutputTracker$.$anonfun$deserializeMapStatuses$3(MapOutputTracker.scala:967) [info] at org.apache.spark.internal.Logging.logInfo(Logging.scala:57) [info] at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56) [info] at org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:887) [info] at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:967) ``` There is a race-condition. After map statuses are broadcasted and the executors obtain serialized broadcasted map statuses. If any fetch failure happens after, Spark scheduler invalidates cached map statuses and destroy broadcasted value of the map statuses. Then any executor trying to deserialize serialized broadcasted map statuses and access broadcasted value, `IOException` will be thrown. Currently we don't catch it in `MapOutputTrackerWorker` and above exception will fail the application. Normally we should throw a fetch failure exception for such case. Spark scheduler will handle this. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test. Closes #32045 from viirya/fix-broadcast. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…rialize broadcasted map statuses ### What changes were proposed in this pull request? This patch catches `IOException`, which is possibly thrown due to unable to deserialize map statuses (e.g., broadcasted value is destroyed), when deserilizing map statuses. Once `IOException` is caught, `MetadataFetchFailedException` is thrown to let Spark handle it. ### Why are the changes needed? One customer encountered application error. From the log, it is caused by accessing non-existing broadcasted value. The broadcasted value is map statuses. E.g., ``` [info] Cause: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0 [info] at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1410) [info] at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:226) [info] at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:103) [info] at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) [info] at org.apache.spark.MapOutputTracker$.$anonfun$deserializeMapStatuses$3(MapOutputTracker.scala:967) [info] at org.apache.spark.internal.Logging.logInfo(Logging.scala:57) [info] at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56) [info] at org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:887) [info] at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:967) ``` There is a race-condition. After map statuses are broadcasted and the executors obtain serialized broadcasted map statuses. If any fetch failure happens after, Spark scheduler invalidates cached map statuses and destroy broadcasted value of the map statuses. Then any executor trying to deserialize serialized broadcasted map statuses and access broadcasted value, `IOException` will be thrown. Currently we don't catch it in `MapOutputTrackerWorker` and above exception will fail the application. Normally we should throw a fetch failure exception for such case. Spark scheduler will handle this. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test. Closes #32033 from viirya/fix-broadcast-master. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 571acc8) Signed-off-by: Dongjoon Hyun <[email protected]>
…rialize broadcasted map statuses ### What changes were proposed in this pull request? This patch catches `IOException`, which is possibly thrown due to unable to deserialize map statuses (e.g., broadcasted value is destroyed), when deserilizing map statuses. Once `IOException` is caught, `MetadataFetchFailedException` is thrown to let Spark handle it. ### Why are the changes needed? One customer encountered application error. From the log, it is caused by accessing non-existing broadcasted value. The broadcasted value is map statuses. E.g., ``` [info] Cause: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0 [info] at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1410) [info] at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:226) [info] at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:103) [info] at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) [info] at org.apache.spark.MapOutputTracker$.$anonfun$deserializeMapStatuses$3(MapOutputTracker.scala:967) [info] at org.apache.spark.internal.Logging.logInfo(Logging.scala:57) [info] at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56) [info] at org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:887) [info] at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:967) ``` There is a race-condition. After map statuses are broadcasted and the executors obtain serialized broadcasted map statuses. If any fetch failure happens after, Spark scheduler invalidates cached map statuses and destroy broadcasted value of the map statuses. Then any executor trying to deserialize serialized broadcasted map statuses and access broadcasted value, `IOException` will be thrown. Currently we don't catch it in `MapOutputTrackerWorker` and above exception will fail the application. Normally we should throw a fetch failure exception for such case. Spark scheduler will handle this. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test. Closes #32033 from viirya/fix-broadcast-master. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 571acc8) Signed-off-by: Dongjoon Hyun <[email protected]>
|
Merged to master/3.1/3.0. |
|
Thank all! |
| } catch { | ||
| case e: IOException => | ||
| logWarning("Exception encountered during deserializing broadcasted map statuses: ", e) | ||
| throw new SparkException("Unable to deserialize broadcasted map statuses", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we throw MetadataFetchFailedException directly here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throw MetadataFetchFailedException here and catch it and rethrow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, recall why I did this. To construct MetadataFetchFailedException needs shuffleId.
I choose to not throw MetadataFetchFailedException as deserializeMapStatuses doesn't have shuffleId and doesn't need it at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I see, thanks for the explanation!
|
Got to this a bit late, looks good to me. Nice catch @viirya ! |
|
Thanks @cloud-fan @mridulm |
|
Late LGTM. |
…rialize broadcasted map statuses ### What changes were proposed in this pull request? This patch catches `IOException`, which is possibly thrown due to unable to deserialize map statuses (e.g., broadcasted value is destroyed), when deserilizing map statuses. Once `IOException` is caught, `MetadataFetchFailedException` is thrown to let Spark handle it. ### Why are the changes needed? One customer encountered application error. From the log, it is caused by accessing non-existing broadcasted value. The broadcasted value is map statuses. E.g., ``` [info] Cause: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0 [info] at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1410) [info] at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:226) [info] at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:103) [info] at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) [info] at org.apache.spark.MapOutputTracker$.$anonfun$deserializeMapStatuses$3(MapOutputTracker.scala:967) [info] at org.apache.spark.internal.Logging.logInfo(Logging.scala:57) [info] at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56) [info] at org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:887) [info] at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:967) ``` There is a race-condition. After map statuses are broadcasted and the executors obtain serialized broadcasted map statuses. If any fetch failure happens after, Spark scheduler invalidates cached map statuses and destroy broadcasted value of the map statuses. Then any executor trying to deserialize serialized broadcasted map statuses and access broadcasted value, `IOException` will be thrown. Currently we don't catch it in `MapOutputTrackerWorker` and above exception will fail the application. Normally we should throw a fetch failure exception for such case. Spark scheduler will handle this. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test. Closes apache#32033 from viirya/fix-broadcast-master. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 571acc8) Signed-off-by: Dongjoon Hyun <[email protected]>
…rialize broadcasted map statuses ### What changes were proposed in this pull request? This patch catches `IOException`, which is possibly thrown due to unable to deserialize map statuses (e.g., broadcasted value is destroyed), when deserilizing map statuses. Once `IOException` is caught, `MetadataFetchFailedException` is thrown to let Spark handle it. ### Why are the changes needed? One customer encountered application error. From the log, it is caused by accessing non-existing broadcasted value. The broadcasted value is map statuses. E.g., ``` [info] Cause: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0 [info] at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1410) [info] at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:226) [info] at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:103) [info] at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) [info] at org.apache.spark.MapOutputTracker$.$anonfun$deserializeMapStatuses$3(MapOutputTracker.scala:967) [info] at org.apache.spark.internal.Logging.logInfo(Logging.scala:57) [info] at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56) [info] at org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:887) [info] at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:967) ``` There is a race-condition. After map statuses are broadcasted and the executors obtain serialized broadcasted map statuses. If any fetch failure happens after, Spark scheduler invalidates cached map statuses and destroy broadcasted value of the map statuses. Then any executor trying to deserialize serialized broadcasted map statuses and access broadcasted value, `IOException` will be thrown. Currently we don't catch it in `MapOutputTrackerWorker` and above exception will fail the application. Normally we should throw a fetch failure exception for such case. Spark scheduler will handle this. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test. Closes apache#32033 from viirya/fix-broadcast-master. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 571acc8) Signed-off-by: Dongjoon Hyun <[email protected]>
… deserialize broadcasted map statuses This patch catches `IOException`, which is possibly thrown due to unable to deserialize map statuses (e.g., broadcasted value is destroyed), when deserilizing map statuses. Once `IOException` is caught, `MetadataFetchFailedException` is thrown to let Spark handle it. This is a backport of apache#32033 to branch-2.4. One customer encountered application error. From the log, it is caused by accessing non-existing broadcasted value. The broadcasted value is map statuses. E.g., ``` [info] Cause: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0 [info] at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1410) [info] at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:226) [info] at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:103) [info] at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) [info] at org.apache.spark.MapOutputTracker$.$anonfun$deserializeMapStatuses$3(MapOutputTracker.scala:967) [info] at org.apache.spark.internal.Logging.logInfo(Logging.scala:57) [info] at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56) [info] at org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:887) [info] at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:967) ``` There is a race-condition. After map statuses are broadcasted and the executors obtain serialized broadcasted map statuses. If any fetch failure happens after, Spark scheduler invalidates cached map statuses and destroy broadcasted value of the map statuses. Then any executor trying to deserialize serialized broadcasted map statuses and access broadcasted value, `IOException` will be thrown. Currently we don't catch it in `MapOutputTrackerWorker` and above exception will fail the application. Normally we should throw a fetch failure exception for such case. Spark scheduler will handle this. No Unit test. Closes apache#32045 from viirya/fix-broadcast. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 30436b5) RB=2855790 BUG=LIHADOOP-61824 G=spark-reviewers R=yezhou,mmuralid,vsowrira A=mmuralid,vsowrira
What changes were proposed in this pull request?
This patch catches
IOException, which is possibly thrown due to unable to deserialize map statuses (e.g., broadcasted value is destroyed), when deserilizing map statuses. OnceIOExceptionis caught,MetadataFetchFailedExceptionis thrown to let Spark handle it.Why are the changes needed?
One customer encountered application error. From the log, it is caused by accessing non-existing broadcasted value. The broadcasted value is map statuses. E.g.,
There is a race-condition. After map statuses are broadcasted and the executors obtain serialized broadcasted map statuses. If any fetch failure happens after, Spark scheduler invalidates cached map statuses and destroy broadcasted value of the map statuses. Then any executor trying to deserialize serialized broadcasted map statuses and access broadcasted value,
IOExceptionwill be thrown. Currently we don't catch it inMapOutputTrackerWorkerand above exception will fail the application.Normally we should throw a fetch failure exception for such case. Spark scheduler will handle this.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit test.