From db71b469f539229a41ad3c24af93969bbe329c2f Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 4 Dec 2018 17:43:03 +0800 Subject: [PATCH 1/8] SPARK-26263: Throw exception when partition value can't be converted to specific type --- .../datasources/PartitioningUtils.scala | 10 +++++++--- .../execution/datasources/FileIndexSuite.scala | 18 +++++++++++++++++- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index d66cb09bda0cc..cd1d9e2037821 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -272,9 +272,13 @@ object PartitioningUtils { val literal = if (userSpecifiedDataTypes.contains(columnName)) { // SPARK-26188: if user provides corresponding column schema, get the column value without // inference, and then cast it as user specified data type. - val columnValue = inferPartitionColumnValue(rawColumnValue, false, timeZone) - val castedValue = - Cast(columnValue, userSpecifiedDataTypes(columnName), Option(timeZone.getID)).eval() + val dataType = userSpecifiedDataTypes(columnName) + val columnValueLiteral = inferPartitionColumnValue(rawColumnValue, false, timeZone) + val columnValue = columnValueLiteral.eval() + val castedValue = Cast(columnValueLiteral, dataType, Option(timeZone.getID)).eval() + if (columnValue != null && castedValue == null) { + throw new RuntimeException(s"Failed to cast partition value `$columnValue` to $dataType") + } Literal.create(castedValue, userSpecifiedDataTypes(columnName)) } else { inferPartitionColumnValue(rawColumnValue, typeInference, timeZone) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index ec552f7ddf47a..1b75983cca303 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.util.{KnownSizeEstimation, SizeEstimator} class FileIndexSuite extends SharedSQLContext { @@ -95,6 +95,22 @@ class FileIndexSuite extends SharedSQLContext { } } + test("SPARK-26263: Throw exception when partition value can't be converted to specific type") { + withTempDir { dir => + val partitionDirectory = new File(dir, "a=foo") + partitionDirectory.mkdir() + val file = new File(partitionDirectory, "text.txt") + stringToFile(file, "text") + val path = new Path(dir.getCanonicalPath) + val schema = StructType(Seq(StructField("a", IntegerType, false))) + val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, Some(schema)) + val msg = intercept[RuntimeException] { + fileIndex.partitionSpec() + }.getMessage + assert(msg == "Failed to cast partition value `foo` to IntegerType") + } + } + test("InMemoryFileIndex: input paths are converted to qualified paths") { withTempDir { dir => val file = new File(dir, "text.txt") From 85f95f97601c6af8e8a398b64beaa2fb30e80a67 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 4 Dec 2018 23:18:08 +0800 Subject: [PATCH 2/8] address comments --- docs/sql-migration-guide-upgrade.md | 2 ++ .../apache/spark/sql/internal/SQLConf.scala | 13 ++++++++++++ .../PartitioningAwareFileIndex.scala | 3 +++ .../datasources/PartitioningUtils.scala | 20 +++++++++++++------ .../datasources/FileIndexSuite.scala | 19 +++++++++++++----- .../ParquetPartitionDiscoverySuite.scala | 18 +++++++++++++---- 6 files changed, 60 insertions(+), 15 deletions(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index ed2ff139bcc33..282f2e1ffef4e 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -37,6 +37,8 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully. + - In Spark version 2.4 and earlier, partition column value is converted as null if it can't be casted to corresponding user provided schema. Since 3.0, partition column value is validated with user provided schema. An exception is thrown if the validation fails. You can disable such validation by setting `spark.sql.legacy.validatePartitionValueWithProvidedSchema` to `false`. + ## Upgrading From Spark SQL 2.3 to 2.4 - In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 451b051f8407e..34e19ed19267c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1396,6 +1396,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val VALIDATE_PARTITION_VALUE_WITH_PROVIDED_SCHEMA = + buildConf("spark.sql.legacy.validatePartitionValueWithProvidedSchema") + .internal() + .doc("When this option is set to true, partition column values will be validated with " + + "provided schema. If the validation fails, a runtime exception is thrown." + + "When this option is set to false, the partition column value will be converted to null " + + "if it can not be converted to corresponding provided schema.") + .booleanConf + .createWithDefault(true) + val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE = buildConf("spark.sql.streaming.continuous.executorQueueSize") .internal() @@ -2014,6 +2024,9 @@ class SQLConf extends Serializable with Logging { def allowCreatingManagedTableUsingNonemptyLocation: Boolean = getConf(ALLOW_CREATING_MANAGED_TABLE_USING_NONEMPTY_LOCATION) + def validatePartitionValueWithProvidedSchema: Boolean = + getConf(VALIDATE_PARTITION_VALUE_WITH_PROVIDED_SCHEMA) + def partitionOverwriteMode: PartitionOverwriteMode.Value = PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 7b0e4dbcc25f4..558ba0c033580 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -128,12 +128,15 @@ abstract class PartitioningAwareFileIndex( .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) val caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis + val validatePartitionValueWithProvidedSchema = + sparkSession.sqlContext.conf.validatePartitionValueWithProvidedSchema PartitioningUtils.parsePartitions( leafDirs, typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, basePaths = basePaths, userSpecifiedSchema = userSpecifiedSchema, caseSensitive = caseSensitive, + validatePartitionValueWithProvidedSchema = validatePartitionValueWithProvidedSchema, timeZoneId = timeZoneId) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index cd1d9e2037821..a9061c654d65d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -26,12 +26,13 @@ import scala.util.Try import org.apache.hadoop.fs.Path -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal} import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils @@ -96,9 +97,10 @@ object PartitioningUtils { basePaths: Set[Path], userSpecifiedSchema: Option[StructType], caseSensitive: Boolean, + validatePartitionValueWithProvidedSchema: Boolean, timeZoneId: String): PartitionSpec = { - parsePartitions(paths, typeInference, basePaths, userSpecifiedSchema, - caseSensitive, DateTimeUtils.getTimeZone(timeZoneId)) + parsePartitions(paths, typeInference, basePaths, userSpecifiedSchema, caseSensitive, + validatePartitionValueWithProvidedSchema, DateTimeUtils.getTimeZone(timeZoneId)) } private[datasources] def parsePartitions( @@ -107,6 +109,7 @@ object PartitioningUtils { basePaths: Set[Path], userSpecifiedSchema: Option[StructType], caseSensitive: Boolean, + validatePartitionValueWithProvidedSchema: Boolean, timeZone: TimeZone): PartitionSpec = { val userSpecifiedDataTypes = if (userSpecifiedSchema.isDefined) { val nameToDataType = userSpecifiedSchema.get.fields.map(f => f.name -> f.dataType).toMap @@ -121,7 +124,8 @@ object PartitioningUtils { // First, we need to parse every partition's path and see if we can find partition values. val (partitionValues, optDiscoveredBasePaths) = paths.map { path => - parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes, timeZone) + parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes, + validatePartitionValueWithProvidedSchema, timeZone) }.unzip // We create pairs of (path -> path's partition value) here @@ -203,6 +207,7 @@ object PartitioningUtils { typeInference: Boolean, basePaths: Set[Path], userSpecifiedDataTypes: Map[String, DataType], + validatePartitionValueWithProvidedSchema: Boolean, timeZone: TimeZone): (Option[PartitionValues], Option[Path]) = { val columns = ArrayBuffer.empty[(String, Literal)] // Old Hadoop versions don't have `Path.isRoot` @@ -224,7 +229,8 @@ object PartitioningUtils { // Let's say currentPath is a path of "/table/a=1/", currentPath.getName will give us a=1. // Once we get the string, we try to parse it and find the partition column and value. val maybeColumn = - parsePartitionColumn(currentPath.getName, typeInference, userSpecifiedDataTypes, timeZone) + parsePartitionColumn(currentPath.getName, typeInference, userSpecifiedDataTypes, + validatePartitionValueWithProvidedSchema, timeZone) maybeColumn.foreach(columns += _) // Now, we determine if we should stop. @@ -258,6 +264,7 @@ object PartitioningUtils { columnSpec: String, typeInference: Boolean, userSpecifiedDataTypes: Map[String, DataType], + validatePartitionValueWithProvidedSchema: Boolean, timeZone: TimeZone): Option[(String, Literal)] = { val equalSignIndex = columnSpec.indexOf('=') if (equalSignIndex == -1) { @@ -276,7 +283,8 @@ object PartitioningUtils { val columnValueLiteral = inferPartitionColumnValue(rawColumnValue, false, timeZone) val columnValue = columnValueLiteral.eval() val castedValue = Cast(columnValueLiteral, dataType, Option(timeZone.getID)).eval() - if (columnValue != null && castedValue == null) { + if (validatePartitionValueWithProvidedSchema + && columnValue != null && castedValue == null) { throw new RuntimeException(s"Failed to cast partition value `$columnValue` to $dataType") } Literal.create(castedValue, userSpecifiedDataTypes(columnName)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 1b75983cca303..5be39b1d543a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -103,11 +103,20 @@ class FileIndexSuite extends SharedSQLContext { stringToFile(file, "text") val path = new Path(dir.getCanonicalPath) val schema = StructType(Seq(StructField("a", IntegerType, false))) - val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, Some(schema)) - val msg = intercept[RuntimeException] { - fileIndex.partitionSpec() - }.getMessage - assert(msg == "Failed to cast partition value `foo` to IntegerType") + withSQLConf(SQLConf.VALIDATE_PARTITION_VALUE_WITH_PROVIDED_SCHEMA.key -> "true") { + val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, Some(schema)) + val msg = intercept[RuntimeException] { + fileIndex.partitionSpec() + }.getMessage + assert(msg == "Failed to cast partition value `foo` to IntegerType") + } + + withSQLConf(SQLConf.VALIDATE_PARTITION_VALUE_WITH_PROVIDED_SCHEMA.key -> "false") { + val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, Some(schema)) + val partitionValues = fileIndex.partitionSpec().partitions.map(_.values) + assert(partitionValues.length == 1 && partitionValues(0).numFields == 1 && + partitionValues(0).isNullAt(0)) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index f808ca458aaa7..88067358667c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -101,7 +101,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha "hdfs://host:9000/path/a=10.5/b=hello") var exception = intercept[AssertionError] { - parsePartitions(paths.map(new Path(_)), true, Set.empty[Path], None, true, timeZoneId) + parsePartitions(paths.map(new Path(_)), true, Set.empty[Path], None, true, true, timeZoneId) } assert(exception.getMessage().contains("Conflicting directory structures detected")) @@ -117,6 +117,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha Set(new Path("hdfs://host:9000/path/")), None, true, + true, timeZoneId) // Valid @@ -132,6 +133,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha Set(new Path("hdfs://host:9000/path/something=true/table")), None, true, + true, timeZoneId) // Valid @@ -147,6 +149,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha Set(new Path("hdfs://host:9000/path/table=true")), None, true, + true, timeZoneId) // Invalid @@ -162,6 +165,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha Set(new Path("hdfs://host:9000/path/")), None, true, + true, timeZoneId) } assert(exception.getMessage().contains("Conflicting directory structures detected")) @@ -184,6 +188,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha Set(new Path("hdfs://host:9000/tmp/tables/")), None, true, + true, timeZoneId) } assert(exception.getMessage().contains("Conflicting directory structures detected")) @@ -191,13 +196,14 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha test("parse partition") { def check(path: String, expected: Option[PartitionValues]): Unit = { - val actual = parsePartition(new Path(path), true, Set.empty[Path], Map.empty, timeZone)._1 + val actual = parsePartition(new Path(path), true, Set.empty[Path], + Map.empty, true, timeZone)._1 assert(expected === actual) } def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = { val message = intercept[T] { - parsePartition(new Path(path), true, Set.empty[Path], Map.empty, timeZone) + parsePartition(new Path(path), true, Set.empty[Path], Map.empty, true, timeZone) }.getMessage assert(message.contains(expected)) @@ -242,6 +248,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha typeInference = true, basePaths = Set(new Path("file://path/a=10")), Map.empty, + true, timeZone = timeZone)._1 assert(partitionSpec1.isEmpty) @@ -252,6 +259,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha typeInference = true, basePaths = Set(new Path("file://path")), Map.empty, + true, timeZone = timeZone)._1 assert(partitionSpec2 == @@ -272,6 +280,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha rootPaths, None, true, + true, timeZoneId) assert(actualSpec.partitionColumns === spec.partitionColumns) assert(actualSpec.partitions.length === spec.partitions.length) @@ -384,7 +393,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha test("parse partitions with type inference disabled") { def check(paths: Seq[String], spec: PartitionSpec): Unit = { val actualSpec = - parsePartitions(paths.map(new Path(_)), false, Set.empty[Path], None, true, timeZoneId) + parsePartitions(paths.map(new Path(_)), false, Set.empty[Path], None, + true, true, timeZoneId) assert(actualSpec === spec) } From f741f7932ef4d89825eec4bfdbe5fbb59505f3c2 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 5 Dec 2018 02:27:49 +0800 Subject: [PATCH 3/8] change conf name --- docs/sql-migration-guide-upgrade.md | 2 ++ .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 282f2e1ffef4e..00d13c8725196 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -39,6 +39,8 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, partition column value is converted as null if it can't be casted to corresponding user provided schema. Since 3.0, partition column value is validated with user provided schema. An exception is thrown if the validation fails. You can disable such validation by setting `spark.sql.legacy.validatePartitionValueWithProvidedSchema` to `false`. + - In Spark version 2.4 and earlier, partition column value is converted as null if it can't be casted to corresponding user provided schema. Since 3.0, partition column value is validated with user provided schema. An exception is thrown if the validation fails. You can disable such validation by setting `spark.sql.validatePartitionValueWithProvidedSchema.enabled` to `false`. + ## Upgrading From Spark SQL 2.3 to 2.4 - In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 34e19ed19267c..5273f454dacb5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1397,7 +1397,7 @@ object SQLConf { .createWithDefault(false) val VALIDATE_PARTITION_VALUE_WITH_PROVIDED_SCHEMA = - buildConf("spark.sql.legacy.validatePartitionValueWithProvidedSchema") + buildConf("spark.sql.validatePartitionValueWithProvidedSchema.enabled") .internal() .doc("When this option is set to true, partition column values will be validated with " + "provided schema. If the validation fails, a runtime exception is thrown." + From b24134a574bb3b2098bdc51bc96a49c2412585e3 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 6 Dec 2018 13:30:37 +0800 Subject: [PATCH 4/8] rename conf --- .../org/apache/spark/ui/static/stagepage.js | 2 +- docs/sql-migration-guide-upgrade.md | 6 ++---- .../org/apache/spark/sql/internal/SQLConf.scala | 7 +++---- .../PartitioningAwareFileIndex.scala | 7 ++----- .../datasources/PartitioningUtils.scala | 17 ++++++++--------- .../execution/datasources/FileIndexSuite.scala | 4 ++-- 6 files changed, 18 insertions(+), 25 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js index 564467487e84e..e42f05683fa82 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js @@ -654,7 +654,7 @@ $(document).ready(function () { "columns": [ {data: function (row, type) { return type !== 'display' ? (isNaN(row.index) ? 0 : row.index ) : row.index; - }, + },sql-migration-guide-upgrade.md name: "Index" }, {data : "taskId", name: "ID"}, diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 00d13c8725196..3638b0873aa4d 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -29,6 +29,8 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, these built-in functions will remove duplicated map keys with last wins policy. Users may still read map values with duplicated keys from data sources which do not enforce it (e.g. Parquet), the behavior will be udefined. + - In Spark version 2.4 and earlier, partition column value is converted as null if it can't be casted to corresponding user provided schema. Since 3.0, partition column value is validated with user provided schema. An exception is thrown if the validation fails. You can disable such validation by setting `spark.sql.sources.validatePartitionColumns` to `false`. + - In Spark version 2.4 and earlier, the `SET` command works without any warnings even if the specified key is for `SparkConf` entries and it has no effect because the command does not update `SparkConf`, but the behavior might confuse users. Since 3.0, the command fails if a `SparkConf` key is used. You can disable such a check by setting `spark.sql.legacy.execution.setCommandRejectsSparkConfs` to `false`. - Spark applications which are built with Spark version 2.4 and prior, and call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, as they are not binary compatible with Spark 3.0. @@ -37,10 +39,6 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully. - - In Spark version 2.4 and earlier, partition column value is converted as null if it can't be casted to corresponding user provided schema. Since 3.0, partition column value is validated with user provided schema. An exception is thrown if the validation fails. You can disable such validation by setting `spark.sql.legacy.validatePartitionValueWithProvidedSchema` to `false`. - - - In Spark version 2.4 and earlier, partition column value is converted as null if it can't be casted to corresponding user provided schema. Since 3.0, partition column value is validated with user provided schema. An exception is thrown if the validation fails. You can disable such validation by setting `spark.sql.validatePartitionValueWithProvidedSchema.enabled` to `false`. - ## Upgrading From Spark SQL 2.3 to 2.4 - In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5273f454dacb5..8fbdd72034e0f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1396,8 +1396,8 @@ object SQLConf { .booleanConf .createWithDefault(false) - val VALIDATE_PARTITION_VALUE_WITH_PROVIDED_SCHEMA = - buildConf("spark.sql.validatePartitionValueWithProvidedSchema.enabled") + val VALIDATE_PARTITION_COLUMNS = + buildConf("spark.sql.sources.validatePartitionColumns") .internal() .doc("When this option is set to true, partition column values will be validated with " + "provided schema. If the validation fails, a runtime exception is thrown." + @@ -2024,8 +2024,7 @@ class SQLConf extends Serializable with Logging { def allowCreatingManagedTableUsingNonemptyLocation: Boolean = getConf(ALLOW_CREATING_MANAGED_TABLE_USING_NONEMPTY_LOCATION) - def validatePartitionValueWithProvidedSchema: Boolean = - getConf(VALIDATE_PARTITION_VALUE_WITH_PROVIDED_SCHEMA) + def validatePartitionColumns: Boolean = getConf(VALIDATE_PARTITION_COLUMNS) def partitionOverwriteMode: PartitionOverwriteMode.Value = PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 558ba0c033580..b2e4155e6f49e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -127,16 +127,13 @@ abstract class PartitioningAwareFileIndex( val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) - val caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis - val validatePartitionValueWithProvidedSchema = - sparkSession.sqlContext.conf.validatePartitionValueWithProvidedSchema PartitioningUtils.parsePartitions( leafDirs, typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, basePaths = basePaths, userSpecifiedSchema = userSpecifiedSchema, - caseSensitive = caseSensitive, - validatePartitionValueWithProvidedSchema = validatePartitionValueWithProvidedSchema, + caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis, + validatePartitionColumns = sparkSession.sqlContext.conf.validatePartitionColumns, timeZoneId = timeZoneId) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index a9061c654d65d..6de206cd17f2e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -97,10 +97,10 @@ object PartitioningUtils { basePaths: Set[Path], userSpecifiedSchema: Option[StructType], caseSensitive: Boolean, - validatePartitionValueWithProvidedSchema: Boolean, + validatePartitionColumns: Boolean, timeZoneId: String): PartitionSpec = { parsePartitions(paths, typeInference, basePaths, userSpecifiedSchema, caseSensitive, - validatePartitionValueWithProvidedSchema, DateTimeUtils.getTimeZone(timeZoneId)) + validatePartitionColumns, DateTimeUtils.getTimeZone(timeZoneId)) } private[datasources] def parsePartitions( @@ -109,7 +109,7 @@ object PartitioningUtils { basePaths: Set[Path], userSpecifiedSchema: Option[StructType], caseSensitive: Boolean, - validatePartitionValueWithProvidedSchema: Boolean, + validatePartitionColumns: Boolean, timeZone: TimeZone): PartitionSpec = { val userSpecifiedDataTypes = if (userSpecifiedSchema.isDefined) { val nameToDataType = userSpecifiedSchema.get.fields.map(f => f.name -> f.dataType).toMap @@ -125,7 +125,7 @@ object PartitioningUtils { // First, we need to parse every partition's path and see if we can find partition values. val (partitionValues, optDiscoveredBasePaths) = paths.map { path => parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes, - validatePartitionValueWithProvidedSchema, timeZone) + validatePartitionColumns, timeZone) }.unzip // We create pairs of (path -> path's partition value) here @@ -207,7 +207,7 @@ object PartitioningUtils { typeInference: Boolean, basePaths: Set[Path], userSpecifiedDataTypes: Map[String, DataType], - validatePartitionValueWithProvidedSchema: Boolean, + validatePartitionColumns: Boolean, timeZone: TimeZone): (Option[PartitionValues], Option[Path]) = { val columns = ArrayBuffer.empty[(String, Literal)] // Old Hadoop versions don't have `Path.isRoot` @@ -230,7 +230,7 @@ object PartitioningUtils { // Once we get the string, we try to parse it and find the partition column and value. val maybeColumn = parsePartitionColumn(currentPath.getName, typeInference, userSpecifiedDataTypes, - validatePartitionValueWithProvidedSchema, timeZone) + validatePartitionColumns, timeZone) maybeColumn.foreach(columns += _) // Now, we determine if we should stop. @@ -264,7 +264,7 @@ object PartitioningUtils { columnSpec: String, typeInference: Boolean, userSpecifiedDataTypes: Map[String, DataType], - validatePartitionValueWithProvidedSchema: Boolean, + validatePartitionColumns: Boolean, timeZone: TimeZone): Option[(String, Literal)] = { val equalSignIndex = columnSpec.indexOf('=') if (equalSignIndex == -1) { @@ -283,8 +283,7 @@ object PartitioningUtils { val columnValueLiteral = inferPartitionColumnValue(rawColumnValue, false, timeZone) val columnValue = columnValueLiteral.eval() val castedValue = Cast(columnValueLiteral, dataType, Option(timeZone.getID)).eval() - if (validatePartitionValueWithProvidedSchema - && columnValue != null && castedValue == null) { + if (validatePartitionColumns && columnValue != null && castedValue == null) { throw new RuntimeException(s"Failed to cast partition value `$columnValue` to $dataType") } Literal.create(castedValue, userSpecifiedDataTypes(columnName)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 5be39b1d543a9..f3701d543d121 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -103,7 +103,7 @@ class FileIndexSuite extends SharedSQLContext { stringToFile(file, "text") val path = new Path(dir.getCanonicalPath) val schema = StructType(Seq(StructField("a", IntegerType, false))) - withSQLConf(SQLConf.VALIDATE_PARTITION_VALUE_WITH_PROVIDED_SCHEMA.key -> "true") { + withSQLConf(SQLConf.VALIDATE_PARTITION_COLUMNS.key -> "true") { val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, Some(schema)) val msg = intercept[RuntimeException] { fileIndex.partitionSpec() @@ -111,7 +111,7 @@ class FileIndexSuite extends SharedSQLContext { assert(msg == "Failed to cast partition value `foo` to IntegerType") } - withSQLConf(SQLConf.VALIDATE_PARTITION_VALUE_WITH_PROVIDED_SCHEMA.key -> "false") { + withSQLConf(SQLConf.VALIDATE_PARTITION_COLUMNS.key -> "false") { val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, Some(schema)) val partitionValues = fileIndex.partitionSpec().partitions.map(_.values) assert(partitionValues.length == 1 && partitionValues(0).numFields == 1 && From ce2db2824d0179d63f7234688784f78ddb04e4e5 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 6 Dec 2018 13:55:46 +0800 Subject: [PATCH 5/8] revise --- core/src/main/resources/org/apache/spark/ui/static/stagepage.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js index e42f05683fa82..564467487e84e 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js @@ -654,7 +654,7 @@ $(document).ready(function () { "columns": [ {data: function (row, type) { return type !== 'display' ? (isNaN(row.index) ? 0 : row.index ) : row.index; - },sql-migration-guide-upgrade.md + }, name: "Index" }, {data : "taskId", name: "ID"}, From 25f7039c6b836d40370b615d3d0259c9640dde4c Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 6 Dec 2018 20:29:44 +0800 Subject: [PATCH 6/8] include partition column in error message --- .../spark/sql/execution/datasources/PartitioningUtils.scala | 3 ++- .../spark/sql/execution/datasources/FileIndexSuite.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 6de206cd17f2e..4430df930913f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -284,7 +284,8 @@ object PartitioningUtils { val columnValue = columnValueLiteral.eval() val castedValue = Cast(columnValueLiteral, dataType, Option(timeZone.getID)).eval() if (validatePartitionColumns && columnValue != null && castedValue == null) { - throw new RuntimeException(s"Failed to cast partition value `$columnValue` to $dataType") + throw new RuntimeException(s"Failed to cast value `$columnValue` to `$dataType` " + + s"for partition column `$columnName`") } Literal.create(castedValue, userSpecifiedDataTypes(columnName)) } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index f3701d543d121..e817521fe0dba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -108,7 +108,7 @@ class FileIndexSuite extends SharedSQLContext { val msg = intercept[RuntimeException] { fileIndex.partitionSpec() }.getMessage - assert(msg == "Failed to cast partition value `foo` to IntegerType") + assert(msg == "Failed to cast value `foo` to `IntegerType` for partition column `a`") } withSQLConf(SQLConf.VALIDATE_PARTITION_COLUMNS.key -> "false") { From e13147a3103be361f0592ee980c0f43c504675e4 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 6 Dec 2018 21:42:51 +0800 Subject: [PATCH 7/8] address comments --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- .../spark/sql/execution/datasources/FileIndexSuite.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8fbdd72034e0f..6857b8de79758 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1400,9 +1400,9 @@ object SQLConf { buildConf("spark.sql.sources.validatePartitionColumns") .internal() .doc("When this option is set to true, partition column values will be validated with " + - "provided schema. If the validation fails, a runtime exception is thrown." + + "user-specified schema. If the validation fails, a runtime exception is thrown." + "When this option is set to false, the partition column value will be converted to null " + - "if it can not be converted to corresponding provided schema.") + "if it can not be casted to corresponding user-specified schema.") .booleanConf .createWithDefault(true) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index e817521fe0dba..6bd0a2591fc1f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -95,7 +95,7 @@ class FileIndexSuite extends SharedSQLContext { } } - test("SPARK-26263: Throw exception when partition value can't be converted to specific type") { + test("SPARK-26263: Throw exception when partition value can't be casted to user-specified type") { withTempDir { dir => val partitionDirectory = new File(dir, "a=foo") partitionDirectory.mkdir() From 6f4e652add4157bfcdad4d7a924c74363f2b5cf2 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 6 Dec 2018 22:02:13 +0800 Subject: [PATCH 8/8] revise --- .../spark/sql/execution/datasources/PartitioningUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 4430df930913f..6458b65466fb5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -287,7 +287,7 @@ object PartitioningUtils { throw new RuntimeException(s"Failed to cast value `$columnValue` to `$dataType` " + s"for partition column `$columnName`") } - Literal.create(castedValue, userSpecifiedDataTypes(columnName)) + Literal.create(castedValue, dataType) } else { inferPartitionColumnValue(rawColumnValue, typeInference, timeZone) }