-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-32896][SS] Add DataStreamWriter.saveAsTable API #29767
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| checkAnswer(spark.table(tableIdentifier), Seq.empty) | ||
|
|
||
| withTempDir { checkpointDir => | ||
| val exc = intercept[AnalysisException] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because file provider based table is V1 which doesn't have capability of streaming write. I hope this is OK, rather than struggling to convert it into Sink and making it work anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a new API. I'm OK with not supporting streaming v1 sink at first.
|
Test build #128736 has finished for PR 29767 at commit
|
| } else { | ||
| extraOptions + ("path" -> path.get) | ||
| } | ||
| val queryName = extraOptions.get("queryName") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The huge diff comes from refactor - I have to go with refactoring as the statement for StreamingQueryManager.startQuery() are all duplicated and I was about to add one more duplication.
The actual change is only performed for source == SOURCE_NAME_TABLE.
|
Test build #128739 has finished for PR 29767 at commit
|
|
retest this, please |
|
Test build #128751 has finished for PR 29767 at commit
|
|
retest this, please |
|
Test build #128767 has finished for PR 29767 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
Outdated
Show resolved
Hide resolved
2284a4d to
6444a1e
Compare
| case NonSessionCatalogAndIdentifier(catalog, ident) => | ||
| catalog.asTableCatalog.loadTable(ident) | ||
|
|
||
| case SessionCatalogAndIdentifier(catalog, ident) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we just use CatalogAndIdentifier?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it OK to skip the namespace length check in SessionCatalogAndIdentifier here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's OK. V2SessionCatalog.loadTable checks the namespace as well.
| } | ||
| } | ||
|
|
||
| class DataStreamWriterWithTableSuite extends StreamTest with BeforeAndAfter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we move it to a new file?
|
|
||
| runTestWithStreamAppend(tableIdentifier) | ||
| } finally { | ||
| spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In after we clear out everything, so this is not needed.
|
Test build #128801 has finished for PR 29767 at commit
|
| val tableInstance = df.sparkSession.sessionState.sqlParser | ||
| .parseMultipartIdentifier(tableName) match { | ||
|
|
||
| case NonSessionCatalogAndIdentifier(catalog, ident) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not needed anymore.
| case CatalogAndIdentifier(catalog, ident) => | ||
| catalog.asTableCatalog.loadTable(ident) | ||
|
|
||
| case other => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See DataFrameWriterV2.scala#L52, we can simply write
val CatalogAndIdentifier(catalog, identifier) = ...parseMultipartIdentifier(tableName)
val table = catalog.asTableCatalog.loadTable(identifier)
xuanyuanking
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some small comments.
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
Show resolved
Hide resolved
| import df.sparkSession.sessionState.analyzer.CatalogAndIdentifier | ||
|
|
||
| import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ | ||
| val CatalogAndIdentifier(catalog, identifier) = df.sparkSession.sessionState.sqlParser |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to get a temp view here and what the behavior should be?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just checked it roughly, and looks like temporary view is not loaded by loadTable - it throws NoSuchTableException in V2SessionCatalog.
test("write to temporary view shouldn't be allowed") {
val tableIdentifier = "table_name"
val tempViewIdentifier = "temp_view"
spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING parquet")
checkAnswer(spark.table(tableIdentifier), Seq.empty)
spark.sql(s"SELECT id, data FROM $tableIdentifier").createOrReplaceTempView(tempViewIdentifier)
// spark.sql(s"CREATE TEMPORARY VIEW $tempViewIdentifier AS SELECT id, data FROM $tableIdentifier")
withTempDir { checkpointDir =>
val exc = intercept[AnalysisException] {
runStreamQueryAppendMode("default." + tempViewIdentifier, checkpointDir, Seq.empty, Seq.empty)
}
assert(exc.getMessage.contains("doesn't support streaming write"))
}
}
fails with "Table default.temp_view not found;" did not contain "doesn't support streaming write".
For sure I think this is desired behavior, as it's a view. Even it can load the (temp) view, capability shouldn't have write related flags.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm playing a bit more with view, and unlike temporary view it seems to be loaded via loadTable. Now checking capability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Temp view doesn't belong to any catalog, it belongs to a session. DataFrameWriter.insertInto can insert to a temp view as well (only if the temp view is a single data source scan node), and probably DataStreamWriter.table should support it as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it requires handling V1Table after loadTable (for view), as well as pattern match with AsTableIdentifier(tableIdentifier) (for temporary view).
In either way, I see DataFrameWriter leverages UnresolvedRelation to defer resolution, but streaming query doesn't add a writer node in logical plan and passes the actual table instance (either SupportsWrite for V2 or Sink for V1) directly, so the situation looks to be a bit different. Probably another reason to add writer node before analyzing?
(Btw, interesting one to test even on batch query. Probably I'd test with creating temp view with V2 table and try to write. If that would work for DataFrameWriter.insertInto, that's probably one thing which DataFrameWriterV2 may not support as of now, as it doesn't have fail-back to V1 path.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For temporary view, this change makes the test work:
spark.table(tableIdentifier).createOrReplaceTempView(tempViewIdentifier)
// or
// spark.read.table(tableIdentifier).createOrReplaceTempView(tempViewIdentifier)
Seq((1, "a"), (2, "b"), (3, "c")).toDF().write.insertInto(tempViewIdentifier)
but I'm not sure about the coverage - it sounds to me that temp view as just an alias of the table is only supported for insertInto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(only if the temp view is a single data source scan node)
As I mentioned before, the temp view must be very simple, like spark.table(name) or CREATE TEMP VIEW v USING parquet OPTIONS(...)
I believe there are tests, but I don't remember where they are. You can update ResolveRelations to drop the support of inserting temp views, and see which tests fail.
For this particular PR, I'm OK to not support temp view for now, as we need to refactor it a little bit and have a logical plan for streaming write. But for consistency with other places that lookup a table, we should still lookup temp views, and just fail if a temp view is returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in e7cd27d - now it looks up (global) temp view directly and provide error message a bit clearer. Also added relevant tests.
That said, I can't find the logic for fail-back in DataFrameWriterV2. It simply looks up with catalog, which temp view will not be found. Do I understand correctly, and if then is it a desired/expected behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should fix DataFrameWriterV2 as well, to fail if the table name refers to a temp view. cc @rdblue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed SPARK-32960 and submitted a PR. (#29830) Please take a look and let me know whether it follows your suggestion properly or not. Thanks!
|
Test build #128804 has finished for PR 29767 at commit
|
|
Test build #128809 has finished for PR 29767 at commit
|
|
Test build #128814 has finished for PR 29767 at commit
|
| import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ | ||
| val sink = tableInstance match { | ||
| case t: SupportsWrite if t.supports(STREAMING_WRITE) => t | ||
| case t => throw new AnalysisException("Table doesn't support streaming " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s"Table $tableName doesn't support streaming "?
| useTempCheckpointLocation = true, | ||
| trigger = trigger) | ||
| startQuery(sink, extraOptions) | ||
| } else if (source == "foreachBatch") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SOURCE_NAME_FOREACH_BATCH?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I missed after doing another approach of refactoring. I'll fix. Thanks!
| resultDf.createOrReplaceTempView(query.name) | ||
| query | ||
| startQuery(sink, extraOptions, Some(resultDf), recoverFromCheckpoint = recoverFromChkpoint) | ||
| } else if (source == "foreach") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SOURCE_NAME_FOREACH?
| } | ||
|
|
||
| startQuery(sink, extraOptions) | ||
| } else if (source == "memory") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SOURCE_NAME_MEMORY?
| override def overwrite(filters: Array[Filter]): WriteBuilder = { | ||
| assert(writer == Append) | ||
| writer = new Overwrite(filters) | ||
| // streaming writer doesn't have equivalent semantic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, does this mean for streaming case, we won't reach here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, at least for now. (If I understand correctly.) If we would like to be sure, we may be able to assign dummy one and throw error on calling buildForStreaming(). Probably it'd be much clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just changed to explicitly fail for the case.
| // Currently we don't create a logical streaming writer node in logical plan, so cannot rely | ||
| // on analyzer to resolve it. Directly lookup only for temp view to provide clearer message. | ||
| // TODO (SPARK-27484): we should add the writing node before the plan is analyzed. | ||
| if (isTempView(df.sparkSession, identifier.asMultipartIdentifier)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is incorrect. The identifier is for a specific catalog. e.g. for cat1.t1, the identifier is t1. cat1.t1 is not a temp view, but t1 might be.
We should check temp view with the original table name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please correct me if I'm missing here. The reason I pass all parts in identifier here is to cover global temp view, which uses global temp db. Dropping the db name (if it isn't from global temp db) is performed in isTempView.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pass all parts in identifier
But you did not... The catalog name is missing, so you may mistakenly treat a table as temp view, e.g. cat1.t1 if t1 is the name of a temp view.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad. Thanks for explaining. I see the failing case when catalog "exists" for the head of identifier; let me fix it immediately.
|
Test build #128855 has finished for PR 29767 at commit
|
|
Test build #128844 has finished for PR 29767 at commit
|
| } | ||
| } | ||
|
|
||
| test("write: write to temporary view isn't allowed yet") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for adding this explicitly.
| outputMode, | ||
| useTempCheckpointLocation = true, | ||
| trigger = trigger) | ||
| startQuery(sink, extraOptions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR . For CaseInsensitiveMap, def toMap: Map[String, T] = originalMap. It seems that we need toMap explicitly here as we did line 385. (cc @cloud-fan )
startQuery(sink, optionsWithPath.originalMap)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, I and @cloud-fan hit case-sensitivity issues in another JIRAs due to this. Please make it sure that this PR doesn't re-introduce it because AS-IS PR switches extraOptions.toMap -> extraOptions silently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you already checked that, please add a test case for that. Or, we just use the old way extraOptions.toMap to avoid any side effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah OK thanks for pointing out. Nice finding. I'll just explicitly call .toMap as it was.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch @dongjoon-hyun !
|
Thanks for reviewing. Addressed review comments. Please take a look again. |
| newOptions: CaseInsensitiveMap[String], | ||
| recoverFromCheckpoint: Boolean = true): StreamingQuery = { | ||
| val options = newOptions.originalMap | ||
| val queryName = options.get("queryName") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previously it was extraOptions.get("queryName"), we should follow it to get the queryName option case insensitively.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I'll let it as it is.
| recoverFromCheckpoint: Boolean = true): StreamingQuery = { | ||
| val options = newOptions.originalMap | ||
| val queryName = options.get("queryName") | ||
| val checkpointLocation = options.get("checkpointLocation") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| val checkpointLocation = options.get("checkpointLocation") | ||
| val useTempCheckpointLocation = SOURCES_ALLOW_ONE_TIME_QUERY.contains(source) | ||
|
|
||
| df.sparkSession.sessionState.streamingQueryManager.startQuery( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can follow the previous code style
...startQuery(
newOptions.get("queryName"),
newOptions.get("checkpointLocation"),
df,
newOptions. originalMap,
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK let me keep it as it is.
|
Kubernetes integration test starting |
|
Test build #129544 has finished for PR 29767 at commit
|
|
Test build #129547 has finished for PR 29767 at commit
|
|
retest this, please |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #129548 has finished for PR 29767 at commit
|
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you so much, @HeartSaVioR and @cloud-fan .
Merged to master for Apache Spark 3.1.0.
|
Thanks all for reviewing and merging! |
|
We need to update the PR title, it's saveAsTable not table. |
|
You're right. Would you like to go with revert & another PR, or it's just for information? Either is fine for me. |
|
Just update the title is good enough, it's only about the commit message |
|
Ah OK. Thanks for the guidance. I've updated the PR title and description as well, as the usage is a bit different from before. |
|
Sorry for missing that. |
What changes were proposed in this pull request?
This PR proposes to add
DataStreamWriter.saveAsTableto specify the output "table" to write from the streaming query.Why are the changes needed?
For now, there's no way to write to the table (especially catalog table) even the table is capable to handle streaming write, so even with Spark 3, writing to the catalog table via SS should go through the
DataStreamWriter.format(provider)and wish the provider can handle it as same as we do with catalog table.With the new API, we can directly point to the catalog table which supports streaming write. Some of usages are covered with tests - simply saying, end users can do the following:
Does this PR introduce any user-facing change?
Yes, as this adds a new public API in DataStreamWriter. This doesn't bring backward incompatible change.
How was this patch tested?
New unit tests.