diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 60cacda9f5f1..5fb1a4d24907 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -386,7 +386,8 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { private def preprocess( insert: InsertIntoStatement, tblName: String, - partColNames: Seq[String]): InsertIntoStatement = { + partColNames: Seq[String], + catalogTable: Option[CatalogTable]): InsertIntoStatement = { val normalizedPartSpec = PartitioningUtils.normalizePartitionSpec( insert.partitionSpec, partColNames, tblName, conf.resolver) @@ -402,6 +403,18 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { s"including ${staticPartCols.size} partition column(s) having constant value(s).") } + val partitionsTrackedByCatalog = catalogTable.isDefined && + catalogTable.get.partitionColumnNames.nonEmpty && + catalogTable.get.tracksPartitionsInCatalog + if (partitionsTrackedByCatalog && normalizedPartSpec.nonEmpty) { + // empty partition column value + if (normalizedPartSpec.filter(_._2.isDefined).exists(_._2.get.isEmpty)) { + val spec = normalizedPartSpec.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") + throw new AnalysisException( + s"Partition spec is invalid. The spec ($spec) contains an empty partition column value") + } + } + val newQuery = TableOutputResolver.resolveOutputColumns( tblName, expectedColumns, insert.query, byName = false, conf) if (normalizedPartSpec.nonEmpty) { @@ -427,13 +440,14 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { table match { case relation: HiveTableRelation => val metadata = relation.tableMeta - preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames) + preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames, + Some(metadata)) case LogicalRelation(h: HadoopFsRelation, _, catalogTable, _) => val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown") - preprocess(i, tblName, h.partitionSchema.map(_.name)) + preprocess(i, tblName, h.partitionSchema.map(_.name), catalogTable) case LogicalRelation(_: InsertableRelation, _, catalogTable, _) => val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown") - preprocess(i, tblName, Nil) + preprocess(i, tblName, Nil, catalogTable) case _ => i } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index abd33ab8a8f2..32c4fb60b8c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -866,6 +866,28 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { }.getMessage assert(message.contains("LOCAL is supported only with file: scheme")) } + + test("SPARK-32508 " + + "Disallow empty part col values in partition spec before static partition writing") { + withTable("insertTable") { + sql( + """ + |CREATE TABLE insertTable(i int, part1 string, part2 string) USING PARQUET + |PARTITIONED BY (part1, part2) + """.stripMargin) + val msg = "Partition spec is invalid" + assert(intercept[AnalysisException] { + sql("INSERT INTO TABLE insertTable PARTITION(part1=1, part2='') SELECT 1") + }.getMessage.contains(msg)) + assert(intercept[AnalysisException] { + sql("INSERT INTO TABLE insertTable PARTITION(part1='', part2) SELECT 1 ,'' AS part2") + }.getMessage.contains(msg)) + + sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2='2') SELECT 1") + sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2) SELECT 1 ,'2' AS part2") + sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2) SELECT 1 ,'' AS part2") + } + } } class FileExistingTestFileSystem extends RawLocalFileSystem { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index 421dcb499bd6..ebc6cfb77d35 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -847,4 +847,26 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter } } } + + test("SPARK-32508 " + + "Disallow empty part col values in partition spec before static partition writing") { + withTable("t1") { + spark.sql( + """ + |CREATE TABLE t1 (c1 int) + |PARTITIONED BY (d string) + """.stripMargin) + + val e = intercept[AnalysisException] { + spark.sql( + """ + |INSERT OVERWRITE TABLE t1 PARTITION(d='') + |SELECT 1 + """.stripMargin) + }.getMessage + + assert(!e.contains("get partition: Value for key d is null or empty")) + assert(e.contains("Partition spec is invalid")) + } + } }