Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,24 @@ Table alterTable(
* @return true if a table was deleted, false if no table exists for the identifier
*/
boolean dropTable(Identifier ident);

/**
* Renames a table in the catalog.
* <p>
* If the catalog supports views and contains a view for the old identifier and not a table, this
* throws {@link NoSuchTableException}. Additionally, if the new identifier is a table or a view,
* this throws {@link TableAlreadyExistsException}.
* <p>
* If the catalog does not support table renames between namespaces, it throws
* {@link UnsupportedOperationException}.
*
* @param oldIdent the table identifier of the existing table to rename
* @param newIdent the new table identifier of the table
* @throws NoSuchTableException If the table to rename doesn't exist or is a view
* @throws TableAlreadyExistsException If the new table name already exists or is a view
* @throws UnsupportedOperationException If the namespaces of old and new identiers do not
* match (optional)
*/
void renameTable(Identifier oldIdent, Identifier newIdent)
throws NoSuchTableException, TableAlreadyExistsException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class TableCatalogSuite extends SparkFunSuite {

private val testNs = Array("`", ".")
private val testIdent = Identifier.of(testNs, "test_table")
private val testIdentNew = Identifier.of(testNs, "test_table_new")

test("Catalogs can load the catalog") {
val catalog = newCatalog()
Expand Down Expand Up @@ -656,6 +657,52 @@ class TableCatalogSuite extends SparkFunSuite {
assert(!catalog.tableExists(testIdent))
}

test("renameTable") {
val catalog = newCatalog()

assert(!catalog.tableExists(testIdent))
assert(!catalog.tableExists(testIdentNew))

catalog.createTable(testIdent, schema, Array.empty, emptyProps)

assert(catalog.tableExists(testIdent))
catalog.renameTable(testIdent, testIdentNew)

assert(!catalog.tableExists(testIdent))
assert(catalog.tableExists(testIdentNew))
}

test("renameTable: fail if table does not exist") {
val catalog = newCatalog()

val exc = intercept[NoSuchTableException] {
catalog.renameTable(testIdent, testIdentNew)
}

assert(exc.message.contains(testIdent.quoted))
assert(exc.message.contains("not found"))
}

test("renameTable: fail if new table name already exists") {
val catalog = newCatalog()

assert(!catalog.tableExists(testIdent))
assert(!catalog.tableExists(testIdentNew))

catalog.createTable(testIdent, schema, Array.empty, emptyProps)
catalog.createTable(testIdentNew, schema, Array.empty, emptyProps)

assert(catalog.tableExists(testIdent))
assert(catalog.tableExists(testIdentNew))

val exc = intercept[TableAlreadyExistsException] {
catalog.renameTable(testIdent, testIdentNew)
}

assert(exc.message.contains(testIdentNew.quoted))
assert(exc.message.contains("already exists"))
}

test("listNamespaces: list namespaces from metadata") {
val catalog = newCatalog()
catalog.createNamespace(Array("ns1"), Map("property" -> "value").asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,19 @@ class TestTableCatalog extends TableCatalog with SupportsNamespaces {

override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined

override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
if (tables.containsKey(newIdent)) {
throw new TableAlreadyExistsException(newIdent)
}

Option(tables.remove(oldIdent)) match {
case Some(table) =>
tables.put(newIdent, InMemoryTable(table.name, table.schema, table.properties))
case _ =>
throw new NoSuchTableException(oldIdent)
}
}

private def allNamespaces: Seq[Seq[String]] = {
(tables.keySet.asScala.map(_.namespace.toSeq) ++ namespaces.keySet.asScala).toSeq.distinct
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,16 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog {
}
}

override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add test cases for v2 session catalog?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it'd be nice to see tests for renaming across namespaces (failing), etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added the tests.

if (tableExists(newIdent)) {
throw new TableAlreadyExistsException(newIdent)
}

// Load table to make sure the table exists
loadTable(oldIdent)
catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier)
}

implicit class TableIdentifierHelper(ident: Identifier) {
def asTableIdentifier: TableIdentifier = {
ident.namespace match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalog.v2.{Catalogs, Identifier, TableCatalog, TableChange}
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
Expand All @@ -45,20 +46,23 @@ class V2SessionCatalogSuite
override protected def beforeAll(): Unit = {
super.beforeAll()
spark.sql("""CREATE DATABASE IF NOT EXISTS db""")
spark.sql("""CREATE DATABASE IF NOT EXISTS db2""")
spark.sql("""CREATE DATABASE IF NOT EXISTS ns""")
spark.sql("""CREATE DATABASE IF NOT EXISTS ns2""")
}

override protected def afterAll(): Unit = {
spark.sql("""DROP TABLE IF EXISTS db.test_table""")
spark.sql("""DROP DATABASE IF EXISTS db""")
spark.sql("""DROP DATABASE IF EXISTS db2""")
spark.sql("""DROP DATABASE IF EXISTS ns""")
spark.sql("""DROP DATABASE IF EXISTS ns2""")
super.afterAll()
}

after {
newCatalog().dropTable(testIdent)
newCatalog().dropTable(testIdentNew)
}

private def newCatalog(): TableCatalog = {
Expand All @@ -67,7 +71,9 @@ class V2SessionCatalogSuite
newCatalog
}

private val testIdent = Identifier.of(Array("db"), "test_table")
private val testNs = Array("db")
private val testIdent = Identifier.of(testNs, "test_table")
private val testIdentNew = Identifier.of(testNs, "test_table_new")

test("Catalogs can load the catalog") {
val catalog = newCatalog()
Expand Down Expand Up @@ -680,4 +686,70 @@ class V2SessionCatalogSuite
assert(!wasDropped)
assert(!catalog.tableExists(testIdent))
}

test("renameTable") {
val catalog = newCatalog()

assert(!catalog.tableExists(testIdent))
assert(!catalog.tableExists(testIdentNew))

catalog.createTable(testIdent, schema, Array.empty, emptyProps)

assert(catalog.tableExists(testIdent))
catalog.renameTable(testIdent, testIdentNew)

assert(!catalog.tableExists(testIdent))
assert(catalog.tableExists(testIdentNew))
}

test("renameTable: fail if table does not exist") {
val catalog = newCatalog()

val exc = intercept[NoSuchTableException] {
catalog.renameTable(testIdent, testIdentNew)
}

assert(exc.message.contains(testIdent.quoted))
assert(exc.message.contains("not found"))
}

test("renameTable: fail if new table name already exists") {
val catalog = newCatalog()

assert(!catalog.tableExists(testIdent))
assert(!catalog.tableExists(testIdentNew))

catalog.createTable(testIdent, schema, Array.empty, emptyProps)
catalog.createTable(testIdentNew, schema, Array.empty, emptyProps)

assert(catalog.tableExists(testIdent))
assert(catalog.tableExists(testIdentNew))

val exc = intercept[TableAlreadyExistsException] {
catalog.renameTable(testIdent, testIdentNew)
}

assert(exc.message.contains(testIdentNew.quoted))
assert(exc.message.contains("already exists"))
}

test("renameTable: fail if db does not match for old and new table names") {
val catalog = newCatalog()
val testIdentNewOtherDb = Identifier.of(Array("db2"), "test_table_new")

assert(!catalog.tableExists(testIdent))
assert(!catalog.tableExists(testIdentNewOtherDb))

catalog.createTable(testIdent, schema, Array.empty, emptyProps)

assert(catalog.tableExists(testIdent))

val exc = intercept[AnalysisException] {
catalog.renameTable(testIdent, testIdentNewOtherDb)
}

assert(exc.message.contains(testIdent.namespace.quoted))
assert(exc.message.contains(testIdentNewOtherDb.namespace.quoted))
assert(exc.message.contains("RENAME TABLE source and destination databases do not match"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,20 @@ class TestInMemoryTableCatalog extends TableCatalog {
Option(tables.remove(ident)).isDefined
}

override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
if (tables.containsKey(newIdent)) {
throw new TableAlreadyExistsException(newIdent)
}

Option(tables.remove(oldIdent)) match {
case Some(table) =>
tables.put(newIdent,
new InMemoryTable(table.name, table.schema, table.partitioning, table.properties))
case _ =>
throw new NoSuchTableException(oldIdent)
}
}

def clearTables(): Unit = {
tables.clear()
}
Expand Down