-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-26379][SS] Fix issue on adding current_timestamp/current_date to streaming query #23609
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can even leverage newBatchesPlan and remove this: this effort is to ensure going through the same path with further transformation when extracting schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR . I prefer to leverage the existing lines (501 ~ 509) as you said like the following. I assume that the following is what you mean. Eventually, it's two line changes (excluding comments). WDYT?
// Rewire the plan to use the new attributes that were returned by the source.
val newAttributePlan = newBatchesPlan transformAllExpressions {
case ct: CurrentTimestamp =>
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
- ct.dataType)
+ ct.dataType).toLiteral
case cd: CurrentDate =>
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
- cd.dataType, cd.timeZoneId)
+ cd.dataType, cd.timeZoneId).toLiteral
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun
Actually I meant we pass newBatchesPlan instead of newAttributePlan only for extracting schema since we actually guess the name and data type will not be changed from IncrementalExecution. But IMHO I feel safer pre-transforming it.
Transforming CurrentBatchTimestamp to Literal not in IncrementalExecution breaks the intention what CurrentBatchTimestamp javadoc describes. Please note that I used transformed plan only for extracting schema and the actual plan is not changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
dc09c99 to
afb0dc4
Compare
|
Test build #101515 has finished for PR 23609 at commit
|
|
Test build #101516 has finished for PR 23609 at commit
|
|
Seems like it's relevant issue:
|
|
@gaborgsomogyi |
gaborgsomogyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tested with spark-shell as well and works fine.
| } | ||
| } | ||
|
|
||
| Seq(true, false).foreach { useV2Sink => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This structure makes it impossible to execute test independently (at least I don't know). The whole suite has to be executed which makes even small modifications time consuming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RIght. That was to minimize the code, but I guess only some lines will be added and it will become easier to test so better to apply your suggestion. Thanks!
| } | ||
|
|
||
| Seq(true, false).foreach { useV2Sink => | ||
| import org.apache.spark.sql.functions._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already imported.
| row.getTimestamp(1).getTime | ||
| } | ||
|
|
||
| var lastTimestamp = -1L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not much functional difference but maybe this could be now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with this.
|
|
||
| test(newTestName) { | ||
| val input = MemoryStream[Int] | ||
| val df = input.toDS().withColumn("cur_timestamp", lit(current_timestamp())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I see this covers CurrentTimestamp, maybe worth to add test for CurrentDate as well.
gaborgsomogyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
| CheckLastBatch { rows: Seq[Row] => | ||
| lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, currentDate, 1) | ||
| }, | ||
| Execute { _ => Thread.sleep(3 * 1000) }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No strong opinion but spending 6 seconds for 2 tests maybe overkill.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah good suggestion. 1s would be enough.
|
Test build #101525 has finished for PR 23609 at commit
|
|
retest this please |
|
Test build #101517 has finished for PR 23609 at commit
|
|
Test build #101527 has finished for PR 23609 at commit
|
|
retest this please |
|
Test build #101535 has finished for PR 23609 at commit
|
|
Test build #101537 has finished for PR 23609 at commit
|
|
Hi, @srowen . Could you review this please? |
|
I honestly don't know anything about this part of the code. Eh, would @rdblue have a comment as it sorta concerns the v2 DataSource? |
|
@HeartSaVioR , the changes looks good, but I am not very sure how the optimizer works. Did you check if the |
|
@arunmahadevan That's why I don't transform actual plan and have temporal plan which is used just for extracting schema. Actual plan is not changed. |
|
Makes sense, LGTM |
|
It sounds like this is related to DSv2 only because v2 gets the schema of a logical plan, which previously contained unresolved nodes. If the fix is to resolve the nodes, then this should be fine. If the fix were to change how v2 works and avoid some call to get the schema of a plan, then I would be concerned. |
This patch is to resolve the nodes prior to get a schema (via having temporary plan), while it doesn't touch the actual plan so that it would work as before. Btw, I agree DSv2 is a moving target, but the bug is affecting not only for master branch but also for 2.4/2.3 as well. So I hope we get it resolved first, and fix it (again) when new DSv2 is concerning. |
|
Retest this please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM (pending Jenkins). This is a safe and clean fix to get only the schema.
Also, we had better have this test coverage during DSv2 transition.
|
Thanks all for reviewing and thanks Dongjoon for merging! |
|
@dongjoon-hyun Given the bug affects 2.3 and 2.4 version lines as well, would we be better to have backport PRs as well? |
…to streaming query ## What changes were proposed in this pull request? This patch proposes to fix issue on adding `current_timestamp` / `current_date` with streaming query. The root reason is that Spark transforms `CurrentTimestamp`/`CurrentDate` to `CurrentBatchTimestamp` in MicroBatchExecution which makes transformed attributes not-yet-resolved. They will be resolved by IncrementalExecution. (In ContinuousExecution, Spark doesn't allow using `current_timestamp` and `current_date` so it has been OK.) It's OK for DataSource V1 sink because it simply leverages transformed logical plan and don't evaluate until they're resolved, but for DataSource V2 sink, Spark tries to extract the schema of transformed logical plan in prior to IncrementalExecution, and unresolved attributes will raise errors. This patch fixes the issue via having separate pre-resolved logical plan to pass the schema to StreamingWriteSupport safely. ## How was this patch tested? Added UT. Closes #23609 from HeartSaVioR/SPARK-26379. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
|
It's merged to branch-2.4 already. It took some time for me to test manually on |
|
For |
…to streaming query ## What changes were proposed in this pull request? This patch proposes to fix issue on adding `current_timestamp` / `current_date` with streaming query. The root reason is that Spark transforms `CurrentTimestamp`/`CurrentDate` to `CurrentBatchTimestamp` in MicroBatchExecution which makes transformed attributes not-yet-resolved. They will be resolved by IncrementalExecution. (In ContinuousExecution, Spark doesn't allow using `current_timestamp` and `current_date` so it has been OK.) It's OK for DataSource V1 sink because it simply leverages transformed logical plan and don't evaluate until they're resolved, but for DataSource V2 sink, Spark tries to extract the schema of transformed logical plan in prior to IncrementalExecution, and unresolved attributes will raise errors. This patch fixes the issue via having separate pre-resolved logical plan to pass the schema to StreamingWriteSupport safely. ## How was this patch tested? Added UT. Closes apache#23609 from HeartSaVioR/SPARK-26379. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
|
Sure, I'll craft it quickly. Thanks @dongjoon-hyun! |
|
Ur, @HeartSaVioR . The patch resolved the original reported issue, but the test case seems to be not enough to reproduce it properly. Could you check the test case once more? |
|
Ah that's odd. I'll see whether test case is valid again and fix if not. Thanks for reporting. (Maybe later today or not today, given that I have some errands...) |
|
Sorry, guys. I misanalyzed the root cause. I'll make a followup PR. |
| val input = MemoryStream[Int] | ||
| val df = input.toDS() | ||
| .withColumn("cur_timestamp", lit(current_timestamp())) | ||
| .withColumn("cur_date", lit(current_date())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, this hides current_timestamp and makes this test case succeeds always.
| case cd: CurrentDate => | ||
| CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, | ||
| cd.dataType, cd.timeZoneId) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The root cause is CurrentBatchTimestamp is TimeZoneAwareExpression which is unresolved without TimeZoneId.
…xception in CurrentBatchTimestamp ## What changes were proposed in this pull request? Spark replaces `CurrentTimestamp` with `CurrentBatchTimestamp`. However, `CurrentBatchTimestamp` is `TimeZoneAwareExpression` while `CurrentTimestamp` isn't. Without TimeZoneId, `CurrentBatchTimestamp` becomes unresolved and raises `UnresolvedException`. Since `CurrentDate` is `TimeZoneAwareExpression`, there is no problem with `CurrentDate`. This PR reverts the [previous patch](#23609) on `MicroBatchExecution` and fixes the root cause. ## How was this patch tested? Pass the Jenkins with the updated test cases. Closes #23660 from dongjoon-hyun/SPARK-26379. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 1ca6b8b) Signed-off-by: Dongjoon Hyun <[email protected]>
…xception in CurrentBatchTimestamp ## What changes were proposed in this pull request? Spark replaces `CurrentTimestamp` with `CurrentBatchTimestamp`. However, `CurrentBatchTimestamp` is `TimeZoneAwareExpression` while `CurrentTimestamp` isn't. Without TimeZoneId, `CurrentBatchTimestamp` becomes unresolved and raises `UnresolvedException`. Since `CurrentDate` is `TimeZoneAwareExpression`, there is no problem with `CurrentDate`. This PR reverts the [previous patch](apache#23609) on `MicroBatchExecution` and fixes the root cause. ## How was this patch tested? Pass the Jenkins with the updated test cases. Closes apache#23660 from dongjoon-hyun/SPARK-26379. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…to streaming query ## What changes were proposed in this pull request? This patch proposes to fix issue on adding `current_timestamp` / `current_date` with streaming query. The root reason is that Spark transforms `CurrentTimestamp`/`CurrentDate` to `CurrentBatchTimestamp` in MicroBatchExecution which makes transformed attributes not-yet-resolved. They will be resolved by IncrementalExecution. (In ContinuousExecution, Spark doesn't allow using `current_timestamp` and `current_date` so it has been OK.) It's OK for DataSource V1 sink because it simply leverages transformed logical plan and don't evaluate until they're resolved, but for DataSource V2 sink, Spark tries to extract the schema of transformed logical plan in prior to IncrementalExecution, and unresolved attributes will raise errors. This patch fixes the issue via having separate pre-resolved logical plan to pass the schema to StreamingWriteSupport safely. ## How was this patch tested? Added UT. Closes apache#23609 from HeartSaVioR/SPARK-26379. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…xception in CurrentBatchTimestamp ## What changes were proposed in this pull request? Spark replaces `CurrentTimestamp` with `CurrentBatchTimestamp`. However, `CurrentBatchTimestamp` is `TimeZoneAwareExpression` while `CurrentTimestamp` isn't. Without TimeZoneId, `CurrentBatchTimestamp` becomes unresolved and raises `UnresolvedException`. Since `CurrentDate` is `TimeZoneAwareExpression`, there is no problem with `CurrentDate`. This PR reverts the [previous patch](apache#23609) on `MicroBatchExecution` and fixes the root cause. ## How was this patch tested? Pass the Jenkins with the updated test cases. Closes apache#23660 from dongjoon-hyun/SPARK-26379. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…to streaming query ## What changes were proposed in this pull request? This patch proposes to fix issue on adding `current_timestamp` / `current_date` with streaming query. The root reason is that Spark transforms `CurrentTimestamp`/`CurrentDate` to `CurrentBatchTimestamp` in MicroBatchExecution which makes transformed attributes not-yet-resolved. They will be resolved by IncrementalExecution. (In ContinuousExecution, Spark doesn't allow using `current_timestamp` and `current_date` so it has been OK.) It's OK for DataSource V1 sink because it simply leverages transformed logical plan and don't evaluate until they're resolved, but for DataSource V2 sink, Spark tries to extract the schema of transformed logical plan in prior to IncrementalExecution, and unresolved attributes will raise errors. This patch fixes the issue via having separate pre-resolved logical plan to pass the schema to StreamingWriteSupport safely. ## How was this patch tested? Added UT. Closes apache#23609 from HeartSaVioR/SPARK-26379. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…xception in CurrentBatchTimestamp ## What changes were proposed in this pull request? Spark replaces `CurrentTimestamp` with `CurrentBatchTimestamp`. However, `CurrentBatchTimestamp` is `TimeZoneAwareExpression` while `CurrentTimestamp` isn't. Without TimeZoneId, `CurrentBatchTimestamp` becomes unresolved and raises `UnresolvedException`. Since `CurrentDate` is `TimeZoneAwareExpression`, there is no problem with `CurrentDate`. This PR reverts the [previous patch](apache#23609) on `MicroBatchExecution` and fixes the root cause. ## How was this patch tested? Pass the Jenkins with the updated test cases. Closes apache#23660 from dongjoon-hyun/SPARK-26379. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 1ca6b8b) Signed-off-by: Dongjoon Hyun <[email protected]>
…to streaming query ## What changes were proposed in this pull request? This patch proposes to fix issue on adding `current_timestamp` / `current_date` with streaming query. The root reason is that Spark transforms `CurrentTimestamp`/`CurrentDate` to `CurrentBatchTimestamp` in MicroBatchExecution which makes transformed attributes not-yet-resolved. They will be resolved by IncrementalExecution. (In ContinuousExecution, Spark doesn't allow using `current_timestamp` and `current_date` so it has been OK.) It's OK for DataSource V1 sink because it simply leverages transformed logical plan and don't evaluate until they're resolved, but for DataSource V2 sink, Spark tries to extract the schema of transformed logical plan in prior to IncrementalExecution, and unresolved attributes will raise errors. This patch fixes the issue via having separate pre-resolved logical plan to pass the schema to StreamingWriteSupport safely. ## How was this patch tested? Added UT. Closes apache#23609 from HeartSaVioR/SPARK-26379. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…xception in CurrentBatchTimestamp ## What changes were proposed in this pull request? Spark replaces `CurrentTimestamp` with `CurrentBatchTimestamp`. However, `CurrentBatchTimestamp` is `TimeZoneAwareExpression` while `CurrentTimestamp` isn't. Without TimeZoneId, `CurrentBatchTimestamp` becomes unresolved and raises `UnresolvedException`. Since `CurrentDate` is `TimeZoneAwareExpression`, there is no problem with `CurrentDate`. This PR reverts the [previous patch](apache#23609) on `MicroBatchExecution` and fixes the root cause. ## How was this patch tested? Pass the Jenkins with the updated test cases. Closes apache#23660 from dongjoon-hyun/SPARK-26379. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 1ca6b8b) Signed-off-by: Dongjoon Hyun <[email protected]>
…to streaming query ## What changes were proposed in this pull request? This patch proposes to fix issue on adding `current_timestamp` / `current_date` with streaming query. The root reason is that Spark transforms `CurrentTimestamp`/`CurrentDate` to `CurrentBatchTimestamp` in MicroBatchExecution which makes transformed attributes not-yet-resolved. They will be resolved by IncrementalExecution. (In ContinuousExecution, Spark doesn't allow using `current_timestamp` and `current_date` so it has been OK.) It's OK for DataSource V1 sink because it simply leverages transformed logical plan and don't evaluate until they're resolved, but for DataSource V2 sink, Spark tries to extract the schema of transformed logical plan in prior to IncrementalExecution, and unresolved attributes will raise errors. This patch fixes the issue via having separate pre-resolved logical plan to pass the schema to StreamingWriteSupport safely. ## How was this patch tested? Added UT. Closes apache#23609 from HeartSaVioR/SPARK-26379. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…xception in CurrentBatchTimestamp ## What changes were proposed in this pull request? Spark replaces `CurrentTimestamp` with `CurrentBatchTimestamp`. However, `CurrentBatchTimestamp` is `TimeZoneAwareExpression` while `CurrentTimestamp` isn't. Without TimeZoneId, `CurrentBatchTimestamp` becomes unresolved and raises `UnresolvedException`. Since `CurrentDate` is `TimeZoneAwareExpression`, there is no problem with `CurrentDate`. This PR reverts the [previous patch](apache#23609) on `MicroBatchExecution` and fixes the root cause. ## How was this patch tested? Pass the Jenkins with the updated test cases. Closes apache#23660 from dongjoon-hyun/SPARK-26379. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 1ca6b8b) Signed-off-by: Dongjoon Hyun <[email protected]>
…xception in CurrentBatchTimestamp ## What changes were proposed in this pull request? Spark replaces `CurrentTimestamp` with `CurrentBatchTimestamp`. However, `CurrentBatchTimestamp` is `TimeZoneAwareExpression` while `CurrentTimestamp` isn't. Without TimeZoneId, `CurrentBatchTimestamp` becomes unresolved and raises `UnresolvedException`. Since `CurrentDate` is `TimeZoneAwareExpression`, there is no problem with `CurrentDate`. This PR reverts the [previous patch](apache/spark#23609) on `MicroBatchExecution` and fixes the root cause. ## How was this patch tested? Pass the Jenkins with the updated test cases. Closes #23660 from dongjoon-hyun/SPARK-26379. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 1ca6b8b) Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 448a063)
What changes were proposed in this pull request?
This patch proposes to fix issue on adding
current_timestamp/current_datewith streaming query.The root reason is that Spark transforms
CurrentTimestamp/CurrentDatetoCurrentBatchTimestampin MicroBatchExecution which makes transformed attributes not-yet-resolved. They will be resolved by IncrementalExecution.(In ContinuousExecution, Spark doesn't allow using
current_timestampandcurrent_dateso it has been OK.)It's OK for DataSource V1 sink because it simply leverages transformed logical plan and don't evaluate until they're resolved, but for DataSource V2 sink, Spark tries to extract the schema of transformed logical plan in prior to IncrementalExecution, and unresolved attributes will raise errors.
This patch fixes the issue via having separate pre-resolved logical plan to pass the schema to StreamingWriteSupport safely.
How was this patch tested?
Added UT.