Skip to content

Commit b67b35f

Browse files
committed
[SPARK-19048][SQL] Delete Partition Location when Dropping Managed Partitioned Tables in InMemoryCatalog
### What changes were proposed in this pull request? The data in the managed table should be deleted after table is dropped. However, if the partition location is not under the location of the partitioned table, it is not deleted as expected. Users can specify any location for the partition when they adding a partition. This PR is to delete partition location when dropping managed partitioned tables stored in `InMemoryCatalog`. ### How was this patch tested? Added test cases for both HiveExternalCatalog and InMemoryCatalog Author: gatorsmile <[email protected]> Closes #16448 from gatorsmile/unsetSerdeProp.
1 parent 89bf370 commit b67b35f

File tree

4 files changed

+113
-9
lines changed

4 files changed

+113
-9
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,9 +229,22 @@ class InMemoryCatalog(
229229
if (tableExists(db, table)) {
230230
val tableMeta = getTable(db, table)
231231
if (tableMeta.tableType == CatalogTableType.MANAGED) {
232+
// Delete the data/directory for each partition
233+
val locationAllParts = catalog(db).tables(table).partitions.values.toSeq.map(_.location)
234+
locationAllParts.foreach { loc =>
235+
val partitionPath = new Path(loc)
236+
try {
237+
val fs = partitionPath.getFileSystem(hadoopConfig)
238+
fs.delete(partitionPath, true)
239+
} catch {
240+
case e: IOException =>
241+
throw new SparkException(s"Unable to delete partition path $partitionPath", e)
242+
}
243+
}
232244
assert(tableMeta.storage.locationUri.isDefined,
233245
"Managed table should always have table location, as we will assign a default location " +
234246
"to it if it doesn't have one.")
247+
// Delete the data/directory of the table
235248
val dir = new Path(tableMeta.location)
236249
try {
237250
val fs = dir.getFileSystem(hadoopConfig)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
324324
val table = CatalogTable(
325325
identifier = TableIdentifier("tbl", Some("db1")),
326326
tableType = CatalogTableType.MANAGED,
327-
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
327+
storage = CatalogStorageFormat.empty,
328328
schema = new StructType()
329329
.add("col1", "int")
330330
.add("col2", "string")
@@ -346,6 +346,46 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
346346
assert(new Path(partitionLocation) == defaultPartitionLocation)
347347
}
348348

349+
test("create/drop partitions in managed tables with location") {
350+
val catalog = newBasicCatalog()
351+
val table = CatalogTable(
352+
identifier = TableIdentifier("tbl", Some("db1")),
353+
tableType = CatalogTableType.MANAGED,
354+
storage = CatalogStorageFormat.empty,
355+
schema = new StructType()
356+
.add("col1", "int")
357+
.add("col2", "string")
358+
.add("partCol1", "int")
359+
.add("partCol2", "string"),
360+
provider = Some("hive"),
361+
partitionColumnNames = Seq("partCol1", "partCol2"))
362+
catalog.createTable(table, ignoreIfExists = false)
363+
364+
val newLocationPart1 = newUriForDatabase()
365+
val newLocationPart2 = newUriForDatabase()
366+
367+
val partition1 =
368+
CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"),
369+
storageFormat.copy(locationUri = Some(newLocationPart1)))
370+
val partition2 =
371+
CatalogTablePartition(Map("partCol1" -> "3", "partCol2" -> "4"),
372+
storageFormat.copy(locationUri = Some(newLocationPart2)))
373+
catalog.createPartitions("db1", "tbl", Seq(partition1), ignoreIfExists = false)
374+
catalog.createPartitions("db1", "tbl", Seq(partition2), ignoreIfExists = false)
375+
376+
assert(exists(newLocationPart1))
377+
assert(exists(newLocationPart2))
378+
379+
// the corresponding directory is dropped.
380+
catalog.dropPartitions("db1", "tbl", Seq(partition1.spec),
381+
ignoreIfNotExists = false, purge = false, retainData = false)
382+
assert(!exists(newLocationPart1))
383+
384+
// all the remaining directories are dropped.
385+
catalog.dropTable("db1", "tbl", ignoreIfNotExists = false, purge = false)
386+
assert(!exists(newLocationPart2))
387+
}
388+
349389
test("list partition names") {
350390
val catalog = newBasicCatalog()
351391
val newPart = CatalogTablePartition(Map("a" -> "1", "b" -> "%="), storageFormat)
@@ -459,7 +499,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
459499
val table = CatalogTable(
460500
identifier = TableIdentifier("tbl", Some("db1")),
461501
tableType = CatalogTableType.MANAGED,
462-
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
502+
storage = CatalogStorageFormat.empty,
463503
schema = new StructType()
464504
.add("col1", "int")
465505
.add("col2", "string")
@@ -684,7 +724,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
684724
val table = CatalogTable(
685725
identifier = TableIdentifier("my_table", Some("db1")),
686726
tableType = CatalogTableType.MANAGED,
687-
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
727+
storage = CatalogStorageFormat.empty,
688728
schema = new StructType().add("a", "int").add("b", "string"),
689729
provider = Some("hive")
690730
)
@@ -717,7 +757,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
717757
val table = CatalogTable(
718758
identifier = TableIdentifier("tbl", Some("db1")),
719759
tableType = CatalogTableType.MANAGED,
720-
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
760+
storage = CatalogStorageFormat.empty,
721761
schema = new StructType()
722762
.add("col1", "int")
723763
.add("col2", "string")

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -400,13 +400,12 @@ case class AlterTableSerDePropertiesCommand(
400400
/**
401401
* Add Partition in ALTER TABLE: add the table partitions.
402402
*
403-
* 'partitionSpecsAndLocs': the syntax of ALTER VIEW is identical to ALTER TABLE,
404-
* EXCEPT that it is ILLEGAL to specify a LOCATION clause.
405403
* An error message will be issued if the partition exists, unless 'ifNotExists' is true.
406404
*
407405
* The syntax of this command is:
408406
* {{{
409-
* ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1']
407+
* ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec1 [LOCATION 'loc1']
408+
* PARTITION spec2 [LOCATION 'loc2']
410409
* }}}
411410
*/
412411
case class AlterTableAddPartitionCommand(

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,52 @@ class HiveDDLSuite
199199
assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a")
200200
}
201201

202+
test("add/drop partition with location - managed table") {
203+
val tab = "tab_with_partitions"
204+
withTempDir { tmpDir =>
205+
val basePath = new File(tmpDir.getCanonicalPath)
206+
val part1Path = new File(basePath + "/part1")
207+
val part2Path = new File(basePath + "/part2")
208+
val dirSet = part1Path :: part2Path :: Nil
209+
210+
// Before data insertion, all the directory are empty
211+
assert(dirSet.forall(dir => dir.listFiles == null || dir.listFiles.isEmpty))
212+
213+
withTable(tab) {
214+
sql(
215+
s"""
216+
|CREATE TABLE $tab (key INT, value STRING)
217+
|PARTITIONED BY (ds STRING, hr STRING)
218+
""".stripMargin)
219+
sql(
220+
s"""
221+
|ALTER TABLE $tab ADD
222+
|PARTITION (ds='2008-04-08', hr=11) LOCATION '$part1Path'
223+
|PARTITION (ds='2008-04-08', hr=12) LOCATION '$part2Path'
224+
""".stripMargin)
225+
assert(dirSet.forall(dir => dir.listFiles == null || dir.listFiles.isEmpty))
226+
227+
sql(s"INSERT OVERWRITE TABLE $tab partition (ds='2008-04-08', hr=11) SELECT 1, 'a'")
228+
sql(s"INSERT OVERWRITE TABLE $tab partition (ds='2008-04-08', hr=12) SELECT 2, 'b'")
229+
// add partition will not delete the data
230+
assert(dirSet.forall(dir => dir.listFiles.nonEmpty))
231+
checkAnswer(
232+
spark.table(tab),
233+
Row(1, "a", "2008-04-08", "11") :: Row(2, "b", "2008-04-08", "12") :: Nil
234+
)
235+
236+
sql(s"ALTER TABLE $tab DROP PARTITION (ds='2008-04-08', hr=11)")
237+
// drop partition will delete the data
238+
assert(part1Path.listFiles == null || part1Path.listFiles.isEmpty)
239+
assert(part2Path.listFiles.nonEmpty)
240+
241+
sql(s"DROP TABLE $tab")
242+
// drop table will delete the data of the managed table
243+
assert(dirSet.forall(dir => dir.listFiles == null || dir.listFiles.isEmpty))
244+
}
245+
}
246+
}
247+
202248
test("add/drop partitions - external table") {
203249
val catalog = spark.sessionState.catalog
204250
withTempDir { tmpDir =>
@@ -257,9 +303,15 @@ class HiveDDLSuite
257303
// drop partition will not delete the data of external table
258304
assert(dirSet.forall(dir => dir.listFiles.nonEmpty))
259305

260-
sql(s"ALTER TABLE $externalTab ADD PARTITION (ds='2008-04-08', hr='12')")
306+
sql(
307+
s"""
308+
|ALTER TABLE $externalTab ADD PARTITION (ds='2008-04-08', hr='12')
309+
|PARTITION (ds='2008-04-08', hr=11)
310+
""".stripMargin)
261311
assert(catalog.listPartitions(TableIdentifier(externalTab)).map(_.spec).toSet ==
262-
Set(Map("ds" -> "2008-04-08", "hr" -> "12"), Map("ds" -> "2008-04-09", "hr" -> "11")))
312+
Set(Map("ds" -> "2008-04-08", "hr" -> "11"),
313+
Map("ds" -> "2008-04-08", "hr" -> "12"),
314+
Map("ds" -> "2008-04-09", "hr" -> "11")))
263315
// add partition will not delete the data
264316
assert(dirSet.forall(dir => dir.listFiles.nonEmpty))
265317

0 commit comments

Comments
 (0)