Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ abstract class ExternalCatalog {

def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit

def renameTable(db: String, oldName: String, newName: String): Unit
def renameTable(
db: String,
oldName: String,
newName: String,
newTablePath: Option[String]): Unit

/**
* Alter a table whose database and name match the ones specified in `tableDefinition`, assuming
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,11 @@ class InMemoryCatalog(
}
}

override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
override def renameTable(
db: String,
oldName: String,
newName: String,
newTablePath: Option[String]): Unit = synchronized {
requireTableExists(db, oldName)
requireTableNotExists(db, newName)
val oldDesc = catalog(db).tables(oldName)
Expand All @@ -275,7 +279,9 @@ class InMemoryCatalog(
"Managed table should always have table location, as we will assign a default location " +
"to it if it doesn't have one.")
val oldDir = new Path(oldDesc.table.location)
val newDir = new Path(new Path(catalog(db).db.locationUri), newName)

require(newTablePath.isDefined)
val newDir = new Path(newTablePath.get)
try {
val fs = oldDir.getFileSystem(hadoopConfig)
fs.rename(oldDir, newDir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,9 +519,13 @@ class SessionCatalog(
requireDbExists(db)
if (oldName.database.isDefined || !tempTables.contains(oldTableName)) {
requireTableExists(TableIdentifier(oldTableName, Some(db)))
requireTableNotExists(TableIdentifier(newTableName, Some(db)))

val newTableIdentifier = TableIdentifier(newTableName, Some(db))
requireTableNotExists(newTableIdentifier)
validateName(newTableName)
externalCatalog.renameTable(db, oldTableName, newTableName)

val newTablePath = CatalogUtils.URIToString(defaultTablePath(newTableIdentifier))
externalCatalog.renameTable(db, oldTableName, newTableName, Some(newTablePath))
} else {
if (newName.database.isDefined) {
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,30 +196,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true, purge = false)
}

test("rename table") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already existed in SessionCatalogSuite

val catalog = newBasicCatalog()
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
catalog.renameTable("db2", "tbl1", "tblone")
assert(catalog.listTables("db2").toSet == Set("tblone", "tbl2"))
}

test("rename table when database/table does not exist") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already existed in SessionCatalogSuite

val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.renameTable("unknown_db", "unknown_table", "unknown_table")
}
intercept[AnalysisException] {
catalog.renameTable("db2", "unknown_table", "unknown_table")
}
}

test("rename table when destination table already exists") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to SessionCatalogSuite

val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.renameTable("db2", "tbl1", "tbl2")
}
}

test("alter table") {
val catalog = newBasicCatalog()
val tbl1 = catalog.getTable("db2", "tbl1")
Expand Down Expand Up @@ -710,18 +686,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", "func2"))
}

// --------------------------------------------------------------------------
// File System operations
// --------------------------------------------------------------------------

private def exists(uri: URI, children: String*): Boolean = {
val base = new Path(uri)
val finalPath = children.foldLeft(base) {
case (parent, child) => new Path(parent, child)
}
base.getFileSystem(new Configuration()).exists(finalPath)
}

test("create/drop database should create/delete the directory") {
val catalog = newBasicCatalog()
val db = newDb("mydb")
Expand All @@ -732,40 +696,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(!exists(db.locationUri))
}

test("create/drop/rename table should create/delete/rename the directory") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to SessionCatalogSuite

val catalog = newBasicCatalog()
val db = catalog.getDatabase("db1")
val table = CatalogTable(
identifier = TableIdentifier("my_table", Some("db1")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
schema = new StructType().add("a", "int").add("b", "string"),
provider = Some(defaultProvider)
)

catalog.createTable(table, ignoreIfExists = false)
assert(exists(db.locationUri, "my_table"))

catalog.renameTable("db1", "my_table", "your_table")
assert(!exists(db.locationUri, "my_table"))
assert(exists(db.locationUri, "your_table"))

catalog.dropTable("db1", "your_table", ignoreIfNotExists = false, purge = false)
assert(!exists(db.locationUri, "your_table"))

val externalTable = CatalogTable(
identifier = TableIdentifier("external_table", Some("db1")),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat(
Some(Utils.createTempDir().toURI),
None, None, None, false, Map.empty),
schema = new StructType().add("a", "int").add("b", "string"),
provider = Some(defaultProvider)
)
catalog.createTable(externalTable, ignoreIfExists = false)
assert(!exists(db.locationUri, "external_table"))
}

test("create/drop/rename partitions should create/delete/rename the directory") {
val catalog = newBasicCatalog()
val table = CatalogTable(
Expand Down Expand Up @@ -954,4 +884,14 @@ abstract class CatalogTestUtils {
catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet
}

// --------------------------------------------------------------------------
// File System operations
// --------------------------------------------------------------------------
def exists(uri: URI, children: String*): Boolean = {
val base = new Path(uri)
val finalPath = children.foldLeft(base) {
case (parent, child) => new Path(parent, child)
}
base.getFileSystem(new Configuration()).exists(finalPath)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils

class InMemorySessionCatalogSuite extends SessionCatalogSuite {
protected val utils = new CatalogTestUtils {
Expand Down Expand Up @@ -421,6 +423,15 @@ abstract class SessionCatalogSuite extends PlanTest {
}
}

test("rename table when destination table already exists") {
withBasicCatalog { catalog =>
intercept[AnalysisException] {
catalog.renameTable(
TableIdentifier("tbl1", Some("db2")), TableIdentifier("tbl2", Some("db2")))
}
}
}

test("alter table") {
withBasicCatalog { catalog =>
val tbl1 = catalog.externalCatalog.getTable("db2", "tbl1")
Expand Down Expand Up @@ -1381,4 +1392,41 @@ abstract class SessionCatalogSuite extends PlanTest {
}
}
}

test("create/drop/rename table should create/delete/rename the directory") {
withBasicCatalog { catalog =>
val db = catalog.externalCatalog.getDatabase("db1")
val table = CatalogTable(
identifier = TableIdentifier("my_table", Some("db1")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
schema = new StructType().add("a", "int").add("b", "string"),
provider = Some(defaultProvider)
)

catalog.createTable(table, ignoreIfExists = false)
assert(exists(db.locationUri, "my_table"))

catalog.renameTable(
TableIdentifier("my_table", Some("db1")), TableIdentifier("your_table", Some("db1")))
assert(!exists(db.locationUri, "my_table"))
assert(exists(db.locationUri, "your_table"))

catalog.externalCatalog.dropTable(
"db1", "your_table", ignoreIfNotExists = false, purge = false)
assert(!exists(db.locationUri, "your_table"))

val externalTable = CatalogTable(
identifier = TableIdentifier("external_table", Some("db1")),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat(
Some(Utils.createTempDir().toURI),
None, None, None, false, Map.empty),
schema = new StructType().add("a", "int").add("b", "string"),
provider = Some(defaultProvider)
)
catalog.createTable(externalTable, ignoreIfExists = false)
assert(!exists(db.locationUri, "external_table"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
client.dropTable(db, table, ignoreIfNotExists, purge)
}

override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
override def renameTable(
db: String,
oldName: String,
newName: String,
newTablePath: Option[String]): Unit = withClient {
val rawTable = getRawTable(db, oldName)

// Note that Hive serde tables don't use path option in storage properties to store the value
Expand All @@ -476,8 +480,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val storageWithNewPath = if (rawTable.tableType == MANAGED && hasPathOption) {
// If it's a managed table with path option and we are renaming it, then the path option
// becomes inaccurate and we need to update it according to the new table name.
val newTablePath = defaultTablePath(TableIdentifier(newName, Some(db)))
updateLocationInStorageProps(rawTable, Some(newTablePath))
require(newTablePath.isDefined)
updateLocationInStorageProps(rawTable, newTablePath)
} else {
rawTable.storage
}
Expand Down