From 2866a9e1c1a7d42e6cf53474733c6f39e812c680 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 29 Nov 2018 00:11:22 +0800 Subject: [PATCH 1/5] fix --- .../PartitioningAwareFileIndex.scala | 17 +++++++++++------ .../execution/datasources/FileIndexSuite.scala | 16 ++++++++++++++++ 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index cc8af7b92c454..34d5e11a3b4eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -126,13 +126,14 @@ abstract class PartitioningAwareFileIndex( val caseInsensitiveOptions = CaseInsensitiveMap(parameters) val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) - val inferredPartitionSpec = PartitioningUtils.parsePartitions( - leafDirs, - typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, - basePaths = basePaths, - timeZoneId = timeZoneId) + userSpecifiedSchema match { case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => + val inferredPartitionSpec = PartitioningUtils.parsePartitions( + leafDirs, + typeInference = false, + basePaths = basePaths, + timeZoneId = timeZoneId) val userPartitionSchema = combineInferredAndUserSpecifiedPartitionSchema(inferredPartitionSpec) @@ -151,7 +152,11 @@ abstract class PartitioningAwareFileIndex( part.copy(values = castPartitionValuesToUserSchema(part.values)) }) case _ => - inferredPartitionSpec + PartitioningUtils.parsePartitions( + leafDirs, + typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, + basePaths = basePaths, + timeZoneId = timeZoneId) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 49e7af4a9896b..e3ac0fb9d0276 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.util.{KnownSizeEstimation, SizeEstimator} class FileIndexSuite extends SharedSQLContext { @@ -49,6 +50,21 @@ class FileIndexSuite extends SharedSQLContext { } } + test("SPARK-26188: don't infer data types of partition columns if user specifies schema") { + withTempDir { dir => + val partitionDirectory = new File(dir, s"a=4d") + partitionDirectory.mkdir() + val file = new File(partitionDirectory, "text.txt") + stringToFile(file, "text") + val path = new Path(dir.getCanonicalPath) + val schema = StructType(Seq(StructField("a", StringType, false))) + val catalog = new InMemoryFileIndex(spark, Seq(path), Map.empty, Some(schema)) + val partitionValues = catalog.partitionSpec().partitions.map(_.values) + assert(partitionValues.length == 1 && partitionValues(0).numFields == 1 && + partitionValues(0).getString(0) == "4d") + } + } + test("InMemoryFileIndex: input paths are converted to qualified paths") { withTempDir { dir => val file = new File(dir, "text.txt") From f25730e6d73391a78c269a912a3cef49e94ed7d5 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 29 Nov 2018 21:32:34 +0800 Subject: [PATCH 2/5] fix --- .../PartitioningAwareFileIndex.scala | 39 ++++----------- .../datasources/PartitioningUtils.scala | 47 +++++++++++++++---- .../ParquetPartitionDiscoverySuite.scala | 22 +++++++-- 3 files changed, 65 insertions(+), 43 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 34d5e11a3b4eb..ab065177e979c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -127,37 +127,14 @@ abstract class PartitioningAwareFileIndex( val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) - userSpecifiedSchema match { - case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => - val inferredPartitionSpec = PartitioningUtils.parsePartitions( - leafDirs, - typeInference = false, - basePaths = basePaths, - timeZoneId = timeZoneId) - val userPartitionSchema = - combineInferredAndUserSpecifiedPartitionSchema(inferredPartitionSpec) - - // we need to cast into the data type that user specified. - def castPartitionValuesToUserSchema(row: InternalRow) = { - InternalRow((0 until row.numFields).map { i => - val dt = inferredPartitionSpec.partitionColumns.fields(i).dataType - Cast( - Literal.create(row.get(i, dt), dt), - userPartitionSchema.fields(i).dataType, - Option(timeZoneId)).eval() - }: _*) - } - - PartitionSpec(userPartitionSchema, inferredPartitionSpec.partitions.map { part => - part.copy(values = castPartitionValuesToUserSchema(part.values)) - }) - case _ => - PartitioningUtils.parsePartitions( - leafDirs, - typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, - basePaths = basePaths, - timeZoneId = timeZoneId) - } + val caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis + PartitioningUtils.parsePartitions( + leafDirs, + typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, + basePaths = basePaths, + userSpecifiedSchema = userSpecifiedSchema, + caseSensitive = caseSensitive, + timeZoneId = timeZoneId) } private def prunePartitions( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 3183fd30e5e0d..600025abefad6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal} -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils @@ -94,18 +94,34 @@ object PartitioningUtils { paths: Seq[Path], typeInference: Boolean, basePaths: Set[Path], + userSpecifiedSchema: Option[StructType], + caseSensitive: Boolean, timeZoneId: String): PartitionSpec = { - parsePartitions(paths, typeInference, basePaths, DateTimeUtils.getTimeZone(timeZoneId)) + parsePartitions(paths, typeInference, basePaths, userSpecifiedSchema, + caseSensitive, DateTimeUtils.getTimeZone(timeZoneId)) } private[datasources] def parsePartitions( paths: Seq[Path], typeInference: Boolean, basePaths: Set[Path], + userSpecifiedSchema: Option[StructType], + caseSensitive: Boolean, timeZone: TimeZone): PartitionSpec = { + val userSpecifiedDataTypes = if (userSpecifiedSchema.isDefined) { + val nameToDataType = userSpecifiedSchema.get.fields.map(f => f.name -> f.dataType).toMap + if (caseSensitive) { + CaseInsensitiveMap(nameToDataType) + } else { + nameToDataType + } + } else { + Map.empty[String, DataType] + } + // First, we need to parse every partition's path and see if we can find partition values. val (partitionValues, optDiscoveredBasePaths) = paths.map { path => - parsePartition(path, typeInference, basePaths, timeZone) + parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes, timeZone) }.unzip // We create pairs of (path -> path's partition value) here @@ -147,14 +163,22 @@ object PartitioningUtils { columnNames.zip(literals).map { case (name, Literal(_, dataType)) => // We always assume partition columns are nullable since we've no idea whether null values // will be appended in the future. - StructField(name, dataType, nullable = true) + StructField(name, userSpecifiedDataTypes.getOrElse(name, dataType), nullable = true) } } // Finally, we create `Partition`s based on paths and resolved partition values. val partitions = resolvedPartitionValues.zip(pathsWithPartitionValues).map { - case (PartitionValues(_, literals), (path, _)) => - PartitionPath(InternalRow.fromSeq(literals.map(_.value)), path) + case (PartitionValues(columnNames, literals), (path, _)) => + val values = columnNames.zip(literals).map { + case (name, literal) => + if (userSpecifiedDataTypes.contains(name)) { + Cast(literal, userSpecifiedDataTypes(name), Option(timeZone.getID)).eval() + } else { + literal.value + } + } + PartitionPath(InternalRow.fromSeq(values), path) } PartitionSpec(StructType(fields), partitions) @@ -185,6 +209,7 @@ object PartitioningUtils { path: Path, typeInference: Boolean, basePaths: Set[Path], + userSpecifiedDataTypes: Map[String, DataType], timeZone: TimeZone): (Option[PartitionValues], Option[Path]) = { val columns = ArrayBuffer.empty[(String, Literal)] // Old Hadoop versions don't have `Path.isRoot` @@ -206,7 +231,7 @@ object PartitioningUtils { // Let's say currentPath is a path of "/table/a=1/", currentPath.getName will give us a=1. // Once we get the string, we try to parse it and find the partition column and value. val maybeColumn = - parsePartitionColumn(currentPath.getName, typeInference, timeZone) + parsePartitionColumn(currentPath.getName, typeInference, userSpecifiedDataTypes, timeZone) maybeColumn.foreach(columns += _) // Now, we determine if we should stop. @@ -239,6 +264,7 @@ object PartitioningUtils { private def parsePartitionColumn( columnSpec: String, typeInference: Boolean, + userSpecifiedDataTypes: Map[String, DataType], timeZone: TimeZone): Option[(String, Literal)] = { val equalSignIndex = columnSpec.indexOf('=') if (equalSignIndex == -1) { @@ -250,7 +276,12 @@ object PartitioningUtils { val rawColumnValue = columnSpec.drop(equalSignIndex + 1) assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'") - val literal = inferPartitionColumnValue(rawColumnValue, typeInference, timeZone) + val literal = if (userSpecifiedDataTypes.contains(columnName)) { + // SPARK-26188: don't infer data types of partition columns if user specifies schema + inferPartitionColumnValue(rawColumnValue, false, timeZone) + } else { + inferPartitionColumnValue(rawColumnValue, typeInference, timeZone) + } Some(columnName -> literal) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 9966ed94a8392..f808ca458aaa7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -101,7 +101,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha "hdfs://host:9000/path/a=10.5/b=hello") var exception = intercept[AssertionError] { - parsePartitions(paths.map(new Path(_)), true, Set.empty[Path], timeZoneId) + parsePartitions(paths.map(new Path(_)), true, Set.empty[Path], None, true, timeZoneId) } assert(exception.getMessage().contains("Conflicting directory structures detected")) @@ -115,6 +115,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha paths.map(new Path(_)), true, Set(new Path("hdfs://host:9000/path/")), + None, + true, timeZoneId) // Valid @@ -128,6 +130,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha paths.map(new Path(_)), true, Set(new Path("hdfs://host:9000/path/something=true/table")), + None, + true, timeZoneId) // Valid @@ -141,6 +145,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha paths.map(new Path(_)), true, Set(new Path("hdfs://host:9000/path/table=true")), + None, + true, timeZoneId) // Invalid @@ -154,6 +160,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha paths.map(new Path(_)), true, Set(new Path("hdfs://host:9000/path/")), + None, + true, timeZoneId) } assert(exception.getMessage().contains("Conflicting directory structures detected")) @@ -174,6 +182,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha paths.map(new Path(_)), true, Set(new Path("hdfs://host:9000/tmp/tables/")), + None, + true, timeZoneId) } assert(exception.getMessage().contains("Conflicting directory structures detected")) @@ -181,13 +191,13 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha test("parse partition") { def check(path: String, expected: Option[PartitionValues]): Unit = { - val actual = parsePartition(new Path(path), true, Set.empty[Path], timeZone)._1 + val actual = parsePartition(new Path(path), true, Set.empty[Path], Map.empty, timeZone)._1 assert(expected === actual) } def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = { val message = intercept[T] { - parsePartition(new Path(path), true, Set.empty[Path], timeZone) + parsePartition(new Path(path), true, Set.empty[Path], Map.empty, timeZone) }.getMessage assert(message.contains(expected)) @@ -231,6 +241,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha path = new Path("file://path/a=10"), typeInference = true, basePaths = Set(new Path("file://path/a=10")), + Map.empty, timeZone = timeZone)._1 assert(partitionSpec1.isEmpty) @@ -240,6 +251,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha path = new Path("file://path/a=10"), typeInference = true, basePaths = Set(new Path("file://path")), + Map.empty, timeZone = timeZone)._1 assert(partitionSpec2 == @@ -258,6 +270,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha paths.map(new Path(_)), true, rootPaths, + None, + true, timeZoneId) assert(actualSpec.partitionColumns === spec.partitionColumns) assert(actualSpec.partitions.length === spec.partitions.length) @@ -370,7 +384,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha test("parse partitions with type inference disabled") { def check(paths: Seq[String], spec: PartitionSpec): Unit = { val actualSpec = - parsePartitions(paths.map(new Path(_)), false, Set.empty[Path], timeZoneId) + parsePartitions(paths.map(new Path(_)), false, Set.empty[Path], None, true, timeZoneId) assert(actualSpec === spec) } From 0a20c9a60a6264c6c122f16883cb682c605bcf88 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 29 Nov 2018 21:45:47 +0800 Subject: [PATCH 3/5] revise --- .../spark/sql/execution/datasources/PartitioningUtils.scala | 3 ++- .../spark/sql/execution/datasources/FileIndexSuite.scala | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 600025abefad6..07376bbf8c410 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -277,7 +277,8 @@ object PartitioningUtils { assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'") val literal = if (userSpecifiedDataTypes.contains(columnName)) { - // SPARK-26188: don't infer data types of partition columns if user specifies schema + // SPARK-26188: if user provides corresponding column schema, process the column as String + // type and cast it as user specified data type later. inferPartitionColumnValue(rawColumnValue, false, timeZone) } else { inferPartitionColumnValue(rawColumnValue, typeInference, timeZone) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index e3ac0fb9d0276..fdb0511f01a22 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -58,8 +58,8 @@ class FileIndexSuite extends SharedSQLContext { stringToFile(file, "text") val path = new Path(dir.getCanonicalPath) val schema = StructType(Seq(StructField("a", StringType, false))) - val catalog = new InMemoryFileIndex(spark, Seq(path), Map.empty, Some(schema)) - val partitionValues = catalog.partitionSpec().partitions.map(_.values) + val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, Some(schema)) + val partitionValues = fileIndex.partitionSpec().partitions.map(_.values) assert(partitionValues.length == 1 && partitionValues(0).numFields == 1 && partitionValues(0).getString(0) == "4d") } From 7c5bdd155bd7b3b4a4fa87a259feac229aaa16e3 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 29 Nov 2018 23:17:55 +0800 Subject: [PATCH 4/5] address comments --- .../PartitioningAwareFileIndex.scala | 19 ----------------- .../datasources/PartitioningUtils.scala | 21 +++++++------------ 2 files changed, 8 insertions(+), 32 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index ab065177e979c..7b0e4dbcc25f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -215,25 +215,6 @@ abstract class PartitioningAwareFileIndex( val name = path.getName !((name.startsWith("_") && !name.contains("=")) || name.startsWith(".")) } - - /** - * In the read path, only managed tables by Hive provide the partition columns properly when - * initializing this class. All other file based data sources will try to infer the partitioning, - * and then cast the inferred types to user specified dataTypes if the partition columns exist - * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510, or - * inconsistent data types as reported in SPARK-21463. - * @param spec A partition inference result - * @return The PartitionSchema resolved from inference and cast according to `userSpecifiedSchema` - */ - private def combineInferredAndUserSpecifiedPartitionSchema(spec: PartitionSpec): StructType = { - val equality = sparkSession.sessionState.conf.resolver - val resolved = spec.partitionColumns.map { partitionField => - // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred - userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse( - partitionField) - } - StructType(resolved) - } } object PartitioningAwareFileIndex { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 07376bbf8c410..44f60f8e03178 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -110,7 +110,7 @@ object PartitioningUtils { timeZone: TimeZone): PartitionSpec = { val userSpecifiedDataTypes = if (userSpecifiedSchema.isDefined) { val nameToDataType = userSpecifiedSchema.get.fields.map(f => f.name -> f.dataType).toMap - if (caseSensitive) { + if (!caseSensitive) { CaseInsensitiveMap(nameToDataType) } else { nameToDataType @@ -170,15 +170,7 @@ object PartitioningUtils { // Finally, we create `Partition`s based on paths and resolved partition values. val partitions = resolvedPartitionValues.zip(pathsWithPartitionValues).map { case (PartitionValues(columnNames, literals), (path, _)) => - val values = columnNames.zip(literals).map { - case (name, literal) => - if (userSpecifiedDataTypes.contains(name)) { - Cast(literal, userSpecifiedDataTypes(name), Option(timeZone.getID)).eval() - } else { - literal.value - } - } - PartitionPath(InternalRow.fromSeq(values), path) + PartitionPath(InternalRow.fromSeq(literals.map(_.value)), path) } PartitionSpec(StructType(fields), partitions) @@ -277,9 +269,12 @@ object PartitioningUtils { assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'") val literal = if (userSpecifiedDataTypes.contains(columnName)) { - // SPARK-26188: if user provides corresponding column schema, process the column as String - // type and cast it as user specified data type later. - inferPartitionColumnValue(rawColumnValue, false, timeZone) + // SPARK-26188: if user provides corresponding column schema, get the column value without + // inference, and then cast it as user specified data type. + val columnValue = inferPartitionColumnValue(rawColumnValue, false, timeZone) + val castedValue = + Cast(columnValue, userSpecifiedDataTypes(columnName), Option(timeZone.getID)).eval() + Literal.create(castedValue, userSpecifiedDataTypes(columnName)) } else { inferPartitionColumnValue(rawColumnValue, typeInference, timeZone) } From 36c65e5a4d8b173b331aee2d5658e1b25e6ce795 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 29 Nov 2018 23:28:43 +0800 Subject: [PATCH 5/5] remove one unnecessary change --- .../spark/sql/execution/datasources/PartitioningUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 44f60f8e03178..9d2c9ba0c1a5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -169,7 +169,7 @@ object PartitioningUtils { // Finally, we create `Partition`s based on paths and resolved partition values. val partitions = resolvedPartitionValues.zip(pathsWithPartitionValues).map { - case (PartitionValues(columnNames, literals), (path, _)) => + case (PartitionValues(_, literals), (path, _)) => PartitionPath(InternalRow.fromSeq(literals.map(_.value)), path) }