Skip to content

Commit 8427e9b

Browse files
rlodgesrowen
authored andcommitted
[SPARK-26734][STREAMING] Fix StackOverflowError with large block queue
## What changes were proposed in this pull request? SPARK-23991 introduced a bug in `ReceivedBlockTracker#allocateBlocksToBatch`: when a queue with more than a few thousand blocks are in the queue, serializing the queue throws a StackOverflowError. This change just adds `dequeueAll` to the new `clone` operation on the queue so that the fix in 23991 is preserved but the serialized data comes from an ArrayBuffer which doesn't have the serialization problems that mutable.Queue has. ## How was this patch tested? A unit test was added. Closes apache#23716 from rlodge/SPARK-26734. Authored-by: Ross Lodge <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 63bced9 commit 8427e9b

File tree

2 files changed

+28
-3
lines changed

2 files changed

+28
-3
lines changed

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,12 @@ private[streaming] class ReceivedBlockTracker(
111111
*/
112112
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
113113
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
114+
// We explicitly create an ArrayBuffer here because at least as of Scala 2.11 and 2.12
115+
// a mutable.Queue fails serialization with a StackOverflow error if it has more than
116+
// a few thousand elements. So we explicitly allocate a collection for serialization which
117+
// we know doesn't have this issue. (See SPARK-26734).
114118
val streamIdToBlocks = streamIds.map { streamId =>
115-
(streamId, getReceivedBlockQueue(streamId).clone())
119+
(streamId, mutable.ArrayBuffer(getReceivedBlockQueue(streamId).clone(): _*))
116120
}.toMap
117121
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
118122
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {

streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,27 @@ class ReceivedBlockTrackerSuite
9696
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
9797
}
9898

99+
test("block addition, and block to batch allocation with many blocks") {
100+
val receivedBlockTracker = createTracker()
101+
receivedBlockTracker.isWriteAheadLogEnabled should be (true)
102+
103+
val blockInfos = generateBlockInfos(100000)
104+
blockInfos.map(receivedBlockTracker.addBlock)
105+
receivedBlockTracker.allocateBlocksToBatch(1)
106+
107+
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
108+
receivedBlockTracker.hasUnallocatedReceivedBlocks should be (false)
109+
receivedBlockTracker.getBlocksOfBatch(1) shouldEqual Map(streamId -> blockInfos)
110+
receivedBlockTracker.getBlocksOfBatchAndStream(1, streamId) shouldEqual blockInfos
111+
112+
val expectedWrittenData1 = blockInfos.map(BlockAdditionEvent) :+
113+
BatchAllocationEvent(1, AllocatedBlocks(Map(streamId -> blockInfos)))
114+
getWrittenLogData() shouldEqual expectedWrittenData1
115+
getWriteAheadLogFiles() should have size 1
116+
117+
receivedBlockTracker.stop()
118+
}
119+
99120
test("recovery with write ahead logs should remove only allocated blocks from received queue") {
100121
val manualClock = new ManualClock
101122
val batchTime = manualClock.getTimeMillis()
@@ -362,8 +383,8 @@ class ReceivedBlockTrackerSuite
362383
}
363384

364385
/** Generate blocks infos using random ids */
365-
def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
366-
List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None,
386+
def generateBlockInfos(blockCount: Int = 5): Seq[ReceivedBlockInfo] = {
387+
List.fill(blockCount)(ReceivedBlockInfo(streamId, Some(0L), None,
367388
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L))))
368389
}
369390

0 commit comments

Comments
 (0)