diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CloudStoreTrashPolicy.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CloudStoreTrashPolicy.java
new file mode 100644
index 0000000000000..9c510defbf9d8
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CloudStoreTrashPolicy.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.Time;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.bindToDurationTrackerFactory;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.TRASH_CREATE_CHECKPOINT;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.TRASH_DELETE_CHECKPOINT;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.TRASH_MOVE_TO_TRASH;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
+import static org.apache.hadoop.util.functional.RemoteIterators.foreach;
+import static org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFromIterable;
+
+/**
+ * A Cloud Store Trash Policy designed to be resilient to
+ * race conditions and configurable to automatically clean up
+ * the current user's older checkpoints whenever invoked.
+ *
+ *
+ * The duration of trash operations are tracked in
+ * the target FileSystem's statistics, if it is configured
+ * to track these statistics:
+ *
+ * - {@link org.apache.hadoop.fs.statistics.StoreStatisticNames#TRASH_CREATE_CHECKPOINT}
+ * - {@link org.apache.hadoop.fs.statistics.StoreStatisticNames#TRASH_DELETE_CHECKPOINT}
+ * - {@link org.apache.hadoop.fs.statistics.StoreStatisticNames#TRASH_MOVE_TO_TRASH}
+ *
+ */
+public class CloudStoreTrashPolicy extends TrashPolicyDefault {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CloudStoreTrashPolicy.class);
+
+ /**
+ * Configuration option to clean up old trash: {@value}.
+ */
+ public static final String CLEANUP_OLD_CHECKPOINTS = "fs.trash.cleanup.old.checkpoints";
+
+ /**
+ * Default value of {@link #CLEANUP_OLD_CHECKPOINTS}: {@value}.
+ * This still requires the cleanup interval to be > 0.
+ */
+ public static final boolean CLEANUP_OLD_CHECKPOINTS_DEFAULT = true;
+
+ /**
+ * Should old trash be cleaned up?
+ */
+ private boolean cleanupOldTrash;
+
+ /**
+ * Duration tracker if the FS provides one through its statistics.
+ */
+ private DurationTrackerFactory durationTrackerFactory;
+
+ public CloudStoreTrashPolicy() {
+ }
+
+ public boolean cleanupOldTrash() {
+ return cleanupOldTrash;
+ }
+
+ public DurationTrackerFactory getDurationTrackerFactory() {
+ return durationTrackerFactory;
+ }
+
+ /**
+ * Set the duration tracker factory; useful for testing.
+ * @param durationTrackerFactory factory.
+ */
+ public void setDurationTrackerFactory(
+ final DurationTrackerFactory durationTrackerFactory) {
+ this.durationTrackerFactory = requireNonNull(durationTrackerFactory);
+ }
+
+ @Override
+ public void initialize(final Configuration conf, final FileSystem fs) {
+ super.initialize(conf, fs);
+ cleanupOldTrash = getDeletionInterval() > 0
+ && conf.getBoolean(CLEANUP_OLD_CHECKPOINTS,
+ CLEANUP_OLD_CHECKPOINTS_DEFAULT);
+ // get any duration tracker
+ setDurationTrackerFactory(bindToDurationTrackerFactory(fs));
+ }
+
+ @Override
+ public boolean moveToTrash(Path path) throws IOException {
+ if (!isEnabled()) {
+ return false;
+ }
+ boolean moved;
+
+ if (!path.isAbsolute()) {
+ // make path absolute
+ path = new Path(fs.getWorkingDirectory(), path);
+ }
+ if (!fs.exists(path)) {
+ // path doesn't actually exist.
+ LOG.info("'{} was deleted before it could be moved to trash", path);
+ moved = true;
+ } else {
+
+ try (DurationInfo info = new DurationInfo(LOG, true, "moveToTrash(%s)",
+ path)) {
+
+ // need for lambda expression.
+ Path p = path;
+ moved = invokeTrackingDuration(
+ durationTrackerFactory.trackDuration(TRASH_MOVE_TO_TRASH), () ->
+ super.moveToTrash(p));
+
+ } catch (IOException e) {
+ if (!fs.exists(path)) {
+ // race condition with the trash setup; something else moved it.
+ // note that checking for FNFE is not sufficient as this may occur in
+ // the rename, at which point the exception may get downgraded.
+ LOG.info("'{} was deleted before it could be moved to trash", path);
+ LOG.debug("IOE raised on moveToTrash({})", path, e);
+ // report success
+ moved = true;
+ } else {
+ // source path still exists, so throw the exception and skip cleanup
+ // don't bother trying to cleanup here as it will only complicate
+ // error reporting
+ throw e;
+ }
+ }
+ }
+
+ // add cleanup
+ if (cleanupOldTrash()) {
+ executeTrashCleanup();
+ }
+ return moved;
+ }
+
+ /**
+ * Execute the cleanup.
+ * @throws IOException failure
+ */
+ @VisibleForTesting
+ public void executeTrashCleanup() throws IOException {
+ FileSystem fs = getFileSystem();
+ AtomicLong count = new AtomicLong();
+ long now = Time.now();
+
+ // list the roots, iterate through
+ // expecting only one root for object stores.
+ foreach(
+ remoteIteratorFromIterable(fs.getTrashRoots(false)),
+ trashRoot -> {
+ try {
+ count.addAndGet(deleteCheckpoint(trashRoot.getPath(), false));
+ createCheckpoint(trashRoot.getPath(), new Date(now));
+ } catch (IOException e) {
+ LOG.warn("Trash caught:{} Skipping {}", e, trashRoot.getPath());
+ LOG.debug("Trash caught", e);
+ }
+ });
+ LOG.debug("Cleaned up {} checkpoints", count.get());
+ }
+
+ /**
+ * Delete a checkpoint; update the duration tracker statistics.
+ * @param trashRoot trash root.
+ * @param deleteImmediately should all entries be deleted?
+ * @return a count of delete checkpoints.
+ * @throws IOException outcome
+ */
+ @Override
+ protected int deleteCheckpoint(final Path trashRoot,
+ final boolean deleteImmediately)
+ throws IOException {
+ return invokeTrackingDuration(
+ durationTrackerFactory.trackDuration(TRASH_DELETE_CHECKPOINT),
+ () -> super.deleteCheckpoint(trashRoot, deleteImmediately));
+ }
+
+ /**
+ * Create a checkpoint; update the duration tracker statistics.
+ * @param trashRoot trash root.
+ * @param date date of checkpoint
+ * @throws IOException outcome
+ */
+ @Override
+ protected void createCheckpoint(final Path trashRoot, final Date date)
+ throws IOException {
+ trackDurationOfInvocation(durationTrackerFactory, TRASH_CREATE_CHECKPOINT, () ->
+ super.createCheckpoint(trashRoot, date));
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 0bc419b035380..6bff12cb298a5 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -85,6 +85,8 @@
import org.apache.hadoop.tracing.TraceScope;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.util.functional.ConsumerRaisingIOE;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,6 +94,8 @@
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
+import static org.apache.hadoop.util.functional.RemoteIterators.foreach;
+import static org.apache.hadoop.util.functional.RemoteIterators.mappingRemoteIterator;
/****************************************************************
* An abstract base class for a fairly generic filesystem. It
@@ -3412,23 +3416,29 @@ public Path getTrashRoot(Path path) {
public Collection getTrashRoots(boolean allUsers) {
Path userHome = new Path(getHomeDirectory().toUri().getPath());
List ret = new ArrayList<>();
+ // an operation to look up a path status and add it to the return list
+ ConsumerRaisingIOE addIfFound = (userTrash) -> {
+ try {
+ FileStatus status = getFileStatus(userTrash);
+ ret.add(status);
+ } catch (FileNotFoundException noTrashFound) {
+ // user doesn't have a trash directory
+ }
+ };
try {
if (!allUsers) {
- Path userTrash = new Path(userHome, TRASH_PREFIX);
- if (exists(userTrash)) {
- ret.add(getFileStatus(userTrash));
- }
+ // Single user: return the value off the home dir.
+ addIfFound.accept(new Path(userHome, TRASH_PREFIX));
} else {
- Path homeParent = userHome.getParent();
- if (exists(homeParent)) {
- FileStatus[] candidates = listStatus(homeParent);
- for (FileStatus candidate : candidates) {
- Path userTrash = new Path(candidate.getPath(), TRASH_PREFIX);
- if (exists(userTrash)) {
- candidate.setPath(userTrash);
- ret.add(candidate);
- }
- }
+ // use the iterator for scale/performance improvements against
+ // HDFS and object stores
+ try {
+ foreach(mappingRemoteIterator(
+ listStatusIterator(userHome.getParent()), (candidate) ->
+ new Path(candidate.getPath(), TRASH_PREFIX)),
+ addIfFound);
+ } catch (FileNotFoundException noUserHomeParent) {
+ // no /user
}
}
} catch (IOException e) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicy.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicy.java
index 35e51f9e1cfb1..912deab5cccb9 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicy.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicy.java
@@ -25,6 +25,9 @@
import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* This interface is used for implementing different Trash policies.
* Provides factory method to create instances of the configured Trash policy.
@@ -32,9 +35,34 @@
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class TrashPolicy extends Configured {
- protected FileSystem fs; // the FileSystem
- protected Path trash; // path to trash directory
- protected long deletionInterval; // deletion interval for Emptier
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TrashPolicy.class);
+
+ /**
+ * Global configuration key for the Trash policy classname: {@value}.
+ */
+ public static final String FS_TRASH_CLASSNAME = "fs.trash.classname";
+
+ /**
+ * FS specific configuration key for the Trash policy classname: {@value}.
+ */
+ public static final String FS_TRASH_SCHEMA_CLASSNAME = "fs.%s.trash.classname";
+
+ /**
+ * The FileSystem.
+ */
+ protected FileSystem fs;
+
+ /**
+ * The path to trash directory.
+ */
+ protected Path trash;
+
+ /**
+ * The deletion interval for Emptier.
+ */
+ protected long deletionInterval;
/**
* Used to setup the trash policy. Must be implemented by all TrashPolicy
@@ -55,7 +83,7 @@ public abstract class TrashPolicy extends Configured {
* @param fs the filesystem to be used
*/
public void initialize(Configuration conf, FileSystem fs) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("TrashPolicy");
}
/**
@@ -126,6 +154,30 @@ public Path getCurrentTrashDir(Path path) throws IOException {
*/
public abstract Runnable getEmptier() throws IOException;
+ /**
+ * Get the filesystem
+ * @return the FS
+ */
+ public FileSystem getFileSystem() {
+ return fs;
+ }
+
+ /**
+ * Get the path to trash directory.
+ * @return The path to trash directory.
+ */
+ public Path getTrash() {
+ return trash;
+ }
+
+ /**
+ * The deletion interval for Emptier.
+ * @return The deletion interval for Emptier.
+ */
+ public long getDeletionInterval() {
+ return deletionInterval;
+ }
+
/**
* Get an instance of the configured TrashPolicy based on the value
* of the configuration parameter fs.trash.classname.
@@ -139,23 +191,39 @@ public Path getCurrentTrashDir(Path path) throws IOException {
@Deprecated
public static TrashPolicy getInstance(Configuration conf, FileSystem fs, Path home) {
Class extends TrashPolicy> trashClass = conf.getClass(
- "fs.trash.classname", TrashPolicyDefault.class, TrashPolicy.class);
+ FS_TRASH_CLASSNAME, TrashPolicyDefault.class, TrashPolicy.class);
TrashPolicy trash = ReflectionUtils.newInstance(trashClass, conf);
trash.initialize(conf, fs, home); // initialize TrashPolicy
return trash;
}
/**
- * Get an instance of the configured TrashPolicy based on the value
- * of the configuration parameter fs.trash.classname.
- *
- * @param conf the configuration to be used
+ * Get an instance of the configured TrashPolicy based on the value of
+ * the configuration parameter
+ *
+ * - {@code fs.${fs.getUri().getScheme()}.trash.classname}
+ * - {@code fs.trash.classname}
+ *
+ * The configuration passed in is used to look up both values and load
+ * in the policy class, not that of the FileSystem instance.
+ * @param conf the configuration to be used for lookup and classloading
* @param fs the file system to be used
* @return an instance of TrashPolicy
*/
+ @SuppressWarnings("ClassReferencesSubclass")
public static TrashPolicy getInstance(Configuration conf, FileSystem fs) {
+ String scheme = fs.getUri().getScheme();
+ String key;
+ key = String.format(FS_TRASH_SCHEMA_CLASSNAME, scheme);
+ if (conf.get(key) == null) {
+ // no specific trash policy for this scheme, use the default key
+ key = FS_TRASH_CLASSNAME;
+ }
+ LOG.debug("Looking up trash policy from configuration key {}", key);
Class extends TrashPolicy> trashClass = conf.getClass(
- "fs.trash.classname", TrashPolicyDefault.class, TrashPolicy.class);
+ key, TrashPolicyDefault.class, TrashPolicy.class);
+ LOG.debug("Trash policy class: {}", trashClass);
+
TrashPolicy trash = ReflectionUtils.newInstance(trashClass, conf);
trash.initialize(conf, fs); // initialize TrashPolicy
return trash;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicyDefault.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicyDefault.java
index f4228dea69f49..36106987c20fb 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicyDefault.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicyDefault.java
@@ -66,7 +66,7 @@ public class TrashPolicyDefault extends TrashPolicy {
/** Format of checkpoint directories used prior to Hadoop 0.23. */
private static final DateFormat OLD_CHECKPOINT =
new SimpleDateFormat("yyMMddHHmm");
- private static final int MSECS_PER_MINUTE = 60*1000;
+ private static final int MSECS_PER_MINUTE = 60_000;
private long emptierInterval;
@@ -151,7 +151,7 @@ public boolean moveToTrash(Path path) throws IOException {
for (int i = 0; i < 2; i++) {
try {
if (!fs.mkdirs(baseTrashPath, PERMISSION)) { // create current
- LOG.warn("Can't create(mkdir) trash directory: " + baseTrashPath);
+ LOG.warn("Can't create(mkdir) trash directory: {}", baseTrashPath);
return false;
}
} catch (FileAlreadyExistsException e) {
@@ -169,7 +169,7 @@ public boolean moveToTrash(Path path) throws IOException {
--i;
continue;
} catch (IOException e) {
- LOG.warn("Can't create trash directory: " + baseTrashPath, e);
+ LOG.warn("Can't create trash directory: {}", baseTrashPath, e);
cause = e;
break;
}
@@ -185,7 +185,7 @@ public boolean moveToTrash(Path path) throws IOException {
// move to current trash
fs.rename(path, trashPath,
Rename.TO_TRASH);
- LOG.info("Moved: '" + path + "' to trash at: " + trashPath);
+ LOG.info("Moved: '{}' to trash at: {}", path, trashPath);
return true;
} catch (IOException e) {
cause = e;
@@ -224,7 +224,7 @@ public void deleteCheckpointsImmediately() throws IOException {
private void deleteCheckpoint(boolean deleteImmediately) throws IOException {
Collection trashRoots = fs.getTrashRoots(false);
for (FileStatus trashRoot : trashRoots) {
- LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " +
+ LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: {}",
trashRoot.getPath());
deleteCheckpoint(trashRoot.getPath(), deleteImmediately);
}
@@ -245,6 +245,12 @@ public Runnable getEmptier() throws IOException {
return new Emptier(getConf(), emptierInterval);
}
+ /**
+ * The emptier of this trash.
+ * This thread is expected to be run on HDFS namenodes.. After being interrupted, it
+ * will close the filesystem the trash policy was bonded to; see HADOOP-2337.
+ * If used elsewhere, it *must* be given its own instance of the target filesystem.
+ */
protected class Emptier implements Runnable {
private Configuration conf;
@@ -254,17 +260,16 @@ protected class Emptier implements Runnable {
this.conf = conf;
this.emptierInterval = emptierInterval;
if (emptierInterval > deletionInterval || emptierInterval <= 0) {
- LOG.info("The configured checkpoint interval is " +
- (emptierInterval / MSECS_PER_MINUTE) + " minutes." +
- " Using an interval of " +
- (deletionInterval / MSECS_PER_MINUTE) +
- " minutes that is used for deletion instead");
+ LOG.info("The configured checkpoint interval is {} minutes."
+ + " Using an interval of {} minutes that is used for deletion instead",
+ emptierInterval / MSECS_PER_MINUTE,
+ deletionInterval / MSECS_PER_MINUTE);
this.emptierInterval = deletionInterval;
}
- LOG.info("Namenode trash configuration: Deletion interval = "
- + (deletionInterval / MSECS_PER_MINUTE)
- + " minutes, Emptier interval = "
- + (this.emptierInterval / MSECS_PER_MINUTE) + " minutes.");
+ LOG.info("Namenode trash configuration: Deletion interval = {} minutes."
+ + " Emptier interval = {} minutes.",
+ deletionInterval / MSECS_PER_MINUTE,
+ emptierInterval / MSECS_PER_MINUTE);
}
@Override
@@ -288,16 +293,18 @@ public void run() {
trashRoots = fs.getTrashRoots(true); // list all trash dirs
for (FileStatus trashRoot : trashRoots) { // dump each trash
- if (!trashRoot.isDirectory())
+ if (!trashRoot.isDirectory()) {
+ LOG.debug("Trash root {} is not a directory: {}",
+ trashRoot.getPath(), trashRoot);
continue;
+ }
try {
TrashPolicyDefault trash = new TrashPolicyDefault(fs, conf);
trash.deleteCheckpoint(trashRoot.getPath(), false);
trash.createCheckpoint(trashRoot.getPath(), new Date(now));
} catch (IOException e) {
- LOG.warn("Trash caught: "+e+". Skipping " +
- trashRoot.getPath() + ".");
- }
+ LOG.warn("Trash caught:{} Skipping {}", e, trashRoot.getPath());
+ }
}
}
} catch (Exception e) {
@@ -324,7 +331,7 @@ protected long getEmptierInterval() {
}
}
- private void createCheckpoint(Path trashRoot, Date date) throws IOException {
+ protected void createCheckpoint(Path trashRoot, Date date) throws IOException {
if (!fs.exists(new Path(trashRoot, CURRENT))) {
return;
}
@@ -339,31 +346,42 @@ private void createCheckpoint(Path trashRoot, Date date) throws IOException {
while (true) {
try {
fs.rename(current, checkpoint, Rename.NONE);
- LOG.info("Created trash checkpoint: " + checkpoint.toUri().getPath());
+ LOG.info("Created trash checkpoint: {}", checkpoint.toUri().getPath());
break;
} catch (FileAlreadyExistsException e) {
if (++attempt > 1000) {
- throw new IOException("Failed to checkpoint trash: " + checkpoint);
+ throw new IOException("Failed to checkpoint trash: " + checkpoint, e);
}
checkpoint = checkpointBase.suffix("-" + attempt);
}
}
}
- private void deleteCheckpoint(Path trashRoot, boolean deleteImmediately)
+ /**
+ * Delete trash directories under a checkpoint older than the interval,
+ * or, if {@code deleteImmediately} is true, all entries.
+ * It is not an error if invoked on a trash root which doesn't exist.
+ * @param trashRoot trash root.
+ * @param deleteImmediately should all entries be deleted
+ * @return the number of entries deleted.
+ * @throws IOException failure in listing or delete() calls
+ */
+ protected int deleteCheckpoint(Path trashRoot, boolean deleteImmediately)
throws IOException {
- LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " + trashRoot);
+ LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: {}", trashRoot);
- FileStatus[] dirs = null;
+ RemoteIterator dirs;
try {
- dirs = fs.listStatus(trashRoot); // scan trash sub-directories
+ dirs = fs.listStatusIterator(trashRoot); // scan trash sub-directories
} catch (FileNotFoundException fnfe) {
- return;
+ return 0;
}
long now = Time.now();
- for (int i = 0; i < dirs.length; i++) {
- Path path = dirs[i].getPath();
+ int counter = 0;
+ while (dirs.hasNext()) {
+ counter++;
+ Path path = dirs.next().getPath();
String dir = path.toUri().getPath();
String name = path.getName();
if (name.equals(CURRENT.getName())) { // skip current
@@ -374,21 +392,39 @@ private void deleteCheckpoint(Path trashRoot, boolean deleteImmediately)
try {
time = getTimeFromCheckpoint(name);
} catch (ParseException e) {
- LOG.warn("Unexpected item in trash: "+dir+". Ignoring.");
+ LOG.warn("Unexpected item in trash {}. Ignoring.", dir);
continue;
}
if (((now - deletionInterval) > time) || deleteImmediately) {
- if (fs.delete(path, true)) {
- LOG.info("Deleted trash checkpoint: "+dir);
- } else {
- LOG.warn("Couldn't delete checkpoint: " + dir + " Ignoring.");
- }
+ deleteCheckpoint(path);
}
}
+ return counter;
}
- private long getTimeFromCheckpoint(String name) throws ParseException {
+ /**
+ * Delete a checkpoint
+ * @param path path to delete
+ * @throws IOException IO exception raised in delete call.
+ */
+ protected void deleteCheckpoint(final Path path) throws IOException {
+ String dir = path.toUri().getPath();
+ if (getFileSystem().delete(path, true)) {
+ LOG.info("Deleted trash checkpoint: {}", path);
+ } else {
+ LOG.warn("Couldn't delete checkpoint: {}. Ignoring.", path);
+ }
+ }
+
+ /**
+ * parse the name of a checkpoint to extgact its timestamp.
+ * Uses the Hadoop 0.23 checkpoint as well as the older version (!).
+ * @param name filename
+ * @return the timestamp
+ * @throws ParseException the filename is not a timestamp.
+ */
+ protected long getTimeFromCheckpoint(String name) throws ParseException {
long time;
try {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/DeleteFilesTrashPolicy.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/DeleteFilesTrashPolicy.java
new file mode 100644
index 0000000000000..3d3443e4b6e51
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/DeleteFilesTrashPolicy.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.shell;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.TrashPolicy;
+import org.apache.hadoop.util.DurationInfo;
+
+/**
+ * Trash policy which deletes files, logging how long it takes.
+ * It still "claims" to be enabled via {@link #isEnabled()} but
+ * it isn't.
+ */
+public class DeleteFilesTrashPolicy extends TrashPolicy {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DeleteFilesTrashPolicy.class);
+
+ private static final Path CURRENT = new Path("Current");
+
+ private FileSystem filesystem;
+
+ @Override
+ public void initialize(final Configuration conf,
+ final FileSystem fs,
+ final Path home) {
+ initialize(conf, fs);
+ }
+
+ @Override
+ public void initialize(final Configuration conf, final FileSystem fs) {
+ filesystem = fs;
+ }
+
+ @Override
+ public Path getCurrentTrashDir(final Path path) throws IOException {
+ return new Path(fs.getTrashRoot(path), CURRENT);
+ }
+
+
+ @Override
+ public Path getCurrentTrashDir() {
+ return new Path(fs.getTrashRoot(null), CURRENT);
+ }
+
+
+ @Override
+ public boolean isEnabled() {
+ return true;
+ }
+
+ /**
+ * Delete, logging duration.
+ * @param path the path.
+ * @return
+ * @throws IOException
+ */
+ @Override
+ public boolean moveToTrash(final Path path) throws IOException {
+ try (DurationInfo info = new DurationInfo(LOG, true, "delete %s", path)) {
+ return filesystem.delete(path, true);
+ }
+ }
+
+ @Override
+ public void createCheckpoint() throws IOException {
+
+ }
+
+ @Override
+ public void deleteCheckpoint() throws IOException {
+
+ }
+
+ @Override
+ public void deleteCheckpointsImmediately() throws IOException {
+
+ }
+
+ /**
+ * Return a no-op {@link Runnable}.
+ *
+ * @return An empty Runnable.
+ */
+ @Override
+ public Runnable getEmptier() throws IOException {
+ return () -> {};
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsSupport.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsSupport.java
index bb4d9a44587a2..319f9ffafe9b4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsSupport.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsSupport.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.fs.statistics;
+import javax.annotation.Nullable;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.impl.StubDurationTracker;
@@ -105,4 +107,35 @@ public static DurationTrackerFactory stubDurationTrackerFactory() {
public static DurationTracker stubDurationTracker() {
return StubDurationTracker.STUB_DURATION_TRACKER;
}
+
+ /**
+ * Retrieve a DurationTrackerFactory for the source.
+ * This will be, in order
+ *
+ * - The object itself, it is a factory
+ * - if the object is an IOStatisticsSource, any provided by the IOStatistics.
+ *
- A stub duration tracker
+ *
+ *
+ * As singletons are returned, this is very low-cost to use.
+ * @return a duration tracker factory.
+ */
+ public static DurationTrackerFactory bindToDurationTrackerFactory(
+ @Nullable Object source) {
+
+ if (source instanceof DurationTrackerFactory) {
+ // the object is a factory
+ return (DurationTrackerFactory) source;
+ } else if (source instanceof IOStatisticsSource) {
+ // the object may provide a factory in its IOStatistics; extract and recurse.
+ // the value may actually be null; rely on the recursion call to handle
+ // this
+ return bindToDurationTrackerFactory(
+ ((IOStatisticsSource) source).getIOStatistics());
+ } else {
+ // object is null, not a duration tracker factory or doesn't provide a statistics source.
+ return StubDurationTrackerFactory.STUB_DURATION_TRACKER_FACTORY;
+ }
+ }
+
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
index c04c1bb47fcea..7e65d5d68b26a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
@@ -286,6 +286,21 @@ public final class StoreStatisticNames {
public static final String OBJECT_SELECT_REQUESTS =
"object_select_requests";
+ /**
+ * Trash statistic: {@value}.
+ */
+ public static final String TRASH_CREATE_CHECKPOINT = "trash.create.checkpoint";
+
+ /**
+ * Trash statistic: {@value}.
+ */
+ public static final String TRASH_DELETE_CHECKPOINT = "trash.delete.checkpoint";
+
+ /**
+ * Trash statistic: {@value}.
+ */
+ public static final String TRASH_MOVE_TO_TRASH = "trash.move.to.trash";
+
/**
* Suffix to use for a minimum value when
* the same key is shared across min/mean/max
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java
index c3fda19d8d73b..1768835baacb2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java
@@ -275,4 +275,6 @@ public static CompletableFuture eval(
}
return result;
}
+
+
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java
index 3a77e82ffb4fb..387f58cf3be98 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java
@@ -109,7 +109,16 @@ public enum AbfsStatistic {
"Number of times rename operation failed due to metadata being "
+ "incomplete"),
RENAME_PATH_ATTEMPTS("rename_path_attempts",
- "Number of times we attempt to rename a path internally");
+ "Number of times we attempt to rename a path internally"),
+
+ TRASH_CREATE_CHECKPOINT(StoreStatisticNames.TRASH_CREATE_CHECKPOINT,
+ "Create trash checkpoint"),
+
+ TRASH_DELETE_CHECKPOINT(StoreStatisticNames.TRASH_DELETE_CHECKPOINT,
+ "Delete trash checkpoint"),
+
+ TRASH_MOVE_TO_TRASH(StoreStatisticNames.TRASH_MOVE_TO_TRASH,
+ "Move to trash");
private String statName;
private String statDescription;