Skip to content

Commit 56248ae

Browse files
committed
backport SPARK-18703
1 parent 6fc7b2d commit 56248ae

File tree

2 files changed

+38
-2
lines changed

2 files changed

+38
-2
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import java.net.URI
2222
import java.text.SimpleDateFormat
2323
import java.util.{Date, Locale, Random}
2424

25+
import scala.util.control.NonFatal
26+
2527
import org.apache.hadoop.fs.{FileSystem, Path}
2628
import org.apache.hadoop.hive.common.FileUtils
2729
import org.apache.hadoop.hive.ql.exec.TaskRunner
@@ -84,6 +86,7 @@ case class InsertIntoHiveTable(
8486
def output: Seq[Attribute] = Seq.empty
8587

8688
val hadoopConf = sessionState.newHadoopConf()
89+
var createdTempDir: Option[Path] = None
8790
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
8891
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
8992

@@ -111,12 +114,12 @@ case class InsertIntoHiveTable(
111114
if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
112115
throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
113116
}
117+
createdTempDir = Some(dir)
114118
fs.deleteOnExit(dir)
115119
} catch {
116120
case e: IOException =>
117121
throw new RuntimeException(
118122
"Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
119-
120123
}
121124
return dir
122125
}
@@ -163,11 +166,11 @@ case class InsertIntoHiveTable(
163166
if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
164167
throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
165168
}
169+
createdTempDir = Some(dirPath)
166170
fs.deleteOnExit(dirPath)
167171
} catch {
168172
case e: IOException =>
169173
throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
170-
171174
}
172175
dirPath
173176
}
@@ -376,6 +379,15 @@ case class InsertIntoHiveTable(
376379
holdDDLTime)
377380
}
378381

382+
// Attempt to delete the staging directory and the inclusive files. If failed, the files are
383+
// expected to be dropped at the normal termination of VM since deleteOnExit is used.
384+
try {
385+
createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) }
386+
} catch {
387+
case NonFatal(e) =>
388+
logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
389+
}
390+
379391
// Invalidate the cache.
380392
sqlContext.sharedState.cacheManager.invalidateCache(table)
381393
sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier)

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,30 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet
544544
}
545545
}
546546

547+
test(s"$version: Delete the temporary staging directory and files after each insert") {
548+
withTempDir { tmpDir =>
549+
withTable("tab") {
550+
spark.sql(
551+
s"""
552+
|CREATE TABLE tab(c1 string)
553+
|location '${tmpDir.toURI.toString}'
554+
""".stripMargin)
555+
556+
(1 to 3).map { i =>
557+
spark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'")
558+
}
559+
def listFiles(path: File): List[String] = {
560+
val dir = path.listFiles()
561+
val folders = dir.filter(_.isDirectory).toList
562+
val filePaths = dir.map(_.getName).toList
563+
folders.flatMap(listFiles) ++: filePaths
564+
}
565+
val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil
566+
assert(listFiles(tmpDir).sorted == expectedFiles)
567+
}
568+
}
569+
}
570+
547571
// TODO: add more tests.
548572
}
549573
}

0 commit comments

Comments
 (0)