Skip to content

Commit e6b12e6

Browse files
committed
[SPARK-24156][SS] Fix error recovering from the failure in a no-data batch
**This PR is not for merging, only for looking at the change. Consider only the changes to the file MicroBatchExecution.scala.** The error occurs when we are recovering from a failure in a no-data batch (say X) that has been planned (i.e. written to offset log) but not executed (i.e. not written to commit log). Upon recovery the following sequence of events happen. 1. `MicroBatchExecution.populateStartOffsets` sets `currentBatchId` to X. Since there was no data in the batch, the `availableOffsets` is same as `committedOffsets`, so `isNewDataAvailable` is `false`. 2. When `MicroBatchExecution.constructNextBatch` is called, ideally it should immediately return true because the next batch has already been constructed. However, the check of whether the batch has been constructed was `if (isNewDataAvailable) return true`. Since the planned batch is a no-data batch, it escaped this check and proceeded to plan the same batch X *once again*. The correct solution is to check the offset log whether the currentBatchId is the latest or not. This is the fix below. TODO Author: Tathagata Das <[email protected]> Closes #2567 from tdas/SC-11085.
1 parent 0be5aa2 commit e6b12e6

File tree

3 files changed

+99
-13
lines changed

3 files changed

+99
-13
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,12 @@ class MicroBatchExecution(
126126
_logicalPlan
127127
}
128128

