Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,22 @@ class InMemoryCatalog(
if (tableExists(db, table)) {
val tableMeta = getTable(db, table)
if (tableMeta.tableType == CatalogTableType.MANAGED) {
// Delete the data/directory for each partition
val locationAllParts = catalog(db).tables(table).partitions.values.toSeq.map(_.location)
locationAllParts.foreach { loc =>
val partitionPath = new Path(loc)
try {
val fs = partitionPath.getFileSystem(hadoopConfig)
fs.delete(partitionPath, true)
} catch {
case e: IOException =>
throw new SparkException(s"Unable to delete partition path $partitionPath", e)
}
}
assert(tableMeta.storage.locationUri.isDefined,
"Managed table should always have table location, as we will assign a default location " +
"to it if it doesn't have one.")
// Delete the data/directory of the table
val dir = new Path(tableMeta.location)
try {
val fs = dir.getFileSystem(hadoopConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val table = CatalogTable(
identifier = TableIdentifier("tbl", Some("db1")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
storage = CatalogStorageFormat.empty,
schema = new StructType()
.add("col1", "int")
.add("col2", "string")
Expand All @@ -346,6 +346,46 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(new Path(partitionLocation) == defaultPartitionLocation)
}

test("create/drop partitions in managed tables with location") {
val catalog = newBasicCatalog()
val table = CatalogTable(
identifier = TableIdentifier("tbl", Some("db1")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
schema = new StructType()
.add("col1", "int")
.add("col2", "string")
.add("partCol1", "int")
.add("partCol2", "string"),
provider = Some("hive"),
partitionColumnNames = Seq("partCol1", "partCol2"))
catalog.createTable(table, ignoreIfExists = false)

val newLocationPart1 = newUriForDatabase()
val newLocationPart2 = newUriForDatabase()

val partition1 =
CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"),
storageFormat.copy(locationUri = Some(newLocationPart1)))
val partition2 =
CatalogTablePartition(Map("partCol1" -> "3", "partCol2" -> "4"),
storageFormat.copy(locationUri = Some(newLocationPart2)))
catalog.createPartitions("db1", "tbl", Seq(partition1), ignoreIfExists = false)
catalog.createPartitions("db1", "tbl", Seq(partition2), ignoreIfExists = false)

assert(exists(newLocationPart1))
assert(exists(newLocationPart2))

// the corresponding directory is dropped.
catalog.dropPartitions("db1", "tbl", Seq(partition1.spec),
ignoreIfNotExists = false, purge = false, retainData = false)
assert(!exists(newLocationPart1))

// all the remaining directories are dropped.
catalog.dropTable("db1", "tbl", ignoreIfNotExists = false, purge = false)
assert(!exists(newLocationPart2))
}

test("list partition names") {
val catalog = newBasicCatalog()
val newPart = CatalogTablePartition(Map("a" -> "1", "b" -> "%="), storageFormat)
Expand Down Expand Up @@ -459,7 +499,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val table = CatalogTable(
identifier = TableIdentifier("tbl", Some("db1")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
storage = CatalogStorageFormat.empty,
schema = new StructType()
.add("col1", "int")
.add("col2", "string")
Expand Down Expand Up @@ -684,7 +724,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val table = CatalogTable(
identifier = TableIdentifier("my_table", Some("db1")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
storage = CatalogStorageFormat.empty,
schema = new StructType().add("a", "int").add("b", "string"),
provider = Some("hive")
)
Expand Down Expand Up @@ -717,7 +757,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val table = CatalogTable(
identifier = TableIdentifier("tbl", Some("db1")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
storage = CatalogStorageFormat.empty,
schema = new StructType()
.add("col1", "int")
.add("col2", "string")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,13 +400,12 @@ case class AlterTableSerDePropertiesCommand(
/**
* Add Partition in ALTER TABLE: add the table partitions.
*
* 'partitionSpecsAndLocs': the syntax of ALTER VIEW is identical to ALTER TABLE,
* EXCEPT that it is ILLEGAL to specify a LOCATION clause.
* An error message will be issued if the partition exists, unless 'ifNotExists' is true.
*
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not support ALTER VIEW ADD PARTITION

* The syntax of this command is:
* {{{
* ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1']
* ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec1 [LOCATION 'loc1']
* PARTITION spec2 [LOCATION 'loc2']
* }}}
*/
case class AlterTableAddPartitionCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,52 @@ class HiveDDLSuite
assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a")
}

test("add/drop partition with location - managed table") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to improve the test coverage of Hive External Catalog.

val tab = "tab_with_partitions"
withTempDir { tmpDir =>
val basePath = new File(tmpDir.getCanonicalPath)
val part1Path = new File(basePath + "/part1")
val part2Path = new File(basePath + "/part2")
val dirSet = part1Path :: part2Path :: Nil

// Before data insertion, all the directory are empty
assert(dirSet.forall(dir => dir.listFiles == null || dir.listFiles.isEmpty))

withTable(tab) {
sql(
s"""
|CREATE TABLE $tab (key INT, value STRING)
|PARTITIONED BY (ds STRING, hr STRING)
""".stripMargin)
sql(
s"""
|ALTER TABLE $tab ADD
|PARTITION (ds='2008-04-08', hr=11) LOCATION '$part1Path'
|PARTITION (ds='2008-04-08', hr=12) LOCATION '$part2Path'
""".stripMargin)
assert(dirSet.forall(dir => dir.listFiles == null || dir.listFiles.isEmpty))

sql(s"INSERT OVERWRITE TABLE $tab partition (ds='2008-04-08', hr=11) SELECT 1, 'a'")
sql(s"INSERT OVERWRITE TABLE $tab partition (ds='2008-04-08', hr=12) SELECT 2, 'b'")
// add partition will not delete the data
assert(dirSet.forall(dir => dir.listFiles.nonEmpty))
checkAnswer(
spark.table(tab),
Row(1, "a", "2008-04-08", "11") :: Row(2, "b", "2008-04-08", "12") :: Nil
)

sql(s"ALTER TABLE $tab DROP PARTITION (ds='2008-04-08', hr=11)")
// drop partition will delete the data
assert(part1Path.listFiles == null || part1Path.listFiles.isEmpty)
assert(part2Path.listFiles.nonEmpty)

sql(s"DROP TABLE $tab")
// drop table will delete the data of the managed table
assert(dirSet.forall(dir => dir.listFiles == null || dir.listFiles.isEmpty))
}
}
}

test("add/drop partitions - external table") {
val catalog = spark.sessionState.catalog
withTempDir { tmpDir =>
Expand Down Expand Up @@ -257,9 +303,15 @@ class HiveDDLSuite
// drop partition will not delete the data of external table
assert(dirSet.forall(dir => dir.listFiles.nonEmpty))

sql(s"ALTER TABLE $externalTab ADD PARTITION (ds='2008-04-08', hr='12')")
sql(
s"""
|ALTER TABLE $externalTab ADD PARTITION (ds='2008-04-08', hr='12')
|PARTITION (ds='2008-04-08', hr=11)
""".stripMargin)
assert(catalog.listPartitions(TableIdentifier(externalTab)).map(_.spec).toSet ==
Set(Map("ds" -> "2008-04-08", "hr" -> "12"), Map("ds" -> "2008-04-09", "hr" -> "11")))
Set(Map("ds" -> "2008-04-08", "hr" -> "11"),
Map("ds" -> "2008-04-08", "hr" -> "12"),
Map("ds" -> "2008-04-09", "hr" -> "11")))
// add partition will not delete the data
assert(dirSet.forall(dir => dir.listFiles.nonEmpty))

Expand Down