diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 3c6649b26ecd..816f5f45860c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -819,8 +819,10 @@ object JdbcUtils extends Logging { if (null != customSchema && customSchema.nonEmpty) { val userSchema = CatalystSqlParser.parseTableSchema(customSchema) - SchemaUtils.checkColumnNameDuplication( - userSchema.map(_.name), "in the customSchema option value", nameEquality) + SchemaUtils.checkSchemaColumnNameDuplication( + userSchema, + "in the customSchema option value", + nameEquality) // This is resolved by names, use the custom filed dataType to replace the default dataType. val newSchema = tableSchema.map { col => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala index 152d59b7b190..78b314272aa0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala @@ -24,6 +24,16 @@ import org.apache.spark.sql.types.{LongType, StructType} // Datasource tests for nested schemas trait NestedDataSourceSuiteBase extends QueryTest with SharedSparkSession { protected val nestedDataSources: Seq[String] = Seq("orc", "parquet", "json") + protected def readOptions(schema: StructType): Map[String, String] = Map.empty + protected def save(selectExpr: Seq[String], format: String, path: String): Unit = { + spark + .range(1L) + .selectExpr(selectExpr: _*) + .write.mode("overwrite") + .format(format) + .save(path) + } + protected val colType: String = "in the data schema" test("SPARK-32431: consistent error for nested and top-level duplicate columns") { Seq( @@ -44,22 +54,17 @@ trait NestedDataSourceSuiteBase extends QueryTest with SharedSparkSession { withClue(s"format = $format select = ${selectExpr.mkString(",")}") { withTempPath { dir => val path = dir.getCanonicalPath - spark - .range(1L) - .selectExpr(selectExpr: _*) - .write.mode("overwrite") - .format(format) - .save(path) + save(selectExpr, format, path) val e = intercept[AnalysisException] { spark .read + .options(readOptions(caseInsensitiveSchema)) .schema(caseInsensitiveSchema) .format(format) .load(path) .show } - assert(e.getMessage.contains( - "Found duplicate column(s) in the data schema: `camelcase`")) + assert(e.getMessage.contains(s"Found duplicate column(s) $colType: `camelcase`")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCNestedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCNestedDataSourceSuite.scala new file mode 100644 index 000000000000..46bdb1918147 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCNestedDataSourceSuite.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc + +import org.apache.spark.sql.NestedDataSourceSuiteBase +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils + +class JDBCNestedDataSourceSuite extends NestedDataSourceSuiteBase { + override val nestedDataSources: Seq[String] = Seq("jdbc") + private val tempDir = Utils.createTempDir() + private val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass" + override val colType: String = "in the customSchema option value" + + override def afterAll(): Unit = { + Utils.deleteRecursively(tempDir) + super.afterAll() + } + + override def readOptions(schema: StructType): Map[String, String] = { + Map("url" -> url, "dbtable" -> "t1", "customSchema" -> schema.toDDL) + } + + override def save(selectExpr: Seq[String], format: String, path: String): Unit = { + // We ignore `selectExpr` because: + // 1. H2 doesn't support nested columns + // 2. JDBC datasource checks duplicates before comparing of user's schema with + // actual schema of `t1`. + spark + .range(1L) + .write.mode("overwrite") + .options(Map("url" -> url, "dbtable" -> "t1")) + .format(format) + .save() + } +}