Skip to content

Commit 1f180cd

Browse files
gaborgsomogyiMarcelo Vanzin
authored andcommitted
[SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes
There is a race condition introduced in SPARK-11141 which could cause data loss. The problem is that ReceivedBlockTracker.insertAllocatedBatch function assumes that all blocks from streamIdToUnallocatedBlockQueues allocated to the batch and clears the queue. In this PR only the allocated blocks will be removed from the queue which will prevent data loss. Additional unit test + manually. Author: Gabor Somogyi <[email protected]> Closes #20620 from gaborgsomogyi/SPARK-23438. (cherry picked from commit b308182) Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent 578607b commit 1f180cd

File tree

2 files changed

+29
-5
lines changed

2 files changed

+29
-5
lines changed

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,12 +193,15 @@ private[streaming] class ReceivedBlockTracker(
193193
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
194194
}
195195

196-
// Insert the recovered block-to-batch allocations and clear the queue of received blocks
197-
// (when the blocks were originally allocated to the batch, the queue must have been cleared).
196+
// Insert the recovered block-to-batch allocations and removes them from queue of
197+
// received blocks.
198198
def insertAllocatedBatch(batchTime: Time, allocatedBlocks: AllocatedBlocks) {
199199
logTrace(s"Recovery: Inserting allocated batch for time $batchTime to " +
200200
s"${allocatedBlocks.streamIdToAllocatedBlocks}")
201-
streamIdToUnallocatedBlockQueues.values.foreach { _.clear() }
201+
allocatedBlocks.streamIdToAllocatedBlocks.foreach {
202+
case (streamId, allocatedBlocksInStream) =>
203+
getReceivedBlockQueue(streamId).dequeueAll(allocatedBlocksInStream.toSet)
204+
}
202205
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
203206
lastAllocatedBatchTime = batchTime
204207
}
@@ -227,7 +230,7 @@ private[streaming] class ReceivedBlockTracker(
227230
}
228231

229232
/** Write an update to the tracker to the write ahead log */
230-
private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
233+
private[streaming] def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
231234
if (isWriteAheadLogEnabled) {
232235
logTrace(s"Writing record: $record")
233236
try {

streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
3333
import org.apache.spark.internal.Logging
3434
import org.apache.spark.storage.StreamBlockId
3535
import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
36-
import org.apache.spark.streaming.scheduler._
36+
import org.apache.spark.streaming.scheduler.{AllocatedBlocks, _}
3737
import org.apache.spark.streaming.util._
3838
import org.apache.spark.streaming.util.WriteAheadLogSuite._
3939
import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
@@ -94,6 +94,27 @@ class ReceivedBlockTrackerSuite
9494
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
9595
}
9696

97+
test("recovery with write ahead logs should remove only allocated blocks from received queue") {
98+
val manualClock = new ManualClock
99+
val batchTime = manualClock.getTimeMillis()
100+
101+
val tracker1 = createTracker(clock = manualClock)
102+
tracker1.isWriteAheadLogEnabled should be (true)
103+
104+
val allocatedBlockInfos = generateBlockInfos()
105+
val unallocatedBlockInfos = generateBlockInfos()
106+
val receivedBlockInfos = allocatedBlockInfos ++ unallocatedBlockInfos
107+
receivedBlockInfos.foreach { b => tracker1.writeToLog(BlockAdditionEvent(b)) }
108+
val allocatedBlocks = AllocatedBlocks(Map(streamId -> allocatedBlockInfos))
109+
tracker1.writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))
110+
tracker1.stop()
111+
112+
val tracker2 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
113+
tracker2.getBlocksOfBatch(batchTime) shouldEqual allocatedBlocks.streamIdToAllocatedBlocks
114+
tracker2.getUnallocatedBlocks(streamId) shouldEqual unallocatedBlockInfos
115+
tracker2.stop()
116+
}
117+
97118
test("recovery and cleanup with write ahead logs") {
98119
val manualClock = new ManualClock
99120
// Set the time increment level to twice the rotation interval so that every increment creates

0 commit comments

Comments
 (0)