-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-22017] Take minimum of all watermark execs in StreamExecution. #19239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| val runId: UUID, | ||
| val currentBatchId: Long, | ||
| offsetSeqMetadata: OffsetSeqMetadata) | ||
| private[sql] val offsetSeqMetadata: OffsetSeqMetadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just make it val. Anything inside sql.execution does not show up in docs, and therefore we just keep to val so that we can debug when we need to dig deep.
| * timestamp present here will be used as the overall query watermark in offsetSeqMetadata; | ||
| * the query watermark is what's logged and used to age out old state. | ||
| */ | ||
| protected var attributeWatermarkMsMap: AttributeMap[Long] = AttributeMap(Seq()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is needed only inside a single function right? so make it a local variable.
even better, if you dont have to make it a var, make it a val in the usual functional way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This map has to persist and get updated across batches, and I'm not sure how to do that with a local variable or a val.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I was mistaken. However in that case, use a mutable.HashMap instead of var. That the code style we use with scala is not to use vars unless absolutely needed.
| if (hasNewData) { | ||
| var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs | ||
| // Update the eventTime watermark if we find one in the plan. | ||
| // Update the eventTime watermarks if we find any in the plan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its still a single watermark that is being updated. it just happens to be updated using multiple watermarks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, we're updating multiple watermarks in the map. We later update offsetSeqMetadata with the new minimum one, but that's not in this block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I had made this comment when i was thinking that we dont need the mutable map. Ignore this.
| == watermark) | ||
| } | ||
|
|
||
| populateNewWatermarkFromData(first, 11) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if these are always used together, then these functions can be merged .. right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I think you can use the testStream..AddData... AssertOnQuery pattern. its cleaner.
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala#L180
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that watermark recalculation happens at the beginning of each batch, and to sequence executions I have to call CheckData or CheckLastBatch. So that method ends up producing a test multiple times longer, since a single entry is:
AddData(realData)
CheckLastBatch
AddData(0)
CheckLastBatch
AssertOnQuery
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. what you have is better.
|
|
||
| val second = MemoryStream[Int] | ||
|
|
||
| val secondAggregation = second.toDF() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no aggregation here.
| union.processAllAvailable() | ||
| } | ||
|
|
||
| def assertQueryWatermark(watermark: Int): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: assertWatermark.
| } | ||
|
|
||
| def assertQueryWatermark(watermark: Int): Unit = { | ||
| assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line break is hard to read. how about you break it with an intermediate variable (e.g. val lastExecution = ... ; assert(...)
|
|
||
| val secondAggregation = second.toDF() | ||
| .withColumn("eventTime", $"value".cast("timestamp")) | ||
| .withWatermark("eventTime", "10 seconds") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update the test to have different watermark delays, so that we test that we are choosing min delay, but the min watermark?
| populateNewWatermarkFromData(first, 11) | ||
| assertQueryWatermark(1000) | ||
|
|
||
| // Watermark stays at 1 from the left when right watermark moves to 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Watermark to "global watermark" .. to differentiate from "right watermark" later in the sentence.
|
Test build #81797 has finished for PR 19239 at commit
|
|
addressed comments |
|
Test build #81803 has finished for PR 19239 at commit
|
| /** | ||
| * A map of current watermarks, keyed by the position of the watermark operator in the | ||
| * physical plan. The minimum watermark timestamp present here will be used and persisted as the | ||
| * query's watermark when preparing each batch, so it's ok that this val isn't fault-tolerant. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it clear that "this state is 'soft state' that does not affect the correctness and semantic guarantees of watermarks."
| logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") | ||
| e.eventTimeStats.value.max - e.delayMs | ||
| }.headOption.foreach { newWatermarkMs => | ||
| val newAttributeWatermarkMs = e.eventTimeStats.value.max - e.delayMs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the e or index in the log debug to differentiate between different operators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also... newAttributeWatermarkMs -> newWatermarkMs
why "attribute"?
| e.eventTimeStats.value.max - e.delayMs | ||
| }.headOption.foreach { newWatermarkMs => | ||
| val newAttributeWatermarkMs = e.eventTimeStats.value.max - e.delayMs | ||
| val mappedWatermarkMs: Option[Long] = watermarkMsMap.get(index) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Option[Long] is not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mappedWatermarkMs -> previousWatermarkMs or prevWatermarkMs
more semantically meaningful.
| case _ => | ||
| } | ||
|
|
||
| // Update the query watermark to the minimum of all attribute watermarks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a little bit more docs saying that this is the safe thing to do as watermark guarantees .....
| assert(lastExecution.offsetSeqMetadata.batchWatermarkMs == watermark) | ||
| } | ||
|
|
||
| generateAndAssertNewWatermark(first, Seq(11), 1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of having two variations for handing single input and double inputs, you can do something like this.
def getWatermarkAfterData(firstData: Seq[Int] = Seq.empty, secondData: Seq[Int] = Seq.empty): Long = {
if (firstData.nonEmpty) first.add(firstData)
if (secondData.nonEmpty) second.add(secondData)
union.processAllAvailable()
// add a dummy batch so lastExecution has the new watermark
first.addData(0)
union.processAllAvailable()
// get updated watermark
val lastExecution = union.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
lastExecution.offsetSeqMetadata.batchWatermarkMs
}
assert(getWatermarkAfterData(firstData = Seq(...)) === 10000)
assert(getWatermarkAfterData(secondData = Seq(...)) === 10000)
assert(getWatermarkAfterData(firstData = Seq(...), secondData = Seq(...)) === 10000)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, why is the first watermark at 1000?
|
Test build #81814 has finished for PR 19239 at commit
|
| // Global watermark doesn't decrement with simultaneous data | ||
| assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 120000) | ||
| assert(getWatermarkAfterData(firstData = Seq(140), secondData = Seq(100)) == 120000) | ||
| assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(135)) == 130000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test recovery of the minimum after a restart.
|
Test build #81832 has finished for PR 19239 at commit
|
|
Test build #81837 has finished for PR 19239 at commit
|
|
thank you @Joseph-Torres merged to master. |
What changes were proposed in this pull request?
Take the minimum of all watermark exec nodes as the "real" watermark in StreamExecution, rather than picking one arbitrarily.
How was this patch tested?
new unit test