From 468660f4956b3fc2d015ae9bcfd4015ddd47f61a Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 6 Dec 2019 15:23:50 +0800 Subject: [PATCH 01/12] mismatch schema --- .../execution/datasources/DataSource.scala | 25 ++- .../datasources/jdbc/JdbcUtilsSuite.scala | 146 +++++++++++++++++- .../sql/test/DataFrameReaderWriterSuite.scala | 21 ++- 3 files changed, 185 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index e9b8fae7cd735..b77c918398309 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -339,11 +339,30 @@ case class DataSource( dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions) case (_: SchemaRelationProvider, None) => throw new AnalysisException(s"A schema needs to be specified when using $className.") - case (dataSource: RelationProvider, Some(schema)) => + case (dataSource: RelationProvider, Some(specifiedSchema)) => val baseRelation = dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions) - if (baseRelation.schema != schema) { - throw new AnalysisException(s"$className does not allow user-specified schemas.") + val persistentSchema = baseRelation.schema + val persistentSize = persistentSchema.size + val specifiedSize = specifiedSchema.size + if (persistentSize == specifiedSize) { + val (persistentFields, specifiedFields) = persistentSchema.zip(specifiedSchema) + .filter { case (existedField, userField) => existedField != userField } + .unzip + if (persistentFields.nonEmpty) { + val errorMsg = + s"Mismatched fields detected between persistent schema and user specified schema: " + + s"persistentFields: ${persistentFields.map(_.toDDL).mkString(", ")}, " + + s"specifiedFields: ${specifiedFields.map(_.toDDL).mkString(", ")}. " + + s"Please either correct your specified schema or just remove the specified schema." + throw new AnalysisException(errorMsg) + } + } else { + val errorMsg = + s"The number of fields between persistent schema and user specified schema mismatch: " + + s"expect $persistentSize fields, but got $specifiedSize fields. " + + s"Please either correct your specified schema or just remove the specified schema." + throw new AnalysisException(errorMsg) } baseRelation diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala index 7d277c1ffaffe..44ead1302cdd7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala @@ -17,12 +17,27 @@ package org.apache.spark.sql.execution.datasources.jdbc -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.{AnalysisException, SQLContext} import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.sources.{BaseRelation, RelationProvider} +import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ -class JdbcUtilsSuite extends SparkFunSuite { +/* A test JdbcRelationProvider used to provide persistent schema */ +class TestJdbcRelationProvider extends RelationProvider { + override def createRelation(sqlCtx: SQLContext, parameters: Map[String, String]) + : BaseRelation = { + new BaseRelation { + override def sqlContext: SQLContext = sqlCtx + override def schema: StructType = { + new StructType().add(StructField("a", StringType)).add(StructField("b", IntegerType)) + } + } + } +} + +class JdbcUtilsSuite extends SharedSparkSession { val tableSchema = StructType(Seq( StructField("C1", StringType, false), StructField("C2", IntegerType, false))) @@ -65,4 +80,129 @@ class JdbcUtilsSuite extends SparkFunSuite { } assert(mismatchedInput.getMessage.contains("mismatched input '.' expecting")) } + + test("Clarify mismatched fields between persistent & specified schema - number mismatch") { + // persistent: (a STRING, b INT) + val persistentSchema = + DataSource(spark, classOf[TestJdbcRelationProvider].getCanonicalName) + .resolveRelation() + .schema + // specified: (a STRING) + val specifiedSchema = new StructType() + .add(StructField("a", StringType)) + val msg = intercept[AnalysisException] { + DataSource( + spark, + classOf[TestJdbcRelationProvider].getCanonicalName, + userSpecifiedSchema = Some(specifiedSchema)) + .resolveRelation() + }.getMessage + assert(msg.contains( + "The number of fields between persistent schema and user specified schema mismatch")) + } + + test("Clarify mismatched fields between persistent & specified schema - wrong name") { + // persistent: (a STRING, b INT) + val persistentSchema = + DataSource(spark, classOf[TestJdbcRelationProvider].getCanonicalName) + .resolveRelation() + .schema + // specified: (a STRING, c INT) + val specifiedSchema = new StructType() + .add(StructField("a", StringType)) + .add(StructField("c", IntegerType)) // wrong field name + val msg = intercept[AnalysisException] { + DataSource( + spark, + classOf[TestJdbcRelationProvider].getCanonicalName, + userSpecifiedSchema = Some(specifiedSchema)) + .resolveRelation() + }.getMessage + assert(msg.contains(s"persistentFields: ${persistentSchema("b").toDDL}")) + assert(msg.contains(s"specifiedFields: ${specifiedSchema("c").toDDL}")) + } + + test("Clarify mismatched fields between persistent & specified schema - wrong type") { + // persistent: (a STRING, b INT) + val persistentSchema = + DataSource(spark, classOf[TestJdbcRelationProvider].getCanonicalName) + .resolveRelation() + .schema + // specified: (a STRING, b STRING) + val specifiedSchema = new StructType() + .add(StructField("a", StringType)) + .add(StructField("b", StringType)) // wrong filed type + val msg = intercept[AnalysisException] { + DataSource( + spark, + classOf[TestJdbcRelationProvider].getCanonicalName, + userSpecifiedSchema = Some(specifiedSchema)) + .resolveRelation() + }.getMessage + assert(msg.contains(s"persistentFields: ${persistentSchema("b").toDDL}")) + assert(msg.contains(s"specifiedFields: ${specifiedSchema("b").toDDL}")) + } + + test("Clarify mismatched fields between persistent & specified schema - wrong name & type") { + // persistent: (a STRING, b INT) + val persistentSchema = + DataSource(spark, classOf[TestJdbcRelationProvider].getCanonicalName) + .resolveRelation() + .schema + // specified: (a STRING, c STRING) + val specifiedSchema = new StructType() + .add(StructField("a", StringType)) + .add(StructField("c", StringType)) // wrong filed name and type + val msg = intercept[AnalysisException] { + DataSource( + spark, + classOf[TestJdbcRelationProvider].getCanonicalName, + userSpecifiedSchema = Some(specifiedSchema)) + .resolveRelation() + }.getMessage + assert(msg.contains(s"persistentFields: ${persistentSchema("b").toDDL}")) + assert(msg.contains(s"specifiedFields: ${specifiedSchema("c").toDDL}")) + } + + test("Clarify mismatched fields between persistent & specified schema - wrong order") { + // persistent: (a STRING, b INT) + val persistentSchema = + DataSource(spark, classOf[TestJdbcRelationProvider].getCanonicalName) + .resolveRelation() + .schema + // specified: (b INT, a STRING) + val specifiedSchema = new StructType() // wrong order + .add(StructField("b", IntegerType)) + .add(StructField("a", StringType)) + val msg = intercept[AnalysisException] { + DataSource( + spark, + classOf[TestJdbcRelationProvider].getCanonicalName, + userSpecifiedSchema = Some(specifiedSchema)) + .resolveRelation() + }.getMessage + assert(msg.contains(s"persistentFields: ${persistentSchema.map(_.toDDL).mkString(", ")}")) + assert(msg.contains(s"specifiedFields: ${specifiedSchema.map(_.toDDL).mkString(", ")}")) + } + + test("Clarify mismatched fields between persistent & specified schema - complex type") { + // persistent: (a STRING, b INT) + val persistentSchema = + DataSource(spark, classOf[TestJdbcRelationProvider].getCanonicalName) + .resolveRelation() + .schema + // specified: (a STRING, b STRUCT) + val specifiedSchema = new StructType() + .add(StructField("a", StringType)) + .add(StructField("b", StructType(StructField("c", IntegerType) :: Nil))) // complex type + val msg = intercept[AnalysisException] { + DataSource( + spark, + classOf[TestJdbcRelationProvider].getCanonicalName, + userSpecifiedSchema = Some(specifiedSchema)) + .resolveRelation() + }.getMessage + assert(msg.contains(s"persistentFields: ${persistentSchema("b").toDDL}")) + assert(msg.contains(s"specifiedFields: ${specifiedSchema("b").toDDL}")) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 55a60940a7750..272debd59db21 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -40,7 +40,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression} import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.execution.datasources.DataSourceUtils +import org.apache.spark.sql.execution.datasources.{DataSource, DataSourceUtils} +import org.apache.spark.sql.execution.datasources.jdbc.TestJdbcRelationProvider import org.apache.spark.sql.execution.datasources.noop.NoopDataSource import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase import org.apache.spark.sql.internal.SQLConf @@ -447,6 +448,24 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with } } + test("Clarify mismatched fields between persistent & specified schema") { + // persistent: (a STRING, b INT) + val persistentSchema = + DataSource(spark, classOf[TestJdbcRelationProvider].getCanonicalName) + .resolveRelation() + .schema + // specified: (a STRING, c INT) + val specifiedSchema = new StructType() + .add(StructField("a", StringType)) + .add(StructField("c", IntegerType)) + val msg = intercept[AnalysisException] { + spark.read.format(classOf[TestJdbcRelationProvider].getCanonicalName) + .schema(specifiedSchema).load() + }.getMessage + assert(msg.contains(s"persistentFields: ${persistentSchema("b").toDDL}")) + assert(msg.contains(s"specifiedFields: ${specifiedSchema("c").toDDL}")) + } + test("prevent all column partitioning") { withTempDir { dir => val path = dir.getCanonicalPath From 98478472f56ed63ffa83f1d0e4ea9e8b5aadb6c9 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 6 Dec 2019 22:19:22 +0800 Subject: [PATCH 02/12] update --- .../spark/sql/execution/datasources/DataSource.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index b77c918398309..d1830a361ad77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -354,14 +354,15 @@ case class DataSource( s"Mismatched fields detected between persistent schema and user specified schema: " + s"persistentFields: ${persistentFields.map(_.toDDL).mkString(", ")}, " + s"specifiedFields: ${specifiedFields.map(_.toDDL).mkString(", ")}. " + - s"Please either correct your specified schema or just remove the specified schema." + s"This happens either you make a mistake in schema or type mapping between Spark " + + s"and external data sources have been updated while your specified schema still " + + s"using the old schema." throw new AnalysisException(errorMsg) } } else { val errorMsg = s"The number of fields between persistent schema and user specified schema mismatch: " + - s"expect $persistentSize fields, but got $specifiedSize fields. " + - s"Please either correct your specified schema or just remove the specified schema." + s"expect $persistentSize fields, but got $specifiedSize fields. " throw new AnalysisException(errorMsg) } baseRelation From aab23914a16e3c8234def2dc866086534e90a8ee Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 6 Dec 2019 22:42:29 +0800 Subject: [PATCH 03/12] update again --- .../spark/sql/execution/datasources/DataSource.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index d1830a361ad77..ec25b18d3865f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -356,13 +356,15 @@ case class DataSource( s"specifiedFields: ${specifiedFields.map(_.toDDL).mkString(", ")}. " + s"This happens either you make a mistake in schema or type mapping between Spark " + s"and external data sources have been updated while your specified schema still " + - s"using the old schema." + s"using the old schema. Please either correct the schema or just do not specify " + + s"the schema." throw new AnalysisException(errorMsg) } } else { val errorMsg = - s"The number of fields between persistent schema and user specified schema mismatch: " + - s"expect $persistentSize fields, but got $specifiedSize fields. " + s"The number of fields between persistent schema and user specified schema " + + s"mismatched: expect $persistentSize fields, but got $specifiedSize fields. " + + s"Please either correct the schema or just do not specify the schema." throw new AnalysisException(errorMsg) } baseRelation From a4f31224f40cf337c29197014cb2961959c21d16 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 6 Dec 2019 23:08:37 +0800 Subject: [PATCH 04/12] improve error message --- .../apache/spark/sql/execution/datasources/DataSource.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index ec25b18d3865f..287e5c753ac54 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -357,14 +357,15 @@ case class DataSource( s"This happens either you make a mistake in schema or type mapping between Spark " + s"and external data sources have been updated while your specified schema still " + s"using the old schema. Please either correct the schema or just do not specify " + - s"the schema." + s"the schema since a specified schema for $className is not necessary." throw new AnalysisException(errorMsg) } } else { val errorMsg = s"The number of fields between persistent schema and user specified schema " + s"mismatched: expect $persistentSize fields, but got $specifiedSize fields. " + - s"Please either correct the schema or just do not specify the schema." + s"Please either correct the schema or just do not specify the schema since " + + s"a specified schema for $className is not necessary." throw new AnalysisException(errorMsg) } baseRelation From dd45804e5124181e171b9963cb98468645970557 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Fri, 6 Dec 2019 23:12:04 +0800 Subject: [PATCH 05/12] updated tests --- .../execution/datasources/jdbc/JdbcUtilsSuite.scala | 12 ++++++------ .../apache/spark/sql/sources/TableScanSuite.scala | 3 ++- .../spark/sql/test/DataFrameReaderWriterSuite.scala | 4 ++-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala index 44ead1302cdd7..57665a0ba836f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala @@ -81,7 +81,7 @@ class JdbcUtilsSuite extends SharedSparkSession { assert(mismatchedInput.getMessage.contains("mismatched input '.' expecting")) } - test("Clarify mismatched fields between persistent & specified schema - number mismatch") { + test("SPARK-30151: user-specified schema not match relation schema - number mismatch") { // persistent: (a STRING, b INT) val persistentSchema = DataSource(spark, classOf[TestJdbcRelationProvider].getCanonicalName) @@ -101,7 +101,7 @@ class JdbcUtilsSuite extends SharedSparkSession { "The number of fields between persistent schema and user specified schema mismatch")) } - test("Clarify mismatched fields between persistent & specified schema - wrong name") { + test("SPARK-30151: user-specified schema not match relation schema - wrong name") { // persistent: (a STRING, b INT) val persistentSchema = DataSource(spark, classOf[TestJdbcRelationProvider].getCanonicalName) @@ -122,7 +122,7 @@ class JdbcUtilsSuite extends SharedSparkSession { assert(msg.contains(s"specifiedFields: ${specifiedSchema("c").toDDL}")) } - test("Clarify mismatched fields between persistent & specified schema - wrong type") { + test("SPARK-30151: user-specified schema not match relation schema - wrong type") { // persistent: (a STRING, b INT) val persistentSchema = DataSource(spark, classOf[TestJdbcRelationProvider].getCanonicalName) @@ -143,7 +143,7 @@ class JdbcUtilsSuite extends SharedSparkSession { assert(msg.contains(s"specifiedFields: ${specifiedSchema("b").toDDL}")) } - test("Clarify mismatched fields between persistent & specified schema - wrong name & type") { + test("SPARK-30151: user-specified schema not match relation schema - wrong name & type") { // persistent: (a STRING, b INT) val persistentSchema = DataSource(spark, classOf[TestJdbcRelationProvider].getCanonicalName) @@ -164,7 +164,7 @@ class JdbcUtilsSuite extends SharedSparkSession { assert(msg.contains(s"specifiedFields: ${specifiedSchema("c").toDDL}")) } - test("Clarify mismatched fields between persistent & specified schema - wrong order") { + test("SPARK-30151: user-specified schema not match relation schema - wrong order") { // persistent: (a STRING, b INT) val persistentSchema = DataSource(spark, classOf[TestJdbcRelationProvider].getCanonicalName) @@ -185,7 +185,7 @@ class JdbcUtilsSuite extends SharedSparkSession { assert(msg.contains(s"specifiedFields: ${specifiedSchema.map(_.toDDL).mkString(", ")}")) } - test("Clarify mismatched fields between persistent & specified schema - complex type") { + test("SPARK-30151: user-specified schema not match relation schema - complex type") { // persistent: (a STRING, b INT) val persistentSchema = DataSource(spark, classOf[TestJdbcRelationProvider].getCanonicalName) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index d4e117953942e..9393fc342ed2e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -369,7 +369,8 @@ class TableScanSuite extends DataSourceTest with SharedSparkSession { |) """.stripMargin) } - assert(schemaNotAllowed.getMessage.contains("does not allow user-specified schemas")) + assert(schemaNotAllowed.getMessage.contains( + "a specified schema for org.apache.spark.sql.sources.SimpleScanSource is not necessary")) val schemaNeeded = intercept[Exception] { sql( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 272debd59db21..e930e15475fec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -448,7 +448,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with } } - test("Clarify mismatched fields between persistent & specified schema") { + test("SPARK-30151: user-specified schema not match relation schema") { // persistent: (a STRING, b INT) val persistentSchema = DataSource(spark, classOf[TestJdbcRelationProvider].getCanonicalName) @@ -512,7 +512,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with val inputSchema = new StructType().add("s", IntegerType, nullable = false) val e = intercept[AnalysisException] { dfReader.schema(inputSchema).load() } assert(e.getMessage.contains( - "org.apache.spark.sql.sources.SimpleScanSource does not allow user-specified schemas")) + "a specified schema for org.apache.spark.sql.sources.SimpleScanSource is not necessary")) } test("read a data source that does not extend RelationProvider") { From de036b6d7e2e6ffa165cc2f83d487930903a941b Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 9 Dec 2019 21:33:14 +0800 Subject: [PATCH 06/12] reword error msg --- .../execution/datasources/DataSource.scala | 32 ++++--------------- 1 file changed, 7 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 287e5c753ac54..797dafcc4f312 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -339,33 +339,15 @@ case class DataSource( dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions) case (_: SchemaRelationProvider, None) => throw new AnalysisException(s"A schema needs to be specified when using $className.") - case (dataSource: RelationProvider, Some(specifiedSchema)) => + case (dataSource: RelationProvider, Some(schema)) => val baseRelation = dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions) - val persistentSchema = baseRelation.schema - val persistentSize = persistentSchema.size - val specifiedSize = specifiedSchema.size - if (persistentSize == specifiedSize) { - val (persistentFields, specifiedFields) = persistentSchema.zip(specifiedSchema) - .filter { case (existedField, userField) => existedField != userField } - .unzip - if (persistentFields.nonEmpty) { - val errorMsg = - s"Mismatched fields detected between persistent schema and user specified schema: " + - s"persistentFields: ${persistentFields.map(_.toDDL).mkString(", ")}, " + - s"specifiedFields: ${specifiedFields.map(_.toDDL).mkString(", ")}. " + - s"This happens either you make a mistake in schema or type mapping between Spark " + - s"and external data sources have been updated while your specified schema still " + - s"using the old schema. Please either correct the schema or just do not specify " + - s"the schema since a specified schema for $className is not necessary." - throw new AnalysisException(errorMsg) - } - } else { - val errorMsg = - s"The number of fields between persistent schema and user specified schema " + - s"mismatched: expect $persistentSize fields, but got $specifiedSize fields. " + - s"Please either correct the schema or just do not specify the schema since " + - s"a specified schema for $className is not necessary." + if (baseRelation.schema != schema) { + val errorMsg = "The user-specified schema doesn't match the actual schema: " + + s"user-specified: ${schema.toDDL}, actual: ${baseRelation.schema.toDDL}. If " + + s"you're using DataFrameReader.schema API or creating a table, please do not " + + s"specify the schema. Or if you're scanning an existed table, please drop " + + s"it and re-create it." throw new AnalysisException(errorMsg) } baseRelation From 8fac1fa6fe48263f5877e006c060b87e79936539 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 9 Dec 2019 21:58:46 +0800 Subject: [PATCH 07/12] revert tests --- .../datasources/jdbc/JdbcUtilsSuite.scala | 125 ------------------ 1 file changed, 125 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala index 57665a0ba836f..6b3eee46d9b84 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala @@ -80,129 +80,4 @@ class JdbcUtilsSuite extends SharedSparkSession { } assert(mismatchedInput.getMessage.contains("mismatched input '.' expecting")) } - - test("SPARK-30151: user-specified schema not match relation schema - number mismatch") { - // persistent: (a STRING, b INT) - val persistentSchema = - DataSource(spark, classOf[TestJdbcRelationProvider].getCanonicalName) - .resolveRelation() - .schema - // specified: (a STRING) - val specifiedSchema = new StructType() - .add(StructField("a", StringType)) - val msg = intercept[AnalysisException] { - DataSource( - spark, - classOf[TestJdbcRelationProvider].getCanonicalName, - userSpecifiedSchema = Some(specifiedSchema)) - .resolveRelation() - }.getMessage - assert(msg.contains( - "The number of fields between persistent schema and user specified schema mismatch")) - } - - test("SPARK-30151: user-specified schema not match relation schema - wrong name") { - // persistent: (a STRING, b INT) - val persistentSchema = - DataSource(spark, classOf[TestJdbcRelationProvider].getCanonicalName) - .resolveRelation() - .schema - // specified: (a STRING, c INT) - val specifiedSchema = new StructType() - .add(StructField("a", StringType)) - .add(StructField("c", IntegerType)) // wrong field name - val msg = intercept[AnalysisException] { - DataSource( - spark, - classOf[TestJdbcRelationProvider].getCanonicalName, - userSpecifiedSchema = Some(specifiedSchema)) - .resolveRelation() - }.getMessage - assert(msg.contains(s"persistentFields: ${persistentSchema("b").toDDL}")) - assert(msg.contains(s"specifiedFields: ${specifiedSchema("c").toDDL}")) - } - - test("SPARK-30151: user-specified schema not match relation schema - wrong type") { - // persistent: (a STRING, b INT) - val persistentSchema = - DataSource(spark, classOf[TestJdbcRelationProvider].getCanonicalName) - .resolveRelation() - .schema - // specified: (a STRING, b STRING) - val specifiedSchema = new StructType() - .add(StructField("a", StringType)) - .add(StructField("b", StringType)) // wrong filed type - val msg = intercept[AnalysisException] { - DataSource( - spark, - classOf[TestJdbcRelationProvider].getCanonicalName, - userSpecifiedSchema = Some(specifiedSchema)) - .resolveRelation() - }.getMessage - assert(msg.contains(s"persistentFields: ${persistentSchema("b").toDDL}")) - assert(msg.contains(s"specifiedFields: ${specifiedSchema("b").toDDL}")) - } - - test("SPARK-30151: user-specified schema not match relation schema - wrong name & type") { - // persistent: (a STRING, b INT) - val persistentSchema = - DataSource(spark, classOf[TestJdbcRelationProvider].getCanonicalName) - .resolveRelation() - .schema - // specified: (a STRING, c STRING) - val specifiedSchema = new StructType() - .add(StructField("a", StringType)) - .add(StructField("c", StringType)) // wrong filed name and type - val msg = intercept[AnalysisException] { - DataSource( - spark, - classOf[TestJdbcRelationProvider].getCanonicalName, - userSpecifiedSchema = Some(specifiedSchema)) - .resolveRelation() - }.getMessage - assert(msg.contains(s"persistentFields: ${persistentSchema("b").toDDL}")) - assert(msg.contains(s"specifiedFields: ${specifiedSchema("c").toDDL}")) - } - - test("SPARK-30151: user-specified schema not match relation schema - wrong order") { - // persistent: (a STRING, b INT) - val persistentSchema = - DataSource(spark, classOf[TestJdbcRelationProvider].getCanonicalName) - .resolveRelation() - .schema - // specified: (b INT, a STRING) - val specifiedSchema = new StructType() // wrong order - .add(StructField("b", IntegerType)) - .add(StructField("a", StringType)) - val msg = intercept[AnalysisException] { - DataSource( - spark, - classOf[TestJdbcRelationProvider].getCanonicalName, - userSpecifiedSchema = Some(specifiedSchema)) - .resolveRelation() - }.getMessage - assert(msg.contains(s"persistentFields: ${persistentSchema.map(_.toDDL).mkString(", ")}")) - assert(msg.contains(s"specifiedFields: ${specifiedSchema.map(_.toDDL).mkString(", ")}")) - } - - test("SPARK-30151: user-specified schema not match relation schema - complex type") { - // persistent: (a STRING, b INT) - val persistentSchema = - DataSource(spark, classOf[TestJdbcRelationProvider].getCanonicalName) - .resolveRelation() - .schema - // specified: (a STRING, b STRUCT) - val specifiedSchema = new StructType() - .add(StructField("a", StringType)) - .add(StructField("b", StructType(StructField("c", IntegerType) :: Nil))) // complex type - val msg = intercept[AnalysisException] { - DataSource( - spark, - classOf[TestJdbcRelationProvider].getCanonicalName, - userSpecifiedSchema = Some(specifiedSchema)) - .resolveRelation() - }.getMessage - assert(msg.contains(s"persistentFields: ${persistentSchema("b").toDDL}")) - assert(msg.contains(s"specifiedFields: ${specifiedSchema("b").toDDL}")) - } } From 6700deef1bb04cca20b492a80c11b39c02b37e75 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 9 Dec 2019 21:58:57 +0800 Subject: [PATCH 08/12] update tests --- .../spark/sql/sources/TableScanSuite.scala | 6 ++--- .../sql/test/DataFrameReaderWriterSuite.scala | 23 ++----------------- 2 files changed, 5 insertions(+), 24 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index 9393fc342ed2e..9a95bf770772e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -358,7 +358,7 @@ class TableScanSuite extends DataSourceTest with SharedSparkSession { // Make sure we do throw correct exception when users use a relation provider that // only implements the RelationProvider or the SchemaRelationProvider. Seq("TEMPORARY VIEW", "TABLE").foreach { tableType => - val schemaNotAllowed = intercept[Exception] { + val schemaNotMatch = intercept[Exception] { sql( s""" |CREATE $tableType relationProvierWithSchema (i int) @@ -369,8 +369,8 @@ class TableScanSuite extends DataSourceTest with SharedSparkSession { |) """.stripMargin) } - assert(schemaNotAllowed.getMessage.contains( - "a specified schema for org.apache.spark.sql.sources.SimpleScanSource is not necessary")) + assert(schemaNotMatch.getMessage.contains( + "The user-specified schema doesn't match the actual schema")) val schemaNeeded = intercept[Exception] { sql( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index e930e15475fec..ac0e323534f88 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -448,24 +448,6 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with } } - test("SPARK-30151: user-specified schema not match relation schema") { - // persistent: (a STRING, b INT) - val persistentSchema = - DataSource(spark, classOf[TestJdbcRelationProvider].getCanonicalName) - .resolveRelation() - .schema - // specified: (a STRING, c INT) - val specifiedSchema = new StructType() - .add(StructField("a", StringType)) - .add(StructField("c", IntegerType)) - val msg = intercept[AnalysisException] { - spark.read.format(classOf[TestJdbcRelationProvider].getCanonicalName) - .schema(specifiedSchema).load() - }.getMessage - assert(msg.contains(s"persistentFields: ${persistentSchema("b").toDDL}")) - assert(msg.contains(s"specifiedFields: ${specifiedSchema("c").toDDL}")) - } - test("prevent all column partitioning") { withTempDir { dir => val path = dir.getCanonicalPath @@ -508,11 +490,10 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with // when users do not specify the schema checkAnswer(dfReader.load(), spark.range(1, 11).toDF()) - // when users specify the schema + // when users specify a wrong schema val inputSchema = new StructType().add("s", IntegerType, nullable = false) val e = intercept[AnalysisException] { dfReader.schema(inputSchema).load() } - assert(e.getMessage.contains( - "a specified schema for org.apache.spark.sql.sources.SimpleScanSource is not necessary")) + assert(e.getMessage.contains("The user-specified schema doesn't match the actual schema")) } test("read a data source that does not extend RelationProvider") { From f54dea90780e6e7ad78c2018af2411df921b3df9 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 9 Dec 2019 22:03:34 +0800 Subject: [PATCH 09/12] remove TestJdbcRelationProvider --- .../datasources/jdbc/JdbcUtilsSuite.scala | 21 +++---------------- 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala index 6b3eee46d9b84..7d277c1ffaffe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala @@ -17,27 +17,12 @@ package org.apache.spark.sql.execution.datasources.jdbc -import org.apache.spark.sql.{AnalysisException, SQLContext} +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.execution.datasources.DataSource -import org.apache.spark.sql.sources.{BaseRelation, RelationProvider} -import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ -/* A test JdbcRelationProvider used to provide persistent schema */ -class TestJdbcRelationProvider extends RelationProvider { - override def createRelation(sqlCtx: SQLContext, parameters: Map[String, String]) - : BaseRelation = { - new BaseRelation { - override def sqlContext: SQLContext = sqlCtx - override def schema: StructType = { - new StructType().add(StructField("a", StringType)).add(StructField("b", IntegerType)) - } - } - } -} - -class JdbcUtilsSuite extends SharedSparkSession { +class JdbcUtilsSuite extends SparkFunSuite { val tableSchema = StructType(Seq( StructField("C1", StringType, false), StructField("C2", IntegerType, false))) From ffdff16b83f9c7f893aaccad605b4607b28e30a1 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 9 Dec 2019 22:06:55 +0800 Subject: [PATCH 10/12] revert import --- .../org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index ac0e323534f88..fb939007697c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -40,8 +40,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression} import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.execution.datasources.{DataSource, DataSourceUtils} -import org.apache.spark.sql.execution.datasources.jdbc.TestJdbcRelationProvider +import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.execution.datasources.noop.NoopDataSource import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase import org.apache.spark.sql.internal.SQLConf From c2b5eea22bd3115254736ce8bacb3c23c7b7a613 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 9 Dec 2019 22:10:30 +0800 Subject: [PATCH 11/12] remove unneeded string interpolator --- .../apache/spark/sql/execution/datasources/DataSource.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 797dafcc4f312..b7152a99d5c98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -345,9 +345,9 @@ case class DataSource( if (baseRelation.schema != schema) { val errorMsg = "The user-specified schema doesn't match the actual schema: " + s"user-specified: ${schema.toDDL}, actual: ${baseRelation.schema.toDDL}. If " + - s"you're using DataFrameReader.schema API or creating a table, please do not " + - s"specify the schema. Or if you're scanning an existed table, please drop " + - s"it and re-create it." + "you're using DataFrameReader.schema API or creating a table, please do not " + + "specify the schema. Or if you're scanning an existed table, please drop " + + "it and re-create it." throw new AnalysisException(errorMsg) } baseRelation From 814821a12b14b1726ff3959bc236f64383400051 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 10 Dec 2019 13:31:42 +0800 Subject: [PATCH 12/12] address comment --- .../apache/spark/sql/execution/datasources/DataSource.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index b7152a99d5c98..3615afcf86c7a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -343,12 +343,12 @@ case class DataSource( val baseRelation = dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions) if (baseRelation.schema != schema) { - val errorMsg = "The user-specified schema doesn't match the actual schema: " + + throw new AnalysisException( + "The user-specified schema doesn't match the actual schema: " + s"user-specified: ${schema.toDDL}, actual: ${baseRelation.schema.toDDL}. If " + "you're using DataFrameReader.schema API or creating a table, please do not " + "specify the schema. Or if you're scanning an existed table, please drop " + - "it and re-create it." - throw new AnalysisException(errorMsg) + "it and re-create it.") } baseRelation