From 87ff39bb00312051109b357c5d978fc92de18e27 Mon Sep 17 00:00:00 2001 From: TJX2014 Date: Sun, 21 Jun 2020 14:14:04 +0800 Subject: [PATCH 1/7] spark.sql.follow.hive.table.location to compatible with legacy `path` in catalog --- .../org/apache/spark/sql/internal/StaticSQLConf.scala | 7 +++++++ .../org/apache/spark/sql/hive/HiveExternalCatalog.scala | 6 +++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index 9618ff606263..28a98dae168d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -226,4 +226,11 @@ object StaticSQLConf { .version("3.0.0") .intConf .createWithDefault(100) + + val FOLLOW_HIVE_TABLE_LOCATION_ENABLED = + buildStaticConf("spark.sql.follow.hive.table.location") + .doc("If prefer hive table location to compatible with legacy `path` in catalog") + .version("3.1.0") + .booleanConf + .createWithDefault(false) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 2faf42028f3a..e75170a274e7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -545,7 +545,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } private def getLocationFromStorageProps(table: CatalogTable): Option[String] = { - CaseInsensitiveMap(table.storage.properties).get("path") + if (conf.get(FOLLOW_HIVE_TABLE_LOCATION_ENABLED)) { + table.storage.locationUri.map(_.toString) + } else { + CaseInsensitiveMap(table.storage.properties).get("path") + } } private def updateLocationInStorageProps( From b45d2b31421dd7bd8372dc8e1f7f7340d12c955b Mon Sep 17 00:00:00 2001 From: TJX2014 Date: Sun, 28 Jun 2020 15:50:08 +0800 Subject: [PATCH 2/7] UT for hive table shared location --- .../sql/hive/HiveExternalCatalogSuite.scala | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index 270595b0011e..bc355d4055e7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -202,4 +202,29 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { assert(alteredTable.provider === Some("foo")) }) } + + test("SPARK-31751: serde property `path` overwrites hive table property location") { + val catalog = newBasicCatalog() + val hiveTable = CatalogTable( + identifier = TableIdentifier("parq_alter", Some("db1")), + tableType = CatalogTableType.MANAGED, + storage = storageFormat, + schema = new StructType().add("col1", "int"), + provider = Some("orc")) + catalog.createTable(hiveTable, ignoreIfExists = false) + externalCatalog.client.runSqlHive( + "alter table db1.parq_alter rename to db1.parq_alter2") + val noFollowTable = externalCatalog.getTable("db1", "parq_alter2") + assert(!noFollowTable.storage.locationUri.toString.contains("parq_alter2")) + + val confField = classOf[HiveExternalCatalog].getDeclaredField("conf") + confField.setAccessible(true) + val sparkConf = confField.get(externalCatalog).asInstanceOf[SparkConf] + sparkConf.set("spark.sql.follow.hive.table.location", "true") + val followTable = externalCatalog.getTable("db1", "parq_alter2") + assert(followTable.storage.locationUri.toString.contains("parq_alter2")) + + sparkConf.set("spark.sql.follow.hive.table.location", "false") + confField.setAccessible(false) + } } From e51fd48afee73bb570e72cdc2ba6534a8a22809d Mon Sep 17 00:00:00 2001 From: TJX2014 Date: Mon, 29 Jun 2020 11:07:38 +0800 Subject: [PATCH 3/7] param loc from catalog module to hive module --- .../org/apache/spark/sql/internal/StaticSQLConf.scala | 7 ------- .../org/apache/spark/sql/hive/HiveExternalCatalog.scala | 2 +- .../main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 7 +++++++ .../apache/spark/sql/hive/HiveExternalCatalogSuite.scala | 6 +++--- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index 28a98dae168d..9618ff606263 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -226,11 +226,4 @@ object StaticSQLConf { .version("3.0.0") .intConf .createWithDefault(100) - - val FOLLOW_HIVE_TABLE_LOCATION_ENABLED = - buildStaticConf("spark.sql.follow.hive.table.location") - .doc("If prefer hive table location to compatible with legacy `path` in catalog") - .version("3.1.0") - .booleanConf - .createWithDefault(false) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index e75170a274e7..65f4cdc075da 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -545,7 +545,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } private def getLocationFromStorageProps(table: CatalogTable): Option[String] = { - if (conf.get(FOLLOW_HIVE_TABLE_LOCATION_ENABLED)) { + if (conf.get(HiveUtils.FOLLOW_HIVE_TABLE_LOCATION_ENABLED)) { table.storage.locationUri.map(_.toString) } else { CaseInsensitiveMap(table.storage.properties).get("path") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 04caf57efdc7..234816a64a6c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -165,6 +165,13 @@ private[spark] object HiveUtils extends Logging { .booleanConf .createWithDefault(true) + val FOLLOW_HIVE_TABLE_LOCATION_ENABLED = + buildStaticConf("spark.sql.hive.follow.table.location") + .doc("If prefer hive table location to compatible with legacy `path` in catalog") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + /** * The version of the hive client that will be used to communicate with the metastore. Note that * this does not necessarily need to be the same version of Hive that is used internally by diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index bc355d4055e7..4eb8f3b22832 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -210,7 +210,7 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { tableType = CatalogTableType.MANAGED, storage = storageFormat, schema = new StructType().add("col1", "int"), - provider = Some("orc")) + provider = Some("parquet")) catalog.createTable(hiveTable, ignoreIfExists = false) externalCatalog.client.runSqlHive( "alter table db1.parq_alter rename to db1.parq_alter2") @@ -220,11 +220,11 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { val confField = classOf[HiveExternalCatalog].getDeclaredField("conf") confField.setAccessible(true) val sparkConf = confField.get(externalCatalog).asInstanceOf[SparkConf] - sparkConf.set("spark.sql.follow.hive.table.location", "true") + sparkConf.set("spark.sql.hive.follow.table.location", "true") val followTable = externalCatalog.getTable("db1", "parq_alter2") assert(followTable.storage.locationUri.toString.contains("parq_alter2")) - sparkConf.set("spark.sql.follow.hive.table.location", "false") + sparkConf.set("spark.sql.hive.follow.table.location", "false") confField.setAccessible(false) } } From 2d5ba45725e0754fe3f45e05cf7512c962d45937 Mon Sep 17 00:00:00 2001 From: TJX2014 Date: Mon, 29 Jun 2020 11:12:43 +0800 Subject: [PATCH 4/7] param to FOLLOW_TABLE_LOCATION --- .../scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala | 2 +- .../src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 65f4cdc075da..1e137776cd8b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -545,7 +545,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } private def getLocationFromStorageProps(table: CatalogTable): Option[String] = { - if (conf.get(HiveUtils.FOLLOW_HIVE_TABLE_LOCATION_ENABLED)) { + if (conf.get(HiveUtils.FOLLOW_TABLE_LOCATION)) { table.storage.locationUri.map(_.toString) } else { CaseInsensitiveMap(table.storage.properties).get("path") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 234816a64a6c..819e99fa12be 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -165,7 +165,7 @@ private[spark] object HiveUtils extends Logging { .booleanConf .createWithDefault(true) - val FOLLOW_HIVE_TABLE_LOCATION_ENABLED = + val FOLLOW_TABLE_LOCATION = buildStaticConf("spark.sql.hive.follow.table.location") .doc("If prefer hive table location to compatible with legacy `path` in catalog") .version("3.1.0") From fc9143d9aa73372cd75e6136d129fd0430439222 Mon Sep 17 00:00:00 2001 From: TJX2014 Date: Mon, 29 Jun 2020 13:25:50 +0800 Subject: [PATCH 5/7] throw exception when location is not consistent --- .../spark/sql/hive/HiveExternalCatalog.scala | 9 +++++--- .../org/apache/spark/sql/hive/HiveUtils.scala | 7 ------ .../sql/hive/HiveExternalCatalogSuite.scala | 22 +++++++++---------- 3 files changed, 16 insertions(+), 22 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 1e137776cd8b..7db2ef6a5d7d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -545,10 +545,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } private def getLocationFromStorageProps(table: CatalogTable): Option[String] = { - if (conf.get(HiveUtils.FOLLOW_TABLE_LOCATION)) { - table.storage.locationUri.map(_.toString) + val storageLoc = table.storage.locationUri.map(_.toString) + val storageProp = CaseInsensitiveMap(table.storage.properties).get("path") + if (storageLoc.equals(storageProp)) { + storageProp } else { - CaseInsensitiveMap(table.storage.properties).get("path") + throw new AnalysisException(s"path in location ${storageLoc} " + + s"not equal to table prop path ${storageProp}, please use alter table in spark") } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 819e99fa12be..04caf57efdc7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -165,13 +165,6 @@ private[spark] object HiveUtils extends Logging { .booleanConf .createWithDefault(true) - val FOLLOW_TABLE_LOCATION = - buildStaticConf("spark.sql.hive.follow.table.location") - .doc("If prefer hive table location to compatible with legacy `path` in catalog") - .version("3.1.0") - .booleanConf - .createWithDefault(false) - /** * The version of the hive client that will be used to communicate with the metastore. Note that * this does not necessarily need to be the same version of Hive that is used internally by diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index 4eb8f3b22832..f3b6fe2d3bd4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -22,6 +22,7 @@ import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkConf +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.QueryExecutionException @@ -212,19 +213,16 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { schema = new StructType().add("col1", "int"), provider = Some("parquet")) catalog.createTable(hiveTable, ignoreIfExists = false) + val beforeAlterTable = externalCatalog.getTable("db1", "parq_alter") + assert(beforeAlterTable.storage.locationUri.toString.contains("parq_alter")) + externalCatalog.client.runSqlHive( "alter table db1.parq_alter rename to db1.parq_alter2") - val noFollowTable = externalCatalog.getTable("db1", "parq_alter2") - assert(!noFollowTable.storage.locationUri.toString.contains("parq_alter2")) - - val confField = classOf[HiveExternalCatalog].getDeclaredField("conf") - confField.setAccessible(true) - val sparkConf = confField.get(externalCatalog).asInstanceOf[SparkConf] - sparkConf.set("spark.sql.hive.follow.table.location", "true") - val followTable = externalCatalog.getTable("db1", "parq_alter2") - assert(followTable.storage.locationUri.toString.contains("parq_alter2")) - - sparkConf.set("spark.sql.hive.follow.table.location", "false") - confField.setAccessible(false) + + val e = intercept[AnalysisException]( + externalCatalog.getTable("db1", "parq_alter2") + ) + assert(e.getMessage.contains("not equal to table prop path") + && e.getMessage.contains("parq_alter2")) } } From e44c1b96963d69212916d8cc82da3ffe37a0fe75 Mon Sep 17 00:00:00 2001 From: TJX2014 Date: Wed, 1 Jul 2020 07:00:41 +0800 Subject: [PATCH 6/7] toString => CatalogUtils.URIToString(_) --- .../scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 7db2ef6a5d7d..a717a9048540 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -545,7 +545,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } private def getLocationFromStorageProps(table: CatalogTable): Option[String] = { - val storageLoc = table.storage.locationUri.map(_.toString) + val storageLoc = table.storage.locationUri.map(CatalogUtils.URIToString(_)) val storageProp = CaseInsensitiveMap(table.storage.properties).get("path") if (storageLoc.equals(storageProp)) { storageProp From 1496c669745bd6c135011ea0dabd768e31016e45 Mon Sep 17 00:00:00 2001 From: TJX2014 Date: Wed, 1 Jul 2020 15:20:52 +0800 Subject: [PATCH 7/7] compact SPARK-31061 test case and make getLocationFromStorageProps compatible with hive table --- .../org/apache/spark/sql/hive/HiveExternalCatalog.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index a717a9048540..7d893872277d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -547,8 +547,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat private def getLocationFromStorageProps(table: CatalogTable): Option[String] = { val storageLoc = table.storage.locationUri.map(CatalogUtils.URIToString(_)) val storageProp = CaseInsensitiveMap(table.storage.properties).get("path") - if (storageLoc.equals(storageProp)) { - storageProp + // storageProp == None is hive table + if (storageLoc.equals(storageProp) || storageProp == None) { + storageLoc } else { throw new AnalysisException(s"path in location ${storageLoc} " + s"not equal to table prop path ${storageProp}, please use alter table in spark")