From 06606d603fcd2a4ad81aa2a74ae0b4efdf89c870 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 27 Feb 2019 14:38:35 +0800 Subject: [PATCH] [SPARK-26990][SQL] FileIndex: use user specified field names if possible MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit WIth the following file structure: ``` /tmp/data └── a=5 ``` In the previous release: ``` scala> spark.read.schema("A int, ID long").parquet("/tmp/data/").printSchema root |-- ID: long (nullable = true) |-- A: integer (nullable = true) ``` While in current code: ``` scala> spark.read.schema("A int, ID long").parquet("/tmp/data/").printSchema root |-- ID: long (nullable = true) |-- a: integer (nullable = true) ``` We can see that the partition column name `a` is different from `A` as user specifed. This PR is to fix the case and make it more user-friendly. Unit test Closes #23894 from gengliangwang/fileIndexSchema. Authored-by: Gengliang Wang Signed-off-by: Wenchen Fan --- .../execution/datasources/PartitioningUtils.scala | 11 ++++++++++- .../execution/datasources/FileIndexSuite.scala | 15 +++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 9d2c9ba0c1a5b..41b0bb67e9626 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -119,6 +119,13 @@ object PartitioningUtils { Map.empty[String, DataType] } + // SPARK-26990: use user specified field names if case insensitive. + val userSpecifiedNames = if (userSpecifiedSchema.isDefined && !caseSensitive) { + CaseInsensitiveMap(userSpecifiedSchema.get.fields.map(f => f.name -> f.name).toMap) + } else { + Map.empty[String, String] + } + // First, we need to parse every partition's path and see if we can find partition values. val (partitionValues, optDiscoveredBasePaths) = paths.map { path => parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes, timeZone) @@ -163,7 +170,9 @@ object PartitioningUtils { columnNames.zip(literals).map { case (name, Literal(_, dataType)) => // We always assume partition columns are nullable since we've no idea whether null values // will be appended in the future. - StructField(name, userSpecifiedDataTypes.getOrElse(name, dataType), nullable = true) + val resultName = userSpecifiedNames.getOrElse(name, name) + val resultDataType = userSpecifiedDataTypes.getOrElse(name, dataType) + StructField(resultName, resultDataType, nullable = true) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index e2ffe63763778..09c68940a6eb4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -65,6 +65,21 @@ class FileIndexSuite extends SharedSQLContext { } } + test("SPARK-26990: use user specified field names if possible") { + withTempDir { dir => + val partitionDirectory = new File(dir, "a=foo") + partitionDirectory.mkdir() + val file = new File(partitionDirectory, "text.txt") + stringToFile(file, "text") + val path = new Path(dir.getCanonicalPath) + val schema = StructType(Seq(StructField("A", StringType, false))) + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, Some(schema)) + assert(fileIndex.partitionSchema.length == 1 && fileIndex.partitionSchema.head.name == "A") + } + } + } + test("InMemoryFileIndex: input paths are converted to qualified paths") { withTempDir { dir => val file = new File(dir, "text.txt")