Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ private DistCpConstants() {
public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source";
public static final String CONF_LABEL_TRACK_MISSING =
"distcp.track.missing.source";
public static final String CONF_LABEL_DELETE_MISSING_USETRASH =
"distcp.delete.missing.usetrash";
public static final String CONF_LABEL_LISTSTATUS_THREADS = "distcp.liststatus.threads";
public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps";
public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ public enum DistCpOptionSwitch {
new Option("delete", false, "Delete from target, " +
"files missing in source. Delete is applicable only with update or overwrite options")),

/**
* When -delete option on, files in target that are missing from source
* will be delete by default. This allows the files to be
* moved to the trash
*/
DELETE_USETRASH(DistCpConstants.CONF_LABEL_DELETE_MISSING_USETRASH,
new Option("useTrash", false, "Move deleted files into " +
"the user's trash directory in the destination filesystem")),

/**
* Track missing files in target that are missing from source
* This allows for other applications to complete the synchronization,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public final class DistCpOptions {
/** Whether to run blocking or non-blocking. */
private final boolean blocking;

private boolean deleteUseTrash;

// When "-diff s1 s2 src tgt" is passed, apply forward snapshot diff (from s1
// to s2) of source cluster to the target cluster to sync target cluster with
// the source cluster. Referred to as "Fdiff" in the code.
Expand Down Expand Up @@ -221,6 +223,7 @@ private DistCpOptions(Builder builder) {
this.trackPath = builder.trackPath;

this.directWrite = builder.directWrite;
this.deleteUseTrash = builder.deleteUseTrash;
}

public Path getSourceFileListing() {
Expand Down Expand Up @@ -284,6 +287,10 @@ public boolean shouldUseSnapshotDiff() {
return shouldUseDiff() || shouldUseRdiff();
}

public boolean shouldDeleteUseTrash() {
return deleteUseTrash;
}

public String getFromSnapshot() {
return this.fromSnapshot;
}
Expand Down Expand Up @@ -374,6 +381,8 @@ public void appendToConf(Configuration conf) {
String.valueOf(useDiff));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.RDIFF,
String.valueOf(useRdiff));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DELETE_USETRASH,
String.valueOf(deleteUseTrash));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
String.valueOf(skipCRC));
if (mapBandwidth > 0) {
Expand Down Expand Up @@ -415,6 +424,7 @@ public String toString() {
"atomicCommit=" + atomicCommit +
", syncFolder=" + syncFolder +
", deleteMissing=" + deleteMissing +
", deleteUseTrash=" + deleteUseTrash +
", ignoreFailures=" + ignoreFailures +
", overwrite=" + overwrite +
", append=" + append +
Expand Down Expand Up @@ -467,6 +477,8 @@ public static class Builder {

private boolean useDiff = false;
private boolean useRdiff = false;
private boolean deleteUseTrash = false;

private String fromSnapshot;
private String toSnapshot;

Expand Down Expand Up @@ -564,6 +576,11 @@ private void validate() {
+ "only with update or overwrite options");
}

if (deleteUseTrash && !deleteMissing) {
throw new IllegalArgumentException("Option -useTrash must be " +
"accompanied by -delete");
}

if (overwrite && syncFolder) {
throw new IllegalArgumentException("Overwrite and update options are "
+ "mutually exclusive");
Expand Down Expand Up @@ -627,6 +644,11 @@ public Builder withDeleteMissing(boolean newDeleteMissing) {
return this;
}

public Builder withDeleteUseTrash(boolean newDeleteUseTrash) {
this.deleteUseTrash = newDeleteUseTrash;
return this;
}

public Builder withIgnoreFailures(boolean newIgnoreFailures) {
this.ignoreFailures = newIgnoreFailures;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public static DistCpOptions parse(String[] args)
command.hasOption(DistCpOptionSwitch.SYNC_FOLDERS.getSwitch()))
.withDeleteMissing(
command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch()))
.withDeleteUseTrash(
command.hasOption(DistCpOptionSwitch.DELETE_USETRASH.getSwitch()))
.withIgnoreFailures(
command.hasOption(DistCpOptionSwitch.IGNORE_FAILURES.getSwitch()))
.withOverwrite(
Expand Down Expand Up @@ -153,6 +155,9 @@ public static DistCpOptions parse(String[] args)
command,
DistCpOptionSwitch.TRACK_MISSING.getSwitch())));
}
if (command.hasOption(DistCpOptionSwitch.DELETE_USETRASH.getSwitch())) {
builder.withDeleteUseTrash(true);
}

if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
Expand Down Expand Up @@ -458,7 +459,8 @@ private void deleteMissing(Configuration conf) throws IOException {
if (tracker.shouldDelete(trgtFileStatus)) {
showProgress = true;
try {
if (targetFS.delete(targetEntry, true)) {
boolean result = deletePath(targetFS, targetEntry, conf);
if (result) {
// the delete worked. Unless the file is actually missing, this is the
LOG.info("Deleted " + targetEntry + " - missing at source");
deletedEntries++;
Expand All @@ -472,7 +474,8 @@ private void deleteMissing(Configuration conf) throws IOException {
// For all the filestores which implement the FS spec properly,
// this means "the file wasn't there".
// so track but don't worry about it.
LOG.info("delete({}) returned false ({})",
LOG.info("delete({}) returned false ({}). Consider using " +
"-useTrash option if trash is enabled.",
targetEntry, trgtFileStatus);
missingDeletes++;
}
Expand Down Expand Up @@ -520,6 +523,17 @@ private void deleteMissing(Configuration conf) throws IOException {
formatDuration(deletionEnd - listingStart));
}

private boolean deletePath(FileSystem targetFS, Path targetEntry,
Configuration conf) throws IOException {
if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING_USETRASH,
false)) {
return Trash.moveToAppropriateTrash(
targetFS, targetEntry, conf);
} else {
return targetFS.delete(targetEntry, true);
}
}

/**
* Take a duration and return a human-readable duration of
* hours:minutes:seconds.millis.
Expand Down
2 changes: 1 addition & 1 deletion hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ Command Line Options
| `-filters` | The path to a file containing a list of pattern strings, one string per line, such that paths matching the pattern will be excluded from the copy. | Support regular expressions specified by java.util.regex.Pattern. |
| `-filelimit <n>` | Limit the total number of files to be <= n | **Deprecated!** Ignored in the new DistCp. |
| `-sizelimit <n>` | Limit the total size to be <= n bytes | **Deprecated!** Ignored in the new DistCp. |
| `-delete` | Delete the files existing in the dst but not in src | The deletion is done by FS Shell. So the trash will be used, if it is enable. Delete is applicable only with update or overwrite options. |
| `-delete [-useTrash]` | Delete the files existing in the `/dst/` but not in `/src/` . when `[-useTrash]` is enabled, the files will be moved into the user's trash directory. Notice that `[-useTrash]` option on some object store does a copy and delete ops and can be slow. Delete is applicable only with update or overwrite options. |
| `-strategy {dynamic|uniformsize}` | Choose the copy-strategy to be used in DistCp. | By default, uniformsize is used. (i.e. Maps are balanced on the total size of files copied by each map. Similar to legacy.) If "dynamic" is specified, `DynamicInputFormat` is used instead. (This is described in the Architecture section, under InputFormats.) |
| `-bandwidth` | Specify bandwidth per map, in MB/second. | Each map will be restricted to consume only the specified bandwidth. This is not always exact. The map throttles back its bandwidth consumption during a copy, such that the **net** bandwidth used tends towards the specified value. |
| `-atomic {-tmp <tmp_dir>}` | Specify atomic commit, with optional tmp directory. | `-atomic` instructs DistCp to copy the source data to a temporary target location, and then move the temporary target to the final-location atomically. Data will either be available at final target in a complete and consistent form, or not at all. Optionally, `-tmp` may be used to specify the location of the tmp-target. If not specified, a default is chosen. **Note:** tmp_dir must be on the final target cluster. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collections;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -225,6 +226,42 @@ public void testSetDeleteMissing() {
}
}

@Test
public void testDeleteMissingUseTrash() throws Exception {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"));
Assert.assertFalse("Delete does not use trash by default.",
builder.build().shouldDeleteUseTrash());

DistCpOptions options = builder.withSyncFolder(true)
.withDeleteMissing(true)
.withDeleteUseTrash(true)
.build();
Assert.assertTrue(options.shouldSyncFolder());
Assert.assertTrue(options.shouldDeleteMissing());
Assert.assertTrue(options.shouldDeleteUseTrash());

options = new DistCpOptions.Builder(
Collections.singletonList(new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"))
.withOverwrite(true)
.withDeleteMissing(true)
.withDeleteUseTrash(true)
.build();

Assert.assertTrue(options.shouldDeleteUseTrash());
Assert.assertTrue(options.shouldOverwrite());
Assert.assertTrue(options.shouldDeleteMissing());

LambdaTestUtils.intercept(IllegalArgumentException.class,
() -> new DistCpOptions.Builder(Collections.singletonList(
new Path("hdfs://localhost:8020/source")),
new Path("hdfs://localhost:8020/target/"))
.withDeleteUseTrash(true)
.build());
}

@Test
public void testSetMaps() {
final DistCpOptions.Builder builder = new DistCpOptions.Builder(
Expand Down Expand Up @@ -281,8 +318,8 @@ public void testToString() {
DistCpOptions option = new DistCpOptions.Builder(new Path("abc"),
new Path("xyz")).build();
String val = "DistCpOptions{atomicCommit=false, syncFolder=false, " +
"deleteMissing=false, ignoreFailures=false, overwrite=false, " +
"append=false, useDiff=false, useRdiff=false, " +
"deleteMissing=false, deleteUseTrash=false, ignoreFailures=false, " +
"overwrite=false, append=false, useDiff=false, useRdiff=false, " +
"fromSnapshot=null, toSnapshot=null, " +
"skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, " +
"mapBandwidth=0.0, copyStrategy='uniformsize', preserveStatus=[], " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ public void testDeleteMissingInDestination() {
createFiles("srcdir/file1", "dstdir/file1", "dstdir/file2");

Path target = new Path(root + "/dstdir");
runTest(listFile, target, false, true, true, false);
runTest(listFile, target, false, true, true, false, false);

checkResult(target, 1, "file1");
} catch (IOException e) {
Expand All @@ -372,7 +372,7 @@ public void testOverwrite() {
createWithContents("dstdir/file1", contents2);

Path target = new Path(root + "/dstdir");
runTest(listFile, target, false, false, false, true);
runTest(listFile, target, false, false, false, true, false);

checkResult(target, 1, "file1");

Expand Down Expand Up @@ -553,15 +553,16 @@ private void mkdirs(String... entries) throws IOException {

private void runTest(Path listFile, Path target, boolean targetExists,
boolean sync) throws IOException {
runTest(listFile, target, targetExists, sync, false, false);
runTest(listFile, target, targetExists, sync, false, false, false);
}

private void runTest(Path listFile, Path target, boolean targetExists,
boolean sync, boolean delete,
boolean overwrite) throws IOException {
boolean overwrite, boolean useTrash) throws IOException {
final DistCpOptions options = new DistCpOptions.Builder(listFile, target)
.withSyncFolder(sync)
.withDeleteMissing(delete)
.withDeleteUseTrash(useTrash)
.withOverwrite(overwrite)
.withNumListstatusThreads(numListstatusThreads)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -804,4 +804,23 @@ public void testExclusionsOption() {
"hdfs://localhost:8020/target/"});
assertThat(options.getFiltersFile()).isEqualTo("/tmp/filters.txt");
}

@Test
public void testParseDeleteSkipTrash() {
DistCpOptions options = OptionsParser.parse(new String[] {
"-overwrite",
"-delete",
"-useTrash",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
Assert.assertTrue("Delete with useTrash.",
options.shouldDeleteUseTrash());
options = OptionsParser.parse(new String[] {
"-overwrite",
"-delete",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
Assert.assertFalse("Delete does not use trash.",
options.shouldDeleteUseTrash());
}
}
Loading