From 7fc341195f6cd348d7745646b06e381dc79fe081 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 15 Dec 2016 18:45:51 -0800 Subject: [PATCH 01/10] No event time stats when no watermark is set --- .../streaming/ProgressReporter.scala | 8 +++- ...te.scala => EventTimeWatermarkSuite.scala} | 44 +++++++++++++------ 2 files changed, 37 insertions(+), 15 deletions(-) rename sql/core/src/test/scala/org/apache/spark/sql/streaming/{WatermarkSuite.scala => EventTimeWatermarkSuite.scala} (87%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index e40135fdd7a55..449aa3fbb0e61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent @@ -84,6 +84,8 @@ trait ProgressReporter extends Logging { private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC")) + private val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty + @volatile protected var currentStatus: StreamingQueryStatus = { new StreamingQueryStatus( @@ -182,7 +184,9 @@ trait ProgressReporter extends Logging { /** Extracts statistics from the most recent query execution. */ private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = { - val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) + val watermarkTimestamp = + if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) + else Map.empty[String, String] if (!hasNewData) { return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala similarity index 87% rename from sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index f1cc19c6e235d..59f67dac6f231 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -26,8 +26,9 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions.{count, window} +import org.apache.spark.sql.InternalOutputModes.Complete -class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { +class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Logging { import testImplicits._ @@ -54,22 +55,39 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { test("event time and watermark metrics") { - val inputData = MemoryStream[Int] + def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q => + body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime) + true + } - val windowedAggregation = inputData.toDF() + // No event time metrics when there is no watermarking + val inputData1 = MemoryStream[Int] + val aggWithoutWatermark = inputData1.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .groupBy(window($"eventTime", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + + testStream(aggWithoutWatermark, outputMode = Complete)( + AddData(inputData1, 15), + CheckAnswer((15, 1)), + assertEventStats { e => assert(e.isEmpty) }, + AddData(inputData1, 10, 12, 14), + CheckAnswer((10, 3), (15, 1)), + assertEventStats { e => assert(e.isEmpty) } + ) + + // All event time metrics where watermarking is set + val inputData2 = MemoryStream[Int] + val aggWithWatermark = inputData2.toDF() .withColumn("eventTime", $"value".cast("timestamp")) .withWatermark("eventTime", "10 seconds") .groupBy(window($"eventTime", "5 seconds") as 'window) .agg(count("*") as 'count) .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) - def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q => - body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime) - true - } - - testStream(windowedAggregation)( - AddData(inputData, 15), + testStream(aggWithWatermark)( + AddData(inputData2, 15), CheckAnswer(), assertEventStats { e => assert(e.get("max") === formatTimestamp(15)) @@ -77,7 +95,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { assert(e.get("avg") === formatTimestamp(15)) assert(e.get("watermark") === formatTimestamp(0)) }, - AddData(inputData, 10, 12, 14), + AddData(inputData2, 10, 12, 14), CheckAnswer(), assertEventStats { e => assert(e.get("max") === formatTimestamp(14)) @@ -85,7 +103,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { assert(e.get("avg") === formatTimestamp(12)) assert(e.get("watermark") === formatTimestamp(5)) }, - AddData(inputData, 25), + AddData(inputData2, 25), CheckAnswer(), assertEventStats { e => assert(e.get("max") === formatTimestamp(25)) @@ -93,7 +111,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { assert(e.get("avg") === formatTimestamp(25)) assert(e.get("watermark") === formatTimestamp(5)) }, - AddData(inputData, 25), + AddData(inputData2, 25), CheckAnswer((10, 3)), assertEventStats { e => assert(e.get("max") === formatTimestamp(25)) From 33a7d1b84266b0d50f7bd1cc4e5bb47226d043cd Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 15 Dec 2016 19:18:18 -0800 Subject: [PATCH 02/10] Disallow dfelay threshold in months or years --- .../apache/spark/unsafe/types/CalendarInterval.java | 3 +++ .../src/main/scala/org/apache/spark/sql/Dataset.scala | 4 ++++ .../spark/sql/streaming/EventTimeWatermarkSuite.scala | 11 +++++++++++ 3 files changed, 18 insertions(+) diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java index a7b0e6f80c2b6..f90cf4461d997 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java @@ -252,6 +252,9 @@ public static long parseSecondNano(String secondNano) throws IllegalArgumentExce public final int months; public final long microseconds; + /** + * Return the interval in miliseconds, not including the months in interval. + */ public final long milliseconds() { return this.microseconds / MICROS_PER_MILLI; } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 29397b134029b..3e48cfeb60e41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -572,6 +572,10 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) + // Threshold specified in months/years is non-deterministic + if (parsedDelay.months > 0) { + throw new AnalysisException(s"Cannot specify time delay in months or years, use days instead") + } EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 59f67dac6f231..3d7a2eee245a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -53,6 +53,17 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin assert(e.getMessage contains "int") } + test("error on threshold in months/years") { + val inputData = MemoryStream[Int].toDF() + + def check(delayThreshold: String): Unit = { + val e = intercept[AnalysisException] { + inputData.withWatermark("value", delayThreshold) + } + } + check("1 year") + check("2 months") + } test("event time and watermark metrics") { def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q => From 8c603f46deb7021c9c2b6297be532342d2d8bfab Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 15 Dec 2016 19:24:27 -0800 Subject: [PATCH 03/10] Minor --- .../apache/spark/sql/streaming/EventTimeWatermarkSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 3d7a2eee245a7..a64f459217954 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -53,13 +53,15 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin assert(e.getMessage contains "int") } - test("error on threshold in months/years") { + test("error on delay in months/years") { val inputData = MemoryStream[Int].toDF() def check(delayThreshold: String): Unit = { val e = intercept[AnalysisException] { inputData.withWatermark("value", delayThreshold) } + assert(e.getMessage.contains("month")) + assert(e.getMessage.contains("year")) } check("1 year") check("2 months") From 67ad327987893c94d58d95eb452065114a41b644 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 16 Dec 2016 15:00:13 -0800 Subject: [PATCH 04/10] Changed fix --- .../scala/org/apache/spark/sql/Dataset.scala | 4 -- .../streaming/EventTimeWatermarkExec.scala | 7 ++- .../execution/streaming/StreamExecution.scala | 2 +- .../streaming/EventTimeWatermarkSuite.scala | 50 ++++++++++++------- 4 files changed, 38 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 3e48cfeb60e41..29397b134029b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -572,10 +572,6 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) - // Threshold specified in months/years is non-deterministic - if (parsedDelay.months > 0) { - throw new AnalysisException(s"Cannot specify time delay in months or years, use days instead") - } EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala index e8570d040dbe4..5a9a99e11188e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala @@ -84,6 +84,11 @@ case class EventTimeWatermarkExec( child: SparkPlan) extends SparkPlan { val eventTimeStats = new EventTimeStatsAccum() + val delayMs = { + val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 + delay.milliseconds + delay.months * millisPerMonth + } + sparkContext.register(eventTimeStats) override protected def doExecute(): RDD[InternalRow] = { @@ -101,7 +106,7 @@ case class EventTimeWatermarkExec( if (a semanticEquals eventTime) { val updatedMetadata = new MetadataBuilder() .withMetadata(a.metadata) - .putLong(EventTimeWatermark.delayKey, delay.milliseconds) + .putLong(EventTimeWatermark.delayKey, delayMs) .build() a.withMetadata(updatedMetadata) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 8f97d9570eaa6..c82af3b343e74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -387,7 +387,7 @@ class StreamExecution( lastExecution.executedPlan.collect { case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") - e.eventTimeStats.value.max - e.delay.milliseconds + math.max(0, e.eventTimeStats.value.max - e.delayMs) }.headOption.foreach { newWatermarkMs => if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) { logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index a64f459217954..1652d4802b14b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming import java.{util => ju} import java.text.SimpleDateFormat +import java.util.{Calendar, Date} import org.scalatest.BeforeAndAfter @@ -53,26 +54,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin assert(e.getMessage contains "int") } - test("error on delay in months/years") { - val inputData = MemoryStream[Int].toDF() - - def check(delayThreshold: String): Unit = { - val e = intercept[AnalysisException] { - inputData.withWatermark("value", delayThreshold) - } - assert(e.getMessage.contains("month")) - assert(e.getMessage.contains("year")) - } - check("1 year") - check("2 months") - } - test("event time and watermark metrics") { - def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q => - body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime) - true - } - // No event time metrics when there is no watermarking val inputData1 = MemoryStream[Int] val aggWithoutWatermark = inputData1.toDF() @@ -155,6 +137,29 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin ) } + test("delay in years handled correctly") { + val input = MemoryStream[Long] + val aggWithWatermark = input.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "1 month") + .groupBy(window($"eventTime", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + + val currentTimeMs = System.currentTimeMillis + val millisPerYear = 1000L * 60 * 60 * 24 * 366 // assume leap year + + testStream(aggWithWatermark)( + AddData(input, currentTimeMs / 1000), + CheckAnswer(), + assertEventStats { e => + assert(timestampFormat.parse(e.get("max")).getTime === (currentTimeMs / 1000) * 1000) + val watermarkTime = timestampFormat.parse(e.get("watermark")) + assert(currentTimeMs - watermarkTime.getTime >= 2 * millisPerYear) + } + ) + } + test("recovery") { val inputData = MemoryStream[Int] val df = inputData.toDF() @@ -262,6 +267,13 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin ) } + private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = { + AssertOnQuery { q => + body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime) + true + } + } + private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 timestampFormat.setTimeZone(ju.TimeZone.getTimeZone("UTC")) From 736c9038f67941b438b14e052a8aa232c1ec7755 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 16 Dec 2016 15:06:18 -0800 Subject: [PATCH 05/10] Undo comments --- .../java/org/apache/spark/unsafe/types/CalendarInterval.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java index f90cf4461d997..a7b0e6f80c2b6 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java @@ -252,9 +252,6 @@ public static long parseSecondNano(String secondNano) throws IllegalArgumentExce public final int months; public final long microseconds; - /** - * Return the interval in miliseconds, not including the months in interval. - */ public final long milliseconds() { return this.microseconds / MICROS_PER_MILLI; } From 0a42fab402264b1df2e1830af4adf8bef554fcba Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 16 Dec 2016 15:29:16 -0800 Subject: [PATCH 06/10] Updated test --- .../sql/streaming/EventTimeWatermarkSuite.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 1652d4802b14b..f930c0be8b106 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -138,24 +138,29 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin } test("delay in years handled correctly") { + + val currentTimeMs = System.currentTimeMillis + val currentTime = new Date(currentTimeMs) + + def monthsSinceEpoch(date: Date): Int = { date.getYear * 12 + date.getMonth } + val input = MemoryStream[Long] val aggWithWatermark = input.toDF() .withColumn("eventTime", $"value".cast("timestamp")) - .withWatermark("eventTime", "1 month") + .withWatermark("eventTime", "2 years 5 months") .groupBy(window($"eventTime", "5 seconds") as 'window) .agg(count("*") as 'count) .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) - val currentTimeMs = System.currentTimeMillis - val millisPerYear = 1000L * 60 * 60 * 24 * 366 // assume leap year - testStream(aggWithWatermark)( + AddData(input, currentTimeMs / 1000), + CheckAnswer(), AddData(input, currentTimeMs / 1000), CheckAnswer(), assertEventStats { e => assert(timestampFormat.parse(e.get("max")).getTime === (currentTimeMs / 1000) * 1000) val watermarkTime = timestampFormat.parse(e.get("watermark")) - assert(currentTimeMs - watermarkTime.getTime >= 2 * millisPerYear) + assert(monthsSinceEpoch(currentTime) - monthsSinceEpoch(watermarkTime) === 29) } ) } From 9d3175a62cd0cac56c41be7cfb578271d7241664 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 16 Dec 2016 15:30:24 -0800 Subject: [PATCH 07/10] Minor --- .../apache/spark/sql/streaming/EventTimeWatermarkSuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index f930c0be8b106..43262968227ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -138,12 +138,9 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin } test("delay in years handled correctly") { - val currentTimeMs = System.currentTimeMillis val currentTime = new Date(currentTimeMs) - def monthsSinceEpoch(date: Date): Int = { date.getYear * 12 + date.getMonth } - val input = MemoryStream[Long] val aggWithWatermark = input.toDF() .withColumn("eventTime", $"value".cast("timestamp")) @@ -152,6 +149,8 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin .agg(count("*") as 'count) .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + def monthsSinceEpoch(date: Date): Int = { date.getYear * 12 + date.getMonth } + testStream(aggWithWatermark)( AddData(input, currentTimeMs / 1000), CheckAnswer(), From 40c200fe12aab4d5f7b1f42f2164e0373988ff63 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 16 Dec 2016 15:31:09 -0800 Subject: [PATCH 08/10] Minor --- .../apache/spark/sql/streaming/EventTimeWatermarkSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 43262968227ec..bdfba9590b0a0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -137,7 +137,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin ) } - test("delay in years handled correctly") { + test("delay in months and years handled correctly") { val currentTimeMs = System.currentTimeMillis val currentTime = new Date(currentTimeMs) From debc748ceac73fcb1e235a1952b0832df658e95d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 20 Dec 2016 13:03:33 -0800 Subject: [PATCH 09/10] Fixed deadlock --- .../spark/sql/execution/streaming/ProgressReporter.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 37bb6af800866..c5e9eae607b39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -84,8 +84,6 @@ trait ProgressReporter extends Logging { private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC")) - private val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty - @volatile protected var currentStatus: StreamingQueryStatus = { new StreamingQueryStatus( @@ -184,6 +182,7 @@ trait ProgressReporter extends Logging { /** Extracts statistics from the most recent query execution. */ private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = { + val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty val watermarkTimestamp = if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) else Map.empty[String, String] From 29f0037631399bf2226ff3c2e630e4927e177eb4 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 20 Dec 2016 18:29:54 -0800 Subject: [PATCH 10/10] Allow negative watermark --- .../apache/spark/sql/execution/streaming/StreamExecution.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index c82af3b343e74..e05200df50849 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -387,7 +387,7 @@ class StreamExecution( lastExecution.executedPlan.collect { case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") - math.max(0, e.eventTimeStats.value.max - e.delayMs) + e.eventTimeStats.value.max - e.delayMs }.headOption.foreach { newWatermarkMs => if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) { logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")