-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-17492] [SQL] Fix Reading Cataloged Data Sources without Extending SchemaRelationProvider #15046
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17492] [SQL] Fix Reading Cataloged Data Sources without Extending SchemaRelationProvider #15046
Changes from all commits
17c2d50
00a49fe
335e0d6
4ab1b8a
31b8724
55ee864
59d06f8
7a80738
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -327,8 +327,13 @@ case class DataSource( | |
| dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions) | ||
| case (_: SchemaRelationProvider, None) => | ||
| throw new AnalysisException(s"A schema needs to be specified when using $className.") | ||
| case (_: RelationProvider, Some(_)) => | ||
| throw new AnalysisException(s"$className does not allow user-specified schemas.") | ||
| case (dataSource: RelationProvider, Some(schema)) => | ||
| val baseRelation = | ||
| dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions) | ||
| if (baseRelation.schema != schema) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cc @yhuai @liancheng to confirm, is it safe? |
||
| throw new AnalysisException(s"$className does not allow user-specified schemas.") | ||
| } | ||
| baseRelation | ||
|
|
||
| // We are reading from the results of a streaming query. Load files from the metadata log | ||
| // instead of listing them using HDFS APIs. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -65,6 +65,26 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { | |
| ) | ||
| } | ||
|
|
||
| test("insert into a temp view that does not point to an insertable data source") { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm, is this test related to this PR?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not related. This is also to improve the test case coverage. Feel free to let me know if you want to remove it from this PR. Thanks! |
||
| import testImplicits._ | ||
| withTempView("t1", "t2") { | ||
| sql( | ||
| """ | ||
| |CREATE TEMPORARY VIEW t1 | ||
| |USING org.apache.spark.sql.sources.SimpleScanSource | ||
| |OPTIONS ( | ||
| | From '1', | ||
| | To '10') | ||
| """.stripMargin) | ||
| sparkContext.parallelize(1 to 10).toDF("a").createOrReplaceTempView("t2") | ||
|
|
||
| val message = intercept[AnalysisException] { | ||
| sql("INSERT INTO TABLE t1 SELECT a FROM t2") | ||
| }.getMessage | ||
| assert(message.contains("does not allow insertion")) | ||
| } | ||
| } | ||
|
|
||
| test("PreInsert casting and renaming") { | ||
| sql( | ||
| s""" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -348,31 +348,51 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext { | |
| test("exceptions") { | ||
| // Make sure we do throw correct exception when users use a relation provider that | ||
| // only implements the RelationProvider or the SchemaRelationProvider. | ||
| val schemaNotAllowed = intercept[Exception] { | ||
| sql( | ||
| """ | ||
| |CREATE TEMPORARY VIEW relationProvierWithSchema (i int) | ||
| |USING org.apache.spark.sql.sources.SimpleScanSource | ||
| |OPTIONS ( | ||
| | From '1', | ||
| | To '10' | ||
| |) | ||
| """.stripMargin) | ||
| Seq("TEMPORARY VIEW", "TABLE").foreach { tableType => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these changes are also not related to this PR right? We are improving the test coverage here.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah |
||
| val schemaNotAllowed = intercept[Exception] { | ||
| sql( | ||
| s""" | ||
| |CREATE $tableType relationProvierWithSchema (i int) | ||
| |USING org.apache.spark.sql.sources.SimpleScanSource | ||
| |OPTIONS ( | ||
| | From '1', | ||
| | To '10' | ||
| |) | ||
| """.stripMargin) | ||
| } | ||
| assert(schemaNotAllowed.getMessage.contains("does not allow user-specified schemas")) | ||
|
|
||
| val schemaNeeded = intercept[Exception] { | ||
| sql( | ||
| s""" | ||
| |CREATE $tableType schemaRelationProvierWithoutSchema | ||
| |USING org.apache.spark.sql.sources.AllDataTypesScanSource | ||
| |OPTIONS ( | ||
| | From '1', | ||
| | To '10' | ||
| |) | ||
| """.stripMargin) | ||
| } | ||
| assert(schemaNeeded.getMessage.contains("A schema needs to be specified when using")) | ||
| } | ||
| assert(schemaNotAllowed.getMessage.contains("does not allow user-specified schemas")) | ||
| } | ||
|
|
||
| val schemaNeeded = intercept[Exception] { | ||
| sql( | ||
| """ | ||
| |CREATE TEMPORARY VIEW schemaRelationProvierWithoutSchema | ||
| |USING org.apache.spark.sql.sources.AllDataTypesScanSource | ||
| |OPTIONS ( | ||
| | From '1', | ||
| | To '10' | ||
| |) | ||
| """.stripMargin) | ||
| test("read the data source tables that do not extend SchemaRelationProvider") { | ||
| Seq("TEMPORARY VIEW", "TABLE").foreach { tableType => | ||
| val tableName = "relationProvierWithSchema" | ||
| withTable (tableName) { | ||
| sql( | ||
| s""" | ||
| |CREATE $tableType $tableName | ||
| |USING org.apache.spark.sql.sources.SimpleScanSource | ||
| |OPTIONS ( | ||
| | From '1', | ||
| | To '10' | ||
| |) | ||
| """.stripMargin) | ||
| checkAnswer(spark.table(tableName), spark.range(1, 11).toDF()) | ||
| } | ||
| } | ||
| assert(schemaNeeded.getMessage.contains("A schema needs to be specified when using")) | ||
| } | ||
|
|
||
| test("SPARK-5196 schema field with comment") { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -293,6 +293,39 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be | |
| Option(dir).map(spark.read.format("org.apache.spark.sql.test").load) | ||
| } | ||
|
|
||
| test("read a data source that does not extend SchemaRelationProvider") { | ||
| val dfReader = spark.read | ||
| .option("from", "1") | ||
| .option("TO", "10") | ||
| .format("org.apache.spark.sql.sources.SimpleScanSource") | ||
|
|
||
| // when users do not specify the schema | ||
| checkAnswer(dfReader.load(), spark.range(1, 11).toDF()) | ||
|
|
||
| // when users specify the schema | ||
| val inputSchema = new StructType().add("s", IntegerType, nullable = false) | ||
| val e = intercept[AnalysisException] { dfReader.schema(inputSchema).load() } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there is no test for this case before?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For |
||
| assert(e.getMessage.contains( | ||
| "org.apache.spark.sql.sources.SimpleScanSource does not allow user-specified schemas")) | ||
| } | ||
|
|
||
| test("read a data source that does not extend RelationProvider") { | ||
| val dfReader = spark.read | ||
| .option("from", "1") | ||
| .option("TO", "10") | ||
| .option("option_with_underscores", "someval") | ||
| .option("option.with.dots", "someval") | ||
| .format("org.apache.spark.sql.sources.AllDataTypesScanSource") | ||
|
|
||
| // when users do not specify the schema | ||
| val e = intercept[AnalysisException] { dfReader.load() } | ||
| assert(e.getMessage.contains("A schema needs to be specified when using")) | ||
|
|
||
| // when users specify the schema | ||
| val inputSchema = new StructType().add("s", StringType, nullable = false) | ||
| assert(dfReader.schema(inputSchema).load().count() == 10) | ||
| } | ||
|
|
||
| test("text - API and behavior regarding schema") { | ||
| // Writer | ||
| spark.createDataset(data).write.mode(SaveMode.Overwrite).text(dir) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the
RelationProviderdoesn't allow user-specified schema, can we assume it's cheap to infer schema for it? Then we can simply check if the given schema matches the schema of relation returned bycreateRelationThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is a pretty good idea. Let me try it. Thanks!