diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index a2e08180cc50e..0625cfb772da2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -122,6 +122,13 @@ object PartitioningUtils { Map.empty[String, DataType] } + // SPARK-26990: use user specified field names if case insensitive. + val userSpecifiedNames = if (userSpecifiedSchema.isDefined && !caseSensitive) { + CaseInsensitiveMap(userSpecifiedSchema.get.fields.map(f => f.name -> f.name).toMap) + } else { + Map.empty[String, String] + } + val dateFormatter = DateFormatter() val timestampFormatter = TimestampFormatter(timestampPartitionPattern, timeZone) // First, we need to parse every partition's path and see if we can find partition values. @@ -170,7 +177,9 @@ object PartitioningUtils { columnNames.zip(literals).map { case (name, Literal(_, dataType)) => // We always assume partition columns are nullable since we've no idea whether null values // will be appended in the future. - StructField(name, userSpecifiedDataTypes.getOrElse(name, dataType), nullable = true) + val resultName = userSpecifiedNames.getOrElse(name, name) + val resultDataType = userSpecifiedDataTypes.getOrElse(name, dataType) + StructField(resultName, resultDataType, nullable = true) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 6bd0a2591fc1f..e0a3641218343 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -65,6 +65,21 @@ class FileIndexSuite extends SharedSQLContext { } } + test("SPARK-26990: use user specified field names if possible") { + withTempDir { dir => + val partitionDirectory = new File(dir, "a=foo") + partitionDirectory.mkdir() + val file = new File(partitionDirectory, "text.txt") + stringToFile(file, "text") + val path = new Path(dir.getCanonicalPath) + val schema = StructType(Seq(StructField("A", StringType, false))) + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, Some(schema)) + assert(fileIndex.partitionSchema.length == 1 && fileIndex.partitionSchema.head.name == "A") + } + } + } + test("SPARK-26230: if case sensitive, validate partitions with original column names") { withTempDir { dir => val partitionDirectory = new File(dir, "a=1")