-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-16963] [STREAMING] [SQL] Changes to Source trait and related implementation classes #14553
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
These changes are now ready for review. The contents of this PR pass regression tests on my machines. Can one of the committers please start a Jenkins build? |
| */ | ||
| def getLatest(): Option[(Long, T)] | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra blank line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in my local copy.
…963. Also addressed minor review comments.
|
@ScrapCodes, would you mind triggering a build of this PR? |
|
ok to test |
|
retest this please |
|
I have tested the PR with my MQTT connector. Looks like I do not have sufficient privilege to command jenkins. |
|
ok to test |
|
Test build #65062 has finished for PR 14553 at commit
|
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except one nit.
| offsetLog.purge(currentBatchId) | ||
| // the batch before the previous batch, and it is safe to discard the old metadata. | ||
| // Note that purge is exclusive, i.e. it purges everything before the target ID. | ||
| offsetLog.purge(currentBatchId - 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this can be offsetLog.purge(currentBatchId), it's exclusive, then you can revert changes to StreamingQuerySuite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can move this change to another JIRA if you'd like, but we really should change currentBatchId to currentBatchId - 1 at some point. The call to offsetLog.purge(currentBatchId), which I introduced in my PR for SPARK-17513, contains a subtle bug. The recovery logic in populateStartOffsets() reads the last and second-to-last entries in offsetLog. populateStartOffsets() uses those entries to populate availableOffsets and committedOffsets, respectively. Calling offsetLog.purge(currentBatchId) at line 350/366 results in the offsetLog being truncated to one entry, which in turn results in committedOffsets being left empty on recovery, which in turn causes the first call to getBatch() for any source to have None as its first argument. Sources that do not prune buffered data in their commit() methods will return a previously committed data in response to such a getBatch() call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Thanks for your clarifying.
|
Test build #67350 has finished for PR 14553 at commit
|
|
retest this please |
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM pending tests
|
Test build #67463 has finished for PR 14553 at commit
|
|
@frreiss you need to reset |
| } | ||
|
|
||
| override def commit(end: Offset): Unit = synchronized { | ||
| if (end.isInstanceOf[LongOffset]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
end match {
case newOffset: LongOffset =>
...
case _ => sys.error(...)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected in my local copy.
| lastOffsetCommitted = newOffset | ||
| } else { | ||
| sys.error(s"MemoryStream.commit() received an offset ($end) that did not originate with " + | ||
| s"an instance of this class") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected in my local copy.
|
LGTM as well! |
|
@frreiss any update? |
|
Updated the branch and addressed new review comments. Looks like my last push missed a one-line change to memory.scala. Tests are running now. |
|
Test build #67603 has finished for PR 14553 at commit
|
|
LGMT. Merging to master and 2.0. Thanks! |
…lementation classes ## What changes were proposed in this pull request? This PR contains changes to the Source trait such that the scheduler can notify data sources when it is safe to discard buffered data. Summary of changes: * Added a method `commit(end: Offset)` that tells the Source that is OK to discard all offsets up `end`, inclusive. * Changed the semantics of a `None` value for the `getBatch` method to mean "from the very beginning of the stream"; as opposed to "all data present in the Source's buffer". * Added notes that the upper layers of the system will never call `getBatch` with a start value less than the last value passed to `commit`. * Added a `lastCommittedOffset` method to allow the scheduler to query the status of each Source on restart. This addition is not strictly necessary, but it seemed like a good idea -- Sources will be maintaining their own persistent state, and there may be bugs in the checkpointing code. * The scheduler in `StreamExecution.scala` now calls `commit` on its stream sources after marking each batch as complete in its checkpoint. * `MemoryStream` now cleans committed batches out of its internal buffer. * `TextSocketSource` now cleans committed batches from its internal buffer. ## How was this patch tested? Existing regression tests already exercise the new code. Author: frreiss <[email protected]> Closes #14553 from frreiss/fred-16963. (cherry picked from commit 5b27598) Signed-off-by: Shixiong Zhu <[email protected]>
…ion" ## What changes were proposed in this pull request? A follow up PR for #14553 to fix the flaky test. It's flaky because the file list API doesn't guarantee any order of the return list. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #15661 from zsxwing/fix-StreamingQuerySuite.
…ion" ## What changes were proposed in this pull request? A follow up PR for #14553 to fix the flaky test. It's flaky because the file list API doesn't guarantee any order of the return list. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #15661 from zsxwing/fix-StreamingQuerySuite. (cherry picked from commit 79fd0cc) Signed-off-by: Shixiong Zhu <[email protected]>
…lementation classes ## What changes were proposed in this pull request? This PR contains changes to the Source trait such that the scheduler can notify data sources when it is safe to discard buffered data. Summary of changes: * Added a method `commit(end: Offset)` that tells the Source that is OK to discard all offsets up `end`, inclusive. * Changed the semantics of a `None` value for the `getBatch` method to mean "from the very beginning of the stream"; as opposed to "all data present in the Source's buffer". * Added notes that the upper layers of the system will never call `getBatch` with a start value less than the last value passed to `commit`. * Added a `lastCommittedOffset` method to allow the scheduler to query the status of each Source on restart. This addition is not strictly necessary, but it seemed like a good idea -- Sources will be maintaining their own persistent state, and there may be bugs in the checkpointing code. * The scheduler in `StreamExecution.scala` now calls `commit` on its stream sources after marking each batch as complete in its checkpoint. * `MemoryStream` now cleans committed batches out of its internal buffer. * `TextSocketSource` now cleans committed batches from its internal buffer. ## How was this patch tested? Existing regression tests already exercise the new code. Author: frreiss <[email protected]> Closes apache#14553 from frreiss/fred-16963.
…ion" ## What changes were proposed in this pull request? A follow up PR for apache#14553 to fix the flaky test. It's flaky because the file list API doesn't guarantee any order of the return list. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes apache#15661 from zsxwing/fix-StreamingQuerySuite.
…lementation classes ## What changes were proposed in this pull request? This PR contains changes to the Source trait such that the scheduler can notify data sources when it is safe to discard buffered data. Summary of changes: * Added a method `commit(end: Offset)` that tells the Source that is OK to discard all offsets up `end`, inclusive. * Changed the semantics of a `None` value for the `getBatch` method to mean "from the very beginning of the stream"; as opposed to "all data present in the Source's buffer". * Added notes that the upper layers of the system will never call `getBatch` with a start value less than the last value passed to `commit`. * Added a `lastCommittedOffset` method to allow the scheduler to query the status of each Source on restart. This addition is not strictly necessary, but it seemed like a good idea -- Sources will be maintaining their own persistent state, and there may be bugs in the checkpointing code. * The scheduler in `StreamExecution.scala` now calls `commit` on its stream sources after marking each batch as complete in its checkpoint. * `MemoryStream` now cleans committed batches out of its internal buffer. * `TextSocketSource` now cleans committed batches from its internal buffer. ## How was this patch tested? Existing regression tests already exercise the new code. Author: frreiss <[email protected]> Closes apache#14553 from frreiss/fred-16963.
…ion" ## What changes were proposed in this pull request? A follow up PR for apache#14553 to fix the flaky test. It's flaky because the file list API doesn't guarantee any order of the return list. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes apache#15661 from zsxwing/fix-StreamingQuerySuite.
What changes were proposed in this pull request?
This PR contains changes to the Source trait such that the scheduler can notify data sources when it is safe to discard buffered data. Summary of changes:
commit(end: Offset)that tells the Source that is OK to discard all offsets upend, inclusive.Nonevalue for thegetBatchmethod to mean "from the very beginning of the stream"; as opposed to "all data present in the Source's buffer".getBatchwith a start value less than the last value passed tocommit.lastCommittedOffsetmethod to allow the scheduler to query the status of each Source on restart. This addition is not strictly necessary, but it seemed like a good idea -- Sources will be maintaining their own persistent state, and there may be bugs in the checkpointing code.StreamExecution.scalanow callscommiton its stream sources after marking each batch as complete in its checkpoint.MemoryStreamnow cleans committed batches out of its internal buffer.TextSocketSourcenow cleans committed batches from its internal buffer.How was this patch tested?
Existing regression tests already exercise the new code.