-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-18657][SPARK-18668] Make StreamingQuery.id persists across restart and not auto-generate StreamingQuery.name #16113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #69538 has finished for PR 16113 at commit
|
|
Test build #69540 has finished for PR 16113 at commit
|
|
Test build #69588 has finished for PR 16113 at commit
|
|
Test build #69589 has finished for PR 16113 at commit
|
| * Time unit: milliseconds | ||
| * @param id unique id of the [[StreamingQuery]] that needs to be persisted across restarts | ||
| */ | ||
| case class StreamExecutionMetadata( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been renamed to OffsetSeqLog and moved to OffsetSeq.scala
marmbrus
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only minor comments. LGTM
| * @param batchTimestampMs: The current batch processing timestamp. | ||
| * Time unit: milliseconds | ||
| */ | ||
| case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we put this in its own file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. But its a small class and closely tied with OffsetSeq, so I thought its not worth having a separate file for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not worth moving these 6 lines of code in a new file.
| * Returns the unique id of this query that persists across restarts from checkpoint data. | ||
| * That is, this id is generated when a query is started for the first time, and | ||
| * will be the same every time it is restarted from checkpoint data. | ||
| * There can only be one query with the same id active in a Spark cluster. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes it sound like its okay to have more than one running as long as they aren't on the same spark cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will just remove that line.
| * JSON string representation of this object. | ||
| */ | ||
| def json: String = Serialization.write(this) | ||
| case class StreamMetadata(id: String) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move these to their own file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
|
Test build #69590 has finished for PR 16113 at commit
|
| StreamMetadata("d366a8bf-db79-42ca-b5a4-d9ca0a11d63e")) | ||
| } | ||
|
|
||
| private def readForResource(fileName: String): StreamMetadata = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note to self: readFromResource
|
Test build #69593 has finished for PR 16113 at commit
|
|
Test build #69594 has finished for PR 16113 at commit
|
|
Test build #69693 has started for PR 16113 at commit |
|
Test build #69695 has finished for PR 16113 at commit
|
|
Merging to master and 2.1 |
…tart and not auto-generate StreamingQuery.name
Here are the major changes in this PR.
- Added the ability to recover `StreamingQuery.id` from checkpoint location, by writing the id to `checkpointLoc/metadata`.
- Added `StreamingQuery.runId` which is unique for every query started and does not persist across restarts. This is to identify each restart of a query separately (same as earlier behavior of `id`).
- Removed auto-generation of `StreamingQuery.name`. The purpose of name was to have the ability to define an identifier across restarts, but since id is precisely that, there is no need for a auto-generated name. This means name becomes purely cosmetic, and is null by default.
- Added `runId` to `StreamingQueryListener` events and `StreamingQueryProgress`.
Implementation details
- Renamed existing `StreamExecutionMetadata` to `OffsetSeqMetadata`, and moved it to the file `OffsetSeq.scala`, because that is what this metadata is tied to. Also did some refactoring to make the code cleaner (got rid of a lot of `.json` and `.getOrElse("{}")`).
- Added the `id` as the new `StreamMetadata`.
- When a StreamingQuery is created it gets or writes the `StreamMetadata` from `checkpointLoc/metadata`.
- All internal logging in `StreamExecution` uses `(name, id, runId)` instead of just `name`
TODO
- [x] Test handling of name=null in json generation of StreamingQueryProgress
- [x] Test handling of name=null in json generation of StreamingQueryListener events
- [x] Test python API of runId
Updated unit tests and new unit tests
Author: Tathagata Das <[email protected]>
Closes #16113 from tdas/SPARK-18657.
(cherry picked from commit bb57bfe)
Signed-off-by: Tathagata Das <[email protected]>
…tart and not auto-generate StreamingQuery.name
## What changes were proposed in this pull request?
Here are the major changes in this PR.
- Added the ability to recover `StreamingQuery.id` from checkpoint location, by writing the id to `checkpointLoc/metadata`.
- Added `StreamingQuery.runId` which is unique for every query started and does not persist across restarts. This is to identify each restart of a query separately (same as earlier behavior of `id`).
- Removed auto-generation of `StreamingQuery.name`. The purpose of name was to have the ability to define an identifier across restarts, but since id is precisely that, there is no need for a auto-generated name. This means name becomes purely cosmetic, and is null by default.
- Added `runId` to `StreamingQueryListener` events and `StreamingQueryProgress`.
Implementation details
- Renamed existing `StreamExecutionMetadata` to `OffsetSeqMetadata`, and moved it to the file `OffsetSeq.scala`, because that is what this metadata is tied to. Also did some refactoring to make the code cleaner (got rid of a lot of `.json` and `.getOrElse("{}")`).
- Added the `id` as the new `StreamMetadata`.
- When a StreamingQuery is created it gets or writes the `StreamMetadata` from `checkpointLoc/metadata`.
- All internal logging in `StreamExecution` uses `(name, id, runId)` instead of just `name`
TODO
- [x] Test handling of name=null in json generation of StreamingQueryProgress
- [x] Test handling of name=null in json generation of StreamingQueryListener events
- [x] Test python API of runId
## How was this patch tested?
Updated unit tests and new unit tests
Author: Tathagata Das <[email protected]>
Closes apache#16113 from tdas/SPARK-18657.
…tart and not auto-generate StreamingQuery.name
## What changes were proposed in this pull request?
Here are the major changes in this PR.
- Added the ability to recover `StreamingQuery.id` from checkpoint location, by writing the id to `checkpointLoc/metadata`.
- Added `StreamingQuery.runId` which is unique for every query started and does not persist across restarts. This is to identify each restart of a query separately (same as earlier behavior of `id`).
- Removed auto-generation of `StreamingQuery.name`. The purpose of name was to have the ability to define an identifier across restarts, but since id is precisely that, there is no need for a auto-generated name. This means name becomes purely cosmetic, and is null by default.
- Added `runId` to `StreamingQueryListener` events and `StreamingQueryProgress`.
Implementation details
- Renamed existing `StreamExecutionMetadata` to `OffsetSeqMetadata`, and moved it to the file `OffsetSeq.scala`, because that is what this metadata is tied to. Also did some refactoring to make the code cleaner (got rid of a lot of `.json` and `.getOrElse("{}")`).
- Added the `id` as the new `StreamMetadata`.
- When a StreamingQuery is created it gets or writes the `StreamMetadata` from `checkpointLoc/metadata`.
- All internal logging in `StreamExecution` uses `(name, id, runId)` instead of just `name`
TODO
- [x] Test handling of name=null in json generation of StreamingQueryProgress
- [x] Test handling of name=null in json generation of StreamingQueryListener events
- [x] Test python API of runId
## How was this patch tested?
Updated unit tests and new unit tests
Author: Tathagata Das <[email protected]>
Closes apache#16113 from tdas/SPARK-18657.
What changes were proposed in this pull request?
Here are the major changes in this PR.
StreamingQuery.idfrom checkpoint location, by writing the id tocheckpointLoc/metadata.StreamingQuery.runIdwhich is unique for every query started and does not persist across restarts. This is to identify each restart of a query separately (same as earlier behavior ofid).StreamingQuery.name. The purpose of name was to have the ability to define an identifier across restarts, but since id is precisely that, there is no need for a auto-generated name. This means name becomes purely cosmetic, and is null by default.runIdtoStreamingQueryListenerevents andStreamingQueryProgress.Implementation details
StreamExecutionMetadatatoOffsetSeqMetadata, and moved it to the fileOffsetSeq.scala, because that is what this metadata is tied to. Also did some refactoring to make the code cleaner (got rid of a lot of.jsonand.getOrElse("{}")).idas the newStreamMetadata.StreamMetadatafromcheckpointLoc/metadata.StreamExecutionuses(name, id, runId)instead of justnameTODO
How was this patch tested?
Updated unit tests and new unit tests