From 4a7f53fdab1e5e640e156a4a3d2ba27837195195 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Wed, 13 Sep 2017 14:49:23 -0700 Subject: [PATCH 01/10] Implement multiple watermark StreamExecution support. --- .../execution/streaming/StreamExecution.scala | 24 ++++++++++++-- .../streaming/EventTimeWatermarkSuite.scala | 32 +++++++++++++++++++ 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 71088ff6386b..d04ff61591c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -130,6 +130,13 @@ class StreamExecution( protected var offsetSeqMetadata = OffsetSeqMetadata( batchWatermarkMs = 0, batchTimestampMs = 0, sparkSession.conf) + /** + * A map from watermarked attributes to their current watermark. The minimum watermark + * timestamp present here will be used as the overall query watermark in offsetSeqMetadata; + * the query watermark is what's logged and used to age out old state. + */ + protected var attributeWatermarkMsMap: AttributeMap[Long] = AttributeMap(Seq()) + override val id: UUID = UUID.fromString(streamMetadata.id) override val runId: UUID = UUID.randomUUID @@ -560,13 +567,24 @@ class StreamExecution( } if (hasNewData) { var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs - // Update the eventTime watermark if we find one in the plan. + // Update the eventTime watermarks if we find any in the plan. if (lastExecution != null) { lastExecution.executedPlan.collect { case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") - e.eventTimeStats.value.max - e.delayMs - }.headOption.foreach { newWatermarkMs => + e + }.foreach { e => + val newAttributeWatermarkMs = e.eventTimeStats.value.max - e.delayMs + val mappedWatermarkMs: Option[Long] = attributeWatermarkMsMap.get(e.eventTime) + if (mappedWatermarkMs.isEmpty || newAttributeWatermarkMs > mappedWatermarkMs.get) { + attributeWatermarkMsMap = AttributeMap( + attributeWatermarkMsMap.toSeq ++ Seq((e.eventTime, newAttributeWatermarkMs))) + } + } + + // Update the query watermark to the minimum of all attribute watermarks. + if(!attributeWatermarkMsMap.isEmpty) { + val newWatermarkMs = attributeWatermarkMsMap.minBy(_._2)._2 if (newWatermarkMs > batchWatermarkMs) { logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms") batchWatermarkMs = newWatermarkMs diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 4f19fa0bb4a9..e1ace6d9399d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -300,6 +300,38 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche ) } + test("2stream") { + import org.apache.spark.sql.functions.lit + + val inputData = MemoryStream[Int] + + val windowedAggregation = inputData.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .groupBy(window($"eventTime", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select(lit("window"), $"window".getField("start").cast("long").as[Long], $"count".as[Long]) + + val secondData = MemoryStream[Int] + + val secondAggregation = secondData.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select(lit("secondary"), 'value.as[Long], lit(0)) + + testStream(windowedAggregation.union(secondAggregation))( + AddData(secondData, 11), // Set up right watermark at 1 + AddData(inputData, 10, 11, 12), + CheckAnswer(("secondary", 11, 0)), + AddData(inputData, 25), // Advance left watermark to 15 seconds - right still at 1 + CheckAnswer(("secondary", 11, 0)), + AddData(secondData, 31), // Advance right watermark to 21 seconds + CheckAnswer(("secondary", 11, 0), ("secondary", 31, 0)), + AddData(inputData, 25), // Trigger another batch on left stream + CheckAnswer(("secondary", 11, 0), ("secondary", 31, 0), ("window", 10, 3)) + ) + } + test("complete mode") { val inputData = MemoryStream[Int] From 9b9cd19106fae9a2de268eb2b559ca1bf159e9c2 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 14 Sep 2017 11:30:40 -0700 Subject: [PATCH 02/10] partially fix test --- .../streaming/IncrementalExecution.scala | 2 +- .../streaming/EventTimeWatermarkSuite.scala | 44 +++++++++---------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 258a64216136..1a5bbd1ae9e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -39,7 +39,7 @@ class IncrementalExecution( val checkpointLocation: String, val runId: UUID, val currentBatchId: Long, - offsetSeqMetadata: OffsetSeqMetadata) + private[sql] val offsetSeqMetadata: OffsetSeqMetadata) extends QueryExecution(sparkSession, logicalPlan) with Logging { // Modified planner with stateful operations. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index e1ace6d9399d..92943eba6679 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -301,35 +301,35 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche } test("2stream") { - import org.apache.spark.sql.functions.lit + val first = MemoryStream[Int] - val inputData = MemoryStream[Int] - - val windowedAggregation = inputData.toDF() + val firstAggregation = first.toDF() .withColumn("eventTime", $"value".cast("timestamp")) .withWatermark("eventTime", "10 seconds") - .groupBy(window($"eventTime", "5 seconds") as 'window) - .agg(count("*") as 'count) - .select(lit("window"), $"window".getField("start").cast("long").as[Long], $"count".as[Long]) + .select('value) - val secondData = MemoryStream[Int] + val second = MemoryStream[Int] - val secondAggregation = secondData.toDF() + val secondAggregation = second.toDF() .withColumn("eventTime", $"value".cast("timestamp")) .withWatermark("eventTime", "10 seconds") - .select(lit("secondary"), 'value.as[Long], lit(0)) - - testStream(windowedAggregation.union(secondAggregation))( - AddData(secondData, 11), // Set up right watermark at 1 - AddData(inputData, 10, 11, 12), - CheckAnswer(("secondary", 11, 0)), - AddData(inputData, 25), // Advance left watermark to 15 seconds - right still at 1 - CheckAnswer(("secondary", 11, 0)), - AddData(secondData, 31), // Advance right watermark to 21 seconds - CheckAnswer(("secondary", 11, 0), ("secondary", 31, 0)), - AddData(inputData, 25), // Trigger another batch on left stream - CheckAnswer(("secondary", 11, 0), ("secondary", 31, 0), ("window", 10, 3)) - ) + .select('value) + + testStream(firstAggregation.union(secondAggregation))( + AddData(first, 11), // Set left watermark at, + CheckLastBatch(11), + AssertOnQuery(_.lastExecution.offsetSeqMetadata.batchWatermarkMs == 1000), + AddData(second, 12), // Set right watermark at 2; overall should still be 1 + CheckLastBatch(12), + AssertOnQuery(_.lastExecution.offsetSeqMetadata.batchWatermarkMs == 1000), + AddData(first, 21), // Left is now further so we should take right watermark + AssertOnQuery(_.lastExecution.offsetSeqMetadata.batchWatermarkMs == 2000), + AddData(second, 22, 32, 42), // Right is further, go back to taking left watermark + AssertOnQuery(_.lastExecution.offsetSeqMetadata.batchWatermarkMs == 11000), + AddData(first, 31), // Left watermark should keep being output as long as it's behind + AssertOnQuery(_.lastExecution.offsetSeqMetadata.batchWatermarkMs == 21000), + AddData(first, 41), + AssertOnQuery(_.lastExecution.offsetSeqMetadata.batchWatermarkMs == 31000)) } test("complete mode") { From 6a4c80b696f42a445c7f846fada3f823e04bd3ab Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 14 Sep 2017 14:52:16 -0700 Subject: [PATCH 03/10] Finish rewriting test --- .../streaming/EventTimeWatermarkSuite.scala | 61 ++++++++++++++----- 1 file changed, 45 insertions(+), 16 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 92943eba6679..c239f552e333 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -300,7 +300,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche ) } - test("2stream") { + test("watermark with 2 streams") { val first = MemoryStream[Int] val firstAggregation = first.toDF() @@ -315,21 +315,50 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche .withWatermark("eventTime", "10 seconds") .select('value) - testStream(firstAggregation.union(secondAggregation))( - AddData(first, 11), // Set left watermark at, - CheckLastBatch(11), - AssertOnQuery(_.lastExecution.offsetSeqMetadata.batchWatermarkMs == 1000), - AddData(second, 12), // Set right watermark at 2; overall should still be 1 - CheckLastBatch(12), - AssertOnQuery(_.lastExecution.offsetSeqMetadata.batchWatermarkMs == 1000), - AddData(first, 21), // Left is now further so we should take right watermark - AssertOnQuery(_.lastExecution.offsetSeqMetadata.batchWatermarkMs == 2000), - AddData(second, 22, 32, 42), // Right is further, go back to taking left watermark - AssertOnQuery(_.lastExecution.offsetSeqMetadata.batchWatermarkMs == 11000), - AddData(first, 31), // Left watermark should keep being output as long as it's behind - AssertOnQuery(_.lastExecution.offsetSeqMetadata.batchWatermarkMs == 21000), - AddData(first, 41), - AssertOnQuery(_.lastExecution.offsetSeqMetadata.batchWatermarkMs == 31000)) + val union = firstAggregation.union(secondAggregation) + .writeStream + .format("memory") + .queryName("test") + .start() + + def populateNewWatermarkFromData(stream: MemoryStream[Int], data: Int*): Unit = { + stream.addData(data) + union.processAllAvailable() + // add a dummy batch so lastExecution has the new watermark + stream.addData(0) + union.processAllAvailable() + } + + def assertQueryWatermark(watermark: Int): Unit = { + assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery + .lastExecution.offsetSeqMetadata.batchWatermarkMs + == watermark) + } + + populateNewWatermarkFromData(first, 11) + assertQueryWatermark(1000) + + // Watermark stays at 1 from the left when right watermark moves to 2 + populateNewWatermarkFromData(second, 12) + assertQueryWatermark(1000) + + // Watermark switches to right side value 2 when left watermark goes higher + populateNewWatermarkFromData(first, 21) + assertQueryWatermark(2000) + + // Watermark goes back to left + populateNewWatermarkFromData(second, 22, 32, 42) + assertQueryWatermark(11000) + + // Watermark stays on left as long as it's below right + populateNewWatermarkFromData(first, 31) + assertQueryWatermark(21000) + + populateNewWatermarkFromData(first, 41) + assertQueryWatermark(31000) + + populateNewWatermarkFromData(first, 51) + assertQueryWatermark(32000) } test("complete mode") { From 484940e5eb4d1eac1c5ec81f475681c9241bbab2 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 14 Sep 2017 15:24:36 -0700 Subject: [PATCH 04/10] make IncrementalExecution.offsetSeqMetadata non-private --- .../spark/sql/execution/streaming/IncrementalExecution.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 1a5bbd1ae9e3..19d95980d57d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -39,7 +39,7 @@ class IncrementalExecution( val checkpointLocation: String, val runId: UUID, val currentBatchId: Long, - private[sql] val offsetSeqMetadata: OffsetSeqMetadata) + val offsetSeqMetadata: OffsetSeqMetadata) extends QueryExecution(sparkSession, logicalPlan) with Logging { // Modified planner with stateful operations. From 032f55503c8d424390da1ff85054e3a01e7489eb Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 14 Sep 2017 16:22:22 -0700 Subject: [PATCH 05/10] properly name test dataframes --- .../spark/sql/streaming/EventTimeWatermarkSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index c239f552e333..b766b8e42935 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -303,19 +303,19 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche test("watermark with 2 streams") { val first = MemoryStream[Int] - val firstAggregation = first.toDF() + val firstDf = first.toDF() .withColumn("eventTime", $"value".cast("timestamp")) .withWatermark("eventTime", "10 seconds") .select('value) val second = MemoryStream[Int] - val secondAggregation = second.toDF() + val secondDf = second.toDF() .withColumn("eventTime", $"value".cast("timestamp")) .withWatermark("eventTime", "10 seconds") .select('value) - val union = firstAggregation.union(secondAggregation) + val union = firstDf.union(secondDf) .writeStream .format("memory") .queryName("test") From d7f5f60c6be5bf228c960c3549eb81ed869f0227 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 14 Sep 2017 16:39:22 -0700 Subject: [PATCH 06/10] Combine test helper functions. --- .../streaming/EventTimeWatermarkSuite.scala | 34 ++++++------------- 1 file changed, 11 insertions(+), 23 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index b766b8e42935..d58db3b1f5c9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -321,44 +321,32 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche .queryName("test") .start() - def populateNewWatermarkFromData(stream: MemoryStream[Int], data: Int*): Unit = { + def generateAndAssertNewWatermark( + stream: MemoryStream[Int], + data: Seq[Int], + watermark: Int): Unit = { stream.addData(data) union.processAllAvailable() // add a dummy batch so lastExecution has the new watermark stream.addData(0) union.processAllAvailable() - } - def assertQueryWatermark(watermark: Int): Unit = { assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery .lastExecution.offsetSeqMetadata.batchWatermarkMs == watermark) } - populateNewWatermarkFromData(first, 11) - assertQueryWatermark(1000) - + generateAndAssertNewWatermark(first, Seq(11), 1000) // Watermark stays at 1 from the left when right watermark moves to 2 - populateNewWatermarkFromData(second, 12) - assertQueryWatermark(1000) - + generateAndAssertNewWatermark(second, Seq(12), 1000) // Watermark switches to right side value 2 when left watermark goes higher - populateNewWatermarkFromData(first, 21) - assertQueryWatermark(2000) - + generateAndAssertNewWatermark(first, Seq(21), 2000) // Watermark goes back to left - populateNewWatermarkFromData(second, 22, 32, 42) - assertQueryWatermark(11000) - + generateAndAssertNewWatermark(second, Seq(22, 32, 42), 11000) // Watermark stays on left as long as it's below right - populateNewWatermarkFromData(first, 31) - assertQueryWatermark(21000) - - populateNewWatermarkFromData(first, 41) - assertQueryWatermark(31000) - - populateNewWatermarkFromData(first, 51) - assertQueryWatermark(32000) + generateAndAssertNewWatermark(first, Seq(31), 21000) + generateAndAssertNewWatermark(first, Seq(41), 31000) + generateAndAssertNewWatermark(first, Seq(51), 41000) } test("complete mode") { From 2f07f90423d87985322975f8ad5aef8f70f28066 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 14 Sep 2017 18:18:12 -0700 Subject: [PATCH 07/10] Key watermarks by relative position rather than attribute. --- .../execution/streaming/StreamExecution.scala | 31 ++++++++++--------- .../streaming/EventTimeWatermarkSuite.scala | 2 +- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index d04ff61591c8..26fc92364caa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -131,11 +131,11 @@ class StreamExecution( batchWatermarkMs = 0, batchTimestampMs = 0, sparkSession.conf) /** - * A map from watermarked attributes to their current watermark. The minimum watermark - * timestamp present here will be used as the overall query watermark in offsetSeqMetadata; - * the query watermark is what's logged and used to age out old state. + * A map of current watermarks, keyed by the position of the watermark operator in the + * physical plan. The minimum watermark timestamp present here will be used and persisted as the + * query's watermark when preparing each batch, so it's ok that this val isn't fault-tolerant. */ - protected var attributeWatermarkMsMap: AttributeMap[Long] = AttributeMap(Seq()) + protected val watermarkMsMap: MutableMap[Int, Long] = MutableMap() override val id: UUID = UUID.fromString(streamMetadata.id) @@ -570,21 +570,22 @@ class StreamExecution( // Update the eventTime watermarks if we find any in the plan. if (lastExecution != null) { lastExecution.executedPlan.collect { - case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => + case e: EventTimeWatermarkExec => e + }.zipWithIndex.foreach { + case (e, index) if e.eventTimeStats.value.count > 0 => logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") - e - }.foreach { e => - val newAttributeWatermarkMs = e.eventTimeStats.value.max - e.delayMs - val mappedWatermarkMs: Option[Long] = attributeWatermarkMsMap.get(e.eventTime) - if (mappedWatermarkMs.isEmpty || newAttributeWatermarkMs > mappedWatermarkMs.get) { - attributeWatermarkMsMap = AttributeMap( - attributeWatermarkMsMap.toSeq ++ Seq((e.eventTime, newAttributeWatermarkMs))) - } + val newAttributeWatermarkMs = e.eventTimeStats.value.max - e.delayMs + val mappedWatermarkMs: Option[Long] = watermarkMsMap.get(index) + if (mappedWatermarkMs.isEmpty || newAttributeWatermarkMs > mappedWatermarkMs.get) { + watermarkMsMap.put(index, newAttributeWatermarkMs) + } + + case _ => } // Update the query watermark to the minimum of all attribute watermarks. - if(!attributeWatermarkMsMap.isEmpty) { - val newWatermarkMs = attributeWatermarkMsMap.minBy(_._2)._2 + if(!watermarkMsMap.isEmpty) { + val newWatermarkMs = watermarkMsMap.minBy(_._2)._2 if (newWatermarkMs > batchWatermarkMs) { logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms") batchWatermarkMs = newWatermarkMs diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index d58db3b1f5c9..65a540253420 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -346,7 +346,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche // Watermark stays on left as long as it's below right generateAndAssertNewWatermark(first, Seq(31), 21000) generateAndAssertNewWatermark(first, Seq(41), 31000) - generateAndAssertNewWatermark(first, Seq(51), 41000) + generateAndAssertNewWatermark(first, Seq(51), 32000) } test("complete mode") { From 8b605384d77fdeb63b28feabee74284a5ab1409a Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 14 Sep 2017 19:05:14 -0700 Subject: [PATCH 08/10] Address test comments. --- .../streaming/EventTimeWatermarkSuite.scala | 56 ++++++++++++++----- 1 file changed, 43 insertions(+), 13 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 65a540253420..4cd378e559c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -312,7 +312,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche val secondDf = second.toDF() .withColumn("eventTime", $"value".cast("timestamp")) - .withWatermark("eventTime", "10 seconds") + .withWatermark("eventTime", "5 seconds") .select('value) val union = firstDf.union(secondDf) @@ -326,27 +326,57 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche data: Seq[Int], watermark: Int): Unit = { stream.addData(data) + assertWatermark(watermark) + } + + def assertWatermark(watermark: Int) { union.processAllAvailable() // add a dummy batch so lastExecution has the new watermark - stream.addData(0) + first.addData(0) union.processAllAvailable() - assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery - .lastExecution.offsetSeqMetadata.batchWatermarkMs - == watermark) + val lastExecution = union.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution + assert(lastExecution.offsetSeqMetadata.batchWatermarkMs == watermark) } generateAndAssertNewWatermark(first, Seq(11), 1000) - // Watermark stays at 1 from the left when right watermark moves to 2 - generateAndAssertNewWatermark(second, Seq(12), 1000) - // Watermark switches to right side value 2 when left watermark goes higher - generateAndAssertNewWatermark(first, Seq(21), 2000) - // Watermark goes back to left - generateAndAssertNewWatermark(second, Seq(22, 32, 42), 11000) - // Watermark stays on left as long as it's below right + // Global watermark stays at left watermark 1 when right watermark moves to 2 + generateAndAssertNewWatermark(second, Seq(8), 1000) + // Global watermark switches to right side value 2 when left watermark goes higher + generateAndAssertNewWatermark(first, Seq(21), 3000) + // Global watermark goes back to left + generateAndAssertNewWatermark(second, Seq(17, 28, 39), 11000) + // Global watermark stays on left as long as it's below right generateAndAssertNewWatermark(first, Seq(31), 21000) generateAndAssertNewWatermark(first, Seq(41), 31000) - generateAndAssertNewWatermark(first, Seq(51), 32000) + // Global watermark switches back to right again + generateAndAssertNewWatermark(first, Seq(51), 34000) + + // Global watermark is updated correctly with simultaneous data from both sides + first.addData(100) + second.addData(100) + assertWatermark(90000) + + first.addData(120) + second.addData(110) + assertWatermark(105000) + + first.addData(130) + second.addData(125) + assertWatermark(120000) + + // Global watermark doesn't decrement with simultaneous data + first.addData(100) + second.addData(100) + assertWatermark(120000) + + first.addData(140) + second.addData(100) + assertWatermark(120000) + + first.addData(100) + second.addData(135) + assertWatermark(130000) } test("complete mode") { From 13affc74b6304f7e2357d392716b68a83e273f29 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Fri, 15 Sep 2017 14:05:17 -0700 Subject: [PATCH 09/10] Address review comments --- .../execution/streaming/StreamExecution.scala | 28 ++++++--- .../streaming/EventTimeWatermarkSuite.scala | 62 +++++++------------ 2 files changed, 41 insertions(+), 49 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 26fc92364caa..5f95ddd864cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -132,8 +132,11 @@ class StreamExecution( /** * A map of current watermarks, keyed by the position of the watermark operator in the - * physical plan. The minimum watermark timestamp present here will be used and persisted as the - * query's watermark when preparing each batch, so it's ok that this val isn't fault-tolerant. + * physical plan. + * + * This state is 'soft state', which does not affect the correctness and semantics of watermarks + * and is not persisted across query restarts. + * The fault-tolerant watermark state is in offsetSeqMetadata. */ protected val watermarkMsMap: MutableMap[Int, Long] = MutableMap() @@ -573,17 +576,24 @@ class StreamExecution( case e: EventTimeWatermarkExec => e }.zipWithIndex.foreach { case (e, index) if e.eventTimeStats.value.count > 0 => - logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") - val newAttributeWatermarkMs = e.eventTimeStats.value.max - e.delayMs - val mappedWatermarkMs: Option[Long] = watermarkMsMap.get(index) - if (mappedWatermarkMs.isEmpty || newAttributeWatermarkMs > mappedWatermarkMs.get) { - watermarkMsMap.put(index, newAttributeWatermarkMs) + logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}") + val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs + val prevWatermarkMs = watermarkMsMap.get(index) + if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) { + watermarkMsMap.put(index, newWatermarkMs) } - case _ => + // Populate 0 if we haven't seen any data yet for this watermark node. + case (_, index) => + if (!watermarkMsMap.isDefinedAt(index)) { + watermarkMsMap.put(index, 0) + } } - // Update the query watermark to the minimum of all attribute watermarks. + // Update the global watermark to the minimum of all watermark nodes. + // This is the safest option, because only the global watermark is fault-tolerant. Making + // it the minimum of all individual watermarks guarantees it will never advance past where + // any individual watermark operator would be if it were in a plan by itself. if(!watermarkMsMap.isEmpty) { val newWatermarkMs = watermarkMsMap.minBy(_._2)._2 if (newWatermarkMs > batchWatermarkMs) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 4cd378e559c6..cd741c14e304 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -321,62 +321,44 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche .queryName("test") .start() - def generateAndAssertNewWatermark( - stream: MemoryStream[Int], - data: Seq[Int], - watermark: Int): Unit = { - stream.addData(data) - assertWatermark(watermark) - } - - def assertWatermark(watermark: Int) { + def getWatermarkAfterData( + firstData: Seq[Int] = Seq.empty, + secondData: Seq[Int] = Seq.empty): Long = { + if (firstData.nonEmpty) first.addData(firstData) + if (secondData.nonEmpty) second.addData(secondData) union.processAllAvailable() // add a dummy batch so lastExecution has the new watermark first.addData(0) union.processAllAvailable() - + // get last watermark val lastExecution = union.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution - assert(lastExecution.offsetSeqMetadata.batchWatermarkMs == watermark) + lastExecution.offsetSeqMetadata.batchWatermarkMs } - generateAndAssertNewWatermark(first, Seq(11), 1000) + // Global watermark starts at 0 until we get data from both sides + assert(getWatermarkAfterData(firstData = Seq(11)) == 0) + assert(getWatermarkAfterData(secondData = Seq(6)) == 1000) // Global watermark stays at left watermark 1 when right watermark moves to 2 - generateAndAssertNewWatermark(second, Seq(8), 1000) + assert(getWatermarkAfterData(secondData = Seq(8)) == 1000) // Global watermark switches to right side value 2 when left watermark goes higher - generateAndAssertNewWatermark(first, Seq(21), 3000) + assert(getWatermarkAfterData(firstData = Seq(21)) == 3000) // Global watermark goes back to left - generateAndAssertNewWatermark(second, Seq(17, 28, 39), 11000) + assert(getWatermarkAfterData(secondData = Seq(17, 28, 39)) == 11000) // Global watermark stays on left as long as it's below right - generateAndAssertNewWatermark(first, Seq(31), 21000) - generateAndAssertNewWatermark(first, Seq(41), 31000) + assert(getWatermarkAfterData(firstData = Seq(31)) == 21000) + assert(getWatermarkAfterData(firstData = Seq(41)) == 31000) // Global watermark switches back to right again - generateAndAssertNewWatermark(first, Seq(51), 34000) + assert(getWatermarkAfterData(firstData = Seq(51)) == 34000) // Global watermark is updated correctly with simultaneous data from both sides - first.addData(100) - second.addData(100) - assertWatermark(90000) - - first.addData(120) - second.addData(110) - assertWatermark(105000) - - first.addData(130) - second.addData(125) - assertWatermark(120000) + assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 90000) + assert(getWatermarkAfterData(firstData = Seq(120), secondData = Seq(110)) == 105000) + assert(getWatermarkAfterData(firstData = Seq(130), secondData = Seq(125)) == 120000) // Global watermark doesn't decrement with simultaneous data - first.addData(100) - second.addData(100) - assertWatermark(120000) - - first.addData(140) - second.addData(100) - assertWatermark(120000) - - first.addData(100) - second.addData(135) - assertWatermark(130000) + assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 120000) + assert(getWatermarkAfterData(firstData = Seq(140), secondData = Seq(100)) == 120000) + assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(135)) == 130000) } test("complete mode") { From cdf4361f6065e4e1d891992ebc30289957a6262f Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Fri, 15 Sep 2017 17:58:09 -0700 Subject: [PATCH 10/10] Add query restart test. --- .../streaming/EventTimeWatermarkSuite.scala | 103 ++++++++++-------- 1 file changed, 60 insertions(+), 43 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index cd741c14e304..f3e8cf950a5a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -301,6 +301,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche } test("watermark with 2 streams") { + import org.apache.spark.sql.functions.sum val first = MemoryStream[Int] val firstDf = first.toDF() @@ -315,50 +316,66 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche .withWatermark("eventTime", "5 seconds") .select('value) - val union = firstDf.union(secondDf) - .writeStream - .format("memory") - .queryName("test") - .start() - - def getWatermarkAfterData( - firstData: Seq[Int] = Seq.empty, - secondData: Seq[Int] = Seq.empty): Long = { - if (firstData.nonEmpty) first.addData(firstData) - if (secondData.nonEmpty) second.addData(secondData) - union.processAllAvailable() - // add a dummy batch so lastExecution has the new watermark - first.addData(0) - union.processAllAvailable() - // get last watermark - val lastExecution = union.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution - lastExecution.offsetSeqMetadata.batchWatermarkMs - } + withTempDir { checkpointDir => + val unionWriter = firstDf.union(secondDf).agg(sum('value)) + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .format("memory") + .outputMode("complete") + .queryName("test") + + val union = unionWriter.start() + + def getWatermarkAfterData( + firstData: Seq[Int] = Seq.empty, + secondData: Seq[Int] = Seq.empty, + query: StreamingQuery = union): Long = { + if (firstData.nonEmpty) first.addData(firstData) + if (secondData.nonEmpty) second.addData(secondData) + query.processAllAvailable() + // add a dummy batch so lastExecution has the new watermark + first.addData(0) + query.processAllAvailable() + // get last watermark + val lastExecution = query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution + lastExecution.offsetSeqMetadata.batchWatermarkMs + } - // Global watermark starts at 0 until we get data from both sides - assert(getWatermarkAfterData(firstData = Seq(11)) == 0) - assert(getWatermarkAfterData(secondData = Seq(6)) == 1000) - // Global watermark stays at left watermark 1 when right watermark moves to 2 - assert(getWatermarkAfterData(secondData = Seq(8)) == 1000) - // Global watermark switches to right side value 2 when left watermark goes higher - assert(getWatermarkAfterData(firstData = Seq(21)) == 3000) - // Global watermark goes back to left - assert(getWatermarkAfterData(secondData = Seq(17, 28, 39)) == 11000) - // Global watermark stays on left as long as it's below right - assert(getWatermarkAfterData(firstData = Seq(31)) == 21000) - assert(getWatermarkAfterData(firstData = Seq(41)) == 31000) - // Global watermark switches back to right again - assert(getWatermarkAfterData(firstData = Seq(51)) == 34000) - - // Global watermark is updated correctly with simultaneous data from both sides - assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 90000) - assert(getWatermarkAfterData(firstData = Seq(120), secondData = Seq(110)) == 105000) - assert(getWatermarkAfterData(firstData = Seq(130), secondData = Seq(125)) == 120000) - - // Global watermark doesn't decrement with simultaneous data - assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 120000) - assert(getWatermarkAfterData(firstData = Seq(140), secondData = Seq(100)) == 120000) - assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(135)) == 130000) + // Global watermark starts at 0 until we get data from both sides + assert(getWatermarkAfterData(firstData = Seq(11)) == 0) + assert(getWatermarkAfterData(secondData = Seq(6)) == 1000) + // Global watermark stays at left watermark 1 when right watermark moves to 2 + assert(getWatermarkAfterData(secondData = Seq(8)) == 1000) + // Global watermark switches to right side value 2 when left watermark goes higher + assert(getWatermarkAfterData(firstData = Seq(21)) == 3000) + // Global watermark goes back to left + assert(getWatermarkAfterData(secondData = Seq(17, 28, 39)) == 11000) + // Global watermark stays on left as long as it's below right + assert(getWatermarkAfterData(firstData = Seq(31)) == 21000) + assert(getWatermarkAfterData(firstData = Seq(41)) == 31000) + // Global watermark switches back to right again + assert(getWatermarkAfterData(firstData = Seq(51)) == 34000) + + // Global watermark is updated correctly with simultaneous data from both sides + assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 90000) + assert(getWatermarkAfterData(firstData = Seq(120), secondData = Seq(110)) == 105000) + assert(getWatermarkAfterData(firstData = Seq(130), secondData = Seq(125)) == 120000) + + // Global watermark doesn't decrement with simultaneous data + assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 120000) + assert(getWatermarkAfterData(firstData = Seq(140), secondData = Seq(100)) == 120000) + assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(135)) == 130000) + + // Global watermark recovers after restart, but left side watermark ahead of it does not. + assert(getWatermarkAfterData(firstData = Seq(200), secondData = Seq(190)) == 185000) + union.stop() + val union2 = unionWriter.start() + assert(getWatermarkAfterData(query = union2) == 185000) + // Even though the left side was ahead of 185000 in the last execution, the watermark won't + // increment until it gets past it in this execution. + assert(getWatermarkAfterData(secondData = Seq(200), query = union2) == 185000) + assert(getWatermarkAfterData(firstData = Seq(200), query = union2) == 190000) + } } test("complete mode") {