129+
/**
130+
* Signifies whether current batch (i.e. for the batch `currentBatchId`) has been constructed
131+
* (i.e. written to the offsetLog) and is ready for execution.
132+
*/
133+
private var isCurrentBatchConstructed = false
134+
129135
/**
130136
* Signals to the thread executing micro-batches that it should stop running after the next
131137
* batch. This method blocks until the thread stops running.
@@ -154,7 +160,6 @@ class MicroBatchExecution(
154160

155161
triggerExecutor.execute(() => {
156162
if (isActive) {
157-
var currentBatchIsRunnable = false // Whether the current batch is runnable / has been run
158163
var currentBatchHasNewData = false // Whether the current batch had new data
159164

160165
startTrigger()
@@ -175,15 +180,17 @@ class MicroBatchExecution(
175180
// new data to process as `constructNextBatch` may decide to run a batch for
176181
// state cleanup, etc. `isNewDataAvailable` will be updated to reflect whether new data
177182
// is available or not.
178-
currentBatchIsRunnable = constructNextBatch(noDataBatchesEnabled)
183+
if (!isCurrentBatchConstructed) {
184+
isCurrentBatchConstructed = constructNextBatch(noDataBatchesEnabled)
185+
}
179186

180187
// Remember whether the current batch has data or not. This will be required later
181188
// for bookkeeping after running the batch, when `isNewDataAvailable` will have changed
182189
// to false as the batch would have already processed the available data.
183190
currentBatchHasNewData = isNewDataAvailable
184191

185192
currentStatus = currentStatus.copy(isDataAvailable = isNewDataAvailable)
186-
if (currentBatchIsRunnable) {
193+
if (isCurrentBatchConstructed) {
187194
if (currentBatchHasNewData) updateStatusMessage("Processing new data")
188195
else updateStatusMessage("No new data but cleaning up state")
189196
runBatch(sparkSessionForStream)
@@ -194,9 +201,12 @@ class MicroBatchExecution(
194201

195202
finishTrigger(currentBatchHasNewData) // Must be outside reportTimeTaken so it is recorded
196203

197-
// If the current batch has been executed, then increment the batch id, else there was
198-
// no data to execute the batch
199-
if (currentBatchIsRunnable) currentBatchId += 1 else Thread.sleep(pollingDelayMs)
204+
// If the current batch has been executed, then increment the batch id and reset flag.
205+
// Otherwise, there was no data to execute the batch and sleep for some time
206+
if (isCurrentBatchConstructed) {
207+
currentBatchId += 1
208+
isCurrentBatchConstructed = false
209+
} else Thread.sleep(pollingDelayMs)
200210
}
201211
updateStatusMessage("Waiting for next trigger")
202212
isActive
@@ -231,6 +241,7 @@ class MicroBatchExecution(
231241
/* First assume that we are re-executing the latest known batch
232242
* in the offset log */
233243
currentBatchId = latestBatchId
244+
isCurrentBatchConstructed = true
234245
availableOffsets = nextOffsets.toStreamProgress(sources)
235246
/* Initialize committed offsets to a committed batch, which at this
236247
* is the second latest batch id in the offset log. */
@@ -269,6 +280,7 @@ class MicroBatchExecution(
269280
// here, so we do nothing here.
270281
}
271282
currentBatchId = latestCommittedBatchId + 1
283+
isCurrentBatchConstructed = false
272284
committedOffsets ++= availableOffsets
273285
// Construct a new batch be recomputing availableOffsets
274286
} else if (latestCommittedBatchId < latestBatchId - 1) {
@@ -313,11 +325,8 @@ class MicroBatchExecution(
313325
* - If either of the above is true, then construct the next batch by committing to the offset
314326
* log that range of offsets that the next batch will process.
315327
*/
316-
private def constructNextBatch(noDataBatchesEnables: Boolean): Boolean = withProgressLocked {
317-
// If new data is already available that means this method has already been called before
318-
// and it must have already committed the offset range of next batch to the offset log.
319-
// Hence do nothing, just return true.
320-
if (isNewDataAvailable) return true
328+
private def constructNextBatch(noDataBatchesEnabled: Boolean): Boolean = withProgressLocked {
329+
if (isCurrentBatchConstructed) return true
321330

322331
// Generate a map from each unique source to the next available offset.
323332
val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map {
@@ -348,9 +357,14 @@ class MicroBatchExecution(
348357
batchTimestampMs = triggerClock.getTimeMillis())
349358

350359
// Check whether next batch should be constructed
351-
val lastExecutionRequiresAnotherBatch = noDataBatchesEnables &&
360+
val lastExecutionRequiresAnotherBatch = noDataBatchesEnabled &&
352361
Option(lastExecution).exists(_.shouldRunAnotherBatch(offsetSeqMetadata))
353362
val shouldConstructNextBatch = isNewDataAvailable || lastExecutionRequiresAnotherBatch
363+
logTrace(
364+
s"noDataBatchesEnabled = $noDataBatchesEnabled, " +
365+
s"lastExecutionRequiresAnotherBatch = $lastExecutionRequiresAnotherBatch, " +
366+
s"isNewDataAvailable = $isNewDataAvailable, " +
367+
s"shouldConstructNextBatch = $shouldConstructNextBatch")
354368

355369
if (shouldConstructNextBatch) {
356370
// Commit the next batch offset range to the offset log
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.streaming
19+
20+
import org.scalatest.BeforeAndAfter
21+
22+
import org.apache.spark.sql.functions.{count, window}
23+
import org.apache.spark.sql.streaming.StreamTest
24+
25+
class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter {
26+
27+
28+
import testImplicits._
29+
30+
after {
31+
sqlContext.streams.active.foreach(_.stop())
32+
}
33+
34+
test("SPARK-24156: do not plan a no-data batch again after it has already been planned") {
35+
val inputData = MemoryStream[Int]
36+
val df = inputData.toDF()
37+
.withColumn("eventTime", $"value".cast("timestamp"))
38+
.withWatermark("eventTime", "10 seconds")
39+
.groupBy(window($"eventTime", "5 seconds") as 'window)
40+
.agg(count("*") as 'count)
41+
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
42+
43+
testStream(df)(
44+
AddData(inputData, 10, 11, 12, 13, 14, 15), // Set watermark to 5
45+
CheckAnswer(),
46+
AddData(inputData, 25), // Set watermark to 15 to make MicroBatchExecution run no-data batch
47+
CheckAnswer((10, 5)), // Last batch should be a no-data batch
48+
StopStream,
49+
Execute { q =>
50+
// Delete the last committed batch from the commit log to signify that the last batch
51+
// (a no-data batch) never completed
52+
val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L)
53+
q.commitLog.purgeAfter(commit - 1)
54+
},
55+
// Add data before start so that MicroBatchExecution can plan a batch. It should not,
56+
// it should first re-run the incomplete no-data batch and then run a new batch to process
57+
// new data.
58+
AddData(inputData, 30),
59+
StartStream(),
60+
CheckNewAnswer((15, 1)), // This should not throw the error reported in SPARK-24156
61+
StopStream,
62+
Execute { q =>
63+
// Delete the entire commit log
64+
val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L)
65+
q.commitLog.purge(commit + 1)
66+
},
67+
AddData(inputData, 50),
68+
StartStream(),
69+
CheckNewAnswer((25, 1), (30, 1)) // This should not throw the error reported in SPARK-24156
70+
)
71+
}
72+
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
292292
/** Execute arbitrary code */
293293
object Execute {
294294
def apply(func: StreamExecution => Any): AssertOnQuery =
295-
AssertOnQuery(query => { func(query); true })
295+
AssertOnQuery(query => { func(query); true }, "Execute")
296296
}
297297

298298
object AwaitEpoch {

0 commit comments

Comments
 (0)