-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-19378][SS] Ensure continuity of stateOperator and eventTime metrics even if there is no new data in trigger #16716
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| // Should emit new progresses every 10 ms, but we could be facing a slow Jenkins | ||
| eventually(timeout(1 minute)) { | ||
| val nextProgress = query.lastProgress | ||
| assert(nextProgress.timestamp !== progress.timestamp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explicitly verify that this progress has no data?
|
|
||
| /** | ||
| * Extract statistics about stateful operators from the executed query plan. | ||
| * SPARK-19378: Still report stateOperator metrics even though no data was processed while |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does not make sense to have jira numbers in a methods scala docs. Just state what it does.
tdas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approach looks good, but need some cleanup and better test.
|
|
||
| /** | ||
| * Extract statistics about event time from the executed query plan. | ||
| * SPARK-19378: Still report eventTime metrics even though no data was processed while |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
| val nextProgress = query.lastProgress | ||
| assert(nextProgress.timestamp !== progress.timestamp) | ||
| assert(progress.eventTime.size() > 1) | ||
| assert(progress.stateOperators.length > 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are not verifying that that the metric values are as expected.
| * SPARK-19378: Still report eventTime metrics even though no data was processed while | ||
| * reporting progress. | ||
| */ | ||
| private def extractEventTimeStats(watermarkTs: Map[String, String]): Map[String, String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it does not make sense for this method to take this watermarkTs as a param. its not extracting event time states from watermark ts, its just appending it. Then why not just return empty map, and do the appending outside? Or do the extraction of watermark inside the function as well.
|
@tdas Addressed |
|
Test build #72061 has finished for PR 16716 at commit
|
|
Test build #72059 has finished for PR 16716 at commit
|
| val nextProgress = query.lastProgress | ||
| assert(nextProgress.timestamp !== progress.timestamp) | ||
| assert(nextProgress.numInputRows === 0) | ||
| assert(nextProgress.eventTime.get("min") === "2017-01-26 01:00:00") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not make sense. if there is no data in the last trigger, the min, max, avg timestamps cannot be different.
and what about the watermark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh shoot. I should definitely leave those out because they are trigger specific right?
I should only keep the stateOperator part
|
Test build #72064 has finished for PR 16716 at commit
|
| assert(nextProgress.timestamp !== progress.timestamp) | ||
| assert(nextProgress.numInputRows === 0) | ||
| assert(nextProgress.stateOperators.head.numRowsTotal === 2) | ||
| assert(nextProgress.stateOperators.head.numRowsTotal === 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this line twice?
|
@tdas addressed |
|
Test build #72168 has finished for PR 16716 at commit
|
…trics even if there is no new data in trigger In StructuredStreaming, if a new trigger was skipped because no new data arrived, we suddenly report nothing for the metrics `stateOperator`. We could however easily report the metrics from `lastExecution` to ensure continuity of metrics. Regression test in `StreamingQueryStatusAndProgressSuite` Author: Burak Yavuz <[email protected]> Closes #16716 from brkyvz/state-agg. (cherry picked from commit 081b7ad) Signed-off-by: Tathagata Das <[email protected]>
…trics even if there is no new data in trigger ## What changes were proposed in this pull request? In StructuredStreaming, if a new trigger was skipped because no new data arrived, we suddenly report nothing for the metrics `stateOperator`. We could however easily report the metrics from `lastExecution` to ensure continuity of metrics. ## How was this patch tested? Regression test in `StreamingQueryStatusAndProgressSuite` Author: Burak Yavuz <[email protected]> Closes apache#16716 from brkyvz/state-agg.
What changes were proposed in this pull request?
In StructuredStreaming, if a new trigger was skipped because no new data arrived, we suddenly report nothing for the metrics
stateOperator. We could however easily report the metrics fromlastExecutionto ensure continuity of metrics.How was this patch tested?
Regression test in
StreamingQueryStatusAndProgressSuite