Skip to content

Commit 2ea645c

Browse files
committed
[SPARK-20013][SQL]merge renameTable to alterTable in ExternalCatalog
1 parent 0368eb9 commit 2ea645c

File tree

6 files changed

+213
-208
lines changed

6 files changed

+213
-208
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,17 +92,18 @@ abstract class ExternalCatalog {
9292

9393
def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
9494

95-
def renameTable(db: String, oldName: String, newName: String): Unit
96-
9795
/**
9896
* Alter a table whose database and name match the ones specified in `tableDefinition`, assuming
9997
* the table exists. Note that, even though we can specify database in `tableDefinition`, it's
10098
* used to identify the table, not to alter the table's database, which is not allowed.
10199
*
100+
* If `newNameTable` is defined and its table name is not equal to the table name of
101+
* `tableDefinition`,it will just rename the table.
102+
*
102103
* Note: If the underlying implementation does not support altering a certain field,
103104
* this becomes a no-op.
104105
*/
105-
def alterTable(tableDefinition: CatalogTable): Unit
106+
def alterTable(tableDefinition: CatalogTable, newNameTable: Option[CatalogTable] = None): Unit
106107

107108
/**
108109
* Alter the schema of a table identified by the provided database and table name. The new schema

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -264,40 +264,45 @@ class InMemoryCatalog(
264264
}
265265
}
266266

267-
override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
268-
requireTableExists(db, oldName)
269-
requireTableNotExists(db, newName)
270-
val oldDesc = catalog(db).tables(oldName)
271-
oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db)))
272-
273-
if (oldDesc.table.tableType == CatalogTableType.MANAGED) {
274-
assert(oldDesc.table.storage.locationUri.isDefined,
275-
"Managed table should always have table location, as we will assign a default location " +
276-
"to it if it doesn't have one.")
277-
val oldDir = new Path(oldDesc.table.location)
278-
val newDir = new Path(new Path(catalog(db).db.locationUri), newName)
279-
try {
280-
val fs = oldDir.getFileSystem(hadoopConfig)
281-
fs.rename(oldDir, newDir)
282-
} catch {
283-
case e: IOException =>
284-
throw new SparkException(s"Unable to rename table $oldName to $newName as failed " +
285-
s"to rename its directory $oldDir", e)
286-
}
287-
oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri))
288-
}
289-
290-
catalog(db).tables.put(newName, oldDesc)
291-
catalog(db).tables.remove(oldName)
292-
}
293-
294-
override def alterTable(tableDefinition: CatalogTable): Unit = synchronized {
267+
override def alterTable(
268+
tableDefinition: CatalogTable,
269+
newNameTable: Option[CatalogTable]): Unit = synchronized {
295270
assert(tableDefinition.identifier.database.isDefined)
296271
val db = tableDefinition.identifier.database.get
297272
requireTableExists(db, tableDefinition.identifier.table)
298-
catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition
299-
}
300273

274+
if (newNameTable.isEmpty
275+
|| newNameTable.get.identifier.table == tableDefinition.identifier.table) {
276+
catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition
277+
} else {
278+
val oldName = tableDefinition.identifier.table
279+
val newName = newNameTable.get.identifier.table
280+
requireTableNotExists(db, newName)
281+
val oldDesc = catalog(db).tables(oldName)
282+
oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db)))
283+
284+
if (oldDesc.table.tableType == CatalogTableType.MANAGED) {
285+
assert(oldDesc.table.storage.locationUri.isDefined,
286+
"Managed table should always have table location, as we will assign a default location " +
287+
"to it if it doesn't have one.")
288+
val oldDir = new Path(oldDesc.table.location)
289+
val newDir = new Path(newNameTable.get.location)
290+
291+
try {
292+
val fs = oldDir.getFileSystem(hadoopConfig)
293+
fs.rename(oldDir, newDir)
294+
} catch {
295+
case e: IOException =>
296+
throw new SparkException(s"Unable to rename table $oldName to $newName" +
297+
s" as failed to rename its directory $oldDir", e)
298+
}
299+
oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri))
300+
}
301+
302+
catalog(db).tables.put(newName, oldDesc)
303+
catalog(db).tables.remove(oldName)
304+
}
305+
}
301306
override def alterTableSchema(
302307
db: String,
303308
table: String,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -577,9 +577,15 @@ class SessionCatalog(
577577
requireDbExists(db)
578578
if (oldName.database.isDefined || !tempTables.contains(oldTableName)) {
579579
requireTableExists(TableIdentifier(oldTableName, Some(db)))
580-
requireTableNotExists(TableIdentifier(newTableName, Some(db)))
580+
val newTableIdentifier = TableIdentifier(newTableName, Some(db))
581+
requireTableNotExists(newTableIdentifier)
581582
validateName(newTableName)
582-
externalCatalog.renameTable(db, oldTableName, newTableName)
583+
val tableDefinition = externalCatalog.getTable(db, oldTableName)
584+
val newTableDefinition = tableDefinition.copy(
585+
identifier = newTableIdentifier,
586+
storage =
587+
tableDefinition.storage.copy(locationUri = Some(defaultTablePath(newTableIdentifier))))
588+
externalCatalog.alterTable(tableDefinition, Some(newTableDefinition))
583589
} else {
584590
if (newName.database.isDefined) {
585591
throw new AnalysisException(

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala

Lines changed: 11 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -199,30 +199,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
199199
catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true, purge = false)
200200
}
201201

202-
test("rename table") {
203-
val catalog = newBasicCatalog()
204-
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
205-
catalog.renameTable("db2", "tbl1", "tblone")
206-
assert(catalog.listTables("db2").toSet == Set("tblone", "tbl2"))
207-
}
208-
209-
test("rename table when database/table does not exist") {
210-
val catalog = newBasicCatalog()
211-
intercept[AnalysisException] {
212-
catalog.renameTable("unknown_db", "unknown_table", "unknown_table")
213-
}
214-
intercept[AnalysisException] {
215-
catalog.renameTable("db2", "unknown_table", "unknown_table")
216-
}
217-
}
218-
219-
test("rename table when destination table already exists") {
220-
val catalog = newBasicCatalog()
221-
intercept[AnalysisException] {
222-
catalog.renameTable("db2", "tbl1", "tbl2")
223-
}
224-
}
225-
226202
test("alter table") {
227203
val catalog = newBasicCatalog()
228204
val tbl1 = catalog.getTable("db2", "tbl1")
@@ -751,18 +727,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
751727
assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", "func2"))
752728
}
753729

754-
// --------------------------------------------------------------------------
755-
// File System operations
756-
// --------------------------------------------------------------------------
757-
758-
private def exists(uri: URI, children: String*): Boolean = {
759-
val base = new Path(uri)
760-
val finalPath = children.foldLeft(base) {
761-
case (parent, child) => new Path(parent, child)
762-
}
763-
base.getFileSystem(new Configuration()).exists(finalPath)
764-
}
765-
766730
test("create/drop database should create/delete the directory") {
767731
val catalog = newBasicCatalog()
768732
val db = newDb("mydb")
@@ -773,40 +737,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
773737
assert(!exists(db.locationUri))
774738
}
775739

776-
test("create/drop/rename table should create/delete/rename the directory") {
777-
val catalog = newBasicCatalog()
778-
val db = catalog.getDatabase("db1")
779-
val table = CatalogTable(
780-
identifier = TableIdentifier("my_table", Some("db1")),
781-
tableType = CatalogTableType.MANAGED,
782-
storage = CatalogStorageFormat.empty,
783-
schema = new StructType().add("a", "int").add("b", "string"),
784-
provider = Some(defaultProvider)
785-
)
786-
787-
catalog.createTable(table, ignoreIfExists = false)
788-
assert(exists(db.locationUri, "my_table"))
789-
790-
catalog.renameTable("db1", "my_table", "your_table")
791-
assert(!exists(db.locationUri, "my_table"))
792-
assert(exists(db.locationUri, "your_table"))
793-
794-
catalog.dropTable("db1", "your_table", ignoreIfNotExists = false, purge = false)
795-
assert(!exists(db.locationUri, "your_table"))
796-
797-
val externalTable = CatalogTable(
798-
identifier = TableIdentifier("external_table", Some("db1")),
799-
tableType = CatalogTableType.EXTERNAL,
800-
storage = CatalogStorageFormat(
801-
Some(Utils.createTempDir().toURI),
802-
None, None, None, false, Map.empty),
803-
schema = new StructType().add("a", "int").add("b", "string"),
804-
provider = Some(defaultProvider)
805-
)
806-
catalog.createTable(externalTable, ignoreIfExists = false)
807-
assert(!exists(db.locationUri, "external_table"))
808-
}
809-
810740
test("create/drop/rename partitions should create/delete/rename the directory") {
811741
val catalog = newBasicCatalog()
812742
val table = CatalogTable(
@@ -995,4 +925,15 @@ abstract class CatalogTestUtils {
995925
catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet
996926
}
997927

928+
// --------------------------------------------------------------------------
929+
// File System operations
930+
// --------------------------------------------------------------------------
931+
def exists(uri: URI, children: String*): Boolean = {
932+
val base = new Path(uri)
933+
val finalPath = children.foldLeft(base) {
934+
case (parent, child) => new Path(parent, child)
935+
}
936+
base.getFileSystem(new Configuration()).exists(finalPath)
937+
}
938+
998939
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
2626
import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View}
2727
import org.apache.spark.sql.internal.SQLConf
2828
import org.apache.spark.sql.types._
29+
import org.apache.spark.util.Utils
2930

3031
class InMemorySessionCatalogSuite extends SessionCatalogSuite {
3132
protected val utils = new CatalogTestUtils {
@@ -399,6 +400,14 @@ abstract class SessionCatalogSuite extends PlanTest {
399400
}
400401
}
401402

403+
test("rename table when destination table already exists") {
404+
withBasicCatalog { catalog =>
405+
intercept[AnalysisException] {
406+
catalog.renameTable(TableIdentifier("tbl1", Some("db2")), TableIdentifier("tbl2"))
407+
}
408+
}
409+
}
410+
402411
test("rename temp table") {
403412
withBasicCatalog { catalog =>
404413
val tempTable = Range(1, 10, 2, 10)
@@ -419,6 +428,43 @@ abstract class SessionCatalogSuite extends PlanTest {
419428
}
420429
}
421430

431+
432+
test("create/drop/rename table should create/delete/rename the directory") {
433+
withBasicCatalog { catalog =>
434+
val db = catalog.externalCatalog.getDatabase("db1")
435+
val table = CatalogTable(
436+
identifier = TableIdentifier("my_table", Some("db1")),
437+
tableType = CatalogTableType.MANAGED,
438+
storage = CatalogStorageFormat.empty,
439+
schema = new StructType().add("a", "int").add("b", "string"),
440+
provider = Some(defaultProvider)
441+
)
442+
443+
catalog.createTable(table, ignoreIfExists = false)
444+
assert(exists(db.locationUri, "my_table"))
445+
446+
catalog.renameTable(
447+
TableIdentifier("my_table", Some("db1")), TableIdentifier("your_table", Some("db1")))
448+
assert(!exists(db.locationUri, "my_table"))
449+
assert(exists(db.locationUri, "your_table"))
450+
451+
catalog.externalCatalog.dropTable("db1", "your_table", ignoreIfNotExists = false, purge = false)
452+
assert(!exists(db.locationUri, "your_table"))
453+
454+
val externalTable = CatalogTable(
455+
identifier = TableIdentifier("external_table", Some("db1")),
456+
tableType = CatalogTableType.EXTERNAL,
457+
storage = CatalogStorageFormat(
458+
Some(Utils.createTempDir().toURI),
459+
None, None, None, false, Map.empty),
460+
schema = new StructType().add("a", "int").add("b", "string"),
461+
provider = Some(defaultProvider)
462+
)
463+
catalog.createTable(externalTable, ignoreIfExists = false)
464+
assert(!exists(db.locationUri, "external_table"))
465+
}
466+
}
467+
422468
test("alter table") {
423469
withBasicCatalog { catalog =>
424470
val tbl1 = catalog.externalCatalog.getTable("db2", "tbl1")

0 commit comments

Comments
 (0)