Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
private def preprocess(
insert: InsertIntoStatement,
tblName: String,
partColNames: Seq[String]): InsertIntoStatement = {
partColNames: Seq[String],
catalogTable: Option[CatalogTable]): InsertIntoStatement = {

val normalizedPartSpec = PartitioningUtils.normalizePartitionSpec(
insert.partitionSpec, partColNames, tblName, conf.resolver)
Expand All @@ -402,6 +403,18 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
s"including ${staticPartCols.size} partition column(s) having constant value(s).")
}

val partitionsTrackedByCatalog = catalogTable.isDefined &&
catalogTable.get.partitionColumnNames.nonEmpty &&
catalogTable.get.tracksPartitionsInCatalog
if (partitionsTrackedByCatalog && normalizedPartSpec.nonEmpty) {
// empty partition column value
if (normalizedPartSpec.filter(_._2.isDefined).exists(_._2.get.isEmpty)) {
val spec = normalizedPartSpec.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
throw new AnalysisException(
s"Partition spec is invalid. The spec ($spec) contains an empty partition column value")
}
}

val newQuery = TableOutputResolver.resolveOutputColumns(
tblName, expectedColumns, insert.query, byName = false, conf)
if (normalizedPartSpec.nonEmpty) {
Expand All @@ -427,13 +440,14 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
table match {
case relation: HiveTableRelation =>
val metadata = relation.tableMeta
preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames)
preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames,
Some(metadata))
case LogicalRelation(h: HadoopFsRelation, _, catalogTable, _) =>
val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
preprocess(i, tblName, h.partitionSchema.map(_.name))
preprocess(i, tblName, h.partitionSchema.map(_.name), catalogTable)
case LogicalRelation(_: InsertableRelation, _, catalogTable, _) =>
val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
preprocess(i, tblName, Nil)
preprocess(i, tblName, Nil, catalogTable)
case _ => i
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,28 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
}.getMessage
assert(message.contains("LOCAL is supported only with file: scheme"))
}

test("SPARK-32508 " +
"Disallow empty part col values in partition spec before static partition writing") {
withTable("insertTable") {
sql(
"""
|CREATE TABLE insertTable(i int, part1 string, part2 string) USING PARQUET
|PARTITIONED BY (part1, part2)
""".stripMargin)
val msg = "Partition spec is invalid"
assert(intercept[AnalysisException] {
sql("INSERT INTO TABLE insertTable PARTITION(part1=1, part2='') SELECT 1")
}.getMessage.contains(msg))
assert(intercept[AnalysisException] {
sql("INSERT INTO TABLE insertTable PARTITION(part1='', part2) SELECT 1 ,'' AS part2")
}.getMessage.contains(msg))

sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2='2') SELECT 1")
sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2) SELECT 1 ,'2' AS part2")
sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2) SELECT 1 ,'' AS part2")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the partition column can be empty string if it's dynamic. Shall we convert the empty string/null in partition spec to __HIVE_DEFAULT_PARTITION__ before calling listPartitions/loadPartition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally speaking, it is meaningless for the partition value to be empty, so the static partition value is not allowed to be empty.
Dynamic partition may be that the user does not know that the partition field is null or empty, and finally wrote the __HIVE_DEFAULT_PARTITION__ partition.

listPartitions

spark-sql> show partitions inserttable ;
part1=1/part2=__HIVE_DEFAULT_PARTITION__
Time taken: 0.2 seconds, Fetched 1 row(s)
spark-sql> desc formatted inserttable partition(part1='1',part2='');
Error in query: Partition spec is invalid. The spec ([part1=1, part2=]) contains an empty partition column value;
spark-sql> desc formatted inserttable partition(part1='1',part2='__HIVE_DEFAULT_PARTITION__');
col_name	data_type	comment
...
Time taken: 0.348 seconds, Fetched 27 row(s)

The partition value the user sees is __HIVE_DEFAULT_PARTITION__, so the user will not specify the partition value empty to query the partition details.

loadPartition
Because in DynamicPartitionDataWriter#partitionPathExpression, the partition value will be null or emtpy converted to __HIVE_DEFAULT_PARTITION__, so it can be executed successfully without the need to increase early conversion.

}
}
}

class FileExistingTestFileSystem extends RawLocalFileSystem {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -847,4 +847,26 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
}
}
}

test("SPARK-32508 " +
"Disallow empty part col values in partition spec before static partition writing") {
withTable("t1") {
spark.sql(
"""
|CREATE TABLE t1 (c1 int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this a problem only for hive tables?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InsertIntoHadoopFsRelationCommand
When manageFilesourcePartitions is turned on,catalog.listPartitions is called, here is a check to see if the partition value is empty.

In the case that manageFilesourcePartitions is not turned on, the partition value is currently not checked, which means that the SQL execution will not fail. If I now move the check logic to the PreprocessTableInsertion rule, this will cause the execution to fail.

Perhaps this check can only be performed when tracksPartitionsInCatalog is equal to true and the static partition is written.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hive calls getPartition when loadPartition, here it will check whether the partition value is empty.

public Partition getPartition(...){
          || (val != null && val.length() == 0)) {
        throw new HiveException("get partition: Value for key "
            + field.getName() + " is null or empty");
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case that manageFilesourcePartitions is not turned on, the partition value is currently not checked, which means that the SQL execution will not fail.

what's the behavior then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InsertIntoHadoopFsRelationCommand

When writing static partition or dynamic partition, the DynamicPartitionDataWriter will be used, and the partition value of empty will generate the default value(HIVE_DEFAULT_PARTITION) through getPartitionPathString.

When manageFilesourcePartitions is not turned on, the partition information is maintained through the filesystem, so it is not checked whether the partition value is empty.
insert ovwriter table a partition(d='') select 1 sql will run successfully.

Copy link
Contributor

@cloud-fan cloud-fan Sep 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the behavior of hive? Does hive treat d='' as d=HIVE_DEFAULT_PARTITION and can run it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hive> INSERT OVERWRITE TABLE t1 PARTITION(d='') select 1;

Caused by: org.apache.hadoop.hive.ql.parse.SemanticException: Line 1:61 Partition not found ''''
        at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer$tableSpec.<init>(BaseSemanticAnalyzer.java:856)
        at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer$tableSpec.<init>(BaseSemanticAnalyzer.java:727)
        at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.getMetaData(SemanticAnalyzer.java:1652)
        ... 23 more
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: get partition: Value for key d is null or empty
        at org.apache.hadoop.hive.ql.metadata.Hive.getPartition(Hive.java:1900)

hive> INSERT OVERWRITE TABLE t1 PARTITION(d) select 1,'' as d;
result:

Loading data to table x.t1 partition (d=null)
	 Time taken for load dynamic partitions : 302
	Loading partition {d=__HIVE_DEFAULT_PARTITION__}
	 Time taken for adding to write entity : 1
Partition x.t1{d=__HIVE_DEFAULT_PARTITION__} stats: [numFiles=1, numRows=1, totalSize=201, rawDataSize=85]

|PARTITIONED BY (d string)
""".stripMargin)

val e = intercept[AnalysisException] {
spark.sql(
"""
|INSERT OVERWRITE TABLE t1 PARTITION(d='')
|SELECT 1
""".stripMargin)
}.getMessage

assert(!e.contains("get partition: Value for key d is null or empty"))
assert(e.contains("Partition spec is invalid"))
}
}
}