Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,18 @@ abstract class ExternalCatalog {

def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit

def renameTable(db: String, oldName: String, newName: String): Unit

/**
* Alter a table whose database and name match the ones specified in `tableDefinition`, assuming
* the table exists. Note that, even though we can specify database in `tableDefinition`, it's
* used to identify the table, not to alter the table's database, which is not allowed.
*
* If `newNameTable` is defined and its table name is not equal to the table name of
* `tableDefinition`,it will just rename the table.
*
* Note: If the underlying implementation does not support altering a certain field,
* this becomes a no-op.
*/
def alterTable(tableDefinition: CatalogTable): Unit
def alterTable(tableDefinition: CatalogTable, newNameTable: Option[CatalogTable] = None): Unit

/**
* Alter the schema of a table identified by the provided database and table name. The new schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,40 +264,45 @@ class InMemoryCatalog(
}
}

override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
requireTableExists(db, oldName)
requireTableNotExists(db, newName)
val oldDesc = catalog(db).tables(oldName)
oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db)))

if (oldDesc.table.tableType == CatalogTableType.MANAGED) {
assert(oldDesc.table.storage.locationUri.isDefined,
"Managed table should always have table location, as we will assign a default location " +
"to it if it doesn't have one.")
val oldDir = new Path(oldDesc.table.location)
val newDir = new Path(new Path(catalog(db).db.locationUri), newName)
try {
val fs = oldDir.getFileSystem(hadoopConfig)
fs.rename(oldDir, newDir)
} catch {
case e: IOException =>
throw new SparkException(s"Unable to rename table $oldName to $newName as failed " +
s"to rename its directory $oldDir", e)
}
oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri))
}

catalog(db).tables.put(newName, oldDesc)
catalog(db).tables.remove(oldName)
}

override def alterTable(tableDefinition: CatalogTable): Unit = synchronized {
override def alterTable(
tableDefinition: CatalogTable,
newNameTable: Option[CatalogTable]): Unit = synchronized {
assert(tableDefinition.identifier.database.isDefined)
val db = tableDefinition.identifier.database.get
requireTableExists(db, tableDefinition.identifier.table)
catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition
}

if (newNameTable.isEmpty
|| newNameTable.get.identifier.table == tableDefinition.identifier.table) {
catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition
} else {
val oldName = tableDefinition.identifier.table
val newName = newNameTable.get.identifier.table
requireTableNotExists(db, newName)
val oldDesc = catalog(db).tables(oldName)
oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db)))

if (oldDesc.table.tableType == CatalogTableType.MANAGED) {
assert(oldDesc.table.storage.locationUri.isDefined,
"Managed table should always have table location, as we will assign a default location " +
"to it if it doesn't have one.")
val oldDir = new Path(oldDesc.table.location)
val newDir = new Path(newNameTable.get.location)

try {
val fs = oldDir.getFileSystem(hadoopConfig)
fs.rename(oldDir, newDir)
} catch {
case e: IOException =>
throw new SparkException(s"Unable to rename table $oldName to $newName" +
s" as failed to rename its directory $oldDir", e)
}
oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri))
}

catalog(db).tables.put(newName, oldDesc)
catalog(db).tables.remove(oldName)
}
}
override def alterTableSchema(
db: String,
table: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,9 +577,15 @@ class SessionCatalog(
requireDbExists(db)
if (oldName.database.isDefined || !tempTables.contains(oldTableName)) {
requireTableExists(TableIdentifier(oldTableName, Some(db)))
requireTableNotExists(TableIdentifier(newTableName, Some(db)))
val newTableIdentifier = TableIdentifier(newTableName, Some(db))
requireTableNotExists(newTableIdentifier)
validateName(newTableName)
externalCatalog.renameTable(db, oldTableName, newTableName)
val tableDefinition = externalCatalog.getTable(db, oldTableName)
val newTableDefinition = tableDefinition.copy(
identifier = newTableIdentifier,
storage =
tableDefinition.storage.copy(locationUri = Some(defaultTablePath(newTableIdentifier))))
externalCatalog.alterTable(tableDefinition, Some(newTableDefinition))
} else {
if (newName.database.isDefined) {
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,30 +199,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true, purge = false)
}

test("rename table") {
val catalog = newBasicCatalog()
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
catalog.renameTable("db2", "tbl1", "tblone")
assert(catalog.listTables("db2").toSet == Set("tblone", "tbl2"))
}

test("rename table when database/table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.renameTable("unknown_db", "unknown_table", "unknown_table")
}
intercept[AnalysisException] {
catalog.renameTable("db2", "unknown_table", "unknown_table")
}
}

test("rename table when destination table already exists") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.renameTable("db2", "tbl1", "tbl2")
}
}

test("alter table") {
val catalog = newBasicCatalog()
val tbl1 = catalog.getTable("db2", "tbl1")
Expand Down Expand Up @@ -751,18 +727,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", "func2"))
}

// --------------------------------------------------------------------------
// File System operations
// --------------------------------------------------------------------------

private def exists(uri: URI, children: String*): Boolean = {
val base = new Path(uri)
val finalPath = children.foldLeft(base) {
case (parent, child) => new Path(parent, child)
}
base.getFileSystem(new Configuration()).exists(finalPath)
}

test("create/drop database should create/delete the directory") {
val catalog = newBasicCatalog()
val db = newDb("mydb")
Expand All @@ -773,40 +737,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(!exists(db.locationUri))
}

test("create/drop/rename table should create/delete/rename the directory") {
val catalog = newBasicCatalog()
val db = catalog.getDatabase("db1")
val table = CatalogTable(
identifier = TableIdentifier("my_table", Some("db1")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
schema = new StructType().add("a", "int").add("b", "string"),
provider = Some(defaultProvider)
)

catalog.createTable(table, ignoreIfExists = false)
assert(exists(db.locationUri, "my_table"))

catalog.renameTable("db1", "my_table", "your_table")
assert(!exists(db.locationUri, "my_table"))
assert(exists(db.locationUri, "your_table"))

catalog.dropTable("db1", "your_table", ignoreIfNotExists = false, purge = false)
assert(!exists(db.locationUri, "your_table"))

val externalTable = CatalogTable(
identifier = TableIdentifier("external_table", Some("db1")),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat(
Some(Utils.createTempDir().toURI),
None, None, None, false, Map.empty),
schema = new StructType().add("a", "int").add("b", "string"),
provider = Some(defaultProvider)
)
catalog.createTable(externalTable, ignoreIfExists = false)
assert(!exists(db.locationUri, "external_table"))
}

test("create/drop/rename partitions should create/delete/rename the directory") {
val catalog = newBasicCatalog()
val table = CatalogTable(
Expand Down Expand Up @@ -995,4 +925,15 @@ abstract class CatalogTestUtils {
catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet
}

// --------------------------------------------------------------------------
// File System operations
// --------------------------------------------------------------------------
def exists(uri: URI, children: String*): Boolean = {
val base = new Path(uri)
val finalPath = children.foldLeft(base) {
case (parent, child) => new Path(parent, child)
}
base.getFileSystem(new Configuration()).exists(finalPath)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

class InMemorySessionCatalogSuite extends SessionCatalogSuite {
protected val utils = new CatalogTestUtils {
Expand Down Expand Up @@ -399,6 +400,14 @@ abstract class SessionCatalogSuite extends PlanTest {
}
}

test("rename table when destination table already exists") {
withBasicCatalog { catalog =>
intercept[AnalysisException] {
catalog.renameTable(TableIdentifier("tbl1", Some("db2")), TableIdentifier("tbl2"))
}
}
}

test("rename temp table") {
withBasicCatalog { catalog =>
val tempTable = Range(1, 10, 2, 10)
Expand All @@ -419,6 +428,43 @@ abstract class SessionCatalogSuite extends PlanTest {
}
}


test("create/drop/rename table should create/delete/rename the directory") {
withBasicCatalog { catalog =>
val db = catalog.externalCatalog.getDatabase("db1")
val table = CatalogTable(
identifier = TableIdentifier("my_table", Some("db1")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
schema = new StructType().add("a", "int").add("b", "string"),
provider = Some(defaultProvider)
)

catalog.createTable(table, ignoreIfExists = false)
assert(exists(db.locationUri, "my_table"))

catalog.renameTable(
TableIdentifier("my_table", Some("db1")), TableIdentifier("your_table", Some("db1")))
assert(!exists(db.locationUri, "my_table"))
assert(exists(db.locationUri, "your_table"))

catalog.externalCatalog.dropTable("db1", "your_table", ignoreIfNotExists = false, purge = false)
assert(!exists(db.locationUri, "your_table"))

val externalTable = CatalogTable(
identifier = TableIdentifier("external_table", Some("db1")),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat(
Some(Utils.createTempDir().toURI),
None, None, None, false, Map.empty),
schema = new StructType().add("a", "int").add("b", "string"),
provider = Some(defaultProvider)
)
catalog.createTable(externalTable, ignoreIfExists = false)
assert(!exists(db.locationUri, "external_table"))
}
}

test("alter table") {
withBasicCatalog { catalog =>
val tbl1 = catalog.externalCatalog.getTable("db2", "tbl1")
Expand Down
Loading