Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ case class CatalogFunction(
* Storage format, used to describe how a partition or a table is stored.
*/
case class CatalogStorageFormat(
// TODO(ekl) consider storing this field as java.net.URI for type safety. Note that this must
// be converted to/from a hadoop Path object using new Path(new URI(locationUri)) and
// path.toUri respectively before use as a filesystem path due to URI char escaping.
locationUri: Option[String],
inputFormat: Option[String],
outputFormat: Option[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive

import java.io.IOException
import java.net.URI
import java.util

import scala.util.control.NonFatal
Expand Down Expand Up @@ -833,10 +834,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// However, Hive metastore is not case preserving and will generate wrong partition location
// with lower cased partition column names. Here we set the default partition location
// manually to avoid this problem.
val partitionPath = p.storage.locationUri.map(new Path(_)).getOrElse {
val partitionPath = p.storage.locationUri.map(uri => new Path(new URI(uri))).getOrElse {
ExternalCatalogUtils.generatePartitionPath(p.spec, partitionColumnNames, tablePath)
}
p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toString)))
p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri.toString)))
}
val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ private[client] class Shim_v0_12 extends Shim with Logging {
ignoreIfExists: Boolean): Unit = {
val table = hive.getTable(database, tableName)
parts.foreach { s =>
val location = s.storage.locationUri.map(new Path(table.getPath, _)).orNull
val location = s.storage.locationUri.map(
uri => new Path(table.getPath, new Path(new URI(uri)))).orNull
val params = if (s.parameters.nonEmpty) s.parameters.asJava else null
val spec = s.spec.asJava
if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
Expand Down Expand Up @@ -463,7 +464,8 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
ignoreIfExists: Boolean): Unit = {
val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
parts.zipWithIndex.foreach { case (s, i) =>
addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull)
addPartitionDesc.addPartition(
s.spec.asJava, s.storage.locationUri.map(u => new Path(new URI(u)).toString).orNull)
if (s.parameters.nonEmpty) {
addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,60 @@ class PartitionProviderCompatibilitySuite
}
}
}

test(s"SPARK-18635 special chars in partition values - partition management $enabled") {
withTable("test") {
spark.range(10)
.selectExpr("id", "id as A", "'%' as B")
.write.partitionBy("A", "B").mode("overwrite")
.saveAsTable("test")
assert(spark.sql("select * from test").count() == 10)
assert(spark.sql("select * from test where B = '%'").count() == 10)
assert(spark.sql("select * from test where B = '$'").count() == 0)
spark.range(10)
.selectExpr("id", "id as A", "'=' as B")
.write.mode("append").insertInto("test")
spark.sql("insert into test partition (A, B) select id, id, '%=' from range(10)")
assert(spark.sql("select * from test").count() == 30)
assert(spark.sql("select * from test where B = '%'").count() == 10)
assert(spark.sql("select * from test where B = '='").count() == 10)
assert(spark.sql("select * from test where B = '%='").count() == 10)

// show partitions sanity check
val parts = spark.sql("show partitions test").collect().map(_.get(0)).toSeq
assert(parts.length == 30)
assert(parts.contains("A=0/B=%25"))
assert(parts.contains("A=0/B=%3D"))
assert(parts.contains("A=0/B=%25%3D"))

// drop partition sanity check
spark.sql("alter table test drop partition (A=1, B='%')")
assert(spark.sql("select * from test").count() == 29) // 1 file in dropped partition

withTempDir { dir =>
// custom locations sanity check
spark.sql(s"""
|alter table test partition (A=0, B='%')
|set location '${dir.getAbsolutePath}'""".stripMargin)
assert(spark.sql("select * from test").count() == 28) // moved to empty dir

// rename partition sanity check
spark.sql(s"""
|alter table test partition (A=5, B='%')
|rename to partition (A=100, B='%')""".stripMargin)
assert(spark.sql("select * from test where a = 5 and b = '%'").count() == 0)
assert(spark.sql("select * from test where a = 100 and b = '%'").count() == 1)

// try with A=0 which has a custom location
spark.sql("insert into test partition (A=0, B='%') select 1")
spark.sql(s"""
|alter table test partition (A=0, B='%')
|rename to partition (A=101, B='%')""".stripMargin)
assert(spark.sql("select * from test where a = 0 and b = '%'").count() == 0)
assert(spark.sql("select * from test where a = 101 and b = '%'").count() == 1)
}
}
}
}

/**
Expand Down