From da5c1ecbe494de0e39ce4a4f182a7e66fa2ad68b Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 31 Jul 2020 14:33:43 +0300 Subject: [PATCH 1/3] Add JdbcNestedDataSourceSuite --- .../spark/sql/NestedDataSourceSuite.scala | 21 +++++--- .../sql/jdbc/JdbcNestedDataSourceSuite.scala | 51 +++++++++++++++++++ 2 files changed, 64 insertions(+), 8 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/jdbc/JdbcNestedDataSourceSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala index 152d59b7b190..78b314272aa0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala @@ -24,6 +24,16 @@ import org.apache.spark.sql.types.{LongType, StructType} // Datasource tests for nested schemas trait NestedDataSourceSuiteBase extends QueryTest with SharedSparkSession { protected val nestedDataSources: Seq[String] = Seq("orc", "parquet", "json") + protected def readOptions(schema: StructType): Map[String, String] = Map.empty + protected def save(selectExpr: Seq[String], format: String, path: String): Unit = { + spark + .range(1L) + .selectExpr(selectExpr: _*) + .write.mode("overwrite") + .format(format) + .save(path) + } + protected val colType: String = "in the data schema" test("SPARK-32431: consistent error for nested and top-level duplicate columns") { Seq( @@ -44,22 +54,17 @@ trait NestedDataSourceSuiteBase extends QueryTest with SharedSparkSession { withClue(s"format = $format select = ${selectExpr.mkString(",")}") { withTempPath { dir => val path = dir.getCanonicalPath - spark - .range(1L) - .selectExpr(selectExpr: _*) - .write.mode("overwrite") - .format(format) - .save(path) + save(selectExpr, format, path) val e = intercept[AnalysisException] { spark .read + .options(readOptions(caseInsensitiveSchema)) .schema(caseInsensitiveSchema) .format(format) .load(path) .show } - assert(e.getMessage.contains( - "Found duplicate column(s) in the data schema: `camelcase`")) + assert(e.getMessage.contains(s"Found duplicate column(s) $colType: `camelcase`")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JdbcNestedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JdbcNestedDataSourceSuite.scala new file mode 100644 index 000000000000..6281f88ba7c7 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JdbcNestedDataSourceSuite.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc + +import org.apache.spark.sql.NestedDataSourceSuiteBase +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils + +class JdbcNestedDataSourceSuite extends NestedDataSourceSuiteBase { + override val nestedDataSources: Seq[String] = Seq("jdbc") + private val tempDir = Utils.createTempDir() + private val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass" + override val colType: String = "in the customSchema option value" + + override def afterAll(): Unit = { + Utils.deleteRecursively(tempDir) + super.afterAll() + } + + override def readOptions(schema: StructType): Map[String, String] = { + Map("url" -> url, "dbtable" -> "t1", "customSchema" -> schema.toDDL) + } + + override def save(selectExpr: Seq[String], format: String, path: String): Unit = { + // We ignore `selectExpr` because: + // 1. H2 doesn't support nested columns + // 2. JDBC datasource checks duplicates before comparing of user's schema with + // actual schema of `t1`. + spark + .range(1L) + .write.mode("overwrite") + .options(Map("url" -> url, "dbtable" -> "t1")) + .format(format) + .save() + } +} From be2c2249cf013c52182e30ff09573ffc7f00fc3d Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 31 Jul 2020 14:34:05 +0300 Subject: [PATCH 2/3] Fix JDBC --- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 3c6649b26ecd..816f5f45860c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -819,8 +819,10 @@ object JdbcUtils extends Logging { if (null != customSchema && customSchema.nonEmpty) { val userSchema = CatalystSqlParser.parseTableSchema(customSchema) - SchemaUtils.checkColumnNameDuplication( - userSchema.map(_.name), "in the customSchema option value", nameEquality) + SchemaUtils.checkSchemaColumnNameDuplication( + userSchema, + "in the customSchema option value", + nameEquality) // This is resolved by names, use the custom filed dataType to replace the default dataType. val newSchema = tableSchema.map { col => From 5d1dcaf37713b500f85f594bc7d6314a9270a740 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 31 Jul 2020 19:23:04 +0300 Subject: [PATCH 3/3] Renaming: Jdbc -> JDBC --- ...tedDataSourceSuite.scala => JDBCNestedDataSourceSuite.scala} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename sql/core/src/test/scala/org/apache/spark/sql/jdbc/{JdbcNestedDataSourceSuite.scala => JDBCNestedDataSourceSuite.scala} (96%) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JdbcNestedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCNestedDataSourceSuite.scala similarity index 96% rename from sql/core/src/test/scala/org/apache/spark/sql/jdbc/JdbcNestedDataSourceSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCNestedDataSourceSuite.scala index 6281f88ba7c7..46bdb1918147 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JdbcNestedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCNestedDataSourceSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.NestedDataSourceSuiteBase import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils -class JdbcNestedDataSourceSuite extends NestedDataSourceSuiteBase { +class JDBCNestedDataSourceSuite extends NestedDataSourceSuiteBase { override val nestedDataSources: Seq[String] = Seq("jdbc") private val tempDir = Utils.createTempDir() private val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass"