-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-23533][SS] Add support for changing ContinuousDataReader's startOffset #20689
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #87758 has finished for PR 20689 at commit
|
|
retest this please |
|
Test build #87767 has finished for PR 20689 at commit
|
| * @param offset offset want to set as the DataReader's startOffset. | ||
| */ | ||
| default DataReader<T> createDataReaderWithOffset(PartitionOffset offset) { | ||
| throw new IllegalStateException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if we want a default here - it seems like subclasses should always be able to provide an implementation, and thus that we should always require them to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to make this one just abstract.
|
@rdblue - this is a streaming DataSourceV2 API change specific to continuous processing (SPIP SPARK-20928). We're still iterating towards a solution that makes continuous processing compatible with all the existing Spark operations, so we don't have a full formal description of the API surface yet. |
|
Test build #87811 has finished for PR 20689 at commit
|
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments. Otherwise LGTM
|
|
||
| override def createDataReaderWithOffset(offset: PartitionOffset): DataReader[UnsafeRow] = { | ||
| val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset] | ||
| assert(kafkaOffset.topicPartition == topicPartition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may happen. I prefer to use require like this:
require(kafkaOffset.topicPartition == topicPartition, s"expected: $topicPartition actual: ${kafkaOffset.topicPartition}")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
|
|
||
| override def createDataReaderWithOffset(offset: PartitionOffset): DataReader[Row] = { | ||
| val rateStreamOffset = offset.asInstanceOf[RateStreamPartitionOffset] | ||
| assert(rateStreamOffset.partition == partitionIndex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| * @param offset offset want to set as the DataReader's startOffset. | ||
| */ | ||
| default DataReader<T> createDataReaderWithOffset(PartitionOffset offset) { | ||
| throw new IllegalStateException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to make this one just abstract.
|
LGTM |
|
Test build #88253 has finished for PR 20689 at commit
|
|
Thanks! Merging to master. |
…rtOffset ## What changes were proposed in this pull request? As discussion in apache#20675, we need add a new interface `ContinuousDataReaderFactory` to support the requirements of setting start offset in Continuous Processing. ## How was this patch tested? Existing UT. Author: Yuanjian Li <[email protected]> Closes apache#20689 from xuanyuanking/SPARK-23533.
…rtOffset ## What changes were proposed in this pull request? As discussion in apache#20675, we need add a new interface `ContinuousDataReaderFactory` to support the requirements of setting start offset in Continuous Processing. ## How was this patch tested? Existing UT. Author: Yuanjian Li <[email protected]> Closes apache#20689 from xuanyuanking/SPARK-23533. RB=1844647 G=superfriends-reviewers R=mshen,fli,zolin,yezhou,latang A=
What changes were proposed in this pull request?
As discussion in #20675, we need add a new interface
ContinuousDataReaderFactoryto support the requirements of setting start offset in Continuous Processing.How was this patch tested?
Existing UT.