Skip to content

Commit 13affc7

Browse files
committed
Address review comments
1 parent 8b60538 commit 13affc7

File tree

2 files changed

+41
-49
lines changed

2 files changed

+41
-49
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,11 @@ class StreamExecution(
132132

133133
/**
134134
* A map of current watermarks, keyed by the position of the watermark operator in the
135-
* physical plan. The minimum watermark timestamp present here will be used and persisted as the
136-
* query's watermark when preparing each batch, so it's ok that this val isn't fault-tolerant.
135+
* physical plan.
136+
*
137+
* This state is 'soft state', which does not affect the correctness and semantics of watermarks
138+
* and is not persisted across query restarts.
139+
* The fault-tolerant watermark state is in offsetSeqMetadata.
137140
*/
138141
protected val watermarkMsMap: MutableMap[Int, Long] = MutableMap()
139142

@@ -573,17 +576,24 @@ class StreamExecution(
573576
case e: EventTimeWatermarkExec => e
574577
}.zipWithIndex.foreach {
575578
case (e, index) if e.eventTimeStats.value.count > 0 =>
576-
logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
577-
val newAttributeWatermarkMs = e.eventTimeStats.value.max - e.delayMs
578-
val mappedWatermarkMs: Option[Long] = watermarkMsMap.get(index)
579-
if (mappedWatermarkMs.isEmpty || newAttributeWatermarkMs > mappedWatermarkMs.get) {
580-
watermarkMsMap.put(index, newAttributeWatermarkMs)
579+
logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
580+
val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
581+
val prevWatermarkMs = watermarkMsMap.get(index)
582+
if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
583+
watermarkMsMap.put(index, newWatermarkMs)
581584
}
582585

583-
case _ =>
586+
// Populate 0 if we haven't seen any data yet for this watermark node.
587+
case (_, index) =>
588+
if (!watermarkMsMap.isDefinedAt(index)) {
589+
watermarkMsMap.put(index, 0)
590+
}
584591
}
585592

586-
// Update the query watermark to the minimum of all attribute watermarks.
593+
// Update the global watermark to the minimum of all watermark nodes.
594+
// This is the safest option, because only the global watermark is fault-tolerant. Making
595+
// it the minimum of all individual watermarks guarantees it will never advance past where
596+
// any individual watermark operator would be if it were in a plan by itself.
587597
if(!watermarkMsMap.isEmpty) {
588598
val newWatermarkMs = watermarkMsMap.minBy(_._2)._2
589599
if (newWatermarkMs > batchWatermarkMs) {

sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala

Lines changed: 22 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -321,62 +321,44 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
321321
.queryName("test")
322322
.start()
323323

324-
def generateAndAssertNewWatermark(
325-
stream: MemoryStream[Int],
326-
data: Seq[Int],
327-
watermark: Int): Unit = {
328-
stream.addData(data)
329-
assertWatermark(watermark)
330-
}
331-
332-
def assertWatermark(watermark: Int) {
324+
def getWatermarkAfterData(
325+
firstData: Seq[Int] = Seq.empty,
326+
secondData: Seq[Int] = Seq.empty): Long = {
327+
if (firstData.nonEmpty) first.addData(firstData)
328+
if (secondData.nonEmpty) second.addData(secondData)
333329
union.processAllAvailable()
334330
// add a dummy batch so lastExecution has the new watermark
335331
first.addData(0)
336332
union.processAllAvailable()
337-
333+
// get last watermark
338334
val lastExecution = union.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
339-
assert(lastExecution.offsetSeqMetadata.batchWatermarkMs == watermark)
335+
lastExecution.offsetSeqMetadata.batchWatermarkMs
340336
}
341337

342-
generateAndAssertNewWatermark(first, Seq(11), 1000)
338+
// Global watermark starts at 0 until we get data from both sides
339+
assert(getWatermarkAfterData(firstData = Seq(11)) == 0)
340+
assert(getWatermarkAfterData(secondData = Seq(6)) == 1000)
343341
// Global watermark stays at left watermark 1 when right watermark moves to 2
344-
generateAndAssertNewWatermark(second, Seq(8), 1000)
342+
assert(getWatermarkAfterData(secondData = Seq(8)) == 1000)
345343
// Global watermark switches to right side value 2 when left watermark goes higher
346-
generateAndAssertNewWatermark(first, Seq(21), 3000)
344+
assert(getWatermarkAfterData(firstData = Seq(21)) == 3000)
347345
// Global watermark goes back to left
348-
generateAndAssertNewWatermark(second, Seq(17, 28, 39), 11000)
346+
assert(getWatermarkAfterData(secondData = Seq(17, 28, 39)) == 11000)
349347
// Global watermark stays on left as long as it's below right
350-
generateAndAssertNewWatermark(first, Seq(31), 21000)
351-
generateAndAssertNewWatermark(first, Seq(41), 31000)
348+
assert(getWatermarkAfterData(firstData = Seq(31)) == 21000)
349+
assert(getWatermarkAfterData(firstData = Seq(41)) == 31000)
352350
// Global watermark switches back to right again
353-
generateAndAssertNewWatermark(first, Seq(51), 34000)
351+
assert(getWatermarkAfterData(firstData = Seq(51)) == 34000)
354352

355353
// Global watermark is updated correctly with simultaneous data from both sides
356-
first.addData(100)
357-
second.addData(100)
358-
assertWatermark(90000)
359-
360-
first.addData(120)
361-
second.addData(110)
362-
assertWatermark(105000)
363-
364-
first.addData(130)
365-
second.addData(125)
366-
assertWatermark(120000)
354+
assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 90000)
355+
assert(getWatermarkAfterData(firstData = Seq(120), secondData = Seq(110)) == 105000)
356+
assert(getWatermarkAfterData(firstData = Seq(130), secondData = Seq(125)) == 120000)
367357

368358
// Global watermark doesn't decrement with simultaneous data
369-
first.addData(100)
370-
second.addData(100)
371-
assertWatermark(120000)
372-
373-
first.addData(140)
374-
second.addData(100)
375-
assertWatermark(120000)
376-
377-
first.addData(100)
378-
second.addData(135)
379-
assertWatermark(130000)
359+
assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 120000)
360+
assert(getWatermarkAfterData(firstData = Seq(140), secondData = Seq(100)) == 120000)
361+
assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(135)) == 130000)
380362
}
381363

382364
test("complete mode") {

0 commit comments

Comments
 (0)