Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ case object MinWatermark extends MultipleWatermarkPolicy {
}

/**
* Policy to choose the *min* of the operator watermark values as the global watermark value. So the
* Policy to choose the *max* of the operator watermark values as the global watermark value. So the
* global watermark will advance if any of the individual operator watermarks has advanced.
* In other words, in a streaming query with multiple input streams and watermarks defined on all
* of them, the global watermark will advance as fast as the fastest input. So if there is watermark
Expand Down Expand Up @@ -108,10 +108,9 @@ case class WatermarkTracker(policy: MultipleWatermarkPolicy) extends Logging {
}
}

// Update the global watermark to the minimum of all watermark nodes.
// This is the safest option, because only the global watermark is fault-tolerant. Making
// it the minimum of all individual watermarks guarantees it will never advance past where
// any individual watermark operator would be if it were in a plan by itself.
// Update the global watermark accordingly to the chosen policy. To find all available policies
// and their semantics, please check the comments of
// `org.apache.spark.sql.execution.streaming.MultipleWatermarkPolicy` implementations.
val chosenGlobalWatermark = policy.chooseGlobalWatermark(operatorToWatermarkMap.values.toSeq)
if (chosenGlobalWatermark > globalWatermarkMs) {
logInfo(s"Updating event-time watermark from $globalWatermarkMs to $chosenGlobalWatermark ms")
Expand Down