From 4162229e3d5d1018c3b7e424aa1911a521fff32b Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 29 Nov 2016 17:38:10 -0800 Subject: [PATCH 1/2] Tue Nov 29 17:38:10 PST 2016 --- .../sql/catalyst/catalog/interface.scala | 3 ++ .../spark/sql/hive/HiveExternalCatalog.scala | 5 +- .../spark/sql/hive/client/HiveShim.scala | 6 ++- .../PartitionProviderCompatibilitySuite.scala | 52 +++++++++++++++++++ 4 files changed, 62 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index d8bc86727e466..d2a1af0800914 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -44,6 +44,9 @@ case class CatalogFunction( * Storage format, used to describe how a partition or a table is stored. */ case class CatalogStorageFormat( + // TODO(ekl) consider storing this field as java.net.URI for type safety. Note that this must + // be converted to/from a hadoop Path object using new Path(new URI(locationUri)) and + // path.toUri respectively before use as a filesystem path due to URI char escaping. locationUri: Option[String], inputFormat: Option[String], outputFormat: Option[String], diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index fd9dc32063872..1a9943bc31058 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import java.io.IOException +import java.net.URI import java.util import scala.util.control.NonFatal @@ -833,10 +834,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // However, Hive metastore is not case preserving and will generate wrong partition location // with lower cased partition column names. Here we set the default partition location // manually to avoid this problem. - val partitionPath = p.storage.locationUri.map(new Path(_)).getOrElse { + val partitionPath = p.storage.locationUri.map(uri => new Path(new URI(uri))).getOrElse { ExternalCatalogUtils.generatePartitionPath(p.spec, partitionColumnNames, tablePath) } - p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toString))) + p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri.toString))) } val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec))) client.createPartitions(db, table, lowerCasedParts, ignoreIfExists) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 3d9642dd1463d..e561706facf03 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -268,7 +268,8 @@ private[client] class Shim_v0_12 extends Shim with Logging { ignoreIfExists: Boolean): Unit = { val table = hive.getTable(database, tableName) parts.foreach { s => - val location = s.storage.locationUri.map(new Path(table.getPath, _)).orNull + val location = s.storage.locationUri.map( + uri => new Path(table.getPath, new Path(new URI(uri)))).orNull val params = if (s.parameters.nonEmpty) s.parameters.asJava else null val spec = s.spec.asJava if (hive.getPartition(table, spec, false) != null && ignoreIfExists) { @@ -463,7 +464,8 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { ignoreIfExists: Boolean): Unit = { val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists) parts.zipWithIndex.foreach { case (s, i) => - addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull) + addPartitionDesc.addPartition( + s.spec.asJava, s.storage.locationUri.map(u => new Path(new URI(u)).toString).orNull) if (s.parameters.nonEmpty) { addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index cace5fa95cad0..be91ce49f7dad 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -205,6 +205,58 @@ class PartitionProviderCompatibilitySuite } } } + + test(s"SPARK-18635 special chars in partition values - partition management $enabled") { + withTable("test") { + spark.range(10) + .selectExpr("id", "id as A", "'%' as B") + .write.partitionBy("A", "B").mode("overwrite") + .saveAsTable("test") + assert(spark.sql("select * from test").count() == 10) + assert(spark.sql("select * from test where B = '%'").count() == 10) + assert(spark.sql("select * from test where B = '$'").count() == 0) + spark.range(10) + .selectExpr("id", "id as A", "'=' as B") + .write.mode("append").insertInto("test") + spark.sql("insert into test partition (A, B) select id, id, '%=' from range(10)") + assert(spark.sql("select * from test").count() == 30) + assert(spark.sql("select * from test where B = '%'").count() == 10) + assert(spark.sql("select * from test where B = '='").count() == 10) + assert(spark.sql("select * from test where B = '%='").count() == 10) + + // show partitions sanity check + val parts = spark.sql("show partitions test").collect().map(_.get(0)).toSeq + assert(parts.length == 30) + assert(parts.contains("A=0/B=%25")) + assert(parts.contains("A=0/B=%3D")) + assert(parts.contains("A=0/B=%25%3D")) + + // custom locations sanity check + withTempDir { dir => + spark.sql(s""" + |alter table test partition (A=0, B='%') + |set location '${dir.getAbsolutePath}'""".stripMargin) + assert(spark.sql("select * from test").count() == 29) // missing 1 + } + + // drop partition sanity check + spark.sql("alter table test drop partition (A=1, B='%')") + assert(spark.sql("select * from test").count() == 28) + + // TODO(ekl) fix rename partition +// withTempDir { dir => +// spark.sql(s""" +// |alter table test partition (A=0, B='%') +// |rename to partition (A=100, B='%')""".stripMargin) +// assert(spark.sql("select * from test where a = 100").count() == 1) +// } + + // TODO(ekl) fix overwrite table +// spark.sql("show partitions test").show(false) +// spark.sql("insert overwrite table test partition (a, b) select id, id, '%' from range(1)") +// assert(spark.sql("select * from test").count() == 1) + } + } } /** From 1bd10ba05655981c46d3c6594d76e320a4fe4dff Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 30 Nov 2016 13:24:29 -0800 Subject: [PATCH 2/2] Wed Nov 30 13:24:29 PST 2016 --- .../PartitionProviderCompatibilitySuite.scala | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index be91ce49f7dad..e8e4238d1c5a4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -231,30 +231,32 @@ class PartitionProviderCompatibilitySuite assert(parts.contains("A=0/B=%3D")) assert(parts.contains("A=0/B=%25%3D")) - // custom locations sanity check + // drop partition sanity check + spark.sql("alter table test drop partition (A=1, B='%')") + assert(spark.sql("select * from test").count() == 29) // 1 file in dropped partition + withTempDir { dir => + // custom locations sanity check spark.sql(s""" |alter table test partition (A=0, B='%') |set location '${dir.getAbsolutePath}'""".stripMargin) - assert(spark.sql("select * from test").count() == 29) // missing 1 - } + assert(spark.sql("select * from test").count() == 28) // moved to empty dir - // drop partition sanity check - spark.sql("alter table test drop partition (A=1, B='%')") - assert(spark.sql("select * from test").count() == 28) - - // TODO(ekl) fix rename partition -// withTempDir { dir => -// spark.sql(s""" -// |alter table test partition (A=0, B='%') -// |rename to partition (A=100, B='%')""".stripMargin) -// assert(spark.sql("select * from test where a = 100").count() == 1) -// } - - // TODO(ekl) fix overwrite table -// spark.sql("show partitions test").show(false) -// spark.sql("insert overwrite table test partition (a, b) select id, id, '%' from range(1)") -// assert(spark.sql("select * from test").count() == 1) + // rename partition sanity check + spark.sql(s""" + |alter table test partition (A=5, B='%') + |rename to partition (A=100, B='%')""".stripMargin) + assert(spark.sql("select * from test where a = 5 and b = '%'").count() == 0) + assert(spark.sql("select * from test where a = 100 and b = '%'").count() == 1) + + // try with A=0 which has a custom location + spark.sql("insert into test partition (A=0, B='%') select 1") + spark.sql(s""" + |alter table test partition (A=0, B='%') + |rename to partition (A=101, B='%')""".stripMargin) + assert(spark.sql("select * from test where a = 0 and b = '%'").count() == 0) + assert(spark.sql("select * from test where a = 101 and b = '%'").count() == 1) + } } } }