Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ private[spark] object Utils extends Logging {
}

/**
* Move data to trash if 'spark.sql.truncate.trash.enabled' is true, else
* Move data to trash if 'spark.sql.trash.enabled' is true, else
* delete the data permanently. If move data to trash failed fallback to hard deletion.
*/
def moveToTrashOrDelete(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2732,14 +2732,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val TRUNCATE_TRASH_ENABLED =
buildConf("spark.sql.truncate.trash.enabled")
.doc("This configuration decides when truncating table, whether data files will be moved " +
"to trash directory or deleted permanently. The trash retention time is controlled by " +
"'fs.trash.interval', and in default, the server side configuration value takes " +
"precedence over the client-side one. Note that if 'fs.trash.interval' is non-positive, " +
"this will be a no-op and log a warning message. If the data fails to be moved to " +
"trash, Spark will turn to delete it permanently.")
val TRASH_ENABLED =
buildConf("spark.sql.trash.enabled")
.doc("This configuration decides when truncating table and insert overwrite, whether data " +
"files will be moved to trash directory or deleted permanently. The trash retention " +
"time is controlled by 'fs.trash.interval', and in default, the server side " +
"configuration value takes precedence over the client-side one. Note that if " +
"'fs.trash.interval' is non-positive, this will be a no-op and log a warning message. " +
"If the data fails to be moved to trash, Spark will turn to delete it permanently.")
.version("3.1.0")
.booleanConf
.createWithDefault(false)
Expand Down Expand Up @@ -3362,7 +3362,7 @@ class SQLConf extends Serializable with Logging {

def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR)

def truncateTrashEnabled: Boolean = getConf(SQLConf.TRUNCATE_TRASH_ENABLED)
def trashEnabled: Boolean = getConf(SQLConf.TRASH_ENABLED)

/** ********************** SQLConf functionality methods ************ */

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ case class TruncateTableCommand(
}
val hadoopConf = spark.sessionState.newHadoopConf()
val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl
val isTrashEnabled = SQLConf.get.truncateTrashEnabled
val isTrashEnabled = SQLConf.get.trashEnabled
locations.foreach { location =>
if (location.isDefined) {
val path = new Path(location.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3105,7 +3105,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
test("SPARK-32481 Move data to trash on truncate table if enabled") {
val trashIntervalKey = "fs.trash.interval"
withTable("tab1") {
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {
withSQLConf(SQLConf.TRASH_ENABLED.key -> "true") {
sql("CREATE TABLE tab1 (col INT) USING parquet")
sql("INSERT INTO tab1 SELECT 1")
// scalastyle:off hadoopconfiguration
Expand Down Expand Up @@ -3134,7 +3134,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
test("SPARK-32481 delete data permanently on truncate table if trash interval is non-positive") {
val trashIntervalKey = "fs.trash.interval"
withTable("tab1") {
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {
withSQLConf(SQLConf.TRASH_ENABLED.key -> "true") {
sql("CREATE TABLE tab1 (col INT) USING parquet")
sql("INSERT INTO tab1 SELECT 1")
// scalastyle:off hadoopconfiguration
Expand All @@ -3161,7 +3161,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {

test("SPARK-32481 Do not move data to trash on truncate table if disabled") {
withTable("tab1") {
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "false") {
withSQLConf(SQLConf.TRASH_ENABLED.key -> "false") {
sql("CREATE TABLE tab1 (col INT) USING parquet")
sql("INSERT INTO tab1 SELECT 1")
val hadoopConf = spark.sessionState.newHadoopConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.Utils

/**
* Command for writing the results of `query` to file system.
Expand Down Expand Up @@ -108,8 +109,11 @@ case class InsertIntoHiveDirCommand(
outputLocation = tmpPath.toString)

if (overwrite && fs.exists(writeToPath)) {
val isTrashEnabled = sparkSession.sessionState.conf.trashEnabled
fs.listStatus(writeToPath).foreach { existFile =>
if (Option(existFile.getPath) != createdTempDir) fs.delete(existFile.getPath, true)
if (Option(existFile.getPath) != createdTempDir) {
Utils.moveToTrashOrDelete(fs, existFile.getPath, isTrashEnabled, hadoopConf)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am doing this too. this pr (such as INSERT OVERWRITE DIRECTION part ) can avoid our user write wrong dir path that have data (such as DB's path, happened before).
Move data to trash can make recovery of production data faster in the event of such a disaster.
FYI @maropu

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.execution
import java.util.Locale

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{Path, Trash}
import org.apache.hadoop.hive.ql.ErrorMsg
import org.apache.hadoop.hive.ql.plan.TableDesc

Expand Down Expand Up @@ -309,9 +309,23 @@ case class InsertIntoHiveTable(
partitionPath.foreach { path =>
val fs = path.getFileSystem(hadoopConf)
if (fs.exists(path)) {
if (!fs.delete(path, true)) {
throw new RuntimeException(
s"Cannot remove partition directory '$path'")
val isTrashEnabled = sparkSession.sessionState.conf.trashEnabled
if (!isTrashEnabled) {
if (!fs.delete(path, true)) {
throw new RuntimeException(
s"Cannot remove partition directory '$path'")
}
} else {
logDebug(s"Try to move data ${path.toString} to trash")
val isSuccess = Trash.moveToAppropriateTrash(fs, path, hadoopConf)
if (!isSuccess) {
logWarning(s"Failed to move data ${path.toString} to trash " +
"fallback to hard deletion")
if (!fs.delete(path, true)) {
throw new RuntimeException(
s"Cannot remove partition directory '$path'")
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are too many case

  1. Hive version > 2.0 & Hive version < 2.9
  2. insert overwrite table
  3. dynamic partition insert

}
// Don't let Hive do overwrite operation since it is slower.
doHiveOverwrite = false
Expand Down