-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-17492] [SQL] Fix Reading Cataloged Data Sources without Extending SchemaRelationProvider #15046
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #65213 has finished for PR 15046 at commit
|
|
cc @yhuai @cloud-fan |
|
ah good catch! But adding a new flag looks a little tricky, let me think if there is better way to fix it |
|
@cloud-fan JDBC is also affected by this bug. Do you have any better idea about this issue? Thanks! |
| if (isSchemaFromUsers) { | ||
| throw new AnalysisException(s"$className does not allow user-specified schemas.") | ||
| } else { | ||
| dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the RelationProvider doesn't allow user-specified schema, can we assume it's cheap to infer schema for it? Then we can simply check if the given schema matches the schema of relation returned by createRelation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is a pretty good idea. Let me try it. Thanks!
|
Test build #65616 has finished for PR 15046 at commit
|
| withTempView("t1", "t2") { | ||
| sql( | ||
| """ | ||
| |CREATE TEMPORARY TABLE t1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use CREATE TEMPORARY VIEW
| (1 to 10).map(Row(_)).toSeq) | ||
| } | ||
|
|
||
| test("create a temp table that does not have a path in the option") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
temp view
| withTable(tableName) { | ||
| sql( | ||
| s""" | ||
| |CREATE $tableType $tableName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My original thought is to provide a comprehensive test coverage of data source table creation/insertion with path. This is not related to this PR. Let me get rid of it.
|
|
||
| // when users specify the schema | ||
| val inputSchema = new StructType().add("s", IntegerType, nullable = false) | ||
| val e = intercept[AnalysisException] { dfReader.schema(inputSchema).load() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no test for this case before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For DataFrameReader APIs, we do not have such a test case
| case (dataSource: RelationProvider, Some(schema)) => | ||
| val baseRelation = | ||
| dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions) | ||
| if (baseRelation.schema != schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @yhuai @liancheng to confirm, is it safe?
|
Test build #65684 has finished for PR 15046 at commit
|
| ) | ||
| } | ||
|
|
||
| test("insert into a temp view that does not point to an insertable data source") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, is this test related to this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related. This is also to improve the test case coverage. Feel free to let me know if you want to remove it from this PR. Thanks!
| | To '10' | ||
| |) | ||
| """.stripMargin) | ||
| Seq("TEMPORARY VIEW", "TABLE").foreach { tableType => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these changes are also not related to this PR right? We are improving the test coverage here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah
|
is this a problem in 2.0? |
|
This is a new issue of Spark 2.1, after we physically store the inferred schema in the metastore. BTW, I also ran the test cases in Spark 2.0. It works well. |
|
thanks, merging to master! |
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> User-specified schema may come from the catalog if the Delta table is stored in an external catalog that syncs the table schema with the Delta log. We should allow it if it's the same as the real Delta table schema. This is already the case for batch read, see apache/spark#15046 This PR changes the Delta streaming read to allow it as well. Note: since Delta uses DS v2 (`TableProvider`) and explicitly claims that user-specified schema is not supported (`TableProvider#supportsExternalMetadata` returns false by default), end users still can't specify schema in `spark.read/readStream.schema`. This change is only for advanced Spark plugins that can construct logical plans to triggers Delta v1 source stream scan. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> a new test ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> User-specified schema may come from the catalog if the Delta table is stored in an external catalog that syncs the table schema with the Delta log. We should allow it if it's the same as the real Delta table schema. This is already the case for batch read, see apache/spark#15046 This PR changes the Delta streaming read to allow it as well. Note: since Delta uses DS v2 (`TableProvider`) and explicitly claims that user-specified schema is not supported (`TableProvider#supportsExternalMetadata` returns false by default), end users still can't specify schema in `spark.read/readStream.schema`. This change is only for advanced Spark plugins that can construct logical plans to triggers Delta v1 source stream scan. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> a new test ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
…4125) backport #3929 to 3.3 <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> User-specified schema may come from the catalog if the Delta table is stored in an external catalog that syncs the table schema with the Delta log. We should allow it if it's the same as the real Delta table schema. This is already the case for batch read, see apache/spark#15046 This PR changes the Delta streaming read to allow it as well. Note: since Delta uses DS v2 (`TableProvider`) and explicitly claims that user-specified schema is not supported (`TableProvider#supportsExternalMetadata` returns false by default), end users still can't specify schema in `spark.read/readStream.schema`. This change is only for advanced Spark plugins that can construct logical plans to triggers Delta v1 source stream scan. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> a new test ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
What changes were proposed in this pull request?
For data sources without extending
SchemaRelationProvider, we expect users to not specify schemas when they creating tables. If the schema is input from users, an exception is issued.Since Spark 2.1, for any data source, to avoid infer the schema every time, we store the schema in the metastore catalog. Thus, when reading a cataloged data source table, the schema could be read from metastore catalog. In this case, we also got an exception. For example,
This PR is to fix the above issue. When building a data source, we introduce a flag
isSchemaFromUsersto indicate whether the schema is really input from users. If true, we issue an exception. Otherwise, we will call thecreateRelationofRelationProviderto generate theBaseRelation, in which it contains the actual schema.How was this patch tested?
Added a few cases.