Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import com.google.common.net.InetAddresses
import org.apache.commons.codec.binary.Hex
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path, Trash}
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
Expand Down Expand Up @@ -269,6 +269,27 @@ private[spark] object Utils extends Logging {
file.setExecutable(true, true)
}

/**
* Move data to trash if 'spark.sql.truncate.trash.enabled' is true
*/
def moveToTrashIfEnabled(
fs: FileSystem,
partitionPath: Path,
isTrashEnabled: Boolean,
hadoopConf: Configuration): Boolean = {
if (isTrashEnabled) {
logDebug(s"will move data ${partitionPath.toString} to trash")
val isSuccess = Trash.moveToAppropriateTrash(fs, partitionPath, hadoopConf)
if (!isSuccess) {
logWarning(s"Failed to move data ${partitionPath.toString} to trash")
return fs.delete(partitionPath, true)
}
isSuccess
} else {
fs.delete(partitionPath, true)
}
}

/**
* Create a directory given the abstract pathname
* @return true, if the directory is successfully created; otherwise, return false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2722,6 +2722,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val TRUNCATE_TRASH_ENABLED =
buildConf("spark.sql.truncate.trash.enabled")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quick question, do we want to have each configuration for each operation? Looks like #29319 targets similar stuff. Maybe it'd make more sense to have a global configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will rework on #29319 and make it a global configuration.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. It's too early to make it a global configuration.

.doc("This configuration decides when truncating table, whether data files will be moved " +
"to trash directory or deleted permanently. The trash retention time is controlled by " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Spark SQL have the trash directory ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, It is hdfs trash directory

"fs.trash.interval, and in default, the server side configuration value takes " +
"precedence over the client-side one. Note that if fs.trash.interval is non-positive, " +
"this will be a no-op and log a warning message.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question. Is the following still true?

Note that if fs.trash.interval is non-positive, this will be a no-op and log a warning message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is still true.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is because when fs.trash.interval is not positive, Hadoop side will consider trash as disabled and will not delete the data. See here. Currently this just logs a warning but we could consider another flag to hard delete the data instead.

.version("3.1.0")
.booleanConf
.createWithDefault(false)

/**
* Holds information about keys that have been deprecated.
*
Expand Down Expand Up @@ -3334,6 +3345,8 @@ class SQLConf extends Serializable with Logging {

def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR)

def truncateTrashEnabled: Boolean = getConf(SQLConf.TRUNCATE_TRASH_ENABLED)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.Utils

/**
* A command to create a table with the same definition of the given existing table.
Expand Down Expand Up @@ -489,6 +490,7 @@ case class TruncateTableCommand(
}
val hadoopConf = spark.sessionState.newHadoopConf()
val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl
val isTrashEnabled = SQLConf.get.truncateTrashEnabled
locations.foreach { location =>
if (location.isDefined) {
val path = new Path(location.get)
Expand All @@ -513,7 +515,7 @@ case class TruncateTableCommand(
}
}

fs.delete(path, true)
Utils.moveToTrashIfEnabled(fs, path, isTrashEnabled, hadoopConf)

// We should keep original permission/acl of the path.
// For owner/group, only super-user can set it, for example on HDFS. Because
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3101,6 +3101,78 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
assert(spark.sessionState.catalog.isRegisteredFunction(rand))
}
}

test("SPARK-32481 Move data to trash on truncate table if enabled") {
val trashIntervalKey = "fs.trash.interval"
withTable("tab1") {
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {
sql("CREATE TABLE tab1 (col INT) USING parquet")
sql("INSERT INTO tab1 SELECT 1")
// scalastyle:off hadoopconfiguration
val hadoopConf = spark.sparkContext.hadoopConfiguration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sessionState.newHadoopConf()?

// scalastyle:on hadoopconfiguration
val originalValue = hadoopConf.get(trashIntervalKey, "0")
val tablePath = new Path(spark.sessionState.catalog
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)

val fs = tablePath.getFileSystem(hadoopConf)
val trashRoot = fs.getTrashRoot(tablePath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the FS support this operation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes a default impl is defined in FileSystem which calls getHomeDirectory implemented in the same class.

Even though it is supported, it seems the trash mechanism is less useful in cloud object stores like S3, where renaming doesn't exist and therefore moving to trash is much more expensive. However user can disable that by the configs given here and in Hadoop itself.

assert(!fs.exists(trashRoot))
try {
hadoopConf.set(trashIntervalKey, "5")
sql("TRUNCATE TABLE tab1")
} finally {
hadoopConf.set(trashIntervalKey, originalValue)
}
assert(fs.exists(trashRoot))
fs.delete(trashRoot, true)
}
}
}

test("SPARK-32481 delete data permanently on truncate table if trash interval is non-positive") {
val trashIntervalKey = "fs.trash.interval"
withTable("tab1") {
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {
sql("CREATE TABLE tab1 (col INT) USING parquet")
sql("INSERT INTO tab1 SELECT 1")
// scalastyle:off hadoopconfiguration
val hadoopConf = spark.sparkContext.hadoopConfiguration
Copy link
Member

@dongjoon-hyun dongjoon-hyun Aug 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use spark.sessionState.newHadoopConf()? Then, we can remove scalastyle:off hadoopConfiguration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to use 'spark.sessionState.newHadoopConf()' but the hadoop conf was not reflected, so ''moveToAppropriateTrash' returns false as 'fs.trash.interval' is 0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other places are the same. Let's try to following the Scalastyle warning if possible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, should we change this underlying one?

// scalastyle:on hadoopconfiguration
val originalValue = hadoopConf.get(trashIntervalKey, "0")
val tablePath = new Path(spark.sessionState.catalog
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)

val fs = tablePath.getFileSystem(hadoopConf)
val trashRoot = fs.getTrashRoot(tablePath)
assert(!fs.exists(trashRoot))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Udbhav30 This line of code is not Mac os friendly, the trashRoot is /Users/xxx/.Trash/, it is the path to the trash can of Mac os. So normally, it exists...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @LuciferYang for pointing it out, i will raise follow-up PR and assert the particular folder that is trashRoot/pathToTable/tab1 in this case instead of trashRoot

try {
hadoopConf.set(trashIntervalKey, "0")
sql("TRUNCATE TABLE tab1")
} finally {
hadoopConf.set(trashIntervalKey, originalValue)
}
assert(!fs.exists(trashRoot))
}
}
}

test("SPARK-32481 Do not move data to trash on truncate table if disabled") {
withTable("tab1") {
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "false") {
sql("CREATE TABLE tab1 (col INT) USING parquet")
sql("INSERT INTO tab1 SELECT 1")
val hadoopConf = spark.sessionState.newHadoopConf()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the other tests, this PR is using spark.sparkContext.hadoopConfiguration.
If that is required, this test case looks misleading, withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "false") { is required here? I'm wondering if this test case passed with withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {, too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hii, In this test we did not update the hadoopConf, so using spark.sessionState.newHadoopConf() doesn't make any difference

Copy link
Contributor Author

@Udbhav30 Udbhav30 Aug 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun See here. If fs.trash.interval is non positive then moveToAppropriateTrash function returns false. So to test this I have to add positive value to fs.trash.interval, but spark.sessionState.newHadoopConf() does not update the hadoopConf and so other testcase fails. And here this testcase is no-op so updating the hadoopConf is not required so I used spark.sessionState.newHadoopConf()

val tablePath = new Path(spark.sessionState.catalog
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)

val fs = tablePath.getFileSystem(hadoopConf)
val trashRoot = fs.getTrashRoot(tablePath)
sql("TRUNCATE TABLE tab1")
assert(!fs.exists(trashRoot))
}
}
}
}

object FakeLocalFsFileSystem {
Expand Down