From 4850e695fbc31cc1b70c4f05427e500468703b4b Mon Sep 17 00:00:00 2001 From: hujiahua Date: Thu, 13 Jan 2022 16:51:49 +0800 Subject: [PATCH 1/2] [SPARK-37894][SQL] Add trash feature to FileCommitProtocol.deleteWithJob --- .../org/apache/spark/internal/io/FileCommitProtocol.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index 5cd7397ea358f..0ecb1f04417d4 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -178,7 +178,12 @@ abstract class FileCommitProtocol extends Logging { * implementation deletes the file immediately. */ def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean): Boolean = { - fs.delete(path, recursive) + if (fs.getConf.getInt("fs.trash.interval", 0) > 0 && + Trash.moveToAppropriateTrash(fs, path, fs.getConf)) { + true + } else { + fs.delete(path, recursive); + } } /** From 154b09c91194caf68d4f2b38de06c7066d49fdcf Mon Sep 17 00:00:00 2001 From: hujiahua Date: Sat, 19 Feb 2022 13:50:28 +0800 Subject: [PATCH 2/2] add a comment --- .../org/apache/spark/internal/io/FileCommitProtocol.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index 0ecb1f04417d4..0dab7c3876ebb 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -175,7 +175,11 @@ abstract class FileCommitProtocol extends Logging { /** * Specifies that a file should be deleted with the commit of this job. The default - * implementation deletes the file immediately. + * implementation deletes the file immediately or moves file to trash based on whether + * the trash feature is enabled. + * + * See https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/core-default.xml + * for the relevant trash configuration */ def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean): Boolean = { if (fs.getConf.getInt("fs.trash.interval", 0) > 0 &&