From d7f68b59d71c92b85a50a42a3c347be24eb721dd Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 21 Nov 2020 22:07:25 +0300 Subject: [PATCH 1/4] Add a test --- .../AlterTablePartitionV2SQLSuite.scala | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index 107d0ea47249d..0f6d175ab6201 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionsAlreadyExistException} import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits +import org.apache.spark.sql.internal.SQLConf class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { @@ -159,4 +160,21 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { assert(partTable.asPartitionable.listPartitionIdentifiers(InternalRow.empty).isEmpty) } } + + test("case sensitivity in resolving partition specs") { + val t = "testpart.ns1.ns2.tbl" + withTable(t) { + spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)") + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val errMsg = intercept[AnalysisException] { + spark.sql(s"ALTER TABLE $t ADD PARTITION (ID=1) LOCATION 'loc1'") + }.getMessage + assert(errMsg.contains("Partition key ID not exists")) + } + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + spark.sql(s"ALTER TABLE $t ADD PARTITION (ID=1) LOCATION 'loc1'") + } + } + } } From 15e26e0faf296448d97b3627d3e81da1de620f9e Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 21 Nov 2020 22:47:17 +0300 Subject: [PATCH 2/4] Respect case sensitivity --- .../analysis/ResolvePartitionSpec.scala | 27 +++++++---- .../spark/sql/util/PartitioningUtils.scala | 47 +++++++++++++++++++ .../command/AnalyzePartitionCommand.scala | 2 +- .../spark/sql/execution/command/ddl.scala | 3 +- .../spark/sql/execution/command/tables.scala | 3 +- .../datasources/PartitioningUtils.scala | 24 ---------- .../sql/execution/datasources/rules.scala | 3 +- .../AlterTablePartitionV2SQLSuite.scala | 12 ++++- 8 files changed, 82 insertions(+), 39 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala index 5e19a32968992..531d40f431dee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddPartition, Alte import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec /** * Resolve [[UnresolvedPartitionSpec]] to [[ResolvedPartitionSpec]] in partition related commands. @@ -33,32 +34,38 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case r @ AlterTableAddPartition( ResolvedTable(_, _, table: SupportsPartitionManagement), partSpecs, _) => - r.copy(parts = resolvePartitionSpecs(partSpecs, table.partitionSchema())) + r.copy(parts = resolvePartitionSpecs(table.name, partSpecs, table.partitionSchema())) case r @ AlterTableDropPartition( ResolvedTable(_, _, table: SupportsPartitionManagement), partSpecs, _, _, _) => - r.copy(parts = resolvePartitionSpecs(partSpecs, table.partitionSchema())) + r.copy(parts = resolvePartitionSpecs(table.name, partSpecs, table.partitionSchema())) } private def resolvePartitionSpecs( - partSpecs: Seq[PartitionSpec], partSchema: StructType): Seq[ResolvedPartitionSpec] = + tableName: String, + partSpecs: Seq[PartitionSpec], + partSchema: StructType): Seq[ResolvedPartitionSpec] = partSpecs.map { case unresolvedPartSpec: UnresolvedPartitionSpec => ResolvedPartitionSpec( - convertToPartIdent(unresolvedPartSpec.spec, partSchema), unresolvedPartSpec.location) + convertToPartIdent(tableName, unresolvedPartSpec.spec, partSchema), + unresolvedPartSpec.location) case resolvedPartitionSpec: ResolvedPartitionSpec => resolvedPartitionSpec } private def convertToPartIdent( - partSpec: TablePartitionSpec, partSchema: StructType): InternalRow = { - val conflictKeys = partSpec.keys.toSeq.diff(partSchema.map(_.name)) - if (conflictKeys.nonEmpty) { - throw new AnalysisException(s"Partition key ${conflictKeys.mkString(",")} not exists") - } + tableName: String, + partitionSpec: TablePartitionSpec, + partSchema: StructType): InternalRow = { + val normalizedSpec = normalizePartitionSpec( + partitionSpec, + partSchema.map(_.name), + tableName, + conf.resolver) val partValues = partSchema.map { part => - val partValue = partSpec.get(part.name).orNull + val partValue = normalizedSpec.get(part.name).orNull if (partValue == null) { null } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala new file mode 100644 index 0000000000000..586aa6c59164f --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.Resolver + +object PartitioningUtils { + /** + * Normalize the column names in partition specification, w.r.t. the real partition column names + * and case sensitivity. e.g., if the partition spec has a column named `monTh`, and there is a + * partition column named `month`, and it's case insensitive, we will normalize `monTh` to + * `month`. + */ + def normalizePartitionSpec[T]( + partitionSpec: Map[String, T], + partColNames: Seq[String], + tblName: String, + resolver: Resolver): Map[String, T] = { + val normalizedPartSpec = partitionSpec.toSeq.map { case (key, value) => + val normalizedKey = partColNames.find(resolver(_, key)).getOrElse { + throw new AnalysisException(s"$key is not a valid partition column in table $tblName.") + } + normalizedKey -> value + } + + SchemaUtils.checkColumnNameDuplication( + normalizedPartSpec.map(_._1), "in the partition schema", resolver) + + normalizedPartSpec.toMap + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala index fc62dce5002b1..0b265bfb63e3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, Unresol import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalogUtils} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal} -import org.apache.spark.sql.execution.datasources.PartitioningUtils +import org.apache.spark.sql.util.PartitioningUtils /** * Analyzes a given set of partitions to generate per-partition statistics, which will be used in diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index d550fe270c753..27ad62026c9b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -39,11 +39,12 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog} import org.apache.spark.sql.connector.catalog.SupportsNamespaces._ -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.PartitioningUtils import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} // Note: The definition of these commands are based on the ones described in diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 206f952fed0ca..9cb81f8ca1645 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIdentifier, CaseInsensitiveMap} -import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils} +import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat @@ -47,6 +47,7 @@ import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.PartitioningUtils import org.apache.spark.sql.util.SchemaUtils import org.apache.spark.util.Utils diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 796c23c7337d8..238089a7d2bb0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -357,30 +357,6 @@ object PartitioningUtils { getPathFragment(spec, StructType.fromAttributes(partitionColumns)) } - /** - * Normalize the column names in partition specification, w.r.t. the real partition column names - * and case sensitivity. e.g., if the partition spec has a column named `monTh`, and there is a - * partition column named `month`, and it's case insensitive, we will normalize `monTh` to - * `month`. - */ - def normalizePartitionSpec[T]( - partitionSpec: Map[String, T], - partColNames: Seq[String], - tblName: String, - resolver: Resolver): Map[String, T] = { - val normalizedPartSpec = partitionSpec.toSeq.map { case (key, value) => - val normalizedKey = partColNames.find(resolver(_, key)).getOrElse { - throw new AnalysisException(s"$key is not a valid partition column in table $tblName.") - } - normalizedKey -> value - } - - SchemaUtils.checkColumnNameDuplication( - normalizedPartSpec.map(_._1), "in the partition schema", resolver) - - normalizedPartSpec.toMap - } - /** * Resolves possible type conflicts between partitions by up-casting "lower" types using * [[findWiderTypeForPartitionColumn]]. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 3a2a642b870f8..9e65b0ce13693 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 import org.apache.spark.sql.sources.InsertableRelation import org.apache.spark.sql.types.{AtomicType, StructType} +import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec import org.apache.spark.sql.util.SchemaUtils /** @@ -386,7 +387,7 @@ object PreprocessTableInsertion extends Rule[LogicalPlan] { partColNames: Seq[String], catalogTable: Option[CatalogTable]): InsertIntoStatement = { - val normalizedPartSpec = PartitioningUtils.normalizePartitionSpec( + val normalizedPartSpec = normalizePartitionSpec( insert.partitionSpec, partColNames, tblName, conf.resolver) val staticPartCols = normalizedPartSpec.filter(_._2.isDefined).keySet diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index 0f6d175ab6201..8f0533be152a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -169,11 +169,21 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { val errMsg = intercept[AnalysisException] { spark.sql(s"ALTER TABLE $t ADD PARTITION (ID=1) LOCATION 'loc1'") }.getMessage - assert(errMsg.contains("Partition key ID not exists")) + assert(errMsg.contains(s"ID is not a valid partition column in table $t")) } + val partTable = catalog("testpart").asTableCatalog + .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")) + .asPartitionable + assert(!partTable.partitionExists(InternalRow.fromSeq(Seq(1)))) + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { spark.sql(s"ALTER TABLE $t ADD PARTITION (ID=1) LOCATION 'loc1'") + assert(partTable.partitionExists(InternalRow.fromSeq(Seq(1)))) + spark.sql(s"ALTER TABLE $t DROP PARTITION (Id=1)") + assert(!partTable.partitionExists(InternalRow.fromSeq(Seq(1)))) + + spark.sql(s"ALTER TABLE $t ADD PARTITION (ID=1) LOCATION 'loc1'") } } } From 7269739b7f630e2bd9e8ed2e8cdc6ef98814db4e Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 21 Nov 2020 23:11:19 +0300 Subject: [PATCH 3/4] Remove unneeded changes --- .../spark/sql/connector/AlterTablePartitionV2SQLSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index 8f0533be152a3..e05c2c09ace2a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -182,8 +182,6 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { assert(partTable.partitionExists(InternalRow.fromSeq(Seq(1)))) spark.sql(s"ALTER TABLE $t DROP PARTITION (Id=1)") assert(!partTable.partitionExists(InternalRow.fromSeq(Seq(1)))) - - spark.sql(s"ALTER TABLE $t ADD PARTITION (ID=1) LOCATION 'loc1'") } } } From 046f0d1a7564a6ca5f2bc6e608c6f771dc4b9df8 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 21 Nov 2020 23:48:11 +0300 Subject: [PATCH 4/4] Remove an unused import --- .../spark/sql/execution/datasources/PartitioningUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 238089a7d2bb0..ea437d200eaab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -30,7 +30,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion} +import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal} import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter}