Skip to content

Commit a4806b3

Browse files
committed
Backport SPARK-21721 to branch 2.1.
1 parent 9b749b6 commit a4806b3

File tree

2 files changed

+27
-2
lines changed

2 files changed

+27
-2
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,13 @@ case class InsertIntoHiveTable(
382382
// Attempt to delete the staging directory and the inclusive files. If failed, the files are
383383
// expected to be dropped at the normal termination of VM since deleteOnExit is used.
384384
try {
385-
createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) }
385+
createdTempDir.foreach { path =>
386+
val fs = path.getFileSystem(hadoopConf)
387+
if (fs.delete(path, true)) {
388+
// If we successfully delete the staging directory, remove it from FileSystem's cache.
389+
fs.cancelDeleteOnExit(path)
390+
}
391+
}
386392
} catch {
387393
case NonFatal(e) =>
388394
logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ package org.apache.spark.sql.hive.execution
2020
import java.io.{File, PrintWriter}
2121
import java.nio.charset.StandardCharsets
2222
import java.sql.{Date, Timestamp}
23+
import java.util.Set
2324

2425
import scala.sys.process.{Process, ProcessLogger}
2526
import scala.util.Try
2627

2728
import com.google.common.io.Files
28-
import org.apache.hadoop.fs.Path
29+
import org.apache.hadoop.fs.{FileSystem, Path}
2930

3031
import org.apache.spark.sql._
3132
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -2031,4 +2032,22 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
20312032
checkAnswer(table.filter($"p" === "p1\" and q=\"q1").select($"a"), Row(4))
20322033
}
20332034
}
2035+
2036+
test("SPARK-21721: Clear FileSystem deleterOnExit cache if path is successfully removed") {
2037+
withTable("test21721") {
2038+
val deleteOnExitField = classOf[FileSystem].getDeclaredField("deleteOnExit")
2039+
deleteOnExitField.setAccessible(true)
2040+
2041+
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
2042+
val setOfPath = deleteOnExitField.get(fs).asInstanceOf[Set[Path]]
2043+
2044+
val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF()
2045+
sql("CREATE TABLE test21721 (key INT, value STRING)")
2046+
val pathSizeToDeleteOnExit = setOfPath.size()
2047+
2048+
(0 to 10).foreach(_ => testData.write.mode(SaveMode.Append).insertInto("test1"))
2049+
2050+
assert(setOfPath.size() == pathSizeToDeleteOnExit)
2051+
}
2052+
}
20342053
}

0 commit comments

Comments
 (0)