Skip to content

Commit 2482cdc

Browse files
committed
backport SPARK-18703
1 parent 027b265 commit 2482cdc

File tree

2 files changed

+37
-2
lines changed

2 files changed

+37
-2
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.util
2424
import java.util.{Date, Random}
2525

2626
import scala.collection.JavaConverters._
27+
import scala.util.control.NonFatal
2728

2829
import org.apache.hadoop.fs.{FileSystem, Path}
2930
import org.apache.hadoop.hive.common.FileUtils
@@ -55,6 +56,7 @@ case class InsertIntoHiveTable(
5556
def output: Seq[Attribute] = Seq.empty
5657

5758
val hadoopConf = sessionState.newHadoopConf()
59+
var createdTempDir: Option[Path] = None
5860
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
5961
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
6062

@@ -83,13 +85,13 @@ case class InsertIntoHiveTable(
8385
if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
8486
throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
8587
}
88+
createdTempDir = Some(dir)
8689
fs.deleteOnExit(dir)
8790
}
8891
catch {
8992
case e: IOException =>
9093
throw new RuntimeException(
9194
"Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
92-
9395
}
9496
return dir
9597
}
@@ -136,11 +138,11 @@ case class InsertIntoHiveTable(
136138
if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
137139
throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
138140
}
141+
createdTempDir = Some(dirPath)
139142
fs.deleteOnExit(dirPath)
140143
} catch {
141144
case e: IOException =>
142145
throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
143-
144146
}
145147
dirPath
146148
}
@@ -341,6 +343,15 @@ case class InsertIntoHiveTable(
341343
holdDDLTime)
342344
}
343345

346+
// Attempt to delete the staging directory and the inclusive files. If failed, the files are
347+
// expected to be dropped at the normal termination of VM since deleteOnExit is used.
348+
try {
349+
createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) }
350+
} catch {
351+
case NonFatal(e) =>
352+
logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
353+
}
354+
344355
// Invalidate the cache.
345356
sqlContext.sharedState.cacheManager.invalidateCache(table)
346357
sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier)

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,30 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet
545545
}
546546
}
547547

548+
test(s"$version: Delete the temporary staging directory and files after each insert") {
549+
withTempDir { tmpDir =>
550+
withTable("tab") {
551+
spark.sql(
552+
s"""
553+
|CREATE TABLE tab(c1 string)
554+
|location '${tmpDir.toURI.toString}'
555+
""".stripMargin)
556+
557+
(1 to 3).map { i =>
558+
spark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'")
559+
}
560+
def listFiles(path: File): List[String] = {
561+
val dir = path.listFiles()
562+
val folders = dir.filter(_.isDirectory).toList
563+
val filePaths = dir.map(_.getName).toList
564+
folders.flatMap(listFiles) ++: filePaths
565+
}
566+
val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil
567+
assert(listFiles(tmpDir).sorted == expectedFiles)
568+
}
569+
}
570+
}
571+
548572
// TODO: add more tests.
549573
}
550574
}

0 commit comments

Comments
 (0)