Skip to content
Closed
2 changes: 2 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ license: |
- In Spark 3.1, SQL UI data adopts the `formatted` mode for the query plan explain results. To restore the behavior before Spark 3.0, you can set `spark.sql.ui.explainMode` to `extended`.

- In Spark 3.1, `from_unixtime`, `unix_timestamp`,`to_unix_timestamp`, `to_timestamp` and `to_date` will fail if the specified datetime pattern is invalid. In Spark 3.0 or earlier, they result `NULL`.

- In Spark 3.1, the Parquet, ORC, Avro and JSON datasources throw the exception `org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema` in read if they detect duplicate names in top-level columns as well in nested structures. The datasources take into account the SQL config `spark.sql.caseSensitive` while detecting column name duplicates.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk Also update this for the changes made in #29317?


## Upgrading from Spark SQL 3.0 to 3.0.1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.v2.avro.AvroScan
import org.apache.spark.util.Utils

abstract class AvroSuite extends QueryTest with SharedSparkSession {
abstract class AvroSuite extends QueryTest with SharedSparkSession with NestedDataSourceSuiteBase {
import testImplicits._

override val nestedDataSources = Seq("avro")
val episodesAvro = testFile("episodes.avro")
val testAvro = testFile("test.avro")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,38 @@ private[spark] object SchemaUtils {
* @param caseSensitiveAnalysis whether duplication checks should be case sensitive or not
*/
def checkSchemaColumnNameDuplication(
schema: StructType, colType: String, caseSensitiveAnalysis: Boolean = false): Unit = {
checkColumnNameDuplication(schema.map(_.name), colType, caseSensitiveAnalysis)
schema: DataType,
colType: String,
caseSensitiveAnalysis: Boolean = false): Unit = {
schema match {
case ArrayType(elementType, _) =>
checkSchemaColumnNameDuplication(elementType, colType, caseSensitiveAnalysis)
case MapType(keyType, valueType, _) =>
checkSchemaColumnNameDuplication(keyType, colType, caseSensitiveAnalysis)
checkSchemaColumnNameDuplication(valueType, colType, caseSensitiveAnalysis)
case structType: StructType =>
val fields = structType.fields
checkColumnNameDuplication(fields.map(_.name), colType, caseSensitiveAnalysis)
fields.foreach { field =>
checkSchemaColumnNameDuplication(field.dataType, colType, caseSensitiveAnalysis)
}
case _ =>
}
}

/**
* Checks if an input schema has duplicate column names. This throws an exception if the
* duplication exists.
*
* @param schema schema to check
* @param colType column type name, used in an exception message
* @param resolver resolver used to determine if two identifiers are equal
*/
def checkSchemaColumnNameDuplication(
schema: StructType,
colType: String,
resolver: Resolver): Unit = {
checkSchemaColumnNameDuplication(schema, colType, isCaseSensitiveAnalysis(resolver))
}

// Returns true if a given resolver is case-sensitive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Locale
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{ArrayType, LongType, MapType, StructType}

class SchemaUtilsSuite extends SparkFunSuite {

Expand Down Expand Up @@ -82,4 +82,28 @@ class SchemaUtilsSuite extends SparkFunSuite {

checkNoExceptionCases("a INT, b INT, c INT", caseSensitive = false)
}

test("SPARK-32431: duplicated fields in nested schemas") {
val schemaA = new StructType()
.add("LowerCase", LongType)
.add("camelcase", LongType)
.add("CamelCase", LongType)
val schemaB = new StructType()
.add("f1", LongType)
.add("StructColumn1", schemaA)
val schemaC = new StructType()
.add("f2", LongType)
.add("StructColumn2", schemaB)
val schemaD = new StructType()
.add("f3", ArrayType(schemaC))
val schemaE = MapType(LongType, schemaD)
val schemaF = MapType(schemaD, LongType)
Seq(schemaA, schemaB, schemaC, schemaD, schemaE, schemaF).foreach { schema =>
val msg = intercept[AnalysisException] {
SchemaUtils.checkSchemaColumnNameDuplication(
schema, "in SchemaUtilsSuite", caseSensitiveAnalysis = false)
}.getMessage
assert(msg.contains("Found duplicate column(s) in SchemaUtilsSuite: `camelcase`"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -421,18 +421,18 @@ case class DataSource(

relation match {
case hs: HadoopFsRelation =>
SchemaUtils.checkColumnNameDuplication(
hs.dataSchema.map(_.name),
SchemaUtils.checkSchemaColumnNameDuplication(
hs.dataSchema,
"in the data schema",
equality)
SchemaUtils.checkColumnNameDuplication(
hs.partitionSchema.map(_.name),
SchemaUtils.checkSchemaColumnNameDuplication(
hs.partitionSchema,
"in the partition schema",
equality)
DataSourceUtils.verifySchema(hs.fileFormat, hs.dataSchema)
case _ =>
SchemaUtils.checkColumnNameDuplication(
relation.schema.map(_.name),
SchemaUtils.checkSchemaColumnNameDuplication(
relation.schema,
"in the data schema",
equality)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ abstract class FileTable(

override lazy val schema: StructType = {
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
SchemaUtils.checkColumnNameDuplication(dataSchema.fieldNames,
SchemaUtils.checkSchemaColumnNameDuplication(dataSchema,
"in the data schema", caseSensitive)
dataSchema.foreach { field =>
if (!supportsDataType(field.dataType)) {
Expand All @@ -88,7 +88,7 @@ abstract class FileTable(
}
}
val partitionSchema = fileIndex.partitionSchema
SchemaUtils.checkColumnNameDuplication(partitionSchema.fieldNames,
SchemaUtils.checkSchemaColumnNameDuplication(partitionSchema,
"in the partition schema", caseSensitive)
val partitionNameSet: Set[String] =
partitionSchema.fields.map(PartitioningUtils.getColName(_, caseSensitive)).toSet
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{LongType, StructType}

// Datasource tests for nested schemas
trait NestedDataSourceSuiteBase extends QueryTest with SharedSparkSession {
protected val nestedDataSources: Seq[String] = Seq("orc", "parquet", "json")

test("SPARK-32431: consistent error for nested and top-level duplicate columns") {
Seq(
Seq("id AS lowercase", "id + 1 AS camelCase") ->
new StructType()
.add("LowerCase", LongType)
.add("camelcase", LongType)
.add("CamelCase", LongType),
Seq("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn") ->
new StructType().add("StructColumn",
new StructType()
.add("LowerCase", LongType)
.add("camelcase", LongType)
.add("CamelCase", LongType))
).foreach { case (selectExpr: Seq[String], caseInsensitiveSchema: StructType) =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
nestedDataSources.map { format =>
withClue(s"format = $format select = ${selectExpr.mkString(",")}") {
withTempPath { dir =>
val path = dir.getCanonicalPath
spark
.range(1L)
.selectExpr(selectExpr: _*)
.write.mode("overwrite")
.format(format)
.save(path)
val e = intercept[AnalysisException] {
spark
.read
.schema(caseInsensitiveSchema)
.format(format)
.load(path)
.show
}
assert(e.getMessage.contains(
"Found duplicate column(s) in the data schema: `camelcase`"))
}
}
}
}
}
}
}

class NestedDataSourceV1Suite extends NestedDataSourceSuiteBase {
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_LIST, nestedDataSources.mkString(","))
}

class NestedDataSourceV2Suite extends NestedDataSourceSuiteBase {
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_LIST, "")
}