File tree Expand file tree Collapse file tree 2 files changed +26
-1
lines changed
main/scala/org/apache/spark/sql
test/scala/org/apache/spark/sql/streaming Expand file tree Collapse file tree 2 files changed +26
-1
lines changed Original file line number Diff line number Diff line change @@ -563,7 +563,7 @@ class Dataset[T] private[sql](
563563 * @param eventTime the name of the column that contains the event time of the row.
564564 * @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest
565565 * record that has been processed in the form of an interval
566- * (e.g. "1 minute" or "5 hours").
566+ * (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative.
567567 *
568568 * @group streaming
569569 * @since 2.1.0
@@ -576,6 +576,8 @@ class Dataset[T] private[sql](
576576 val parsedDelay =
577577 Option (CalendarInterval .fromString(" interval " + delayThreshold))
578578 .getOrElse(throw new AnalysisException (s " Unable to parse time delay ' $delayThreshold' " ))
579+ require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0 ,
580+ s " delay threshold ( $delayThreshold) should not be negative. " )
579581 EventTimeWatermark (UnresolvedAttribute (eventTime), parsedDelay, logicalPlan)
580582 }
581583
Original file line number Diff line number Diff line change @@ -306,6 +306,29 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin
306306 )
307307 }
308308
309+ test(" delay threshold should not be negative." ) {
310+ val inputData = MemoryStream [Int ].toDF()
311+ var e = intercept[IllegalArgumentException ] {
312+ inputData.withWatermark(" value" , " -1 year" )
313+ }
314+ assert(e.getMessage contains " should not be negative." )
315+
316+ e = intercept[IllegalArgumentException ] {
317+ inputData.withWatermark(" value" , " 1 year -13 months" )
318+ }
319+ assert(e.getMessage contains " should not be negative." )
320+
321+ e = intercept[IllegalArgumentException ] {
322+ inputData.withWatermark(" value" , " 1 month -40 days" )
323+ }
324+ assert(e.getMessage contains " should not be negative." )
325+
326+ e = intercept[IllegalArgumentException ] {
327+ inputData.withWatermark(" value" , " -10 seconds" )
328+ }
329+ assert(e.getMessage contains " should not be negative." )
330+ }
331+
309332 test(" the new watermark should override the old one" ) {
310333 val df = MemoryStream [(Long , Long )].toDF()
311334 .withColumn(" first" , $" _1" .cast(" timestamp" ))
You can’t perform that action at this time.
0 commit comments