Skip to content

Commit e23de4a

Browse files
committed
remove eventTime changes
1 parent 55e3d36 commit e23de4a

File tree

2 files changed

+17
-29
lines changed

2 files changed

+17
-29
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -200,34 +200,18 @@ trait ProgressReporter extends Logging {
200200
}
201201
}
202202

203-
/** Extract statistics about event time from the executed query plan. */
204-
private def extractEventTimeStats(): Map[String, String] = {
203+
/** Extracts statistics from the most recent query execution. */
204+
private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
205205
val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty
206206
val watermarkTimestamp =
207207
if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
208208
else Map.empty[String, String]
209209

210-
if (lastExecution == null || !hasEventTime) return watermarkTimestamp
211-
212-
lastExecution.executedPlan.collect {
213-
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
214-
val stats = e.eventTimeStats.value
215-
Map(
216-
"max" -> stats.max,
217-
"min" -> stats.min,
218-
"avg" -> stats.avg).mapValues(formatTimestamp)
219-
}.headOption.getOrElse(Map.empty)
220-
}
221-
222-
/** Extracts statistics from the most recent query execution. */
223-
private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
224-
225210
// SPARK-19378: Still report metrics even though no data was processed while reporting progress.
226211
val stateOperators = extractStateOperatorMetrics(hasNewData)
227-
val eventTimeStats = extractEventTimeStats()
228212

229213
if (!hasNewData) {
230-
return ExecutionStats(Map.empty, stateOperators, eventTimeStats)
214+
return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
231215
}
232216

233217
// We want to associate execution plan leaves to sources that generate them, so that we match
@@ -276,6 +260,15 @@ trait ProgressReporter extends Logging {
276260
Map.empty
277261
}
278262

263+
val eventTimeStats = lastExecution.executedPlan.collect {
264+
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
265+
val stats = e.eventTimeStats.value
266+
Map(
267+
"max" -> stats.max,
268+
"min" -> stats.min,
269+
"avg" -> stats.avg).mapValues(formatTimestamp)
270+
}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
271+
279272
ExecutionStats(numInputRows, stateOperators, eventTimeStats)
280273
}
281274

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -175,15 +175,14 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
175175
}
176176
}
177177

178-
test("SPARK-19378: Continue reporting stateOp and eventTime metrics even if there is no data") {
178+
test("SPARK-19378: Continue reporting stateOp metrics even if there is no active trigger") {
179179
import testImplicits._
180180

181181
withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10") {
182-
val inputData = MemoryStream[(Int, String)]
182+
val inputData = MemoryStream[Int]
183183

184-
val query = inputData.toDS().toDF("value", "time")
185-
.select('value, 'time.cast("timestamp"))
186-
.withWatermark("time", "10 seconds")
184+
val query = inputData.toDS().toDF("value")
185+
.select('value)
187186
.groupBy($"value")
188187
.agg(count("*"))
189188
.writeStream
@@ -192,20 +191,16 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
192191
.outputMode("complete")
193192
.start()
194193
try {
195-
inputData.addData((1, "2017-01-26 01:00:00"), (2, "2017-01-26 01:00:02"))
194+
inputData.addData(1, 2)
196195
query.processAllAvailable()
197196

198197
val progress = query.lastProgress
199-
assert(progress.eventTime.size() > 1)
200198
assert(progress.stateOperators.length > 0)
201199
// Should emit new progresses every 10 ms, but we could be facing a slow Jenkins
202200
eventually(timeout(1 minute)) {
203201
val nextProgress = query.lastProgress
204202
assert(nextProgress.timestamp !== progress.timestamp)
205203
assert(nextProgress.numInputRows === 0)
206-
assert(nextProgress.eventTime.get("min") === "2017-01-26 01:00:00")
207-
assert(nextProgress.eventTime.get("avg") === "2017-01-26 01:00:01")
208-
assert(nextProgress.eventTime.get("max") === "2017-01-26 01:00:02")
209204
assert(nextProgress.stateOperators.head.numRowsTotal === 2)
210205
assert(nextProgress.stateOperators.head.numRowsTotal === 2)
211206
assert(nextProgress.stateOperators.head.numRowsUpdated === 0)

0 commit comments

Comments
 (0)