-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-26734] [STREAMING] Fix StackOverflowError with large block queue #23716
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { | ||
| val streamIdToBlocks = streamIds.map { streamId => | ||
| (streamId, getReceivedBlockQueue(streamId).clone()) | ||
| (streamId, getReceivedBlockQueue(streamId).clone().dequeueAll(x => true)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This just basically copies the content right? would it be more robust to just construct a new mutable.Seq from the queue to be sure about what the type is? I know it won't be a Queue here but sounds like the representation does kind of matter.
Was https://issues.apache.org/jira/browse/SPARK-23358 / https://issues.apache.org/jira/browse/SPARK-23391 really the cause or did it mostly uncover this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I could do:
mutable.ArrayBuffer.empty[ReceivedBlockInfo] ++= getReceivedBlockQueue(streamId).clone()
which ensures that we know exactly the sequence type being serialized; would you prefer I did that?
https://issues.apache.org/jira/browse/SPARK-23991 caused it in the sense that it moved from using dequeueAll to just serializing a clone of the queue, and the queue just doesn't serialize correctly after some number of entries (a scala bug, IMO). You could say it "uncovered" a scala bug that was sitting there, but prior to that checking the code wouldn't error with large numbers of entries because dequeueAll constructs an ArrayBuffer and that's what was being serialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I was looking at the wrong JIRA, number typo. Yeah I see it. @gaborgsomogyi that makes sense?
How about ArrayBuffer(...:_*)? might be better as it could exactly allocate the size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not a scala but java limitation. Serialization is vulnerable to stack overflow for certain kind of structures; for example, a long linked list with no special writeObject() methods will be serialized by recursively writing each link. If you've got a 100k links, you're going to try to use 100k stack frames, and quite likely fail with a StackOverflowError. The main thing here is to use something which is not linked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would also choose a solution which allocates the buffer in one step and we could avoid amortized O(1) issue.
Additionally it will be a weird construct which can be tried to simplified by later developers. I would add a comment here which describes the issue not to enforce people to analyze this again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think that's the point here indeed, but a Queue apparently has a linked-list-like representation. ArrayBuffer won't. Are you saying that's the right change or no? my point was we're not actually guaranteed what dequeueAll returns.
A comment would be good, sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think .dequeueAll(x => true) works but not the right change and I would propose similar things what you've mentioned, for example ArrayBuffer.... The main point is to allocate the buffer in one step which is not the case with dequeueAll.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I can add a comment. Is
ArrayBuffer(queue.clone(): _*)
sufficient, or do we need
val clonedQueue = getReceivedBlockQueue(streamId).clone()
val bufferToWrite = new mutable.ArrayBuffer[ReceivedBlockInfo](clonedQueue.size)
bufferToWrite ++= clonedQueue
(streamId,
bufferToWrite)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first one looks good to me. It definitely makes an ArrayBuffer! and I can only imagine the ordering that is produced is the order of dequeueing the elements. It was so when I tried it locally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I pushed those changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the efforts! I would write down the root cause of this issue in the description as well. It's not obvious why it solves the problem.
| if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { | ||
| val streamIdToBlocks = streamIds.map { streamId => | ||
| (streamId, getReceivedBlockQueue(streamId).clone()) | ||
| (streamId, getReceivedBlockQueue(streamId).clone().dequeueAll(x => true)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would also choose a solution which allocates the buffer in one step and we could avoid amortized O(1) issue.
Additionally it will be a weird construct which can be tried to simplified by later developers. I would add a comment here which describes the issue not to enforce people to analyze this again.
| val receivedBlockTracker = createTracker(setCheckpointDir = true) | ||
| val blockInfos = generateBlockInfos(100000) | ||
| blockInfos.map(receivedBlockTracker.addBlock) | ||
| receivedBlockTracker.allocateBlocksToBatch(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add assertions here as well. If it's not throwing exception doesn't mean the same blocks deserialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
| } | ||
|
|
||
| test("block addition, and block to batch allocation with many blocks") { | ||
| val receivedBlockTracker = createTracker(setCheckpointDir = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setCheckpointDir is already true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apart from the minor things looks good.
| val streamIdToBlocks = streamIds.map { streamId => | ||
| (streamId, getReceivedBlockQueue(streamId).clone()) | ||
| (streamId, | ||
| mutable.ArrayBuffer(getReceivedBlockQueue(streamId).clone(): _*)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: no line break required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, Sure.
|
|
||
| test("block addition, and block to batch allocation with many blocks") { | ||
| val receivedBlockTracker = createTracker() | ||
| receivedBlockTracker.isWriteAheadLogEnabled should be (true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to close the tracker. I know not all tests does this (which is a bug) but it would be good to make it clean here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
|
Test build #4550 has finished for PR 23716 at commit
|
## What changes were proposed in this pull request? SPARK-23991 introduced a bug in `ReceivedBlockTracker#allocateBlocksToBatch`: when a queue with more than a few thousand blocks are in the queue, serializing the queue throws a StackOverflowError. This change just adds `dequeueAll` to the new `clone` operation on the queue so that the fix in 23991 is preserved but the serialized data comes from an ArrayBuffer which doesn't have the serialization problems that mutable.Queue has. ## How was this patch tested? A unit test was added. Closes #23716 from rlodge/SPARK-26734. Authored-by: Ross Lodge <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 8427e9b) Signed-off-by: Sean Owen <[email protected]>
## What changes were proposed in this pull request? SPARK-23991 introduced a bug in `ReceivedBlockTracker#allocateBlocksToBatch`: when a queue with more than a few thousand blocks are in the queue, serializing the queue throws a StackOverflowError. This change just adds `dequeueAll` to the new `clone` operation on the queue so that the fix in 23991 is preserved but the serialized data comes from an ArrayBuffer which doesn't have the serialization problems that mutable.Queue has. ## How was this patch tested? A unit test was added. Closes #23716 from rlodge/SPARK-26734. Authored-by: Ross Lodge <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 8427e9b) Signed-off-by: Sean Owen <[email protected]>
|
Merged to master/2.4/2.3 |
## What changes were proposed in this pull request? SPARK-23991 introduced a bug in `ReceivedBlockTracker#allocateBlocksToBatch`: when a queue with more than a few thousand blocks are in the queue, serializing the queue throws a StackOverflowError. This change just adds `dequeueAll` to the new `clone` operation on the queue so that the fix in 23991 is preserved but the serialized data comes from an ArrayBuffer which doesn't have the serialization problems that mutable.Queue has. ## How was this patch tested? A unit test was added. Closes apache#23716 from rlodge/SPARK-26734. Authored-by: Ross Lodge <[email protected]> Signed-off-by: Sean Owen <[email protected]>
## What changes were proposed in this pull request? SPARK-23991 introduced a bug in `ReceivedBlockTracker#allocateBlocksToBatch`: when a queue with more than a few thousand blocks are in the queue, serializing the queue throws a StackOverflowError. This change just adds `dequeueAll` to the new `clone` operation on the queue so that the fix in 23991 is preserved but the serialized data comes from an ArrayBuffer which doesn't have the serialization problems that mutable.Queue has. ## How was this patch tested? A unit test was added. Closes apache#23716 from rlodge/SPARK-26734. Authored-by: Ross Lodge <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 8427e9b) Signed-off-by: Sean Owen <[email protected]>
## What changes were proposed in this pull request? SPARK-23991 introduced a bug in `ReceivedBlockTracker#allocateBlocksToBatch`: when a queue with more than a few thousand blocks are in the queue, serializing the queue throws a StackOverflowError. This change just adds `dequeueAll` to the new `clone` operation on the queue so that the fix in 23991 is preserved but the serialized data comes from an ArrayBuffer which doesn't have the serialization problems that mutable.Queue has. ## How was this patch tested? A unit test was added. Closes apache#23716 from rlodge/SPARK-26734. Authored-by: Ross Lodge <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 8427e9b) Signed-off-by: Sean Owen <[email protected]>
## What changes were proposed in this pull request? SPARK-23991 introduced a bug in `ReceivedBlockTracker#allocateBlocksToBatch`: when a queue with more than a few thousand blocks are in the queue, serializing the queue throws a StackOverflowError. This change just adds `dequeueAll` to the new `clone` operation on the queue so that the fix in 23991 is preserved but the serialized data comes from an ArrayBuffer which doesn't have the serialization problems that mutable.Queue has. ## How was this patch tested? A unit test was added. Closes apache#23716 from rlodge/SPARK-26734. Authored-by: Ross Lodge <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 8427e9b) Signed-off-by: Sean Owen <[email protected]>
What changes were proposed in this pull request?
SPARK-23991 introduced a bug in
ReceivedBlockTracker#allocateBlocksToBatch: when a queue with more than a few thousand blocks are in the queue, serializing the queue throws a StackOverflowError. This change just addsdequeueAllto the newcloneoperation on the queue so that the fix in 23991 is preserved but the serialized data comes from an ArrayBuffer which doesn't have the serialization problems that mutable.Queue has.How was this patch tested?
A unit test was added.