diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 08a01e8601897..b1fcca9752d5a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -92,7 +92,11 @@ abstract class ExternalCatalog { def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit - def renameTable(db: String, oldName: String, newName: String): Unit + def renameTable( + db: String, + oldName: String, + newName: String, + newTablePath: Option[String]): Unit /** * Alter a table whose database and name match the ones specified in `tableDefinition`, assuming diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index cdf618aef97c3..4bc899b45dc17 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -264,7 +264,11 @@ class InMemoryCatalog( } } - override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized { + override def renameTable( + db: String, + oldName: String, + newName: String, + newTablePath: Option[String]): Unit = synchronized { requireTableExists(db, oldName) requireTableNotExists(db, newName) val oldDesc = catalog(db).tables(oldName) @@ -275,7 +279,9 @@ class InMemoryCatalog( "Managed table should always have table location, as we will assign a default location " + "to it if it doesn't have one.") val oldDir = new Path(oldDesc.table.location) - val newDir = new Path(new Path(catalog(db).db.locationUri), newName) + + require(newTablePath.isDefined) + val newDir = new Path(newTablePath.get) try { val fs = oldDir.getFileSystem(hadoopConfig) fs.rename(oldDir, newDir) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 25aa8d3ba921f..04e0aadf04b54 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -519,9 +519,13 @@ class SessionCatalog( requireDbExists(db) if (oldName.database.isDefined || !tempTables.contains(oldTableName)) { requireTableExists(TableIdentifier(oldTableName, Some(db))) - requireTableNotExists(TableIdentifier(newTableName, Some(db))) + + val newTableIdentifier = TableIdentifier(newTableName, Some(db)) + requireTableNotExists(newTableIdentifier) validateName(newTableName) - externalCatalog.renameTable(db, oldTableName, newTableName) + + val newTablePath = CatalogUtils.URIToString(defaultTablePath(newTableIdentifier)) + externalCatalog.renameTable(db, oldTableName, newTableName, Some(newTablePath)) } else { if (newName.database.isDefined) { throw new AnalysisException( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 7820f39d96426..493663eded139 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -196,30 +196,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true, purge = false) } - test("rename table") { - val catalog = newBasicCatalog() - assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) - catalog.renameTable("db2", "tbl1", "tblone") - assert(catalog.listTables("db2").toSet == Set("tblone", "tbl2")) - } - - test("rename table when database/table does not exist") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { - catalog.renameTable("unknown_db", "unknown_table", "unknown_table") - } - intercept[AnalysisException] { - catalog.renameTable("db2", "unknown_table", "unknown_table") - } - } - - test("rename table when destination table already exists") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { - catalog.renameTable("db2", "tbl1", "tbl2") - } - } - test("alter table") { val catalog = newBasicCatalog() val tbl1 = catalog.getTable("db2", "tbl1") @@ -710,18 +686,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", "func2")) } - // -------------------------------------------------------------------------- - // File System operations - // -------------------------------------------------------------------------- - - private def exists(uri: URI, children: String*): Boolean = { - val base = new Path(uri) - val finalPath = children.foldLeft(base) { - case (parent, child) => new Path(parent, child) - } - base.getFileSystem(new Configuration()).exists(finalPath) - } - test("create/drop database should create/delete the directory") { val catalog = newBasicCatalog() val db = newDb("mydb") @@ -732,40 +696,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(!exists(db.locationUri)) } - test("create/drop/rename table should create/delete/rename the directory") { - val catalog = newBasicCatalog() - val db = catalog.getDatabase("db1") - val table = CatalogTable( - identifier = TableIdentifier("my_table", Some("db1")), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty, - schema = new StructType().add("a", "int").add("b", "string"), - provider = Some(defaultProvider) - ) - - catalog.createTable(table, ignoreIfExists = false) - assert(exists(db.locationUri, "my_table")) - - catalog.renameTable("db1", "my_table", "your_table") - assert(!exists(db.locationUri, "my_table")) - assert(exists(db.locationUri, "your_table")) - - catalog.dropTable("db1", "your_table", ignoreIfNotExists = false, purge = false) - assert(!exists(db.locationUri, "your_table")) - - val externalTable = CatalogTable( - identifier = TableIdentifier("external_table", Some("db1")), - tableType = CatalogTableType.EXTERNAL, - storage = CatalogStorageFormat( - Some(Utils.createTempDir().toURI), - None, None, None, false, Map.empty), - schema = new StructType().add("a", "int").add("b", "string"), - provider = Some(defaultProvider) - ) - catalog.createTable(externalTable, ignoreIfExists = false) - assert(!exists(db.locationUri, "external_table")) - } - test("create/drop/rename partitions should create/delete/rename the directory") { val catalog = newBasicCatalog() val table = CatalogTable( @@ -954,4 +884,14 @@ abstract class CatalogTestUtils { catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet } + // -------------------------------------------------------------------------- + // File System operations + // -------------------------------------------------------------------------- + def exists(uri: URI, children: String*): Boolean = { + val base = new Path(uri) + val finalPath = children.foldLeft(base) { + case (parent, child) => new Path(parent, child) + } + base.getFileSystem(new Configuration()).exists(finalPath) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index bb87763e0bbb0..b31ec1c538f7c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -26,6 +26,8 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils class InMemorySessionCatalogSuite extends SessionCatalogSuite { protected val utils = new CatalogTestUtils { @@ -421,6 +423,15 @@ abstract class SessionCatalogSuite extends PlanTest { } } + test("rename table when destination table already exists") { + withBasicCatalog { catalog => + intercept[AnalysisException] { + catalog.renameTable( + TableIdentifier("tbl1", Some("db2")), TableIdentifier("tbl2", Some("db2"))) + } + } + } + test("alter table") { withBasicCatalog { catalog => val tbl1 = catalog.externalCatalog.getTable("db2", "tbl1") @@ -1381,4 +1392,41 @@ abstract class SessionCatalogSuite extends PlanTest { } } } + + test("create/drop/rename table should create/delete/rename the directory") { + withBasicCatalog { catalog => + val db = catalog.externalCatalog.getDatabase("db1") + val table = CatalogTable( + identifier = TableIdentifier("my_table", Some("db1")), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = new StructType().add("a", "int").add("b", "string"), + provider = Some(defaultProvider) + ) + + catalog.createTable(table, ignoreIfExists = false) + assert(exists(db.locationUri, "my_table")) + + catalog.renameTable( + TableIdentifier("my_table", Some("db1")), TableIdentifier("your_table", Some("db1"))) + assert(!exists(db.locationUri, "my_table")) + assert(exists(db.locationUri, "your_table")) + + catalog.externalCatalog.dropTable( + "db1", "your_table", ignoreIfNotExists = false, purge = false) + assert(!exists(db.locationUri, "your_table")) + + val externalTable = CatalogTable( + identifier = TableIdentifier("external_table", Some("db1")), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat( + Some(Utils.createTempDir().toURI), + None, None, None, false, Map.empty), + schema = new StructType().add("a", "int").add("b", "string"), + provider = Some(defaultProvider) + ) + catalog.createTable(externalTable, ignoreIfExists = false) + assert(!exists(db.locationUri, "external_table")) + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 33b21be37203b..bd1f12464ac21 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -464,7 +464,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.dropTable(db, table, ignoreIfNotExists, purge) } - override def renameTable(db: String, oldName: String, newName: String): Unit = withClient { + override def renameTable( + db: String, + oldName: String, + newName: String, + newTablePath: Option[String]): Unit = withClient { val rawTable = getRawTable(db, oldName) // Note that Hive serde tables don't use path option in storage properties to store the value @@ -476,8 +480,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val storageWithNewPath = if (rawTable.tableType == MANAGED && hasPathOption) { // If it's a managed table with path option and we are renaming it, then the path option // becomes inaccurate and we need to update it according to the new table name. - val newTablePath = defaultTablePath(TableIdentifier(newName, Some(db))) - updateLocationInStorageProps(rawTable, Some(newTablePath)) + require(newTablePath.isDefined) + updateLocationInStorageProps(rawTable, newTablePath) } else { rawTable.storage }