-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-32885][SS] Add DataStreamReader.table API #29756
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| case NonSessionCatalogAndIdentifier(catalog, ident) => | ||
| CatalogV2Util.loadTable(catalog, ident) match { | ||
| case Some(table) => | ||
| Some(StreamingRelationV2( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the refactory of StreamingRelationV2(#29633), we can directly create it in the catalyst.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we just add a isStreaming flag in lookupV2Relation, to unify the code a bit more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, done in 305c316
| private def getStreamingRelation( | ||
| table: CatalogTable, | ||
| extraOptions: CaseInsensitiveStringMap): StreamingRelation = { | ||
| val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, table) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep the same behavior with DataFrameReader.table on respecting options(#29712)
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
Outdated
Show resolved
Hide resolved
|
Test build #128699 has finished for PR 29756 at commit
|
|
Test build #128701 has finished for PR 29756 at commit
|
|
Thanks for the patch. I was also about to find the missing spot (as I commented in #29715) and this PR looks to fulfill the needs. I'll take a look soon. |
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll review once there're enough tests to cover the functionality, as that would save plenty of times on reviewing.
...rc/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
Outdated
Show resolved
Hide resolved
|
Btw to expand the tests you may also want to have the changes on InMemoryTable. Would it be good for you to go through my PR first and rebase, or let me extract the part to the separate PR? |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Outdated
Show resolved
Hide resolved
| child.collect { | ||
| // Disallow creating permanent views based on temporary views. | ||
| case UnresolvedRelation(nameParts, _) if catalog.isTempView(nameParts) => | ||
| case UnresolvedRelation(nameParts, _, _) if catalog.isTempView(nameParts) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isStreaming = false only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can create a temp view based on streaming relation, so it should be kept as a full match?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
| * Define a Streaming DataFrame on a Table. The DataSource corresponding to the table should | ||
| * support streaming mode. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the data source doesn't support, what will happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will check in MicroBatchExecution/ContinuousExecution if the data source doesn't support.
| } | ||
| } | ||
|
|
||
| class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you move this to a new file when you add more tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, adding more tests now.
@HeartSaVioR Sure, let's go through the writer's side first. I think it's ok to rebase or resolve conflicts. |
| def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { | ||
| case u @ UnresolvedRelation(ident, _) => | ||
| case u @ UnresolvedRelation(ident, _, _) => | ||
| lookupTempView(ident).getOrElse(u) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we fail if the temp view is not a streaming plan but the isStreaming flag is true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably vice versa.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about the other way around. We have SparkSession.table, which can read both batch or streaming temp views. We shouldn't break it.
It's a bit weird if DataFramaReader.table can read streaming temp view, but this is the existing behavior and probably is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, is it end user's responsibility to know about whether the temp view is from batch or streaming, so that they can correctly call write or writeStream? Without thinking of SparkSession.table I assume it's clear as end user will match the reader side and writer side clearly (read/write or readStream/writeStream), and it looks a bit confusing.
If DataFrameReader.table allows streaming temp view, then I guess read/writeStream pair is possible which is a bit confusing. (or does it change the plan to the batch one magically?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's less confusing if DataFrameReader.table fails on reading streaming temp view while SparkSession.table works. But it's a breaking change and we at least shouldn't do it in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I agree it should be better to fix in different PR. I still think we shouldn't support confused things just because we supported them. The fix would depend on change of the PR (isStreaming flag) - I'll wait for this PR and try to fix it after the PR.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
Outdated
Show resolved
Hide resolved
| ) | ||
| } | ||
|
|
||
| test("stream table API support") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move DataStreamTableAPISuite to a new file and move this test to there as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, done in 9004fba
|
Test build #128820 has finished for PR 29756 at commit
|
|
Test build #128931 has finished for PR 29756 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
Show resolved
Hide resolved
03a91f8 to
fad1976
Compare
|
Test build #129019 has finished for PR 29756 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
Outdated
Show resolved
Hide resolved
...core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
Outdated
Show resolved
Hide resolved
|
Test build #129020 has finished for PR 29756 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala
Outdated
Show resolved
Hide resolved
cloud-fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except some minor comments
|
Test build #129062 has finished for PR 29756 at commit
|
|
Test build #129069 has finished for PR 29756 at commit
|
|
thanks, merging to master! |
What changes were proposed in this pull request?
This pr aims to add a new
tableAPI in DataStreamReader, which is similar to the table API in DataFrameReader.Why are the changes needed?
Users can directly use this API to get a Streaming DataFrame on a table. Below is a simple example:
Application 1 for initializing and starting the streaming job:
Application 2 for appending new data:
Check result:
Does this PR introduce any user-facing change?
Yes, a new API added.
How was this patch tested?
New UT added and integrated testing.