Skip to content

Commit 081b7ad

Browse files
brkyvztdas
authored andcommitted
[SPARK-19378][SS] Ensure continuity of stateOperator and eventTime metrics even if there is no new data in trigger
## What changes were proposed in this pull request? In StructuredStreaming, if a new trigger was skipped because no new data arrived, we suddenly report nothing for the metrics `stateOperator`. We could however easily report the metrics from `lastExecution` to ensure continuity of metrics. ## How was this patch tested? Regression test in `StreamingQueryStatusAndProgressSuite` Author: Burak Yavuz <[email protected]> Closes #16716 from brkyvz/state-agg.
1 parent 57d70d2 commit 081b7ad

File tree

2 files changed

+64
-13
lines changed

2 files changed

+64
-13
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -180,15 +180,38 @@ trait ProgressReporter extends Logging {
180180
currentStatus = currentStatus.copy(isTriggerActive = false)
181181
}
182182

183+
/** Extract statistics about stateful operators from the executed query plan. */
184+
private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = {
185+
if (lastExecution == null) return Nil
186+
// lastExecution could belong to one of the previous triggers if `!hasNewData`.
187+
// Walking the plan again should be inexpensive.
188+
val stateNodes = lastExecution.executedPlan.collect {
189+
case p if p.isInstanceOf[StateStoreSaveExec] => p
190+
}
191+
stateNodes.map { node =>
192+
val numRowsUpdated = if (hasNewData) {
193+
node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L)
194+
} else {
195+
0L
196+
}
197+
new StateOperatorProgress(
198+
numRowsTotal = node.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L),
199+
numRowsUpdated = numRowsUpdated)
200+
}
201+
}
202+
183203
/** Extracts statistics from the most recent query execution. */
184204
private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
185205
val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty
186206
val watermarkTimestamp =
187207
if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
188208
else Map.empty[String, String]
189209

210+
// SPARK-19378: Still report metrics even though no data was processed while reporting progress.
211+
val stateOperators = extractStateOperatorMetrics(hasNewData)
212+
190213
if (!hasNewData) {
191-
return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp)
214+
return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
192215
}
193216

194217
// We want to associate execution plan leaves to sources that generate them, so that we match
@@ -237,16 +260,6 @@ trait ProgressReporter extends Logging {
237260
Map.empty
238261
}
239262

240-
// Extract statistics about stateful operators in the query plan.
241-
val stateNodes = lastExecution.executedPlan.collect {
242-
case p if p.isInstanceOf[StateStoreSaveExec] => p
243-
}
244-
val stateOperators = stateNodes.map { node =>
245-
new StateOperatorProgress(
246-
numRowsTotal = node.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L),
247-
numRowsUpdated = node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L))
248-
}
249-
250263
val eventTimeStats = lastExecution.executedPlan.collect {
251264
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
252265
val stats = e.eventTimeStats.value

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,19 @@ package org.apache.spark.sql.streaming
2020
import java.util.UUID
2121

2222
import scala.collection.JavaConverters._
23+
import scala.language.postfixOps
2324

2425
import org.json4s._
2526
import org.json4s.jackson.JsonMethods._
27+
import org.scalatest.concurrent.Eventually
28+
import org.scalatest.time.SpanSugar._
2629

2730
import org.apache.spark.sql.execution.streaming.MemoryStream
2831
import org.apache.spark.sql.functions._
32+
import org.apache.spark.sql.internal.SQLConf
2933
import org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite._
3034

31-
32-
class StreamingQueryStatusAndProgressSuite extends StreamTest {
35+
class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
3336
implicit class EqualsIgnoreCRLF(source: String) {
3437
def equalsIgnoreCRLF(target: String): Boolean = {
3538
source.replaceAll("\r\n|\r|\n", System.lineSeparator) ===
@@ -171,6 +174,41 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest {
171174
query.stop()
172175
}
173176
}
177+
178+
test("SPARK-19378: Continue reporting stateOp metrics even if there is no active trigger") {
179+
import testImplicits._
180+
181+
withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10") {
182+
val inputData = MemoryStream[Int]
183+
184+
val query = inputData.toDS().toDF("value")
185+
.select('value)
186+
.groupBy($"value")
187+
.agg(count("*"))
188+
.writeStream
189+
.queryName("metric_continuity")
190+
.format("memory")
191+
.outputMode("complete")
192+
.start()
193+
try {
194+
inputData.addData(1, 2)
195+
query.processAllAvailable()
196+
197+
val progress = query.lastProgress
198+
assert(progress.stateOperators.length > 0)
199+
// Should emit new progresses every 10 ms, but we could be facing a slow Jenkins
200+
eventually(timeout(1 minute)) {
201+
val nextProgress = query.lastProgress
202+
assert(nextProgress.timestamp !== progress.timestamp)
203+
assert(nextProgress.numInputRows === 0)
204+
assert(nextProgress.stateOperators.head.numRowsTotal === 2)
205+
assert(nextProgress.stateOperators.head.numRowsUpdated === 0)
206+
}
207+
} finally {
208+
query.stop()
209+
}
210+
}
211+
}
174212
}
175213

176214
object StreamingQueryStatusAndProgressSuite {

0 commit comments

Comments
 (0)