-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-28565][SQL] DataFrameWriter saveAsTable support for V2 catalogs #25330
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @jose-torres as well |
|
Test build #108531 has finished for PR 25330 at commit
|
| } | ||
| } | ||
|
|
||
| sessionCatalogTest("saveAsTable and v2 table - table doesn't exist") { session => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider moving all session catalog tests to a new suite file?
This way, the entire suite can have a Spark session with v2 session catalog defined.
We call also add new insertInto test cases for session catalog there.
| case u @ UnresolvedRelation(CatalogObjectIdentifier(maybeCatalog, ident)) => | ||
| // First try loading the table with a loadable catalog, then fallback to the session | ||
| // catalog if that exists | ||
| maybeCatalog.flatMap(loadTable(_, ident)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about?
maybeCatalog.orElse(sessionCatalog)
.flatMap(loadTable(_, ident))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the tricky thing. We need to see if we can load using the defined catalog first. If the catalog is defined, but it doesn't return a table, then we miss out on using the session catalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a user tries to use table cat1.ns1.ns2.tbl, where
- catalog
cat1exists, but without tablens1.ns2.tbl - v2 session catalog is configured and it has a table named
cat1.ns1.ns2.tbl
IMHO, Spark should throw no table found exception. This is consistent with the follow statement in #24768:
A session catalog should be used when the v1 catalog is responsible for tables with no catalog in the table identifier.
Maybe I missed some discussions about fallback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the tricky thing. We need to see if we can load using the defined catalog first. If the catalog is defined, but it doesn't return a table, then we miss out on using the session catalog.
I disagree.
Determining which catalog is responsible for an identifier is independent of the catalog results. If maybeCatalog is None, then the session catalog is responsible for the identifier. Whether to use v1 or v2 after that point depends on the provider.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue Imagine the following use case. Let's say as the data science team, I have a database on the Hive MetaStore called prod. I have tables named like: prod.tbl1, prod.tbl2. Now, some other team (maybe data engineering team), creates a v2 catalog called prod and adds it to my environment.
Won't the prod catalog hijack all my requests, and start failing to resolve the tables I had declared before, which I was expecting the V2SessionCatalog to resolve? I, as an unaware user, suddenly I have all my jobs failing and chaos ensues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. A conflict could break queries in both directions when there is a conflict between namespace prod and catalog prod:
- If a namespace takes precedence and a new one is created, it will break queries that use the catalog
- If a catalog takes precedence and a new one is created, it will break queries that use the namespace
When we wrote the SPIP, the choice was to make catalog take precedence because:
- Any user can create a namespace globally in the catalog, possibly breaking other users
- Users can't globally create catalogs -- that's done by administrators -- so the impact is limited to their own jobs where they can add Spark configs
- Global catalogs are created by administrators and this happens more rarely
In short, we expect fewer problems when catalogs take precedence.
|
I'd like to call a discussion about the v2 session catalog. I think v2 session catalog is an extension of the builtin catalog(hive catalog). It allows users to add some hooks when Spark sends metadata requests to the builtin catalog. With this in mind, the table lookup logic should be:
Similar logic should be applied to table creation as well. I think this PR makes sense: if we need to go to v2 session catalog for table creation, we should do the same for table lookup as well. But I do agree with @jzhuge about #25330 (comment) |
|
@cloud-fan, I think we are saying the same thing. The identifier supplied by the user determines the catalog that should be used (considering session catalog and v2 session catalog to be the same catalog). Then v2 or v1 session catalog is used depending on what the provider implementation needs. We already do this for That first rule only matches if there is no v2 catalog (not set in the identifier and no default), and if the provider is a v1 provider. The second rule matches all cases where v2 will be used. |
|
Test build #108683 has finished for PR 25330 at commit
|
|
Test build #108684 has finished for PR 25330 at commit
|
jzhuge
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly LGTM. However, v2 session catalog changes feel like a separate PR.
| import org.apache.spark.sql.types.StructType | ||
| import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||
|
|
||
| class DataSourceDFWriterV2SessionCatalogSuite |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about DataSourceV2SessionDataFrameSuite? Match the name DataSourceV2DataFrameSuite better.
|
|
||
| before { | ||
| spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) | ||
| spark.conf.set(s"spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed?
| } | ||
| } | ||
|
|
||
| testQuietly("saveAsTable: with defined catalog and table doesn't exist") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and many other tests don't need with defined catalog in test name any more since v2 session tests are split into a separate file.
| case u @ UnresolvedRelation(CatalogObjectIdentifier(Some(catalogPlugin), ident)) => | ||
| loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u) | ||
| case u @ UnresolvedRelation(CatalogObjectIdentifier(maybeCatalog, ident)) => | ||
| maybeCatalog.orElse(sessionCatalog) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes the SELECT path, so more unit tests for table relation are needed, e.g.:
test("Relation: basic") {
val t1 = "tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 USING $v2Format AS SELECT id, data FROM source")
checkAnswer(spark.table(t1), spark.table("source"))
checkAnswer(sql(s"TABLE $t1"), spark.table("source"))
checkAnswer(sql(s"SELECT * FROM $t1"), spark.table("source"))
}
}
IMHO this v2 session catalog support and related tests can be split to a separate PR with additional unit tests for SQL SELECT and spark.table().
Also another PR to update ResolveInsertInto to support v2 session catalog in SQL INSERT INTO and DFW.insertInto.
|
Sounds good. I can separate the session catalog things. That's a valid point around requiring more tests around reads. |
|
Or would you mind if those are added as a follow up in supporting INSERT INTO as well? |
|
It'd better to group READ/SELECT and saveAsTable together because they both are affected by the change in |
| case (other, _) => | ||
| // We have a potential race condition here in AppendMode, if the table suddenly gets | ||
| // created between our existence check and physical execution, but this can't be helped | ||
| // in any case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we treat saveAsTable as one atomic operation, I think AppendMode means "create or append table". Since "create or append table" is not a standard SQL commannd, maybe it's fine to ignore this race condition.
|
cc @jzhuge Addressed your comments. Would love to get this merged asap so that I can push the session catalog related things as well. |
jzhuge
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, CatalogObjectIdentifier} | ||
| import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ | ||
|
|
||
| val session = df.sparkSession |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: the local variable is used only once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll use it in the follow up :)
| saveAsTable(catalog.asTableCatalog, ident, modeForDSV2) | ||
|
|
||
| case AsTableIdentifier(tableIdentifier) => | ||
| saveAsTable(tableIdentifier) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about cases where AsTableIdentifier won't match? For example, saveAsTable("ns1.ns2.table"). For those cases, we need a case _ => that throws an exception because the table name is not supported by the session catalog.
| case (SaveMode.Append, Some(table)) => | ||
| AppendData.byName(DataSourceV2Relation.create(table), df.logicalPlan) | ||
|
|
||
| case (SaveMode.Overwrite, _) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that the behavior here is to truncate and write, not replace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we may want to change the table properties though, such as partitioning and schema, wouldn't we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to match the behavior of v1 file sources. Do we know what that behavior is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed in DSV2 sync, the old behavior was to drop the old table and create a new one, therefore the Replace behavior here works.
|
Test build #108785 has finished for PR 25330 at commit
|
|
Test build #108842 has finished for PR 25330 at commit
|
| case other => | ||
| // TODO(SPARK-28666): This should go through V2SessionCatalog | ||
| throw new UnsupportedOperationException( | ||
| s"Couldn't find a catalog to handle the identifier ${other.quoted}.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is an analysis error. The catalog was None, so it belongs to the session catalog. But the session catalog doesn't support namespaces with more than one part, so it is an invalid identifier for that catalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah gotcha, that's the 4th case... I'd like to make that change along with the V2SessionCatalog support. Right now we just couldn't find a catalog for it, so... (Maybe they forgot to specify a default catalog).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. saveAsTable("ns1.ns2.table") means we want to save the table to the builtin catalog, but ns1.ns2 is not a valid namespace to the builtin catalog. (or we can report namespace not found?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true, but that could also mean that they forgot to configure their default catalog too, right?
I can throw a better error when the SessionCatalog code path is implemented, since we need that before we can even say that ns1.ns2.table cannot be handled by the session catalog.
|
I had a couple of minor comments about documenting what will be done when this supports the v2 session catalog and throwing an exception. Once those are fixed, +1. |
|
Test build #108852 has finished for PR 25330 at commit
|
| df.sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { | ||
| case CatalogObjectIdentifier(Some(catalog), ident) => | ||
| insertInto(catalog, ident) | ||
| // TODO(SPARK-28667): Support the V2SessionCatalog |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this PR, but just a note. I've been working on V2SessionCatalog improvement for a while, and one issue I found is: it's not easy to implement "if the table provider is v1, go to v1 session catalog. if the table provider is v2, go to v2 session catalog".
It's easy to do it for CREATE TABLE, because the table provider is known at the beginning. But it's hard for SELECT and INSERT, as we need to look up the table from Hive catalog to get the table provider.
I'd expect to have 2 analyzer rules to do it:
- one rule in sql/catalyst, which resolves
UnresolvedRelationtoUnresolvedCatalogRelationby looking up the table from hive catalog - one rule in sql/core, which resolves
UnresolvedCatalogRelationtoDataSourceV2Relation, if the table provider is v2. This has to be in sql/core becauseDataSource.lookupDataSourceis in sql/core. We have a ruleFindDataSourceTablethat resolvesUnresolvedCatalogRelationto v1 relation if the table provider is v1.
I think DataFrameWriter should produce ...Statement plans, to reuse the analyzer rules for all the things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, the V2SessionCatalog already produces a CatalogTableAsV2, which gets unwrapped to the UnresolvedCatalogRelation already in DataSourceResolution, and then it can get resolved to the V1 code paths. I've also had a similar problem when trying to implement V2SessionCatalog support for ALTER TABLE. In that case I decided that it should reject and fallback to V1 plans if the loaded table is a CatalogTableAsV2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
V2SessionCatalog already produces a CatalogTableAsV2
Yes, for both v1 and v2 provider. So we still need a rule to check the table provider, and either unwrap it to a UnresolvedCatalogRelation, or create the actual v2 table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, for both v1 and v2 provider. So we still need a rule to check the table provider, and either unwrap it to a UnresolvedCatalogRelation, or create the actual v2 table.
This should be done in loadTable. The original implementation called TableProvider to create the table, but this path is currently broken. I think that TableProvider needs to be improved so that it works instead of adding more rules to convert from CatalogTableAsV2.
The catalog should return the correct Table instance for all v2 tables. Spark shouldn't convert between v2 table instances.
| } | ||
| } | ||
|
|
||
| testQuietly("saveAsTable: table doesn't exist => create table") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just for curiosity, why do we need testQuietly here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was before merging Gengliang's "Catalogs.load should work for inbuilt catalogs" PR. It may not be needed now, but I don't think it's a big deal
cloud-fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except a few minor comments
|
Test build #108853 has finished for PR 25330 at commit
|
|
Thanks all for the reviews! Merging to master :) |
What changes were proposed in this pull request?
Adds support for V2 catalogs and the V2SessionCatalog for V2 tables for saveAsTable.
If the table can resolve through the V2SessionCatalog, we use SaveMode for datasource v1 for backwards compatibility to select the code path we're going to hit.
Depending on the SaveMode:
a) If table exists: Use AppendData.byName
b) If table doesn't exist, use CTAS (ignoreIfExists = false)
How was this patch tested?
Unit tests in DataSourceV2DataFrameSuite