From 18c3dd439f902d29fe8c7d716974640c962cd598 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 26 Feb 2019 14:47:16 +0800 Subject: [PATCH 1/2] SPARK-26990: use user specified field names if possible --- .../execution/datasources/PartitioningUtils.scala | 13 ++++++++----- .../execution/datasources/FileIndexSuite.scala | 15 +++++++++++++++ 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index a2e08180cc50..92a6ee4e0f5a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -111,15 +111,16 @@ object PartitioningUtils { caseSensitive: Boolean, validatePartitionColumns: Boolean, timeZone: TimeZone): PartitionSpec = { - val userSpecifiedDataTypes = if (userSpecifiedSchema.isDefined) { + val (userSpecifiedDataTypes, userSpecifiedNames) = if (userSpecifiedSchema.isDefined) { val nameToDataType = userSpecifiedSchema.get.fields.map(f => f.name -> f.dataType).toMap + val nameToName = userSpecifiedSchema.get.fields.map(f => f.name -> f.name).toMap if (!caseSensitive) { - CaseInsensitiveMap(nameToDataType) + (CaseInsensitiveMap(nameToDataType), CaseInsensitiveMap(nameToName)) } else { - nameToDataType + (nameToDataType, nameToName) } } else { - Map.empty[String, DataType] + (Map.empty[String, DataType], Map.empty[String, String]) } val dateFormatter = DateFormatter() @@ -170,7 +171,9 @@ object PartitioningUtils { columnNames.zip(literals).map { case (name, Literal(_, dataType)) => // We always assume partition columns are nullable since we've no idea whether null values // will be appended in the future. - StructField(name, userSpecifiedDataTypes.getOrElse(name, dataType), nullable = true) + val resultName = userSpecifiedNames.getOrElse(name, name) + val resultDataType = userSpecifiedDataTypes.getOrElse(name, dataType) + StructField(resultName, resultDataType, nullable = true) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 6bd0a2591fc1..e0a364121834 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -65,6 +65,21 @@ class FileIndexSuite extends SharedSQLContext { } } + test("SPARK-26990: use user specified field names if possible") { + withTempDir { dir => + val partitionDirectory = new File(dir, "a=foo") + partitionDirectory.mkdir() + val file = new File(partitionDirectory, "text.txt") + stringToFile(file, "text") + val path = new Path(dir.getCanonicalPath) + val schema = StructType(Seq(StructField("A", StringType, false))) + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, Some(schema)) + assert(fileIndex.partitionSchema.length == 1 && fileIndex.partitionSchema.head.name == "A") + } + } + } + test("SPARK-26230: if case sensitive, validate partitions with original column names") { withTempDir { dir => val partitionDirectory = new File(dir, "a=1") From 17f7852510262704791308b3c8decd17eb7bf578 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 26 Feb 2019 17:56:25 +0800 Subject: [PATCH 2/2] revise code --- .../datasources/PartitioningUtils.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 92a6ee4e0f5a..0625cfb772da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -111,16 +111,22 @@ object PartitioningUtils { caseSensitive: Boolean, validatePartitionColumns: Boolean, timeZone: TimeZone): PartitionSpec = { - val (userSpecifiedDataTypes, userSpecifiedNames) = if (userSpecifiedSchema.isDefined) { + val userSpecifiedDataTypes = if (userSpecifiedSchema.isDefined) { val nameToDataType = userSpecifiedSchema.get.fields.map(f => f.name -> f.dataType).toMap - val nameToName = userSpecifiedSchema.get.fields.map(f => f.name -> f.name).toMap if (!caseSensitive) { - (CaseInsensitiveMap(nameToDataType), CaseInsensitiveMap(nameToName)) + CaseInsensitiveMap(nameToDataType) } else { - (nameToDataType, nameToName) + nameToDataType } } else { - (Map.empty[String, DataType], Map.empty[String, String]) + Map.empty[String, DataType] + } + + // SPARK-26990: use user specified field names if case insensitive. + val userSpecifiedNames = if (userSpecifiedSchema.isDefined && !caseSensitive) { + CaseInsensitiveMap(userSpecifiedSchema.get.fields.map(f => f.name -> f.name).toMap) + } else { + Map.empty[String, String] } val dateFormatter = DateFormatter()