-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-46043][SQL] Support create table using DSv2 sources #43949
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-46043][SQL] Support create table using DSv2 sources #43949
Conversation
03bf5a4 to
b77c505
Compare
|
cc @cloud-fan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does this work? empty table schema?
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
Outdated
Show resolved
Hide resolved
c2bbb4a to
81fff6a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case p: Some(v) if !v.isInstanceOf[FileDataSourceV2] => p
case _ => None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because isV2Provider is only used for ResolveSessionCatalog, move this back to ResolveSessionCatalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can pass in the catalogManager and skip this check. And user cannot provide schema here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Actually no. CatalogManager constructor takes in a v2SessionCatalog, and here we can't pass in the catalog manager to the constructor of v2 session catalog (circular dependency):
spark/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
Lines 174 to 176 in 7a0d041
| protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog) | |
| protected lazy val catalogManager = new CatalogManager(v2SessionCatalog, catalog) |
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
Outdated
Show resolved
Hide resolved
a3d687e to
6fdbe6c
Compare
beliefer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert this line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| val tableProperties = table.properties | |
| val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) | |
| val properties = tableProperties ++ pathOption | |
| val properties = table.properties ++ | |
| table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) |
51b6581 to
572fea0
Compare
beliefer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. cc @cloud-fan @huaxingao
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala
Show resolved
Hide resolved
| }, | ||
| "CANNOT_CREATE_DATA_SOURCE_V2_TABLE" : { | ||
| "message" : [ | ||
| "Failed to create data source V2 table:" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we include the table name?
| ], | ||
| "sqlState" : "42846" | ||
| }, | ||
| "CANNOT_CREATE_DATA_SOURCE_V2_TABLE" : { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't find other errors that mention data source v2. I think it's a developer thing and we should not expose it to end users via error message. How about just CANNOT_CREATE_TABLE?
| import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ | ||
| (catalog, identifier) match { | ||
| case (Some(cat), Some(ident)) => s"${quoteIdentifier(cat.name())}.${ident.quoted}" | ||
| case (None, Some(ident)) => ident.quoted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this can happen. We can add an assert.
| } | ||
|
|
||
| case _ => | ||
| (schema, partitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we fail here if it's not a valid data source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can do it latter. It's the current behavior that allows any table provider.
| import org.scalatest.BeforeAndAfter | ||
|
|
||
| import org.apache.spark.SparkException | ||
| import org.apache.spark.{SparkException, SparkUnsupportedOperationException} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary change?
|
The test failure seems unrelated. |
|
|
||
| case Some(tableProvider) => | ||
| assert(tableProvider.supportsExternalMetadata()) | ||
| lazy val dsOptions = new CaseInsensitiveStringMap(properties) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to put the path option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can add a new method to create ds options from a CatalogTable, to save duplicated code.
| } | ||
|
|
||
| /** Used as a V2 DataSource for V2SessionCatalog DDL */ | ||
| class FakeV2Provider extends SimpleTableProvider { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid extending SimpleTableProvider here? I think it's not meant to support external metadata.
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
Outdated
Show resolved
Hide resolved
| if (schema.nonEmpty) { | ||
| throw new SparkUnsupportedOperationException( | ||
| errorClass = "CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED", | ||
| messageParameters = Map("tableName" -> ident.quoted, "provider" -> provider)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ident.quoted only quotes when necessary, but in error message, we require fully quoted.
You can call toSQLId(ident.asMultipartIdentifier), but maybe it's better to add a def fullyQuoted in implicit class IdentifierHelper and use it here.
|
|
||
| test("SPARK-46043: create table in SQL with schema required data source") { | ||
| val cls = classOf[SchemaRequiredDataSource] | ||
| val e = intercept[IllegalArgumentException] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, it doesn't have an error class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| test("SPARK-46043: create table in SQL with partitioning required data source") { | ||
| val cls = classOf[PartitionsRequiredDataSource] | ||
| val e = intercept[IllegalArgumentException]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh it's thrown directly from the data source?
| verifyTable(t1, Seq.empty[(Long, String, String)].toDF("id", "data", "missing")) | ||
| val tableName = if (catalogAndNamespace.isEmpty) toSQLId(s"default.$t1") else toSQLId(t1) | ||
| val tableName = if (catalogAndNamespace.isEmpty) { | ||
| toSQLId(s"spark_catalog.default.$t1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not related to your PR but this seems to indicate a bug. so the error message points to table "`spark_catalog.default.t1`"? cc @MaxGekk
| case (Some(cat), Some(ident)) => s"${quoteIfNeeded(cat.name())}.${ident.quoted}" | ||
| case (None, None) => table.name() | ||
| case _ => | ||
| throw new IllegalArgumentException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be SparkException.internalError
|
thanks, merging to master! |
### What changes were proposed in this pull request? This PR supports `CREATE TABLE ... USING source` for DSv2 sources. ### Why are the changes needed? To support creating DSv2 tables in SQL. Currently the table create can work but when you select a dsv2 table created in SQL, it fails with this error: ``` org.apache.spark.sql.AnalysisException: org.apache.spark.sql.connector.SimpleDataSourceV2 is not a valid Spark SQL Data Source. ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#43949 from allisonwang-db/spark-46043-dsv2-create-table. Authored-by: allisonwang-db <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…in DataSourceV2Relation ### What changes were proposed in this pull request? #43949 added a check in the `name` method of `DataSourceV2Relation`, which can be overly strict. This PR removes the check and revert to use `table.name()` when either catalog or identifier is empty. ### Why are the changes needed? To reduce the chance of having breaking changes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #44348 from allisonwang-db/spark-46043-followup. Authored-by: allisonwang-db <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? #43949 supports CREATE TABLE using DSv2 sources. This PR supports CREATE TABLE AS SELECT (CTAS) using DSv2 sources. It turns out that we don't need additional code changes. This PR simply adds more test cases for CTAS queries. ### Why are the changes needed? To add tests for CTAS for DSv2 sources. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #44190 from allisonwang-db/spark-46272-ctas. Authored-by: allisonwang-db <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…stom session catalog ### What changes were proposed in this pull request? This is a follow-up of #43949 to fix a breaking change. Spark allows people to provide a custom session catalog, which may return custom v2 tables based on the table provider. #43949 resolves the table provider earlier than the custom session catalog, and may break custom session catalogs. This PR fixes it by not resolving table provider if custom session catalog is present. ### Why are the changes needed? avoid breaking custom session catalogs ### Does this PR introduce _any_ user-facing change? no, #43949 is not released yet. ### How was this patch tested? existing tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #45440 from cloud-fan/fix. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Kent Yao <[email protected]>
What changes were proposed in this pull request?
This PR supports
CREATE TABLE ... USING sourcefor DSv2 sources.Why are the changes needed?
To support creating DSv2 tables in SQL. Currently the table create can work but when you select a dsv2 table created in SQL, it fails with this error:
Does this PR introduce any user-facing change?
No
How was this patch tested?
New unit tests
Was this patch authored or co-authored using generative AI tooling?
No