diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 7817360810bde..17ffa2a517312 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -126,6 +126,12 @@ class MicroBatchExecution( _logicalPlan } + /** + * Signifies whether current batch (i.e. for the batch `currentBatchId`) has been constructed + * (i.e. written to the offsetLog) and is ready for execution. + */ + private var isCurrentBatchConstructed = false + /** * Signals to the thread executing micro-batches that it should stop running after the next * batch. This method blocks until the thread stops running. @@ -154,7 +160,6 @@ class MicroBatchExecution( triggerExecutor.execute(() => { if (isActive) { - var currentBatchIsRunnable = false // Whether the current batch is runnable / has been run var currentBatchHasNewData = false // Whether the current batch had new data startTrigger() @@ -175,7 +180,9 @@ class MicroBatchExecution( // new data to process as `constructNextBatch` may decide to run a batch for // state cleanup, etc. `isNewDataAvailable` will be updated to reflect whether new data // is available or not. - currentBatchIsRunnable = constructNextBatch(noDataBatchesEnabled) + if (!isCurrentBatchConstructed) { + isCurrentBatchConstructed = constructNextBatch(noDataBatchesEnabled) + } // Remember whether the current batch has data or not. This will be required later // for bookkeeping after running the batch, when `isNewDataAvailable` will have changed @@ -183,7 +190,7 @@ class MicroBatchExecution( currentBatchHasNewData = isNewDataAvailable currentStatus = currentStatus.copy(isDataAvailable = isNewDataAvailable) - if (currentBatchIsRunnable) { + if (isCurrentBatchConstructed) { if (currentBatchHasNewData) updateStatusMessage("Processing new data") else updateStatusMessage("No new data but cleaning up state") runBatch(sparkSessionForStream) @@ -194,9 +201,12 @@ class MicroBatchExecution( finishTrigger(currentBatchHasNewData) // Must be outside reportTimeTaken so it is recorded - // If the current batch has been executed, then increment the batch id, else there was - // no data to execute the batch - if (currentBatchIsRunnable) currentBatchId += 1 else Thread.sleep(pollingDelayMs) + // If the current batch has been executed, then increment the batch id and reset flag. + // Otherwise, there was no data to execute the batch and sleep for some time + if (isCurrentBatchConstructed) { + currentBatchId += 1 + isCurrentBatchConstructed = false + } else Thread.sleep(pollingDelayMs) } updateStatusMessage("Waiting for next trigger") isActive @@ -231,6 +241,7 @@ class MicroBatchExecution( /* First assume that we are re-executing the latest known batch * in the offset log */ currentBatchId = latestBatchId + isCurrentBatchConstructed = true availableOffsets = nextOffsets.toStreamProgress(sources) /* Initialize committed offsets to a committed batch, which at this * is the second latest batch id in the offset log. */ @@ -269,6 +280,7 @@ class MicroBatchExecution( // here, so we do nothing here. } currentBatchId = latestCommittedBatchId + 1 + isCurrentBatchConstructed = false committedOffsets ++= availableOffsets // Construct a new batch be recomputing availableOffsets } else if (latestCommittedBatchId < latestBatchId - 1) { @@ -313,11 +325,8 @@ class MicroBatchExecution( * - If either of the above is true, then construct the next batch by committing to the offset * log that range of offsets that the next batch will process. */ - private def constructNextBatch(noDataBatchesEnables: Boolean): Boolean = withProgressLocked { - // If new data is already available that means this method has already been called before - // and it must have already committed the offset range of next batch to the offset log. - // Hence do nothing, just return true. - if (isNewDataAvailable) return true + private def constructNextBatch(noDataBatchesEnabled: Boolean): Boolean = withProgressLocked { + if (isCurrentBatchConstructed) return true // Generate a map from each unique source to the next available offset. val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map { @@ -348,9 +357,14 @@ class MicroBatchExecution( batchTimestampMs = triggerClock.getTimeMillis()) // Check whether next batch should be constructed - val lastExecutionRequiresAnotherBatch = noDataBatchesEnables && + val lastExecutionRequiresAnotherBatch = noDataBatchesEnabled && Option(lastExecution).exists(_.shouldRunAnotherBatch(offsetSeqMetadata)) val shouldConstructNextBatch = isNewDataAvailable || lastExecutionRequiresAnotherBatch + logTrace( + s"noDataBatchesEnabled = $noDataBatchesEnabled, " + + s"lastExecutionRequiresAnotherBatch = $lastExecutionRequiresAnotherBatch, " + + s"isNewDataAvailable = $isNewDataAvailable, " + + s"shouldConstructNextBatch = $shouldConstructNextBatch") if (shouldConstructNextBatch) { // Commit the next batch offset range to the offset log diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala new file mode 100644 index 0000000000000..c228740df07c8 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.functions.{count, window} +import org.apache.spark.sql.streaming.StreamTest + +class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter { + + import testImplicits._ + + after { + sqlContext.streams.active.foreach(_.stop()) + } + + test("SPARK-24156: do not plan a no-data batch again after it has already been planned") { + val inputData = MemoryStream[Int] + val df = inputData.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .groupBy(window($"eventTime", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + + testStream(df)( + AddData(inputData, 10, 11, 12, 13, 14, 15), // Set watermark to 5 + CheckAnswer(), + AddData(inputData, 25), // Set watermark to 15 to make MicroBatchExecution run no-data batch + CheckAnswer((10, 5)), // Last batch should be a no-data batch + StopStream, + Execute { q => + // Delete the last committed batch from the commit log to signify that the last batch + // (a no-data batch) never completed + val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L) + q.commitLog.purgeAfter(commit - 1) + }, + // Add data before start so that MicroBatchExecution can plan a batch. It should not, + // it should first re-run the incomplete no-data batch and then run a new batch to process + // new data. + AddData(inputData, 30), + StartStream(), + CheckNewAnswer((15, 1)), // This should not throw the error reported in SPARK-24156 + StopStream, + Execute { q => + // Delete the entire commit log + val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L) + q.commitLog.purge(commit + 1) + }, + AddData(inputData, 50), + StartStream(), + CheckNewAnswer((25, 1), (30, 1)) // This should not throw the error reported in SPARK-24156 + ) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index f348dac1319cb..4c3fd58cb2e45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -292,7 +292,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be /** Execute arbitrary code */ object Execute { def apply(func: StreamExecution => Any): AssertOnQuery = - AssertOnQuery(query => { func(query); true }) + AssertOnQuery(query => { func(query); true }, "Execute") } object AwaitEpoch {