Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class IncrementalExecution(
val checkpointLocation: String,
val runId: UUID,
val currentBatchId: Long,
offsetSeqMetadata: OffsetSeqMetadata)
val offsetSeqMetadata: OffsetSeqMetadata)
extends QueryExecution(sparkSession, logicalPlan) with Logging {

// Modified planner with stateful operations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ class StreamExecution(
protected var offsetSeqMetadata = OffsetSeqMetadata(
batchWatermarkMs = 0, batchTimestampMs = 0, sparkSession.conf)

/**
* A map of current watermarks, keyed by the position of the watermark operator in the
* physical plan.
*
* This state is 'soft state', which does not affect the correctness and semantics of watermarks
* and is not persisted across query restarts.
* The fault-tolerant watermark state is in offsetSeqMetadata.
*/
protected val watermarkMsMap: MutableMap[Int, Long] = MutableMap()

override val id: UUID = UUID.fromString(streamMetadata.id)

override val runId: UUID = UUID.randomUUID
Expand Down Expand Up @@ -560,13 +570,32 @@ class StreamExecution(
}
if (hasNewData) {
var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
// Update the eventTime watermark if we find one in the plan.
// Update the eventTime watermarks if we find any in the plan.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its still a single watermark that is being updated. it just happens to be updated using multiple watermarks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we're updating multiple watermarks in the map. We later update offsetSeqMetadata with the new minimum one, but that's not in this block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I had made this comment when i was thinking that we dont need the mutable map. Ignore this.

if (lastExecution != null) {
lastExecution.executedPlan.collect {
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
e.eventTimeStats.value.max - e.delayMs
}.headOption.foreach { newWatermarkMs =>
case e: EventTimeWatermarkExec => e
}.zipWithIndex.foreach {
case (e, index) if e.eventTimeStats.value.count > 0 =>
logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
val prevWatermarkMs = watermarkMsMap.get(index)
if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
watermarkMsMap.put(index, newWatermarkMs)
}

// Populate 0 if we haven't seen any data yet for this watermark node.
case (_, index) =>
if (!watermarkMsMap.isDefinedAt(index)) {
watermarkMsMap.put(index, 0)
}
}

// Update the global watermark to the minimum of all watermark nodes.
// This is the safest option, because only the global watermark is fault-tolerant. Making
// it the minimum of all individual watermarks guarantees it will never advance past where
// any individual watermark operator would be if it were in a plan by itself.
if(!watermarkMsMap.isEmpty) {
val newWatermarkMs = watermarkMsMap.minBy(_._2)._2
if (newWatermarkMs > batchWatermarkMs) {
logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
batchWatermarkMs = newWatermarkMs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,84 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
)
}

test("watermark with 2 streams") {
import org.apache.spark.sql.functions.sum
val first = MemoryStream[Int]

val firstDf = first.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withWatermark("eventTime", "10 seconds")
.select('value)

val second = MemoryStream[Int]

val secondDf = second.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withWatermark("eventTime", "5 seconds")
.select('value)

withTempDir { checkpointDir =>
val unionWriter = firstDf.union(secondDf).agg(sum('value))
.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format("memory")
.outputMode("complete")
.queryName("test")

val union = unionWriter.start()

def getWatermarkAfterData(
firstData: Seq[Int] = Seq.empty,
secondData: Seq[Int] = Seq.empty,
query: StreamingQuery = union): Long = {
if (firstData.nonEmpty) first.addData(firstData)
if (secondData.nonEmpty) second.addData(secondData)
query.processAllAvailable()
// add a dummy batch so lastExecution has the new watermark
first.addData(0)
query.processAllAvailable()
// get last watermark
val lastExecution = query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
lastExecution.offsetSeqMetadata.batchWatermarkMs
}

// Global watermark starts at 0 until we get data from both sides
assert(getWatermarkAfterData(firstData = Seq(11)) == 0)
assert(getWatermarkAfterData(secondData = Seq(6)) == 1000)
// Global watermark stays at left watermark 1 when right watermark moves to 2
assert(getWatermarkAfterData(secondData = Seq(8)) == 1000)
// Global watermark switches to right side value 2 when left watermark goes higher
assert(getWatermarkAfterData(firstData = Seq(21)) == 3000)
// Global watermark goes back to left
assert(getWatermarkAfterData(secondData = Seq(17, 28, 39)) == 11000)
// Global watermark stays on left as long as it's below right
assert(getWatermarkAfterData(firstData = Seq(31)) == 21000)
assert(getWatermarkAfterData(firstData = Seq(41)) == 31000)
// Global watermark switches back to right again
assert(getWatermarkAfterData(firstData = Seq(51)) == 34000)

// Global watermark is updated correctly with simultaneous data from both sides
assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 90000)
assert(getWatermarkAfterData(firstData = Seq(120), secondData = Seq(110)) == 105000)
assert(getWatermarkAfterData(firstData = Seq(130), secondData = Seq(125)) == 120000)

// Global watermark doesn't decrement with simultaneous data
assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 120000)
assert(getWatermarkAfterData(firstData = Seq(140), secondData = Seq(100)) == 120000)
assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(135)) == 130000)

// Global watermark recovers after restart, but left side watermark ahead of it does not.
assert(getWatermarkAfterData(firstData = Seq(200), secondData = Seq(190)) == 185000)
union.stop()
val union2 = unionWriter.start()
assert(getWatermarkAfterData(query = union2) == 185000)
// Even though the left side was ahead of 185000 in the last execution, the watermark won't
// increment until it gets past it in this execution.
assert(getWatermarkAfterData(secondData = Seq(200), query = union2) == 185000)
assert(getWatermarkAfterData(firstData = Seq(200), query = union2) == 190000)
}
}

test("complete mode") {
val inputData = MemoryStream[Int]

Expand Down