-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-27919][SQL] Add v2 session catalog #24768
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
FYI @jzhuge |
|
Test build #106076 has finished for PR 24768 at commit
|
|
Test build #106112 has finished for PR 24768 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala
Outdated
Show resolved
Hide resolved
|
Test build #106169 has finished for PR 24768 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala
Outdated
Show resolved
Hide resolved
|
Test build #106207 has finished for PR 24768 at commit
|
|
@dongjoon-hyun, @mccheah, and @jiangxb1987, could you take a look at this PR? This adds a v2 catalog that delegates to the session catalog. This is needed to make v2 sources function correctly when the session catalog is responsible for table identifiers without an explicit catalog. |
|
Thank you for pinging me, @rdblue . This is v2 Catalog. Shall we remove |
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogV2Util.scala
Outdated
Show resolved
Hide resolved
|
Test build #106211 has finished for PR 24768 at commit
|
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogV2Util.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala
Outdated
Show resolved
Hide resolved
...yst/src/test/scala/org/apache/spark/sql/catalog/v2/expressions/TransformExtractorSuite.scala
Outdated
Show resolved
Hide resolved
## What changes were proposed in this pull request? Move methods that implement v2 catalog operations to CatalogV2Util so they can be used in #24768. ## How was this patch tested? Behavior is validated by existing tests. Closes #24813 from rdblue/SPARK-27964-add-catalog-v2-util. Authored-by: Ryan Blue <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
|
Thank you, @rdblue .
For this PR, let's come back after #24812 is resolved. |
75ac313 to
18c084b
Compare
|
Test build #106273 has finished for PR 24768 at commit
|
|
@dongjoon-hyun, @gatorsmile, I rebased this PR on master with #21812 and #21813. Please have a look when you get a chance. |
|
cc @jiangxb1987 Could you review this? |
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we pass the catalog table schema to getTable?
val tableOptions = new CaseInsensitiveStringMap(storageOptions.asJava)
tableProvider.getTable(tableOptions, catalogTable.schema)
In such way, SPARK-27960 is also resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, that is not the correct solution for SPARK-27960.
The user supplied schema is used when interpreting data that has no schema, like CSV or JSON. That's why it is called the user-supplied schema.
In this case, the schema is stored in the metastore. The table provider can access it from the session metastore using the table and database options.
I agree that this is strange and isn't a good way to handle these tables. I think it would be a good idea to change the TableProvider interface to accept more metadata from the metastore, but we should consider those options and should not reuse the user-supplied schema for something other than what the API was intended for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, the schema is stored in the metastore. The table provider can access it from the session metastore using the table and database options.
In such case, we call V2SessionCatalog.loadTable to get a table, while the table provider has to access metastore again to get schema. Seems tricky.
I think it would be a good idea to change the TableProvider interface to accept more metadata from the metastore, but we should consider those options and should not reuse the user-supplied schema for something other than what the API was intended for.
I admit that reusing the user-specified schema sounds a bit ugly. But it is a simple solution. There is similar usage in FindDataSourceTable/CreateDataSourceTableCommand when constructing DataSource.
I believe that there could be a better solution than this. (Maybe we can start another thread for this.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gengliangwang, we can talk about how to fix TableProvider later. It should not block this commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, we can resolve this later.
|
Test build #106358 has finished for PR 24768 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
Outdated
Show resolved
Hide resolved
|
@rdblue Sorry for asking about adding comments. I think it would helpful for developers to understand the context in the future. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a concerning change in light of the class comment. If failing to apply this rule can cause wrong results, how sure are we that the rule doesn't need to be applied when children are unresolved?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a child is not resolved, then the next line will fail because the children do not have expression IDs. If the children are never resolved, then the query will fail the resolution check. If the children are eventually resolved, then this simply runs later, when the next line will not fail.
This was a preexisting bug that was uncovered by this PR and is caused by not throwing an analysis exception immediately in ResolveRelations. There have been several updates like this to fix places that didn't check child resolution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, that makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this needed when the first case is case p if !p.resolved => p? Do you mean there is a plan that is resolved but its children are not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, feel free to look into that, but this case was matching with an unresolved child that did not have expression IDs. I thought the best approach was to require resolved children, but if you'd like to suggest a different approach I can update here.
Otherwise, this solves the problem and it is clearly correct to only access expression IDs when children are resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what is the problem this resolved. When we reach here, p is already resolved, which usually means p.childrenResolved is always true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, not necessarily. Some plans may not check that children are resolved. CheckAnalysis accounts for this case by using foreachUp:
plan.foreachUp {
case o if !o.resolved =>
failAnalysis(s"unresolved operator ${o.simpleString(SQLConf.get.maxToStringFields)}")
case _ =>
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if there's anything to do about it in this PR, and maybe I'm misunderstanding, but I'm very concerned if we're inferring which catalogs do or don't exist based only on the shape of the query plan. Ideally we should be able to interpret query plans without needing to think about which catalogs exist, and directly check in the rare situations where it does matter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catalog existence is independent. A catalog exists if it is defined in Spark's configuration properties, spark.sql.catalog.name=<implementation-class>.
As far as the ability to interpret plans without knowing which catalogs exist, that would require being able to know which part of an identifier refers to a catalog up front. That means either always requiring a catalog, or always requiring a certain number of identifier parts. Because the need was to support external catalogs with an unknown number of namespaces and to provide backward compatibility with existing tables (e.g. the identifier db.table without a catalog), we need to look up the first identifier to check whether it is a catalog.
I can give you more context if you'd like, but this decision was carefully considered and not really something we should revisit at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with the design goals you're describing, and agree that we shouldn't revisit that decision here. (Even if I had a better solution in mind, which I don't.)
I'm still confused about the "there is no default v2 catalog" part, though. How do we know that a CreateTableStatement matching this case can't exist when there's a default v2 catalog?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this PR, the rules to determine the catalog responsible for an identifier are:
- If the identifier starts with a known catalog, use it
- If there is a configured default v2 catalog, use that catalog
- Otherwise, the session catalog is responsible for the identifier
Rules 1 and 2 are implemented in the AsTableIdentifier and CatalogObjectIdentifier extractors. So AsTableIdentifier is not going to match and convert to a v1 identifier if a v2 catalog is responsible for the table. In those cases, CatalogObjectIdentifier will always return a catalog and an Identifier.
If the session catalog is responsible for the identifier, then Spark needs to determine whether it should use the session catalog directly, or if it needs to use a v2 plan that uses the V2SessionCatalog because the source is v2. That's what the V1WriteProvider matcher determines.
So putting all of that together, we get that this rule will match when the source is v1 (use the session catalog directly), the identifier has no catalog (rule 1 is not the case), and there is no default v2 catalog (rule 2 is not the case).
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no temporary table in Spark, do you mean temp view?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can update this if it is a problem. Let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went ahead and renamed this. It is a little strange that it returns a TableIdentifier but is named for views, but I guess that's okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where do we register the v2 session catalog?
BTW this should be an internal thing, I think we just need to ask LookupCatalog implementations to provide the v2 session catalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The v2 session catalog is configured by adding a catalog named "session" in SQL config. That needs to be added to SQLConf defaults, but I wasn't sure whether it should happen in this PR or later. I can add it in this PR if you'd prefer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would we allow users to config session catalog? I see V2SessionCatalog as a way to fallback to the builtin hive catalog, before we migrate hive catalog to the v2 catalog API. This is a completely internal thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do that if you'd like. This PR uses a different implementation to test SQL behavior because the built-in sources don't behave correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a default config for this that uses the V2SessionCatalog. May as well add it now, but we do need to be able to override it until we have a built-in source that passes all SQL tests.
871242f to
ceb58ca
Compare
|
Test build #107288 has finished for PR 24768 at commit
|
|
I think this PR does 2 things:
For 1, I have a different opinion about the expected behavior. The new behavior after this PR is still slightly different from what I think of. I opened #25077 for broader discussions. For 2, I do like the idea if v2 session catalog. It's a pretty good way to fallback to the Hive catalog before we migrate it to the new catalog API. I'd like to merge it after we reach a consensus for 1. |
|
@cloud-fan, could you please let us know what you think the behavior should be? I can add this as a topic to the sync this week if you'd like to propose a change. |
I think #25077 outlines an alternative proposal for the expected behavior. |
|
@mccheah, as far as I can tell, it doesn't offer a proposal for alternative behavior. It is just an alternative implementation of what is here. The desired behavior is the same as in this PR, which is why I asked. @cloud-fan, I looked at #25077 and you're proposing the same behavior as implemented in this PR, but with a lot of other changes. I think we agree on what I wrote here:
I see that it also handles some cases, like temporary tables, more correctly ( What behavior are you proposing should change? |
brkyvz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I generally like this PR. I think #25077 also provides some useful refactoring, but I think those can be done separately, as this PR is a pretty large PR already.
| .foreach { sqlIdent => | ||
| inside(parseMultipartIdentifier(sqlIdent)) { | ||
| case AsTemporaryViewIdentifier(_) => | ||
| fail("AsTemporaryTableIdentifier should not match when " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: AsTemporaryViewIdentifier should not match
| partitions += LogicalExpressions.identity(col) | ||
| } | ||
|
|
||
| v1Table.bucketSpec.foreach { spec => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just out of curiosity, does the ordering here matter? Do partitions need to be specified as Transforms first, and then bucketing is specified? Could bucketing be specified first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bucketing can be specified first by using bucket(16, col) as a transform. When it is created using the BUCKET BY clause, we add it last because that's where it is effectively added in Hive and Spark implementations. But the new syntax allows users to specify the order that they choose.
|
|
||
| private def quote(part: String): String = { | ||
| if (part.contains(".") || part.contains("`")) { | ||
| s"`${part.replace("`", "``")}`" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so
`a.b`.c
will become
```a.b```.`c`
? Reminds me of the regexes with \\\\\\\\\s :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This is from the parser, which uses doubling to escape back-tick.
|
Test build #107411 has finished for PR 24768 at commit
|
|
I think I understand the behavior proposed here now. It's better to put the final rule in the PR description, instead of only the updates. IIUC, the final rule is: for a table name
I agree with it and I'm happy to merge it except one concern: I think it's a bad UX to ask end-users to config both default catalog and session catalog. It's really confusing to expose these 2 configs together to end-users, and I don't expect to see custom implements of session catalog. |
|
I'm merging it and will send follow up PR according to rdblue#7 , since it's arguable if we want to make session catalog configurable. |
|
@cloud-fan I think it is necessary to be able to configure the session catalog. I'm just unsure on the naming of the configuration, as it may lead to conflicts. The changes you suggested in your PR with the CatalogManager can come in a follow up. This PR is pretty large already. If we agree on the behavior and the semantics look to be well tested (to me they do), I think we can go ahead and merge this PR |
|
After discussing with @cloud-fan offline, my thoughts are:
|
|
@brkyvz, those two properties have different purposes. If you set the default catalog, it will be used for all identifiers with no specified catalog. That's not what we want for Spark 3.0, where we want the session catalog -- not the v2 session catalog -- to continue to be the default. The v2 session catalog property, In short, the default catalog cannot be used to configure the v2 session catalog implementation without changing behavior and making all SQL go through the v2 code paths. And we still want a separate configuration to control the v2 session catalog implementation. |
What changes were proposed in this pull request?
This fixes a problem where it is possible to create a v2 table using the default catalog that cannot be loaded with the session catalog. A session catalog should be used when the v1 catalog is responsible for tables with no catalog in the table identifier.
How was this patch tested?