Skip to content

Commit cec8296

Browse files
committed
adapt comment to the reality
previous comment was true for Apache Spark prior 2.4.0. The 2.4.0 release brought multiple watermark policy and therefore stating that the 'min' is always chosen is misleading.
1 parent 8c0e961 commit cec8296

File tree

1 file changed

+4
-5
lines changed

1 file changed

+4
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ case object MinWatermark extends MultipleWatermarkPolicy {
6363
}
6464

6565
/**
66-
* Policy to choose the *min* of the operator watermark values as the global watermark value. So the
66+
* Policy to choose the *max* of the operator watermark values as the global watermark value. So the
6767
* global watermark will advance if any of the individual operator watermarks has advanced.
6868
* In other words, in a streaming query with multiple input streams and watermarks defined on all
6969
* of them, the global watermark will advance as fast as the fastest input. So if there is watermark
@@ -108,10 +108,9 @@ case class WatermarkTracker(policy: MultipleWatermarkPolicy) extends Logging {
108108
}
109109
}
110110

111-
// Update the global watermark to the minimum of all watermark nodes.
112-
// This is the safest option, because only the global watermark is fault-tolerant. Making
113-
// it the minimum of all individual watermarks guarantees it will never advance past where
114-
// any individual watermark operator would be if it were in a plan by itself.
111+
// Update the global watermark accordingly to the chosen policy. To find all available policies
112+
// and their semantics, please check the comments of
113+
// `org.apache.spark.sql.execution.streaming.MultipleWatermarkPolicy` implementations.
115114
val chosenGlobalWatermark = policy.chooseGlobalWatermark(operatorToWatermarkMap.values.toSeq)
116115
if (chosenGlobalWatermark > globalWatermarkMs) {
117116
logInfo(s"Updating event-time watermark from $globalWatermarkMs to $chosenGlobalWatermark ms")

0 commit comments

Comments
 (0)