Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,38 @@ trait ProgressReporter extends Logging {
currentStatus = currentStatus.copy(isTriggerActive = false)
}

/** Extract statistics about stateful operators from the executed query plan. */
private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = {
if (lastExecution == null) return Nil
// lastExecution could belong to one of the previous triggers if `!hasNewData`.
// Walking the plan again should be inexpensive.
val stateNodes = lastExecution.executedPlan.collect {
case p if p.isInstanceOf[StateStoreSaveExec] => p
}
stateNodes.map { node =>
val numRowsUpdated = if (hasNewData) {
node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L)
} else {
0L
}
new StateOperatorProgress(
numRowsTotal = node.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L),
numRowsUpdated = numRowsUpdated)
}
}

/** Extracts statistics from the most recent query execution. */
private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty
val watermarkTimestamp =
if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
else Map.empty[String, String]

// SPARK-19378: Still report metrics even though no data was processed while reporting progress.
val stateOperators = extractStateOperatorMetrics(hasNewData)

if (!hasNewData) {
return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp)
return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
}

// We want to associate execution plan leaves to sources that generate them, so that we match
Expand Down Expand Up @@ -237,16 +260,6 @@ trait ProgressReporter extends Logging {
Map.empty
}

// Extract statistics about stateful operators in the query plan.
val stateNodes = lastExecution.executedPlan.collect {
case p if p.isInstanceOf[StateStoreSaveExec] => p
}
val stateOperators = stateNodes.map { node =>
new StateOperatorProgress(
numRowsTotal = node.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L),
numRowsUpdated = node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L))
}

val eventTimeStats = lastExecution.executedPlan.collect {
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
val stats = e.eventTimeStats.value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@ package org.apache.spark.sql.streaming
import java.util.UUID

import scala.collection.JavaConverters._
import scala.language.postfixOps

import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._

import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite._


class StreamingQueryStatusAndProgressSuite extends StreamTest {
class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
implicit class EqualsIgnoreCRLF(source: String) {
def equalsIgnoreCRLF(target: String): Boolean = {
source.replaceAll("\r\n|\r|\n", System.lineSeparator) ===
Expand Down Expand Up @@ -171,6 +174,41 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest {
query.stop()
}
}

test("SPARK-19378: Continue reporting stateOp metrics even if there is no active trigger") {
import testImplicits._

withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10") {
val inputData = MemoryStream[Int]

val query = inputData.toDS().toDF("value")
.select('value)
.groupBy($"value")
.agg(count("*"))
.writeStream
.queryName("metric_continuity")
.format("memory")
.outputMode("complete")
.start()
try {
inputData.addData(1, 2)
query.processAllAvailable()

val progress = query.lastProgress
assert(progress.stateOperators.length > 0)
// Should emit new progresses every 10 ms, but we could be facing a slow Jenkins
eventually(timeout(1 minute)) {
val nextProgress = query.lastProgress
assert(nextProgress.timestamp !== progress.timestamp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explicitly verify that this progress has no data?

assert(nextProgress.numInputRows === 0)
assert(nextProgress.stateOperators.head.numRowsTotal === 2)
assert(nextProgress.stateOperators.head.numRowsUpdated === 0)
}
} finally {
query.stop()
}
}
}
}

object StreamingQueryStatusAndProgressSuite {
Expand Down