Skip to content

Commit 8b67fee

Browse files
committed
[SPARK-32480] Support insert overwrite to move data to trash
1 parent eb74d55 commit 8b67fee

File tree

4 files changed

+40
-6
lines changed

4 files changed

+40
-6
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ import com.google.common.net.InetAddresses
5050
import org.apache.commons.codec.binary.Hex
5151
import org.apache.commons.lang3.SystemUtils
5252
import org.apache.hadoop.conf.Configuration
53-
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
53+
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path, Trash}
5454
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
5555
import org.apache.hadoop.security.UserGroupInformation
5656
import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -269,6 +269,20 @@ private[spark] object Utils extends Logging {
269269
file.setExecutable(true, true)
270270
}
271271

272+
def moveToTrashIfEnabled(
273+
fs: FileSystem,
274+
partitionPath: Path,
275+
trashInterval: Int,
276+
hadoopConf: Configuration): Unit = {
277+
if (trashInterval < 0) {
278+
fs.delete(partitionPath, true)
279+
} else {
280+
logDebug(s"will move data ${partitionPath.toString} to trash")
281+
hadoopConf.setInt("fs.trash.interval", trashInterval)
282+
Trash.moveToAppropriateTrash(fs, partitionPath, hadoopConf)
283+
}
284+
}
285+
272286
/**
273287
* Create a directory given the abstract pathname
274288
* @return true, if the directory is successfully created; otherwise, return false.

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2701,6 +2701,13 @@ object SQLConf {
27012701
.booleanConf
27022702
.createWithDefault(false)
27032703

2704+
val INSERT_OVERWRITE_TRASH_INTERVAL =
2705+
buildConf("spark.insertOverwrite.trash.interval")
2706+
.doc("This Configuration will decide whether move files to trash on insert overwrite" +
2707+
"If -1 files will be deleted without moving to trash")
2708+
.intConf
2709+
.createWithDefault(-1)
2710+
27042711
/**
27052712
* Holds information about keys that have been deprecated.
27062713
*
@@ -3311,6 +3318,8 @@ class SQLConf extends Serializable with Logging {
33113318
def optimizeNullAwareAntiJoin: Boolean =
33123319
getConf(SQLConf.OPTIMIZE_NULL_AWARE_ANTI_JOIN)
33133320

3321+
def insertOverwriteTrashInterval: Int = getConf(SQLConf.INSERT_OVERWRITE_TRASH_INTERVAL)
3322+
33143323
/** ********************** SQLConf functionality methods ************ */
33153324

33163325
/** Set Spark SQL configuration properties. */

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3131
import org.apache.spark.sql.execution.SparkPlan
3232
import org.apache.spark.sql.hive.client.HiveClientImpl
3333
import org.apache.spark.sql.util.SchemaUtils
34+
import org.apache.spark.util.Utils
3435

3536
/**
3637
* Command for writing the results of `query` to file system.
@@ -108,8 +109,11 @@ case class InsertIntoHiveDirCommand(
108109
outputLocation = tmpPath.toString)
109110

110111
if (overwrite && fs.exists(writeToPath)) {
112+
val trashInterval = sparkSession.sessionState.conf.insertOverwriteTrashInterval
111113
fs.listStatus(writeToPath).foreach { existFile =>
112-
if (Option(existFile.getPath) != createdTempDir) fs.delete(existFile.getPath, true)
114+
if (Option(existFile.getPath) != createdTempDir) {
115+
Utils.moveToTrashIfEnabled(fs, existFile.getPath, trashInterval, hadoopConf)
116+
}
113117
}
114118
}
115119

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.execution
2020
import java.util.Locale
2121

2222
import org.apache.hadoop.conf.Configuration
23-
import org.apache.hadoop.fs.Path
23+
import org.apache.hadoop.fs.{Path, Trash}
2424
import org.apache.hadoop.hive.ql.ErrorMsg
2525
import org.apache.hadoop.hive.ql.plan.TableDesc
2626

@@ -309,9 +309,16 @@ case class InsertIntoHiveTable(
309309
partitionPath.foreach { path =>
310310
val fs = path.getFileSystem(hadoopConf)
311311
if (fs.exists(path)) {
312-
if (!fs.delete(path, true)) {
313-
throw new RuntimeException(
314-
s"Cannot remove partition directory '$path'")
312+
val trashInterval = sparkSession.sessionState.conf.insertOverwriteTrashInterval
313+
if (trashInterval < 0 ) {
314+
if (!fs.delete(path, true)) {
315+
throw new RuntimeException(
316+
s"Cannot remove partition directory '$path'")
317+
}
318+
} else {
319+
logDebug(s"will move data ${path.toString} to trash")
320+
hadoopConf.setInt("fs.trash.interval", trashInterval)
321+
Trash.moveToAppropriateTrash(fs, path, hadoopConf)
315322
}
316323
// Don't let Hive do overwrite operation since it is slower.
317324
doHiveOverwrite = false

0 commit comments

Comments
 (0)