Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ class MicroBatchExecution(
_logicalPlan
}

/**
* Signifies whether current batch (i.e. for the batch `currentBatchId`) has been constructed
* (i.e. written to the offsetLog) and is ready for execution.
*/
private var isCurrentBatchConstructed = false

/**
* Signals to the thread executing micro-batches that it should stop running after the next
* batch. This method blocks until the thread stops running.
Expand Down Expand Up @@ -154,7 +160,6 @@ class MicroBatchExecution(

triggerExecutor.execute(() => {
if (isActive) {
var currentBatchIsRunnable = false // Whether the current batch is runnable / has been run
var currentBatchHasNewData = false // Whether the current batch had new data

startTrigger()
Expand All @@ -175,15 +180,17 @@ class MicroBatchExecution(
// new data to process as `constructNextBatch` may decide to run a batch for
// state cleanup, etc. `isNewDataAvailable` will be updated to reflect whether new data
// is available or not.
currentBatchIsRunnable = constructNextBatch(noDataBatchesEnabled)
if (!isCurrentBatchConstructed) {
isCurrentBatchConstructed = constructNextBatch(noDataBatchesEnabled)
}

// Remember whether the current batch has data or not. This will be required later
// for bookkeeping after running the batch, when `isNewDataAvailable` will have changed
// to false as the batch would have already processed the available data.
currentBatchHasNewData = isNewDataAvailable

currentStatus = currentStatus.copy(isDataAvailable = isNewDataAvailable)
if (currentBatchIsRunnable) {
if (isCurrentBatchConstructed) {
if (currentBatchHasNewData) updateStatusMessage("Processing new data")
else updateStatusMessage("No new data but cleaning up state")
runBatch(sparkSessionForStream)
Expand All @@ -194,9 +201,12 @@ class MicroBatchExecution(

finishTrigger(currentBatchHasNewData) // Must be outside reportTimeTaken so it is recorded

// If the current batch has been executed, then increment the batch id, else there was
// no data to execute the batch
if (currentBatchIsRunnable) currentBatchId += 1 else Thread.sleep(pollingDelayMs)
// If the current batch has been executed, then increment the batch id and reset flag.
// Otherwise, there was no data to execute the batch and sleep for some time
if (isCurrentBatchConstructed) {
currentBatchId += 1
isCurrentBatchConstructed = false
} else Thread.sleep(pollingDelayMs)
}
updateStatusMessage("Waiting for next trigger")
isActive
Expand Down Expand Up @@ -231,6 +241,7 @@ class MicroBatchExecution(
/* First assume that we are re-executing the latest known batch
* in the offset log */
currentBatchId = latestBatchId
isCurrentBatchConstructed = true
availableOffsets = nextOffsets.toStreamProgress(sources)
/* Initialize committed offsets to a committed batch, which at this
* is the second latest batch id in the offset log. */
Expand Down Expand Up @@ -269,6 +280,7 @@ class MicroBatchExecution(
// here, so we do nothing here.
}
currentBatchId = latestCommittedBatchId + 1
isCurrentBatchConstructed = false
committedOffsets ++= availableOffsets
// Construct a new batch be recomputing availableOffsets
} else if (latestCommittedBatchId < latestBatchId - 1) {
Expand Down Expand Up @@ -313,11 +325,8 @@ class MicroBatchExecution(
* - If either of the above is true, then construct the next batch by committing to the offset
* log that range of offsets that the next batch will process.
*/
private def constructNextBatch(noDataBatchesEnables: Boolean): Boolean = withProgressLocked {
// If new data is already available that means this method has already been called before
// and it must have already committed the offset range of next batch to the offset log.
// Hence do nothing, just return true.
if (isNewDataAvailable) return true
private def constructNextBatch(noDataBatchesEnabled: Boolean): Boolean = withProgressLocked {
if (isCurrentBatchConstructed) return true

// Generate a map from each unique source to the next available offset.
val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map {
Expand Down Expand Up @@ -348,9 +357,14 @@ class MicroBatchExecution(
batchTimestampMs = triggerClock.getTimeMillis())

// Check whether next batch should be constructed
val lastExecutionRequiresAnotherBatch = noDataBatchesEnables &&
val lastExecutionRequiresAnotherBatch = noDataBatchesEnabled &&
Option(lastExecution).exists(_.shouldRunAnotherBatch(offsetSeqMetadata))
val shouldConstructNextBatch = isNewDataAvailable || lastExecutionRequiresAnotherBatch
logTrace(
s"noDataBatchesEnabled = $noDataBatchesEnabled, " +
s"lastExecutionRequiresAnotherBatch = $lastExecutionRequiresAnotherBatch, " +
s"isNewDataAvailable = $isNewDataAvailable, " +
s"shouldConstructNextBatch = $shouldConstructNextBatch")

if (shouldConstructNextBatch) {
// Commit the next batch offset range to the offset log
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.streaming.StreamTest

class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter {

import testImplicits._

after {
sqlContext.streams.active.foreach(_.stop())
}

test("SPARK-24156: do not plan a no-data batch again after it has already been planned") {
val inputData = MemoryStream[Int]
val df = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withWatermark("eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])

testStream(df)(
AddData(inputData, 10, 11, 12, 13, 14, 15), // Set watermark to 5
CheckAnswer(),
AddData(inputData, 25), // Set watermark to 15 to make MicroBatchExecution run no-data batch
CheckAnswer((10, 5)), // Last batch should be a no-data batch
StopStream,
Execute { q =>
// Delete the last committed batch from the commit log to signify that the last batch
// (a no-data batch) never completed
val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L)
q.commitLog.purgeAfter(commit - 1)
},
// Add data before start so that MicroBatchExecution can plan a batch. It should not,
// it should first re-run the incomplete no-data batch and then run a new batch to process
// new data.
AddData(inputData, 30),
StartStream(),
CheckNewAnswer((15, 1)), // This should not throw the error reported in SPARK-24156
StopStream,
Execute { q =>
// Delete the entire commit log
val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L)
q.commitLog.purge(commit + 1)
},
AddData(inputData, 50),
StartStream(),
CheckNewAnswer((25, 1), (30, 1)) // This should not throw the error reported in SPARK-24156
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
/** Execute arbitrary code */
object Execute {
def apply(func: StreamExecution => Any): AssertOnQuery =
AssertOnQuery(query => { func(query); true })
AssertOnQuery(query => { func(query); true }, "Execute")
}

object AwaitEpoch {
Expand Down