From e0c09201382ffa005b25e189d4a3fe0d0d699bf0 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 25 Jul 2020 11:58:36 +0300 Subject: [PATCH 01/15] Add a test to FileBasedDataSourceSuite --- .../spark/sql/FileBasedDataSourceSuite.scala | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index e9bff64d72fc..b23e82ed7663 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -902,6 +902,40 @@ class FileBasedDataSourceSuite extends QueryTest } } } + + test("SPARK-32431: consistent error for nested and top-level duplicate columns") { + val caseInsensitiveSchema = new StructType() + .add("LowerCase", LongType) + .add("camelcase", LongType) + .add("CamelCase", LongType) + Seq( + "id AS lowercase", "id + 1 AS camelCase", + "NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn").foreach { selectExpr => + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + Seq("parquet", "orc", "json").map { format => + withTempPath { dir => + val path = dir.getCanonicalPath + spark + .range(1L) + .selectExpr(selectExpr) + .write.mode("overwrite") + .format(format) + .save(path) + val e = intercept[AnalysisException] { + spark + .read + .schema(caseInsensitiveSchema) + .format(format) + .load(path) + .show + } + assert(e.getMessage.contains( + "Found duplicate column(s) in the data schema: `camelcase`")) + } + } + } + } + } } object TestingUDT { From cc971f42df4ca2f5e7c7466303940dfab943aba9 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 25 Jul 2020 12:06:34 +0300 Subject: [PATCH 02/15] Add a test to AvroSuite --- .../org/apache/spark/sql/avro/AvroSuite.scala | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 83a7ef0061fb..f5088103d225 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -1800,6 +1800,38 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { assert(version === SPARK_VERSION_SHORT) } } + + test("SPARK-32431: consistent error for nested and top-level duplicate columns") { + val caseInsensitiveSchema = new StructType() + .add("LowerCase", LongType) + .add("camelcase", LongType) + .add("CamelCase", LongType) + Seq( + "id AS lowercase", "id + 1 AS camelCase", + "NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn").foreach { selectExpr => + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + withTempPath { dir => + val path = dir.getCanonicalPath + spark + .range(1L) + .selectExpr(selectExpr) + .write.mode("overwrite") + .format("avro") + .save(path) + val e = intercept[AnalysisException] { + spark + .read + .schema(caseInsensitiveSchema) + .format("avro") + .load(path) + .show + } + assert(e.getMessage.contains( + "Found duplicate column(s) in the data schema: `camelcase`")) + } + } + } + } } class AvroV1Suite extends AvroSuite { From d0764d2f15cf8e19d802b46142ec2b0e8b4206a7 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 27 Jul 2020 19:50:43 +0300 Subject: [PATCH 03/15] Fix tests --- .../org/apache/spark/sql/avro/AvroSuite.scala | 20 +++++--- .../spark/sql/FileBasedDataSourceSuite.scala | 50 +++++++++++-------- 2 files changed, 42 insertions(+), 28 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index f5088103d225..dc1ce2183e82 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -1802,19 +1802,25 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { } test("SPARK-32431: consistent error for nested and top-level duplicate columns") { - val caseInsensitiveSchema = new StructType() - .add("LowerCase", LongType) - .add("camelcase", LongType) - .add("CamelCase", LongType) Seq( - "id AS lowercase", "id + 1 AS camelCase", - "NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn").foreach { selectExpr => + Seq("id AS lowercase", "id + 1 AS camelCase") -> + new StructType() + .add("LowerCase", LongType) + .add("camelcase", LongType) + .add("CamelCase", LongType), + Seq("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn") -> + new StructType().add("StructColumn", + new StructType() + .add("LowerCase", LongType) + .add("camelcase", LongType) + .add("CamelCase", LongType)) + ).foreach { case (selectExpr: Seq[String], caseInsensitiveSchema: StructType) => withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { withTempPath { dir => val path = dir.getCanonicalPath spark .range(1L) - .selectExpr(selectExpr) + .selectExpr(selectExpr: _*) .write.mode("overwrite") .format("avro") .save(path) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index b23e82ed7663..39e69e1e8e44 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -904,33 +904,41 @@ class FileBasedDataSourceSuite extends QueryTest } test("SPARK-32431: consistent error for nested and top-level duplicate columns") { - val caseInsensitiveSchema = new StructType() - .add("LowerCase", LongType) - .add("camelcase", LongType) - .add("CamelCase", LongType) Seq( - "id AS lowercase", "id + 1 AS camelCase", - "NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn").foreach { selectExpr => + Seq("id AS lowercase", "id + 1 AS camelCase") -> + new StructType() + .add("LowerCase", LongType) + .add("camelcase", LongType) + .add("CamelCase", LongType), + Seq("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn") -> + new StructType().add("StructColumn", + new StructType() + .add("LowerCase", LongType) + .add("camelcase", LongType) + .add("CamelCase", LongType)) + ).foreach { case (selectExpr: Seq[String], caseInsensitiveSchema: StructType) => withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { Seq("parquet", "orc", "json").map { format => - withTempPath { dir => - val path = dir.getCanonicalPath - spark - .range(1L) - .selectExpr(selectExpr) - .write.mode("overwrite") - .format(format) - .save(path) - val e = intercept[AnalysisException] { + withClue(s"format = $format select = ${selectExpr.mkString(",")}") { + withTempPath { dir => + val path = dir.getCanonicalPath spark - .read - .schema(caseInsensitiveSchema) + .range(1L) + .selectExpr(selectExpr: _*) + .write.mode("overwrite") .format(format) - .load(path) - .show + .save(path) + val e = intercept[AnalysisException] { + spark + .read + .schema(caseInsensitiveSchema) + .format(format) + .load(path) + .show + } + assert(e.getMessage.contains( + "Found duplicate column(s) in the data schema: `camelcase`")) } - assert(e.getMessage.contains( - "Found duplicate column(s) in the data schema: `camelcase`")) } } } From 61a9e94b3cd1fbb92dbe2a730626d3a9cb7acc26 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 27 Jul 2020 21:59:17 +0300 Subject: [PATCH 04/15] Check nested structs --- .../org/apache/spark/sql/util/SchemaUtils.scala | 11 ++++++++++- .../apache/spark/sql/util/SchemaUtilsSuite.scala | 15 ++++++++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala index 27b5eec27281..ca4982e3c392 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.util import java.util.Locale +import scala.collection.mutable.Queue + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, NamedTransform, Transform} @@ -42,7 +44,14 @@ private[spark] object SchemaUtils { */ def checkSchemaColumnNameDuplication( schema: StructType, colType: String, caseSensitiveAnalysis: Boolean = false): Unit = { - checkColumnNameDuplication(schema.map(_.name), colType, caseSensitiveAnalysis) + val queue = new Queue[StructType]() + queue.enqueue(schema) + do { + val st = queue.dequeue() + checkColumnNameDuplication(st.map(_.name), colType, caseSensitiveAnalysis) + val nestedStructs = st.map(_.dataType).collect { case st: StructType => st } + queue.enqueue(nestedStructs: _*) + } while (queue.nonEmpty) } // Returns true if a given resolver is case-sensitive diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala index 2f576a4031e9..8afd2d123979 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala @@ -22,7 +22,7 @@ import java.util.Locale import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{LongType, StructType} class SchemaUtilsSuite extends SparkFunSuite { @@ -82,4 +82,17 @@ class SchemaUtilsSuite extends SparkFunSuite { checkNoExceptionCases("a INT, b INT, c INT", caseSensitive = false) } + + test("SPARK-32431: duplicated fields in nested schemas") { + val schema = new StructType().add("StructColumn", + new StructType() + .add("LowerCase", LongType) + .add("camelcase", LongType) + .add("CamelCase", LongType)) + val msg = intercept[AnalysisException] { + SchemaUtils.checkSchemaColumnNameDuplication( + schema, "in SchemaUtilsSuite", caseSensitiveAnalysis = false) + }.getMessage + assert(msg.contains("Found duplicate column(s) in SchemaUtilsSuite: `camelcase`")) + } } From 378a3978f50dd3dce5386e3a564314b2f1eb6e85 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 27 Jul 2020 22:08:29 +0300 Subject: [PATCH 05/15] Refactoring --- .../apache/spark/sql/util/SchemaUtils.scala | 6 ++--- .../spark/sql/util/SchemaUtilsSuite.scala | 27 ++++++++++++------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala index ca4982e3c392..ef6794a3690e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala @@ -47,9 +47,9 @@ private[spark] object SchemaUtils { val queue = new Queue[StructType]() queue.enqueue(schema) do { - val st = queue.dequeue() - checkColumnNameDuplication(st.map(_.name), colType, caseSensitiveAnalysis) - val nestedStructs = st.map(_.dataType).collect { case st: StructType => st } + val struct = queue.dequeue() + checkColumnNameDuplication(struct.map(_.name), colType, caseSensitiveAnalysis) + val nestedStructs = struct.map(_.dataType).collect { case st: StructType => st } queue.enqueue(nestedStructs: _*) } while (queue.nonEmpty) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala index 8afd2d123979..301270c23b8b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala @@ -84,15 +84,22 @@ class SchemaUtilsSuite extends SparkFunSuite { } test("SPARK-32431: duplicated fields in nested schemas") { - val schema = new StructType().add("StructColumn", - new StructType() - .add("LowerCase", LongType) - .add("camelcase", LongType) - .add("CamelCase", LongType)) - val msg = intercept[AnalysisException] { - SchemaUtils.checkSchemaColumnNameDuplication( - schema, "in SchemaUtilsSuite", caseSensitiveAnalysis = false) - }.getMessage - assert(msg.contains("Found duplicate column(s) in SchemaUtilsSuite: `camelcase`")) + val schemaA = new StructType() + .add("LowerCase", LongType) + .add("camelcase", LongType) + .add("CamelCase", LongType) + val schemaB = new StructType() + .add("f1", LongType) + .add("StructColumn1", schemaA) + val schemaC = new StructType() + .add("f2", LongType) + .add("StructColumn2", schemaB) + Seq(schemaA, schemaB, schemaC).foreach { schema => + val msg = intercept[AnalysisException] { + SchemaUtils.checkSchemaColumnNameDuplication( + schema, "in SchemaUtilsSuite", caseSensitiveAnalysis = false) + }.getMessage + assert(msg.contains("Found duplicate column(s) in SchemaUtilsSuite: `camelcase`")) + } } } From 89067327d3535d00e2f13122e6997f70c86f114c Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 27 Jul 2020 22:10:37 +0300 Subject: [PATCH 06/15] Ignore tests --- .../src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala | 2 +- .../scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index dc1ce2183e82..c496c5634337 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -1801,7 +1801,7 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { } } - test("SPARK-32431: consistent error for nested and top-level duplicate columns") { + ignore("SPARK-32431: consistent error for nested and top-level duplicate columns") { Seq( Seq("id AS lowercase", "id + 1 AS camelCase") -> new StructType() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 39e69e1e8e44..424e51e68f8c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -903,7 +903,7 @@ class FileBasedDataSourceSuite extends QueryTest } } - test("SPARK-32431: consistent error for nested and top-level duplicate columns") { + ignore("SPARK-32431: consistent error for nested and top-level duplicate columns") { Seq( Seq("id AS lowercase", "id + 1 AS camelCase") -> new StructType() From e66b03cd171e22084aa2d15fed79ae0610cc0a22 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 28 Jul 2020 10:04:21 +0300 Subject: [PATCH 07/15] Enable tests in AvroSuite and in FileBasedDataSourceSuite --- .../scala/org/apache/spark/sql/avro/AvroSuite.scala | 2 +- .../org/apache/spark/sql/util/SchemaUtils.scala | 13 +++++++++++++ .../sql/execution/datasources/DataSource.scala | 12 ++++++------ .../sql/execution/datasources/v2/FileTable.scala | 4 ++-- .../apache/spark/sql/FileBasedDataSourceSuite.scala | 2 +- 5 files changed, 23 insertions(+), 10 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index c496c5634337..dc1ce2183e82 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -1801,7 +1801,7 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { } } - ignore("SPARK-32431: consistent error for nested and top-level duplicate columns") { + test("SPARK-32431: consistent error for nested and top-level duplicate columns") { Seq( Seq("id AS lowercase", "id + 1 AS camelCase") -> new StructType() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala index ef6794a3690e..2d59c973c5cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala @@ -54,6 +54,19 @@ private[spark] object SchemaUtils { } while (queue.nonEmpty) } + /** + * Checks if an input schema has duplicate column names. This throws an exception if the + * duplication exists. + * + * @param schema schema to check + * @param colType column type name, used in an exception message + * @param resolver resolver used to determine if two identifiers are equal + */ + def checkSchemaColumnNameDuplication( + schema: StructType, colType: String, resolver: Resolver): Unit = { + checkSchemaColumnNameDuplication(schema, colType, isCaseSensitiveAnalysis(resolver)) + } + // Returns true if a given resolver is case-sensitive private def isCaseSensitiveAnalysis(resolver: Resolver): Boolean = { if (resolver == caseSensitiveResolution) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index db564485be88..36e5eb33e1ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -421,18 +421,18 @@ case class DataSource( relation match { case hs: HadoopFsRelation => - SchemaUtils.checkColumnNameDuplication( - hs.dataSchema.map(_.name), + SchemaUtils.checkSchemaColumnNameDuplication( + hs.dataSchema, "in the data schema", equality) - SchemaUtils.checkColumnNameDuplication( - hs.partitionSchema.map(_.name), + SchemaUtils.checkSchemaColumnNameDuplication( + hs.partitionSchema, "in the partition schema", equality) DataSourceUtils.verifySchema(hs.fileFormat, hs.dataSchema) case _ => - SchemaUtils.checkColumnNameDuplication( - relation.schema.map(_.name), + SchemaUtils.checkSchemaColumnNameDuplication( + relation.schema, "in the data schema", equality) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index 59dc3ae56bf2..7bd05f12873f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -79,7 +79,7 @@ abstract class FileTable( override lazy val schema: StructType = { val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis - SchemaUtils.checkColumnNameDuplication(dataSchema.fieldNames, + SchemaUtils.checkSchemaColumnNameDuplication(dataSchema, "in the data schema", caseSensitive) dataSchema.foreach { field => if (!supportsDataType(field.dataType)) { @@ -88,7 +88,7 @@ abstract class FileTable( } } val partitionSchema = fileIndex.partitionSchema - SchemaUtils.checkColumnNameDuplication(partitionSchema.fieldNames, + SchemaUtils.checkSchemaColumnNameDuplication(partitionSchema, "in the partition schema", caseSensitive) val partitionNameSet: Set[String] = partitionSchema.fields.map(PartitioningUtils.getColName(_, caseSensitive)).toSet diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 424e51e68f8c..39e69e1e8e44 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -903,7 +903,7 @@ class FileBasedDataSourceSuite extends QueryTest } } - ignore("SPARK-32431: consistent error for nested and top-level duplicate columns") { + test("SPARK-32431: consistent error for nested and top-level duplicate columns") { Seq( Seq("id AS lowercase", "id + 1 AS camelCase") -> new StructType() From bd03de50da61bcfb1b638aa76b05c31c19527f06 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 28 Jul 2020 18:57:24 +0300 Subject: [PATCH 08/15] Put common code to NestedDataSourceSuiteBase --- .../org/apache/spark/sql/avro/AvroSuite.scala | 41 +-------- .../spark/sql/FileBasedDataSourceSuite.scala | 91 ++++++++++--------- 2 files changed, 50 insertions(+), 82 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index dc1ce2183e82..76f5de5724c0 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -50,9 +50,10 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.v2.avro.AvroScan import org.apache.spark.util.Utils -abstract class AvroSuite extends QueryTest with SharedSparkSession { +abstract class AvroSuite extends QueryTest with SharedSparkSession with NestedDataSourceSuiteBase { import testImplicits._ + override val nestedDataSources = Seq("avro") val episodesAvro = testFile("episodes.avro") val testAvro = testFile("test.avro") @@ -1800,44 +1801,6 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { assert(version === SPARK_VERSION_SHORT) } } - - test("SPARK-32431: consistent error for nested and top-level duplicate columns") { - Seq( - Seq("id AS lowercase", "id + 1 AS camelCase") -> - new StructType() - .add("LowerCase", LongType) - .add("camelcase", LongType) - .add("CamelCase", LongType), - Seq("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn") -> - new StructType().add("StructColumn", - new StructType() - .add("LowerCase", LongType) - .add("camelcase", LongType) - .add("CamelCase", LongType)) - ).foreach { case (selectExpr: Seq[String], caseInsensitiveSchema: StructType) => - withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { - withTempPath { dir => - val path = dir.getCanonicalPath - spark - .range(1L) - .selectExpr(selectExpr: _*) - .write.mode("overwrite") - .format("avro") - .save(path) - val e = intercept[AnalysisException] { - spark - .read - .schema(caseInsensitiveSchema) - .format("avro") - .load(path) - .show - } - assert(e.getMessage.contains( - "Found duplicate column(s) in the data schema: `camelcase`")) - } - } - } - } } class AvroV1Suite extends AvroSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 39e69e1e8e44..68a6a9c7e7b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -43,10 +43,56 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ +trait NestedDataSourceSuiteBase extends QueryTest with SharedSparkSession{ + protected val nestedDataSources: Seq[String] + + test("SPARK-32431: consistent error for nested and top-level duplicate columns") { + Seq( + Seq("id AS lowercase", "id + 1 AS camelCase") -> + new StructType() + .add("LowerCase", LongType) + .add("camelcase", LongType) + .add("CamelCase", LongType), + Seq("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn") -> + new StructType().add("StructColumn", + new StructType() + .add("LowerCase", LongType) + .add("camelcase", LongType) + .add("CamelCase", LongType)) + ).foreach { case (selectExpr: Seq[String], caseInsensitiveSchema: StructType) => + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + nestedDataSources.map { format => + withClue(s"format = $format select = ${selectExpr.mkString(",")}") { + withTempPath { dir => + val path = dir.getCanonicalPath + spark + .range(1L) + .selectExpr(selectExpr: _*) + .write.mode("overwrite") + .format(format) + .save(path) + val e = intercept[AnalysisException] { + spark + .read + .schema(caseInsensitiveSchema) + .format(format) + .load(path) + .show + } + assert(e.getMessage.contains( + "Found duplicate column(s) in the data schema: `camelcase`")) + } + } + } + } + } + } +} class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession - with AdaptiveSparkPlanHelper { + with AdaptiveSparkPlanHelper + with NestedDataSourceSuiteBase { import testImplicits._ override def beforeAll(): Unit = { @@ -62,6 +108,7 @@ class FileBasedDataSourceSuite extends QueryTest } } + override val nestedDataSources = Seq("orc", "parquet", "json") private val allFileBasedDataSources = Seq("orc", "parquet", "csv", "json", "text") private val nameWithSpecialChars = "sp&cial%c hars" @@ -902,48 +949,6 @@ class FileBasedDataSourceSuite extends QueryTest } } } - - test("SPARK-32431: consistent error for nested and top-level duplicate columns") { - Seq( - Seq("id AS lowercase", "id + 1 AS camelCase") -> - new StructType() - .add("LowerCase", LongType) - .add("camelcase", LongType) - .add("CamelCase", LongType), - Seq("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn") -> - new StructType().add("StructColumn", - new StructType() - .add("LowerCase", LongType) - .add("camelcase", LongType) - .add("CamelCase", LongType)) - ).foreach { case (selectExpr: Seq[String], caseInsensitiveSchema: StructType) => - withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { - Seq("parquet", "orc", "json").map { format => - withClue(s"format = $format select = ${selectExpr.mkString(",")}") { - withTempPath { dir => - val path = dir.getCanonicalPath - spark - .range(1L) - .selectExpr(selectExpr: _*) - .write.mode("overwrite") - .format(format) - .save(path) - val e = intercept[AnalysisException] { - spark - .read - .schema(caseInsensitiveSchema) - .format(format) - .load(path) - .show - } - assert(e.getMessage.contains( - "Found duplicate column(s) in the data schema: `camelcase`")) - } - } - } - } - } - } } object TestingUDT { From 078bf504b657635bdfce9bbff9c74ba87f2449aa Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 29 Jul 2020 09:47:38 +0300 Subject: [PATCH 09/15] Replace checkSchemaColumnNameDuplication by rec impl --- .../org/apache/spark/sql/util/SchemaUtils.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala index 2d59c973c5cc..e60f244c87e5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala @@ -44,14 +44,12 @@ private[spark] object SchemaUtils { */ def checkSchemaColumnNameDuplication( schema: StructType, colType: String, caseSensitiveAnalysis: Boolean = false): Unit = { - val queue = new Queue[StructType]() - queue.enqueue(schema) - do { - val struct = queue.dequeue() - checkColumnNameDuplication(struct.map(_.name), colType, caseSensitiveAnalysis) - val nestedStructs = struct.map(_.dataType).collect { case st: StructType => st } - queue.enqueue(nestedStructs: _*) - } while (queue.nonEmpty) + val fields = schema.fields + checkColumnNameDuplication(fields.map(_.name), colType, caseSensitiveAnalysis) + fields.map(_.dataType).foreach { + case st: StructType => checkSchemaColumnNameDuplication(st, colType, caseSensitiveAnalysis) + case _ => + } } /** From a746cdf56d4b302e455ef2cb679a1c7a6f4fe4f7 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 29 Jul 2020 11:01:24 +0300 Subject: [PATCH 10/15] Move tests to NestedDataSourceSuite --- .../spark/sql/FileBasedDataSourceSuite.scala | 49 +---------- .../spark/sql/NestedDataSourceSuite.scala | 83 +++++++++++++++++++ 2 files changed, 84 insertions(+), 48 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 68a6a9c7e7b1..e9bff64d72fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -43,56 +43,10 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ -trait NestedDataSourceSuiteBase extends QueryTest with SharedSparkSession{ - protected val nestedDataSources: Seq[String] - - test("SPARK-32431: consistent error for nested and top-level duplicate columns") { - Seq( - Seq("id AS lowercase", "id + 1 AS camelCase") -> - new StructType() - .add("LowerCase", LongType) - .add("camelcase", LongType) - .add("CamelCase", LongType), - Seq("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn") -> - new StructType().add("StructColumn", - new StructType() - .add("LowerCase", LongType) - .add("camelcase", LongType) - .add("CamelCase", LongType)) - ).foreach { case (selectExpr: Seq[String], caseInsensitiveSchema: StructType) => - withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { - nestedDataSources.map { format => - withClue(s"format = $format select = ${selectExpr.mkString(",")}") { - withTempPath { dir => - val path = dir.getCanonicalPath - spark - .range(1L) - .selectExpr(selectExpr: _*) - .write.mode("overwrite") - .format(format) - .save(path) - val e = intercept[AnalysisException] { - spark - .read - .schema(caseInsensitiveSchema) - .format(format) - .load(path) - .show - } - assert(e.getMessage.contains( - "Found duplicate column(s) in the data schema: `camelcase`")) - } - } - } - } - } - } -} class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession - with AdaptiveSparkPlanHelper - with NestedDataSourceSuiteBase { + with AdaptiveSparkPlanHelper { import testImplicits._ override def beforeAll(): Unit = { @@ -108,7 +62,6 @@ class FileBasedDataSourceSuite extends QueryTest } } - override val nestedDataSources = Seq("orc", "parquet", "json") private val allFileBasedDataSources = Seq("orc", "parquet", "csv", "json", "text") private val nameWithSpecialChars = "sp&cial%c hars" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala new file mode 100644 index 000000000000..0066c401e1d6 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql + +import org.apache.spark.SparkConf +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{LongType, StructType} + +// Datasource tests for nested schemas +trait NestedDataSourceSuiteBase extends QueryTest with SharedSparkSession{ + protected val nestedDataSources: Seq[String] = Seq("orc", "parquet", "json") + + test("SPARK-32431: consistent error for nested and top-level duplicate columns") { + Seq( + Seq("id AS lowercase", "id + 1 AS camelCase") -> + new StructType() + .add("LowerCase", LongType) + .add("camelcase", LongType) + .add("CamelCase", LongType), + Seq("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn") -> + new StructType().add("StructColumn", + new StructType() + .add("LowerCase", LongType) + .add("camelcase", LongType) + .add("CamelCase", LongType)) + ).foreach { case (selectExpr: Seq[String], caseInsensitiveSchema: StructType) => + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + nestedDataSources.map { format => + withClue(s"format = $format select = ${selectExpr.mkString(",")}") { + withTempPath { dir => + val path = dir.getCanonicalPath + spark + .range(1L) + .selectExpr(selectExpr: _*) + .write.mode("overwrite") + .format(format) + .save(path) + val e = intercept[AnalysisException] { + spark + .read + .schema(caseInsensitiveSchema) + .format(format) + .load(path) + .show + } + assert(e.getMessage.contains( + "Found duplicate column(s) in the data schema: `camelcase`")) + } + } + } + } + } + } +} + +class NestedDataSourceV1Suite extends NestedDataSourceSuiteBase { + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_LIST, nestedDataSources.mkString(",")) +} + +class NestedDataSourceV2Suite extends NestedDataSourceSuiteBase { + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_LIST, "") +} From ae7268c8f35d4d7060586a57c15833921fe15fc9 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 29 Jul 2020 11:29:44 +0300 Subject: [PATCH 11/15] Update the SQL migration guide. --- docs/sql-migration-guide.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 46d330e09586..7c9c0a726c88 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -29,6 +29,8 @@ license: | - In Spark 3.1, SQL UI data adopts the `formatted` mode for the query plan explain results. To restore the behavior before Spark 3.0, you can set `spark.sql.ui.explainMode` to `extended`. - In Spark 3.1, `from_unixtime`, `unix_timestamp`,`to_unix_timestamp`, `to_timestamp` and `to_date` will fail if the specified datetime pattern is invalid. In Spark 3.0 or earlier, they result `NULL`. + + - In Spark 3.1, the Parquet, ORC, Avro and JSON datasources throw the exception `org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema` in read if they detect duplicate names in top-level columns as well in nested structures. The datasources take into account the SQL config `spark.sql.caseSensitive` while detecting column name duplicates. ## Upgrading from Spark SQL 3.0 to 3.0.1 From c5616b3666662398f3d3265c190614c63ea959bf Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 29 Jul 2020 14:25:53 +0300 Subject: [PATCH 12/15] Add a gap --- .../test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala index 0066c401e1d6..152d59b7b190 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{LongType, StructType} // Datasource tests for nested schemas -trait NestedDataSourceSuiteBase extends QueryTest with SharedSparkSession{ +trait NestedDataSourceSuiteBase extends QueryTest with SharedSparkSession { protected val nestedDataSources: Seq[String] = Seq("orc", "parquet", "json") test("SPARK-32431: consistent error for nested and top-level duplicate columns") { From 12338f2725f8861d91ff1d6c984f352130d6d49c Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 29 Jul 2020 14:30:11 +0300 Subject: [PATCH 13/15] Remove an unused import --- .../src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala index e60f244c87e5..fcc1967ae678 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.util import java.util.Locale -import scala.collection.mutable.Queue - import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, NamedTransform, Transform} From 9d7246750aa8d5d4c0736c0d7e875fa7a862f0de Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 29 Jul 2020 14:31:40 +0300 Subject: [PATCH 14/15] Fix coding style --- .../main/scala/org/apache/spark/sql/util/SchemaUtils.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala index fcc1967ae678..7b5ba3d8c75b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala @@ -59,7 +59,9 @@ private[spark] object SchemaUtils { * @param resolver resolver used to determine if two identifiers are equal */ def checkSchemaColumnNameDuplication( - schema: StructType, colType: String, resolver: Resolver): Unit = { + schema: StructType, + colType: String, + resolver: Resolver): Unit = { checkSchemaColumnNameDuplication(schema, colType, isCaseSensitiveAnalysis(resolver)) } From 918c77c996d632f76d9594724944550a32ce42a3 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 30 Jul 2020 00:30:01 +0300 Subject: [PATCH 15/15] Look into maps and arrays --- .../apache/spark/sql/util/SchemaUtils.scala | 20 ++++++++++++++----- .../spark/sql/util/SchemaUtilsSuite.scala | 8 ++++++-- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala index 7b5ba3d8c75b..c83cd5225070 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala @@ -41,11 +41,21 @@ private[spark] object SchemaUtils { * @param caseSensitiveAnalysis whether duplication checks should be case sensitive or not */ def checkSchemaColumnNameDuplication( - schema: StructType, colType: String, caseSensitiveAnalysis: Boolean = false): Unit = { - val fields = schema.fields - checkColumnNameDuplication(fields.map(_.name), colType, caseSensitiveAnalysis) - fields.map(_.dataType).foreach { - case st: StructType => checkSchemaColumnNameDuplication(st, colType, caseSensitiveAnalysis) + schema: DataType, + colType: String, + caseSensitiveAnalysis: Boolean = false): Unit = { + schema match { + case ArrayType(elementType, _) => + checkSchemaColumnNameDuplication(elementType, colType, caseSensitiveAnalysis) + case MapType(keyType, valueType, _) => + checkSchemaColumnNameDuplication(keyType, colType, caseSensitiveAnalysis) + checkSchemaColumnNameDuplication(valueType, colType, caseSensitiveAnalysis) + case structType: StructType => + val fields = structType.fields + checkColumnNameDuplication(fields.map(_.name), colType, caseSensitiveAnalysis) + fields.foreach { field => + checkSchemaColumnNameDuplication(field.dataType, colType, caseSensitiveAnalysis) + } case _ => } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala index 301270c23b8b..02ee634dba1b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala @@ -22,7 +22,7 @@ import java.util.Locale import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.types.{LongType, StructType} +import org.apache.spark.sql.types.{ArrayType, LongType, MapType, StructType} class SchemaUtilsSuite extends SparkFunSuite { @@ -94,7 +94,11 @@ class SchemaUtilsSuite extends SparkFunSuite { val schemaC = new StructType() .add("f2", LongType) .add("StructColumn2", schemaB) - Seq(schemaA, schemaB, schemaC).foreach { schema => + val schemaD = new StructType() + .add("f3", ArrayType(schemaC)) + val schemaE = MapType(LongType, schemaD) + val schemaF = MapType(schemaD, LongType) + Seq(schemaA, schemaB, schemaC, schemaD, schemaE, schemaF).foreach { schema => val msg = intercept[AnalysisException] { SchemaUtils.checkSchemaColumnNameDuplication( schema, "in SchemaUtilsSuite", caseSensitiveAnalysis = false)