diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index c5e9eae607b3..1f74fffbe6e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -180,6 +180,26 @@ trait ProgressReporter extends Logging { currentStatus = currentStatus.copy(isTriggerActive = false) } + /** Extract statistics about stateful operators from the executed query plan. */ + private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = { + if (lastExecution == null) return Nil + // lastExecution could belong to one of the previous triggers if `!hasNewData`. + // Walking the plan again should be inexpensive. + val stateNodes = lastExecution.executedPlan.collect { + case p if p.isInstanceOf[StateStoreSaveExec] => p + } + stateNodes.map { node => + val numRowsUpdated = if (hasNewData) { + node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L) + } else { + 0L + } + new StateOperatorProgress( + numRowsTotal = node.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L), + numRowsUpdated = numRowsUpdated) + } + } + /** Extracts statistics from the most recent query execution. */ private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = { val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty @@ -187,8 +207,11 @@ trait ProgressReporter extends Logging { if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) else Map.empty[String, String] + // SPARK-19378: Still report metrics even though no data was processed while reporting progress. + val stateOperators = extractStateOperatorMetrics(hasNewData) + if (!hasNewData) { - return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp) + return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp) } // We want to associate execution plan leaves to sources that generate them, so that we match @@ -237,16 +260,6 @@ trait ProgressReporter extends Logging { Map.empty } - // Extract statistics about stateful operators in the query plan. - val stateNodes = lastExecution.executedPlan.collect { - case p if p.isInstanceOf[StateStoreSaveExec] => p - } - val stateOperators = stateNodes.map { node => - new StateOperatorProgress( - numRowsTotal = node.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L), - numRowsUpdated = node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L)) - } - val eventTimeStats = lastExecution.executedPlan.collect { case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => val stats = e.eventTimeStats.value diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 2035db562895..901cf34f289c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -20,16 +20,19 @@ package org.apache.spark.sql.streaming import java.util.UUID import scala.collection.JavaConverters._ +import scala.language.postfixOps import org.json4s._ import org.json4s.jackson.JsonMethods._ +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar._ import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite._ - -class StreamingQueryStatusAndProgressSuite extends StreamTest { +class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { implicit class EqualsIgnoreCRLF(source: String) { def equalsIgnoreCRLF(target: String): Boolean = { source.replaceAll("\r\n|\r|\n", System.lineSeparator) === @@ -171,6 +174,41 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest { query.stop() } } + + test("SPARK-19378: Continue reporting stateOp metrics even if there is no active trigger") { + import testImplicits._ + + withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10") { + val inputData = MemoryStream[Int] + + val query = inputData.toDS().toDF("value") + .select('value) + .groupBy($"value") + .agg(count("*")) + .writeStream + .queryName("metric_continuity") + .format("memory") + .outputMode("complete") + .start() + try { + inputData.addData(1, 2) + query.processAllAvailable() + + val progress = query.lastProgress + assert(progress.stateOperators.length > 0) + // Should emit new progresses every 10 ms, but we could be facing a slow Jenkins + eventually(timeout(1 minute)) { + val nextProgress = query.lastProgress + assert(nextProgress.timestamp !== progress.timestamp) + assert(nextProgress.numInputRows === 0) + assert(nextProgress.stateOperators.head.numRowsTotal === 2) + assert(nextProgress.stateOperators.head.numRowsUpdated === 0) + } + } finally { + query.stop() + } + } + } } object StreamingQueryStatusAndProgressSuite {