From 2ea645c6efef36ad2833d8eced0150d4062b1efd Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 21 Apr 2017 22:52:30 +0800 Subject: [PATCH] [SPARK-20013][SQL]merge renameTable to alterTable in ExternalCatalog --- .../catalyst/catalog/ExternalCatalog.scala | 7 +- .../catalyst/catalog/InMemoryCatalog.scala | 65 +++--- .../sql/catalyst/catalog/SessionCatalog.scala | 10 +- .../catalog/ExternalCatalogSuite.scala | 81 +------ .../catalog/SessionCatalogSuite.scala | 46 ++++ .../spark/sql/hive/HiveExternalCatalog.scala | 212 +++++++++--------- 6 files changed, 213 insertions(+), 208 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 08a01e8601897..becd09f31b02e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -92,17 +92,18 @@ abstract class ExternalCatalog { def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit - def renameTable(db: String, oldName: String, newName: String): Unit - /** * Alter a table whose database and name match the ones specified in `tableDefinition`, assuming * the table exists. Note that, even though we can specify database in `tableDefinition`, it's * used to identify the table, not to alter the table's database, which is not allowed. * + * If `newNameTable` is defined and its table name is not equal to the table name of + * `tableDefinition`,it will just rename the table. + * * Note: If the underlying implementation does not support altering a certain field, * this becomes a no-op. */ - def alterTable(tableDefinition: CatalogTable): Unit + def alterTable(tableDefinition: CatalogTable, newNameTable: Option[CatalogTable] = None): Unit /** * Alter the schema of a table identified by the provided database and table name. The new schema diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 9ca1c71d1dcb1..01d08a2ddfb40 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -264,40 +264,45 @@ class InMemoryCatalog( } } - override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized { - requireTableExists(db, oldName) - requireTableNotExists(db, newName) - val oldDesc = catalog(db).tables(oldName) - oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db))) - - if (oldDesc.table.tableType == CatalogTableType.MANAGED) { - assert(oldDesc.table.storage.locationUri.isDefined, - "Managed table should always have table location, as we will assign a default location " + - "to it if it doesn't have one.") - val oldDir = new Path(oldDesc.table.location) - val newDir = new Path(new Path(catalog(db).db.locationUri), newName) - try { - val fs = oldDir.getFileSystem(hadoopConfig) - fs.rename(oldDir, newDir) - } catch { - case e: IOException => - throw new SparkException(s"Unable to rename table $oldName to $newName as failed " + - s"to rename its directory $oldDir", e) - } - oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri)) - } - - catalog(db).tables.put(newName, oldDesc) - catalog(db).tables.remove(oldName) - } - - override def alterTable(tableDefinition: CatalogTable): Unit = synchronized { + override def alterTable( + tableDefinition: CatalogTable, + newNameTable: Option[CatalogTable]): Unit = synchronized { assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) - catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition - } + if (newNameTable.isEmpty + || newNameTable.get.identifier.table == tableDefinition.identifier.table) { + catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition + } else { + val oldName = tableDefinition.identifier.table + val newName = newNameTable.get.identifier.table + requireTableNotExists(db, newName) + val oldDesc = catalog(db).tables(oldName) + oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db))) + + if (oldDesc.table.tableType == CatalogTableType.MANAGED) { + assert(oldDesc.table.storage.locationUri.isDefined, + "Managed table should always have table location, as we will assign a default location " + + "to it if it doesn't have one.") + val oldDir = new Path(oldDesc.table.location) + val newDir = new Path(newNameTable.get.location) + + try { + val fs = oldDir.getFileSystem(hadoopConfig) + fs.rename(oldDir, newDir) + } catch { + case e: IOException => + throw new SparkException(s"Unable to rename table $oldName to $newName" + + s" as failed to rename its directory $oldDir", e) + } + oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri)) + } + + catalog(db).tables.put(newName, oldDesc) + catalog(db).tables.remove(oldName) + } + } override def alterTableSchema( db: String, table: String, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 6c6d600190b66..dfe1517625765 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -577,9 +577,15 @@ class SessionCatalog( requireDbExists(db) if (oldName.database.isDefined || !tempTables.contains(oldTableName)) { requireTableExists(TableIdentifier(oldTableName, Some(db))) - requireTableNotExists(TableIdentifier(newTableName, Some(db))) + val newTableIdentifier = TableIdentifier(newTableName, Some(db)) + requireTableNotExists(newTableIdentifier) validateName(newTableName) - externalCatalog.renameTable(db, oldTableName, newTableName) + val tableDefinition = externalCatalog.getTable(db, oldTableName) + val newTableDefinition = tableDefinition.copy( + identifier = newTableIdentifier, + storage = + tableDefinition.storage.copy(locationUri = Some(defaultTablePath(newTableIdentifier)))) + externalCatalog.alterTable(tableDefinition, Some(newTableDefinition)) } else { if (newName.database.isDefined) { throw new AnalysisException( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 42db4398e5072..fee7df88d7735 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -199,30 +199,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true, purge = false) } - test("rename table") { - val catalog = newBasicCatalog() - assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) - catalog.renameTable("db2", "tbl1", "tblone") - assert(catalog.listTables("db2").toSet == Set("tblone", "tbl2")) - } - - test("rename table when database/table does not exist") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { - catalog.renameTable("unknown_db", "unknown_table", "unknown_table") - } - intercept[AnalysisException] { - catalog.renameTable("db2", "unknown_table", "unknown_table") - } - } - - test("rename table when destination table already exists") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { - catalog.renameTable("db2", "tbl1", "tbl2") - } - } - test("alter table") { val catalog = newBasicCatalog() val tbl1 = catalog.getTable("db2", "tbl1") @@ -751,18 +727,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", "func2")) } - // -------------------------------------------------------------------------- - // File System operations - // -------------------------------------------------------------------------- - - private def exists(uri: URI, children: String*): Boolean = { - val base = new Path(uri) - val finalPath = children.foldLeft(base) { - case (parent, child) => new Path(parent, child) - } - base.getFileSystem(new Configuration()).exists(finalPath) - } - test("create/drop database should create/delete the directory") { val catalog = newBasicCatalog() val db = newDb("mydb") @@ -773,40 +737,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(!exists(db.locationUri)) } - test("create/drop/rename table should create/delete/rename the directory") { - val catalog = newBasicCatalog() - val db = catalog.getDatabase("db1") - val table = CatalogTable( - identifier = TableIdentifier("my_table", Some("db1")), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty, - schema = new StructType().add("a", "int").add("b", "string"), - provider = Some(defaultProvider) - ) - - catalog.createTable(table, ignoreIfExists = false) - assert(exists(db.locationUri, "my_table")) - - catalog.renameTable("db1", "my_table", "your_table") - assert(!exists(db.locationUri, "my_table")) - assert(exists(db.locationUri, "your_table")) - - catalog.dropTable("db1", "your_table", ignoreIfNotExists = false, purge = false) - assert(!exists(db.locationUri, "your_table")) - - val externalTable = CatalogTable( - identifier = TableIdentifier("external_table", Some("db1")), - tableType = CatalogTableType.EXTERNAL, - storage = CatalogStorageFormat( - Some(Utils.createTempDir().toURI), - None, None, None, false, Map.empty), - schema = new StructType().add("a", "int").add("b", "string"), - provider = Some(defaultProvider) - ) - catalog.createTable(externalTable, ignoreIfExists = false) - assert(!exists(db.locationUri, "external_table")) - } - test("create/drop/rename partitions should create/delete/rename the directory") { val catalog = newBasicCatalog() val table = CatalogTable( @@ -995,4 +925,15 @@ abstract class CatalogTestUtils { catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet } + // -------------------------------------------------------------------------- + // File System operations + // -------------------------------------------------------------------------- + def exists(uri: URI, children: String*): Boolean = { + val base = new Path(uri) + val finalPath = children.foldLeft(base) { + case (parent, child) => new Path(parent, child) + } + base.getFileSystem(new Configuration()).exists(finalPath) + } + } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index be8903000a0d1..1b9b00fcb82be 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils class InMemorySessionCatalogSuite extends SessionCatalogSuite { protected val utils = new CatalogTestUtils { @@ -399,6 +400,14 @@ abstract class SessionCatalogSuite extends PlanTest { } } + test("rename table when destination table already exists") { + withBasicCatalog { catalog => + intercept[AnalysisException] { + catalog.renameTable(TableIdentifier("tbl1", Some("db2")), TableIdentifier("tbl2")) + } + } + } + test("rename temp table") { withBasicCatalog { catalog => val tempTable = Range(1, 10, 2, 10) @@ -419,6 +428,43 @@ abstract class SessionCatalogSuite extends PlanTest { } } + + test("create/drop/rename table should create/delete/rename the directory") { + withBasicCatalog { catalog => + val db = catalog.externalCatalog.getDatabase("db1") + val table = CatalogTable( + identifier = TableIdentifier("my_table", Some("db1")), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = new StructType().add("a", "int").add("b", "string"), + provider = Some(defaultProvider) + ) + + catalog.createTable(table, ignoreIfExists = false) + assert(exists(db.locationUri, "my_table")) + + catalog.renameTable( + TableIdentifier("my_table", Some("db1")), TableIdentifier("your_table", Some("db1"))) + assert(!exists(db.locationUri, "my_table")) + assert(exists(db.locationUri, "your_table")) + + catalog.externalCatalog.dropTable("db1", "your_table", ignoreIfNotExists = false, purge = false) + assert(!exists(db.locationUri, "your_table")) + + val externalTable = CatalogTable( + identifier = TableIdentifier("external_table", Some("db1")), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat( + Some(Utils.createTempDir().toURI), + None, None, None, false, Map.empty), + schema = new StructType().add("a", "int").add("b", "string"), + provider = Some(defaultProvider) + ) + catalog.createTable(externalTable, ignoreIfExists = false) + assert(!exists(db.locationUri, "external_table")) + } + } + test("alter table") { withBasicCatalog { catalog => val tbl1 = catalog.externalCatalog.getTable("db2", "tbl1") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 8b0fdf49cefab..62a57037119b0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -465,31 +465,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.dropTable(db, table, ignoreIfNotExists, purge) } - override def renameTable(db: String, oldName: String, newName: String): Unit = withClient { - val rawTable = getRawTable(db, oldName) - - // Note that Hive serde tables don't use path option in storage properties to store the value - // of table location, but use `locationUri` field to store it directly. And `locationUri` field - // will be updated automatically in Hive metastore by the `alterTable` call at the end of this - // method. Here we only update the path option if the path option already exists in storage - // properties, to avoid adding a unnecessary path option for Hive serde tables. - val hasPathOption = CaseInsensitiveMap(rawTable.storage.properties).contains("path") - val storageWithNewPath = if (rawTable.tableType == MANAGED && hasPathOption) { - // If it's a managed table with path option and we are renaming it, then the path option - // becomes inaccurate and we need to update it according to the new table name. - val newTablePath = defaultTablePath(TableIdentifier(newName, Some(db))) - updateLocationInStorageProps(rawTable, Some(newTablePath)) - } else { - rawTable.storage - } - - val newTable = rawTable.copy( - identifier = TableIdentifier(newName, Some(db)), - storage = storageWithNewPath) - - client.alterTable(oldName, newTable) - } - private def getLocationFromStorageProps(table: CatalogTable): Option[String] = { CaseInsensitiveMap(table.storage.properties).get("path") } @@ -512,98 +487,129 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat * Note: As of now, this doesn't support altering table schema, partition column names and bucket * specification. We will ignore them even if users do specify different values for these fields. */ - override def alterTable(tableDefinition: CatalogTable): Unit = withClient { + override def alterTable( + tableDefinition: CatalogTable, + newNameTable: Option[CatalogTable]): Unit = withClient { assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) - verifyTableProperties(tableDefinition) - // convert table statistics to properties so that we can persist them through hive api - val withStatsProps = if (tableDefinition.stats.isDefined) { - val stats = tableDefinition.stats.get - var statsProperties: Map[String, String] = - Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString()) - if (stats.rowCount.isDefined) { - statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString() - } - val colNameTypeMap: Map[String, DataType] = - tableDefinition.schema.fields.map(f => (f.name, f.dataType)).toMap - stats.colStats.foreach { case (colName, colStat) => - colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) => - statsProperties += (columnStatKeyPropName(colName, k) -> v) + if (newNameTable.isEmpty + || newNameTable.get.identifier.table == tableDefinition.identifier.table) { + verifyTableProperties(tableDefinition) + // convert table statistics to properties so that we can persist them through hive api + val withStatsProps = if (tableDefinition.stats.isDefined) { + val stats = tableDefinition.stats.get + var statsProperties: Map[String, String] = + Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString()) + if (stats.rowCount.isDefined) { + statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString() + } + val colNameTypeMap: Map[String, DataType] = + tableDefinition.schema.fields.map(f => (f.name, f.dataType)).toMap + stats.colStats.foreach { case (colName, colStat) => + colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) => + statsProperties += (columnStatKeyPropName(colName, k) -> v) + } } + tableDefinition.copy(properties = tableDefinition.properties ++ statsProperties) + } else { + tableDefinition } - tableDefinition.copy(properties = tableDefinition.properties ++ statsProperties) - } else { - tableDefinition - } - - if (tableDefinition.tableType == VIEW) { - client.alterTable(withStatsProps) - } else { - val oldTableDef = getRawTable(db, withStatsProps.identifier.table) - val newStorage = if (DDLUtils.isHiveTable(tableDefinition)) { - tableDefinition.storage + if (tableDefinition.tableType == VIEW) { + client.alterTable(withStatsProps) } else { - // We can't alter the table storage of data source table directly for 2 reasons: - // 1. internally we use path option in storage properties to store the value of table - // location, but the given `tableDefinition` is from outside and doesn't have the path - // option, we need to add it manually. - // 2. this data source table may be created on a file, not a directory, then we can't set - // the `locationUri` field and save it to Hive metastore, because Hive only allows - // directory as table location. - // - // For example, an external data source table is created with a single file '/path/to/file'. - // Internally, we will add a path option with value '/path/to/file' to storage properties, - // and set the `locationUri` to a special value due to SPARK-15269(please see - // `saveTableIntoHive` for more details). When users try to get the table metadata back, we - // will restore the `locationUri` field from the path option and remove the path option from - // storage properties. When users try to alter the table storage, the given - // `tableDefinition` will have `locationUri` field with value `/path/to/file` and the path - // option is not set. - // - // Here we need 2 extra steps: - // 1. add path option to storage properties, to match the internal format, i.e. using path - // option to store the value of table location. - // 2. set the `locationUri` field back to the old one from the existing table metadata, - // if users don't want to alter the table location. This step is necessary as the - // `locationUri` is not always same with the path option, e.g. in the above example - // `locationUri` is a special value and we should respect it. Note that, if users - // want to alter the table location to a file path, we will fail. This should be fixed - // in the future. - - val newLocation = tableDefinition.storage.locationUri.map(CatalogUtils.URIToString(_)) - val storageWithPathOption = tableDefinition.storage.copy( - properties = tableDefinition.storage.properties ++ newLocation.map("path" -> _)) - - val oldLocation = getLocationFromStorageProps(oldTableDef) - if (oldLocation == newLocation) { - storageWithPathOption.copy(locationUri = oldTableDef.storage.locationUri) + val oldTableDef = getRawTable(db, withStatsProps.identifier.table) + + val newStorage = if (DDLUtils.isHiveTable(tableDefinition)) { + tableDefinition.storage } else { - storageWithPathOption + // We can't alter the table storage of data source table directly for 2 reasons: + // 1. internally we use path option in storage properties to store the value of + // table location, but the given `tableDefinition` is from outside and doesn't + // have the path option, we need to add it manually. + // 2. this data source table may be created on a file, not a directory, then we + // can't set the `locationUri` field and save it to Hive metastore, because Hive + // only allows directory as table location. + // + // For example, an external data source table is created with a single file + // '/path/to/file'.Internally, we will add a path option with value '/path/to/file' to + // storage properties,and set the `locationUri` to a special value due to SPARK-15269 + // (please see `saveTableIntoHive` for more details). When users try to get the table + // metadata back, we will restore the `locationUri` field from the path option and remove + // the path option from storage properties. When users try to alter the table storage, the + // given `tableDefinition` will have `locationUri` field with value `/path/to/file` and + // the path option is not set. + // + // Here we need 2 extra steps: + // 1. add path option to storage properties, to match the internal format, i.e. using + // path option to store the value of table location. + // 2. set the `locationUri` field back to the old one from the existing table metadata, + // if users don't want to alter the table location. This step is necessary as the + // `locationUri` is not always same with the path option, e.g. in the above example + // `locationUri` is a special value and we should respect it. Note that, if users + // want to alter the table location to a file path, we will fail. This should be + // fixed in the future. + + val newLocation = tableDefinition.storage.locationUri.map(CatalogUtils.URIToString(_)) + val storageWithPathOption = tableDefinition.storage.copy( + properties = tableDefinition.storage.properties ++ newLocation.map("path" -> _)) + + val oldLocation = getLocationFromStorageProps(oldTableDef) + if (oldLocation == newLocation) { + storageWithPathOption.copy(locationUri = oldTableDef.storage.locationUri) + } else { + storageWithPathOption + } } - } - val partitionProviderProp = if (tableDefinition.tracksPartitionsInCatalog) { - TABLE_PARTITION_PROVIDER -> TABLE_PARTITION_PROVIDER_CATALOG + val partitionProviderProp = if (tableDefinition.tracksPartitionsInCatalog) { + TABLE_PARTITION_PROVIDER -> TABLE_PARTITION_PROVIDER_CATALOG + } else { + TABLE_PARTITION_PROVIDER -> TABLE_PARTITION_PROVIDER_FILESYSTEM + } + + // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table + // definition, to retain the spark specific format if it is. Also add old data source + // properties to table properties, to retain the data source table format. + val oldDataSourceProps = oldTableDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX)) + val newTableProps = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp + val newDef = withStatsProps.copy( + storage = newStorage, + schema = oldTableDef.schema, + partitionColumnNames = oldTableDef.partitionColumnNames, + bucketSpec = oldTableDef.bucketSpec, + properties = newTableProps) + + client.alterTable(newDef) + } + } else { + val oldName = tableDefinition.identifier.table + val newTableIdentifier = TableIdentifier(newNameTable.get.identifier.table, Some(db)) + + val rawTable = getRawTable(db, oldName) + + // Note that Hive serde tables don't use path option in storage properties to store the value + // of table location, but use `locationUri` field to store it directly. And `locationUri` + // field will be updated automatically in Hive metastore by the `alterTable` call at the end + // of this method. Here we only update the path option if the path option already exists in + // storage properties, to avoid adding a unnecessary path option for Hive serde tables. + val hasPathOption = CaseInsensitiveMap(rawTable.storage.properties).contains("path") + val storageWithNewPath = if (rawTable.tableType == MANAGED && hasPathOption) { + // If it's a managed table with path option and we are renaming it, then the path option + // becomes inaccurate and we need to update it according to the new table name. + val newTablePath = CatalogUtils.URIToString(newNameTable.get.location) + updateLocationInStorageProps(rawTable, Some(newTablePath)) } else { - TABLE_PARTITION_PROVIDER -> TABLE_PARTITION_PROVIDER_FILESYSTEM + rawTable.storage } - // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, - // to retain the spark specific format if it is. Also add old data source properties to table - // properties, to retain the data source table format. - val oldDataSourceProps = oldTableDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX)) - val newTableProps = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp - val newDef = withStatsProps.copy( - storage = newStorage, - schema = oldTableDef.schema, - partitionColumnNames = oldTableDef.partitionColumnNames, - bucketSpec = oldTableDef.bucketSpec, - properties = newTableProps) - - client.alterTable(newDef) + val newTable = rawTable.copy( + identifier = newTableIdentifier, + storage = storageWithNewPath) + + client.alterTable(oldName, newTable) } }