From 6be846b8e6ec61c0b496d688de18db4a2a787f9a Mon Sep 17 00:00:00 2001 From: Udbhav Agrawal Date: Fri, 31 Jul 2020 17:17:29 +0530 Subject: [PATCH] [SPARK-32480] Support insert overwrite to move data to trash --- .../scala/org/apache/spark/util/Utils.scala | 2 +- .../apache/spark/sql/internal/SQLConf.scala | 18 +++++++-------- .../spark/sql/execution/command/tables.scala | 2 +- .../sql/execution/command/DDLSuite.scala | 6 ++--- .../execution/InsertIntoHiveDirCommand.scala | 6 ++++- .../hive/execution/InsertIntoHiveTable.scala | 22 +++++++++++++++---- 6 files changed, 37 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index b8b044bbad30e..c1fa2fecf7441 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -270,7 +270,7 @@ private[spark] object Utils extends Logging { } /** - * Move data to trash if 'spark.sql.truncate.trash.enabled' is true, else + * Move data to trash if 'spark.sql.trash.enabled' is true, else * delete the data permanently. If move data to trash failed fallback to hard deletion. */ def moveToTrashOrDelete( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index dca421a09da62..1247d9ac2de8c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2732,14 +2732,14 @@ object SQLConf { .booleanConf .createWithDefault(false) - val TRUNCATE_TRASH_ENABLED = - buildConf("spark.sql.truncate.trash.enabled") - .doc("This configuration decides when truncating table, whether data files will be moved " + - "to trash directory or deleted permanently. The trash retention time is controlled by " + - "'fs.trash.interval', and in default, the server side configuration value takes " + - "precedence over the client-side one. Note that if 'fs.trash.interval' is non-positive, " + - "this will be a no-op and log a warning message. If the data fails to be moved to " + - "trash, Spark will turn to delete it permanently.") + val TRASH_ENABLED = + buildConf("spark.sql.trash.enabled") + .doc("This configuration decides when truncating table and insert overwrite, whether data " + + "files will be moved to trash directory or deleted permanently. The trash retention " + + "time is controlled by 'fs.trash.interval', and in default, the server side " + + "configuration value takes precedence over the client-side one. Note that if " + + "'fs.trash.interval' is non-positive, this will be a no-op and log a warning message. " + + "If the data fails to be moved to trash, Spark will turn to delete it permanently.") .version("3.1.0") .booleanConf .createWithDefault(false) @@ -3362,7 +3362,7 @@ class SQLConf extends Serializable with Logging { def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR) - def truncateTrashEnabled: Boolean = getConf(SQLConf.TRUNCATE_TRASH_ENABLED) + def trashEnabled: Boolean = getConf(SQLConf.TRASH_ENABLED) /** ********************** SQLConf functionality methods ************ */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index f94c9712a31cc..cefa8083a0ee3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -490,7 +490,7 @@ case class TruncateTableCommand( } val hadoopConf = spark.sessionState.newHadoopConf() val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl - val isTrashEnabled = SQLConf.get.truncateTrashEnabled + val isTrashEnabled = SQLConf.get.trashEnabled locations.foreach { location => if (location.isDefined) { val path = new Path(location.get) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index b8ac5079b7745..19d409f3acc11 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -3105,7 +3105,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { test("SPARK-32481 Move data to trash on truncate table if enabled") { val trashIntervalKey = "fs.trash.interval" withTable("tab1") { - withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") { + withSQLConf(SQLConf.TRASH_ENABLED.key -> "true") { sql("CREATE TABLE tab1 (col INT) USING parquet") sql("INSERT INTO tab1 SELECT 1") // scalastyle:off hadoopconfiguration @@ -3134,7 +3134,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { test("SPARK-32481 delete data permanently on truncate table if trash interval is non-positive") { val trashIntervalKey = "fs.trash.interval" withTable("tab1") { - withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") { + withSQLConf(SQLConf.TRASH_ENABLED.key -> "true") { sql("CREATE TABLE tab1 (col INT) USING parquet") sql("INSERT INTO tab1 SELECT 1") // scalastyle:off hadoopconfiguration @@ -3161,7 +3161,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { test("SPARK-32481 Do not move data to trash on truncate table if disabled") { withTable("tab1") { - withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "false") { + withSQLConf(SQLConf.TRASH_ENABLED.key -> "false") { sql("CREATE TABLE tab1 (col INT) USING parquet") sql("INSERT INTO tab1 SELECT 1") val hadoopConf = spark.sessionState.newHadoopConf() diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala index b66c302a7d7ea..adf3ba76e3ebb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.util.Utils /** * Command for writing the results of `query` to file system. @@ -108,8 +109,11 @@ case class InsertIntoHiveDirCommand( outputLocation = tmpPath.toString) if (overwrite && fs.exists(writeToPath)) { + val isTrashEnabled = sparkSession.sessionState.conf.trashEnabled fs.listStatus(writeToPath).foreach { existFile => - if (Option(existFile.getPath) != createdTempDir) fs.delete(existFile.getPath, true) + if (Option(existFile.getPath) != createdTempDir) { + Utils.moveToTrashOrDelete(fs, existFile.getPath, isTrashEnabled, hadoopConf) + } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 3c3f31ac2994a..307a1380e9993 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.execution import java.util.Locale import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{Path, Trash} import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.hive.ql.plan.TableDesc @@ -309,9 +309,23 @@ case class InsertIntoHiveTable( partitionPath.foreach { path => val fs = path.getFileSystem(hadoopConf) if (fs.exists(path)) { - if (!fs.delete(path, true)) { - throw new RuntimeException( - s"Cannot remove partition directory '$path'") + val isTrashEnabled = sparkSession.sessionState.conf.trashEnabled + if (!isTrashEnabled) { + if (!fs.delete(path, true)) { + throw new RuntimeException( + s"Cannot remove partition directory '$path'") + } + } else { + logDebug(s"Try to move data ${path.toString} to trash") + val isSuccess = Trash.moveToAppropriateTrash(fs, path, hadoopConf) + if (!isSuccess) { + logWarning(s"Failed to move data ${path.toString} to trash " + + "fallback to hard deletion") + if (!fs.delete(path, true)) { + throw new RuntimeException( + s"Cannot remove partition directory '$path'") + } + } } // Don't let Hive do overwrite operation since it is slower. doHiveOverwrite = false