From 6cf355a1268857e8039bb52fb5a86ecf684234b3 Mon Sep 17 00:00:00 2001 From: Udbhav30 Date: Tue, 25 Aug 2020 23:38:43 -0700 Subject: [PATCH 1/4] [SPARK-32481][CORE][SQL] Support truncate table to move data to trash Instead of deleting the data, we can move the data to trash. Based on the configuration provided by the user it will be deleted permanently from the trash. Instead of directly deleting the data, we can provide flexibility to move data to the trash and then delete it permanently. Yes, After truncate table the data is not permanently deleted now. It is first moved to the trash and then after the given time deleted permanently; new UTs added Closes #29387 from Udbhav30/tuncateTrash. Authored-by: Udbhav30 Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/util/Utils.scala | 23 +++++- .../apache/spark/sql/internal/SQLConf.scala | 13 ++++ .../spark/sql/execution/command/tables.scala | 4 +- .../sql/execution/command/DDLSuite.scala | 75 +++++++++++++++++++ 4 files changed, 113 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 35d60bb514405..a336c1260d344 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -50,7 +50,7 @@ import com.google.common.net.InetAddresses import org.apache.commons.codec.binary.Hex import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} +import org.apache.hadoop.fs.{FileSystem, FileUtil, Path, Trash} import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration @@ -269,6 +269,27 @@ private[spark] object Utils extends Logging { file.setExecutable(true, true) } + /** + * Move data to trash if 'spark.sql.truncate.trash.enabled' is true + */ + def moveToTrashIfEnabled( + fs: FileSystem, + partitionPath: Path, + isTrashEnabled: Boolean, + hadoopConf: Configuration): Boolean = { + if (isTrashEnabled) { + logDebug(s"will move data ${partitionPath.toString} to trash") + val isSuccess = Trash.moveToAppropriateTrash(fs, partitionPath, hadoopConf) + if (!isSuccess) { + logWarning(s"Failed to move data ${partitionPath.toString} to trash") + return fs.delete(partitionPath, true) + } + isSuccess + } else { + fs.delete(partitionPath, true) + } + } + /** * Create a directory given the abstract pathname * @return true, if the directory is successfully created; otherwise, return false. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3e82b8e12df02..c9db7b1e8960a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2722,6 +2722,17 @@ object SQLConf { .booleanConf .createWithDefault(false) + val TRUNCATE_TRASH_ENABLED = + buildConf("spark.sql.truncate.trash.enabled") + .doc("This configuration decides when truncating table, whether data files will be moved " + + "to trash directory or deleted permanently. The trash retention time is controlled by " + + "fs.trash.interval, and in default, the server side configuration value takes " + + "precedence over the client-side one. Note that if fs.trash.interval is non-positive, " + + "this will be a no-op and log a warning message.") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + /** * Holds information about keys that have been deprecated. * @@ -3334,6 +3345,8 @@ class SQLConf extends Serializable with Logging { def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR) + def truncateTrashEnabled: Boolean = getConf(SQLConf.TRUNCATE_TRASH_ENABLED) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 7aebdddf1d59c..7aebdd7e57293 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -48,6 +48,7 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.util.Utils /** * A command to create a table with the same definition of the given existing table. @@ -489,6 +490,7 @@ case class TruncateTableCommand( } val hadoopConf = spark.sessionState.newHadoopConf() val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl + val isTrashEnabled = SQLConf.get.truncateTrashEnabled locations.foreach { location => if (location.isDefined) { val path = new Path(location.get) @@ -513,7 +515,7 @@ case class TruncateTableCommand( } } - fs.delete(path, true) + Utils.moveToTrashIfEnabled(fs, path, isTrashEnabled, hadoopConf) // We should keep original permission/acl of the path. // For owner/group, only super-user can set it, for example on HDFS. Because diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 17857a6ce173d..b8ac5079b7745 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -3101,6 +3101,81 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { assert(spark.sessionState.catalog.isRegisteredFunction(rand)) } } + + test("SPARK-32481 Move data to trash on truncate table if enabled") { + val trashIntervalKey = "fs.trash.interval" + withTable("tab1") { + withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") { + sql("CREATE TABLE tab1 (col INT) USING parquet") + sql("INSERT INTO tab1 SELECT 1") + // scalastyle:off hadoopconfiguration + val hadoopConf = spark.sparkContext.hadoopConfiguration + // scalastyle:on hadoopconfiguration + val originalValue = hadoopConf.get(trashIntervalKey, "0") + val tablePath = new Path(spark.sessionState.catalog + .getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get) + + val fs = tablePath.getFileSystem(hadoopConf) + val trashCurrent = new Path(fs.getHomeDirectory, ".Trash/Current") + val trashPath = Path.mergePaths(trashCurrent, tablePath) + assert(!fs.exists(trashPath)) + try { + hadoopConf.set(trashIntervalKey, "5") + sql("TRUNCATE TABLE tab1") + } finally { + hadoopConf.set(trashIntervalKey, originalValue) + } + assert(fs.exists(trashPath)) + fs.delete(trashPath, true) + } + } + } + + test("SPARK-32481 delete data permanently on truncate table if trash interval is non-positive") { + val trashIntervalKey = "fs.trash.interval" + withTable("tab1") { + withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") { + sql("CREATE TABLE tab1 (col INT) USING parquet") + sql("INSERT INTO tab1 SELECT 1") + // scalastyle:off hadoopconfiguration + val hadoopConf = spark.sparkContext.hadoopConfiguration + // scalastyle:on hadoopconfiguration + val originalValue = hadoopConf.get(trashIntervalKey, "0") + val tablePath = new Path(spark.sessionState.catalog + .getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get) + + val fs = tablePath.getFileSystem(hadoopConf) + val trashCurrent = new Path(fs.getHomeDirectory, ".Trash/Current") + val trashPath = Path.mergePaths(trashCurrent, tablePath) + assert(!fs.exists(trashPath)) + try { + hadoopConf.set(trashIntervalKey, "0") + sql("TRUNCATE TABLE tab1") + } finally { + hadoopConf.set(trashIntervalKey, originalValue) + } + assert(!fs.exists(trashPath)) + } + } + } + + test("SPARK-32481 Do not move data to trash on truncate table if disabled") { + withTable("tab1") { + withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "false") { + sql("CREATE TABLE tab1 (col INT) USING parquet") + sql("INSERT INTO tab1 SELECT 1") + val hadoopConf = spark.sessionState.newHadoopConf() + val tablePath = new Path(spark.sessionState.catalog + .getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get) + + val fs = tablePath.getFileSystem(hadoopConf) + val trashCurrent = new Path(fs.getHomeDirectory, ".Trash/Current") + val trashPath = Path.mergePaths(trashCurrent, tablePath) + sql("TRUNCATE TABLE tab1") + assert(!fs.exists(trashPath)) + } + } + } } object FakeLocalFsFileSystem { From 47075dee4d22d0617b59121168c1a72498f3051d Mon Sep 17 00:00:00 2001 From: Udbhav30 Date: Thu, 27 Aug 2020 01:18:20 +0530 Subject: [PATCH 2/4] Handle review comments --- core/src/main/scala/org/apache/spark/util/Utils.scala | 6 ++++-- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a336c1260d344..2bac45637f70b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -270,7 +270,8 @@ private[spark] object Utils extends Logging { } /** - * Move data to trash if 'spark.sql.truncate.trash.enabled' is true + * Move data to trash if 'spark.sql.truncate.trash.enabled' is true, else + * delete the data permanently. If move data to trash failed fallback to hard deletion. */ def moveToTrashIfEnabled( fs: FileSystem, @@ -281,7 +282,8 @@ private[spark] object Utils extends Logging { logDebug(s"will move data ${partitionPath.toString} to trash") val isSuccess = Trash.moveToAppropriateTrash(fs, partitionPath, hadoopConf) if (!isSuccess) { - logWarning(s"Failed to move data ${partitionPath.toString} to trash") + logWarning(s"Failed to move data ${partitionPath.toString} to trash. " + + "Fallback to hard deletion") return fs.delete(partitionPath, true) } isSuccess diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c9db7b1e8960a..c59b1e8010375 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2728,7 +2728,8 @@ object SQLConf { "to trash directory or deleted permanently. The trash retention time is controlled by " + "fs.trash.interval, and in default, the server side configuration value takes " + "precedence over the client-side one. Note that if fs.trash.interval is non-positive, " + - "this will be a no-op and log a warning message.") + "this will be a no-op and log a warning message. If the data fails to be moved to " + + "trash, Spark will turn to delete it permanently.") .version("3.1.0") .booleanConf .createWithDefault(false) From 1062cb999ba13f839f71472a013ad5dde7da156a Mon Sep 17 00:00:00 2001 From: Udbhav30 Date: Thu, 27 Aug 2020 01:40:09 +0530 Subject: [PATCH 3/4] Handle review comments --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- .../scala/org/apache/spark/sql/execution/command/tables.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 2bac45637f70b..a70f3d5d9319d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -273,7 +273,7 @@ private[spark] object Utils extends Logging { * Move data to trash if 'spark.sql.truncate.trash.enabled' is true, else * delete the data permanently. If move data to trash failed fallback to hard deletion. */ - def moveToTrashIfEnabled( + def moveToTrashOrDelete( fs: FileSystem, partitionPath: Path, isTrashEnabled: Boolean, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 7aebdd7e57293..f94c9712a31cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -515,7 +515,7 @@ case class TruncateTableCommand( } } - Utils.moveToTrashIfEnabled(fs, path, isTrashEnabled, hadoopConf) + Utils.moveToTrashOrDelete(fs, path, isTrashEnabled, hadoopConf) // We should keep original permission/acl of the path. // For owner/group, only super-user can set it, for example on HDFS. Because From 2dd78a4b2793356f2a66bec9f67e0b4aa80d3aa9 Mon Sep 17 00:00:00 2001 From: Udbhav30 Date: Sun, 30 Aug 2020 12:56:25 +0530 Subject: [PATCH 4/4] minor comment fixes --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a70f3d5d9319d..b8b044bbad30e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -279,7 +279,7 @@ private[spark] object Utils extends Logging { isTrashEnabled: Boolean, hadoopConf: Configuration): Boolean = { if (isTrashEnabled) { - logDebug(s"will move data ${partitionPath.toString} to trash") + logDebug(s"Try to move data ${partitionPath.toString} to trash") val isSuccess = Trash.moveToAppropriateTrash(fs, partitionPath, hadoopConf) if (!isSuccess) { logWarning(s"Failed to move data ${partitionPath.toString} to trash. " + diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c59b1e8010375..8bc707c81d8c0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2726,8 +2726,8 @@ object SQLConf { buildConf("spark.sql.truncate.trash.enabled") .doc("This configuration decides when truncating table, whether data files will be moved " + "to trash directory or deleted permanently. The trash retention time is controlled by " + - "fs.trash.interval, and in default, the server side configuration value takes " + - "precedence over the client-side one. Note that if fs.trash.interval is non-positive, " + + "'fs.trash.interval', and in default, the server side configuration value takes " + + "precedence over the client-side one. Note that if 'fs.trash.interval' is non-positive, " + "this will be a no-op and log a warning message. If the data fails to be moved to " + "trash, Spark will turn to delete it permanently.") .version("3.1.0")