Skip to content

Commit 55e3d36

Browse files
committed
Address
1 parent 884a789 commit 55e3d36

File tree

2 files changed

+21
-21
lines changed

2 files changed

+21
-21
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -180,11 +180,7 @@ trait ProgressReporter extends Logging {
180180
currentStatus = currentStatus.copy(isTriggerActive = false)
181181
}
182182

183-
/**
184-
* Extract statistics about stateful operators from the executed query plan.
185-
* SPARK-19378: Still report stateOperator metrics even though no data was processed while
186-
* reporting progress.
187-
*/
183+
/** Extract statistics about stateful operators from the executed query plan. */
188184
private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = {
189185
if (lastExecution == null) return Nil
190186
// lastExecution could belong to one of the previous triggers if `!hasNewData`.
@@ -204,32 +200,31 @@ trait ProgressReporter extends Logging {
204200
}
205201
}
206202

207-
/**
208-
* Extract statistics about event time from the executed query plan.
209-
* SPARK-19378: Still report eventTime metrics even though no data was processed while
210-
* reporting progress.
211-
*/
212-
private def extractEventTimeStats(watermarkTs: Map[String, String]): Map[String, String] = {
213-
if (lastExecution == null) return watermarkTs
203+
/** Extract statistics about event time from the executed query plan. */
204+
private def extractEventTimeStats(): Map[String, String] = {
205+
val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty
206+
val watermarkTimestamp =
207+
if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
208+
else Map.empty[String, String]
209+
210+
if (lastExecution == null || !hasEventTime) return watermarkTimestamp
211+
214212
lastExecution.executedPlan.collect {
215213
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
216214
val stats = e.eventTimeStats.value
217215
Map(
218216
"max" -> stats.max,
219217
"min" -> stats.min,
220218
"avg" -> stats.avg).mapValues(formatTimestamp)
221-
}.headOption.getOrElse(Map.empty) ++ watermarkTs
219+
}.headOption.getOrElse(Map.empty)
222220
}
223221

224222
/** Extracts statistics from the most recent query execution. */
225223
private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
226-
val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty
227-
val watermarkTimestamp =
228-
if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
229-
else Map.empty[String, String]
230224

225+
// SPARK-19378: Still report metrics even though no data was processed while reporting progress.
231226
val stateOperators = extractStateOperatorMetrics(hasNewData)
232-
val eventTimeStats = extractEventTimeStats(watermarkTimestamp)
227+
val eventTimeStats = extractEventTimeStats()
233228

234229
if (!hasNewData) {
235230
return ExecutionStats(Map.empty, stateOperators, eventTimeStats)

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.streaming
2020
import java.util.UUID
2121

2222
import scala.collection.JavaConverters._
23-
import scala.language.postfixOps._
23+
import scala.language.postfixOps
2424

2525
import org.json4s._
2626
import org.json4s.jackson.JsonMethods._
@@ -202,8 +202,13 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
202202
eventually(timeout(1 minute)) {
203203
val nextProgress = query.lastProgress
204204
assert(nextProgress.timestamp !== progress.timestamp)
205-
assert(progress.eventTime.size() > 1)
206-
assert(progress.stateOperators.length > 0)
205+
assert(nextProgress.numInputRows === 0)
206+
assert(nextProgress.eventTime.get("min") === "2017-01-26 01:00:00")
207+
assert(nextProgress.eventTime.get("avg") === "2017-01-26 01:00:01")
208+
assert(nextProgress.eventTime.get("max") === "2017-01-26 01:00:02")
209+
assert(nextProgress.stateOperators.head.numRowsTotal === 2)
210+
assert(nextProgress.stateOperators.head.numRowsTotal === 2)
211+
assert(nextProgress.stateOperators.head.numRowsUpdated === 0)
207212
}
208213
} finally {
209214
query.stop()

0 commit comments

Comments
 (0)