diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index b6f4898fd1574..858f29c7ee530 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -423,7 +423,13 @@ case class InsertIntoHiveTable( // Attempt to delete the staging directory and the inclusive files. If failed, the files are // expected to be dropped at the normal termination of VM since deleteOnExit is used. try { - createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) } + createdTempDir.foreach { path => + val fs = path.getFileSystem(hadoopConf) + if (fs.delete(path, true)) { + // If we successfully delete the staging directory, remove it from FileSystem's cache. + fs.cancelDeleteOnExit(path) + } + } } catch { case NonFatal(e) => logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index a949e5e829e14..45bbb0c674be3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -20,10 +20,10 @@ package org.apache.spark.sql.hive.execution import java.io.File import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import java.util.Locale +import java.util.{Locale, Set} import com.google.common.io.Files -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.TestUtils import org.apache.spark.sql._ @@ -2021,4 +2021,22 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { checkAnswer(table.filter($"p" === "p1\" and q=\"q1").select($"a"), Row(4)) } } + + test("SPARK-21721: Clear FileSystem deleterOnExit cache if path is successfully removed") { + withTable("test21721") { + val deleteOnExitField = classOf[FileSystem].getDeclaredField("deleteOnExit") + deleteOnExitField.setAccessible(true) + + val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) + val setOfPath = deleteOnExitField.get(fs).asInstanceOf[Set[Path]] + + val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF() + sql("CREATE TABLE test21721 (key INT, value STRING)") + val pathSizeToDeleteOnExit = setOfPath.size() + + (0 to 10).foreach(_ => testData.write.mode(SaveMode.Append).insertInto("test1")) + + assert(setOfPath.size() == pathSizeToDeleteOnExit) + } + } }