Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -459,11 +459,20 @@ class SessionCatalog(
* If a database is specified in `oldName`, this will rename the table in that database.
* If no database is specified, this will first attempt to rename a temporary table with
* the same name, then, if that does not exist, rename the table in the current database.
*
* This assumes the database specified in `newName` matches the one in `oldName`.
*/
def renameTable(oldName: TableIdentifier, newName: String): Unit = synchronized {
def renameTable(oldName: TableIdentifier, newName: TableIdentifier): Unit = synchronized {
val db = formatDatabaseName(oldName.database.getOrElse(currentDb))
newName.database.map(formatDatabaseName).foreach { newDb =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If newName.database is empty, we should use the current database, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see PR description, we should use the database of source table, so that users can just write db.tbl1 RENAME TO tbl2. This is different from Hive, as we don't support move table from one database to another.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh, I see. If this is by design, I do not have more questions. LGTM

if (db != newDb) {
throw new AnalysisException(
s"RENAME TABLE source and destination databases do not match: '$db' != '$newDb'")
}
}

val oldTableName = formatTableName(oldName.table)
val newTableName = formatTableName(newName)
val newTableName = formatTableName(newName.table)
if (db == globalTempViewManager.database) {
globalTempViewManager.rename(oldTableName, newTableName)
} else {
Expand All @@ -473,6 +482,11 @@ class SessionCatalog(
requireTableNotExists(TableIdentifier(newTableName, Some(db)))
externalCatalog.renameTable(db, oldTableName, newTableName)
} else {
if (newName.database.isDefined) {
throw new AnalysisException(
s"RENAME TEMPORARY TABLE from '$oldName' to '$newName': cannot specify database " +
s"name '${newName.database.get}' in the destination table")
}
if (tempTables.contains(newTableName)) {
throw new AnalysisException(s"RENAME TEMPORARY TABLE from '$oldName' to '$newName': " +
"destination table already exists")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,27 +273,34 @@ class SessionCatalogSuite extends SparkFunSuite {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
sessionCatalog.renameTable(TableIdentifier("tbl1", Some("db2")), "tblone")
sessionCatalog.renameTable(TableIdentifier("tbl1", Some("db2")), TableIdentifier("tblone"))
assert(externalCatalog.listTables("db2").toSet == Set("tblone", "tbl2"))
sessionCatalog.renameTable(TableIdentifier("tbl2", Some("db2")), "tbltwo")
sessionCatalog.renameTable(TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbltwo"))
assert(externalCatalog.listTables("db2").toSet == Set("tblone", "tbltwo"))
// Rename table without explicitly specifying database
sessionCatalog.setCurrentDatabase("db2")
sessionCatalog.renameTable(TableIdentifier("tbltwo"), "table_two")
sessionCatalog.renameTable(TableIdentifier("tbltwo"), TableIdentifier("table_two"))
assert(externalCatalog.listTables("db2").toSet == Set("tblone", "table_two"))
// Renaming "db2.tblone" to "db1.tblones" should fail because databases don't match
intercept[AnalysisException] {
sessionCatalog.renameTable(
TableIdentifier("tblone", Some("db2")), TableIdentifier("tblones", Some("db1")))
}
// The new table already exists
intercept[TableAlreadyExistsException] {
sessionCatalog.renameTable(TableIdentifier("tblone", Some("db2")), "table_two")
sessionCatalog.renameTable(
TableIdentifier("tblone", Some("db2")),
TableIdentifier("table_two"))
}
}

test("rename table when database/table does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[NoSuchDatabaseException] {
catalog.renameTable(TableIdentifier("tbl1", Some("unknown_db")), "tbl2")
catalog.renameTable(TableIdentifier("tbl1", Some("unknown_db")), TableIdentifier("tbl2"))
}
intercept[NoSuchTableException] {
catalog.renameTable(TableIdentifier("unknown_table", Some("db2")), "tbl2")
catalog.renameTable(TableIdentifier("unknown_table", Some("db2")), TableIdentifier("tbl2"))
}
}

Expand All @@ -306,12 +313,12 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(sessionCatalog.getTempView("tbl1") == Option(tempTable))
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
// If database is not specified, temp table should be renamed first
sessionCatalog.renameTable(TableIdentifier("tbl1"), "tbl3")
sessionCatalog.renameTable(TableIdentifier("tbl1"), TableIdentifier("tbl3"))
assert(sessionCatalog.getTempView("tbl1").isEmpty)
assert(sessionCatalog.getTempView("tbl3") == Option(tempTable))
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
// If database is specified, temp tables are never renamed
sessionCatalog.renameTable(TableIdentifier("tbl2", Some("db2")), "tbl4")
sessionCatalog.renameTable(TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbl4"))
assert(sessionCatalog.getTempView("tbl3") == Option(tempTable))
assert(sessionCatalog.getTempView("tbl4").isEmpty)
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl4"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,15 +689,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
* }}}
*/
override def visitRenameTable(ctx: RenameTableContext): LogicalPlan = withOrigin(ctx) {
val fromName = visitTableIdentifier(ctx.from)
val toName = visitTableIdentifier(ctx.to)
if (toName.database.isDefined) {
operationNotAllowed("Can not specify database in table/view name after RENAME TO", ctx)
}

AlterTableRenameCommand(
fromName,
toName.table,
visitTableIdentifier(ctx.from),
visitTableIdentifier(ctx.to),
ctx.VIEW != null)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends
*/
case class AlterTableRenameCommand(
oldName: TableIdentifier,
newName: String,
newName: TableIdentifier,
isView: Boolean)
extends RunnableCommand {

Expand All @@ -159,7 +159,6 @@ case class AlterTableRenameCommand(
} else {
val table = catalog.getTableMetadata(oldName)
DDLUtils.verifyAlterTableType(catalog, table, isView)
val newTblName = TableIdentifier(newName, oldName.database)
// If an exception is thrown here we can just assume the table is uncached;
// this can happen with Hive tables when the underlying catalog is in-memory.
val wasCached = Try(sparkSession.catalog.isCached(oldName.unquotedString)).getOrElse(false)
Expand All @@ -172,7 +171,7 @@ case class AlterTableRenameCommand(
}
// For datasource tables, we also need to update the "path" serde property
if (DDLUtils.isDatasourceTable(table) && table.tableType == CatalogTableType.MANAGED) {
val newPath = catalog.defaultTablePath(newTblName)
val newPath = catalog.defaultTablePath(newName)
val newTable = table.withNewStorage(
properties = table.storage.properties ++ Map("path" -> newPath))
catalog.alterTable(newTable)
Expand All @@ -182,7 +181,7 @@ case class AlterTableRenameCommand(
catalog.refreshTable(oldName)
catalog.renameTable(oldName, newName)
if (wasCached) {
sparkSession.catalog.cacheTable(newTblName.unquotedString)
sparkSession.catalog.cacheTable(newName.unquotedString)
}
}
Seq.empty[Row]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,20 +387,22 @@ class DDLCommandSuite extends PlanTest {
val parsed_table = parser.parsePlan(sql_table)
val parsed_view = parser.parsePlan(sql_view)
val expected_table = AlterTableRenameCommand(
TableIdentifier("table_name", None),
"new_table_name",
TableIdentifier("table_name"),
TableIdentifier("new_table_name"),
isView = false)
val expected_view = AlterTableRenameCommand(
TableIdentifier("table_name", None),
"new_table_name",
TableIdentifier("table_name"),
TableIdentifier("new_table_name"),
isView = true)
comparePlans(parsed_table, expected_table)
comparePlans(parsed_view, expected_view)
}

val e = intercept[ParseException](
parser.parsePlan("ALTER TABLE db1.tbl RENAME TO db1.tbl2")
)
assert(e.getMessage.contains("Can not specify database in table/view name after RENAME TO"))
test("alter table: rename table with database") {
val query = "ALTER TABLE db1.tbl RENAME TO db1.tbl2"
val plan = parseAs[AlterTableRenameCommand](query)
assert(plan.oldName == TableIdentifier("tbl", Some("db1")))
assert(plan.newName == TableIdentifier("tbl2", Some("db1")))
}

// ALTER TABLE table_name SET TBLPROPERTIES ('comment' = new_comment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,16 +665,27 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
createDatabase(catalog, "dbx")
createDatabase(catalog, "dby")
createTable(catalog, tableIdent1)

assert(catalog.listTables("dbx") == Seq(tableIdent1))
sql("ALTER TABLE dbx.tab1 RENAME TO tab2")
sql("ALTER TABLE dbx.tab1 RENAME TO dbx.tab2")
assert(catalog.listTables("dbx") == Seq(tableIdent2))

// The database in destination table name can be omitted, and we will use the database of source
// table for it.
sql("ALTER TABLE dbx.tab2 RENAME TO tab1")
assert(catalog.listTables("dbx") == Seq(tableIdent1))

catalog.setCurrentDatabase("dbx")
// rename without explicitly specifying database
sql("ALTER TABLE tab2 RENAME TO tab1")
assert(catalog.listTables("dbx") == Seq(tableIdent1))
sql("ALTER TABLE tab1 RENAME TO tab2")
assert(catalog.listTables("dbx") == Seq(tableIdent2))
// table to rename does not exist
intercept[AnalysisException] {
sql("ALTER TABLE dbx.does_not_exist RENAME TO tab2")
sql("ALTER TABLE dbx.does_not_exist RENAME TO dbx.tab2")
}
// destination database is different
intercept[AnalysisException] {
sql("ALTER TABLE dbx.tab1 RENAME TO dby.tab2")
}
}

Expand All @@ -696,6 +707,31 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(spark.table("teachers").collect().toSeq == df.collect().toSeq)
}

test("rename temporary table - destination table with database name") {
withTempView("tab1") {
sql(
"""
|CREATE TEMPORARY TABLE tab1
|USING org.apache.spark.sql.sources.DDLScanSource
|OPTIONS (
| From '1',
| To '10',
| Table 'test1'
|)
""".stripMargin)

val e = intercept[AnalysisException] {
sql("ALTER TABLE tab1 RENAME TO default.tab2")
}
assert(e.getMessage.contains(
"RENAME TEMPORARY TABLE from '`tab1`' to '`default`.`tab2`': " +
"cannot specify database name 'default' in the destination table"))

val catalog = spark.sessionState.catalog
assert(catalog.listTables("default") == Seq(TableIdentifier("tab1")))
}
}

test("rename temporary table") {
withTempView("tab1", "tab2") {
spark.range(10).createOrReplaceTempView("tab1")
Expand Down Expand Up @@ -736,7 +772,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql("ALTER TABLE tab1 RENAME TO tab2")
}
assert(e.getMessage.contains(
"RENAME TEMPORARY TABLE from '`tab1`' to 'tab2': destination table already exists"))
"RENAME TEMPORARY TABLE from '`tab1`' to '`tab2`': destination table already exists"))

val catalog = spark.sessionState.catalog
assert(catalog.listTables("default") == Seq(TableIdentifier("tab1"), TableIdentifier("tab2")))
Expand Down