diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index 25815687c2973..0c781e0282c58 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -61,6 +61,8 @@ private DistCpConstants() { public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source"; public static final String CONF_LABEL_TRACK_MISSING = "distcp.track.missing.source"; + public static final String CONF_LABEL_DELETE_MISSING_USETRASH = + "distcp.delete.missing.usetrash"; public static final String CONF_LABEL_LISTSTATUS_THREADS = "distcp.liststatus.threads"; public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps"; public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing"; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java index 49ffc59344e75..3302792bf564a 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java @@ -76,6 +76,15 @@ public enum DistCpOptionSwitch { new Option("delete", false, "Delete from target, " + "files missing in source. Delete is applicable only with update or overwrite options")), + /** + * When -delete option on, files in target that are missing from source + * will be delete by default. This allows the files to be + * moved to the trash + */ + DELETE_USETRASH(DistCpConstants.CONF_LABEL_DELETE_MISSING_USETRASH, + new Option("useTrash", false, "Move deleted files into " + + "the user's trash directory in the destination filesystem")), + /** * Track missing files in target that are missing from source * This allows for other applications to complete the synchronization, diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index 4a6552fed6b55..b07b503262b3f 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -95,6 +95,8 @@ public final class DistCpOptions { /** Whether to run blocking or non-blocking. */ private final boolean blocking; + private boolean deleteUseTrash; + // When "-diff s1 s2 src tgt" is passed, apply forward snapshot diff (from s1 // to s2) of source cluster to the target cluster to sync target cluster with // the source cluster. Referred to as "Fdiff" in the code. @@ -221,6 +223,7 @@ private DistCpOptions(Builder builder) { this.trackPath = builder.trackPath; this.directWrite = builder.directWrite; + this.deleteUseTrash = builder.deleteUseTrash; } public Path getSourceFileListing() { @@ -284,6 +287,10 @@ public boolean shouldUseSnapshotDiff() { return shouldUseDiff() || shouldUseRdiff(); } + public boolean shouldDeleteUseTrash() { + return deleteUseTrash; + } + public String getFromSnapshot() { return this.fromSnapshot; } @@ -374,6 +381,8 @@ public void appendToConf(Configuration conf) { String.valueOf(useDiff)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.RDIFF, String.valueOf(useRdiff)); + DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DELETE_USETRASH, + String.valueOf(deleteUseTrash)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC, String.valueOf(skipCRC)); if (mapBandwidth > 0) { @@ -415,6 +424,7 @@ public String toString() { "atomicCommit=" + atomicCommit + ", syncFolder=" + syncFolder + ", deleteMissing=" + deleteMissing + + ", deleteUseTrash=" + deleteUseTrash + ", ignoreFailures=" + ignoreFailures + ", overwrite=" + overwrite + ", append=" + append + @@ -467,6 +477,8 @@ public static class Builder { private boolean useDiff = false; private boolean useRdiff = false; + private boolean deleteUseTrash = false; + private String fromSnapshot; private String toSnapshot; @@ -564,6 +576,11 @@ private void validate() { + "only with update or overwrite options"); } + if (deleteUseTrash && !deleteMissing) { + throw new IllegalArgumentException("Option -useTrash must be " + + "accompanied by -delete"); + } + if (overwrite && syncFolder) { throw new IllegalArgumentException("Overwrite and update options are " + "mutually exclusive"); @@ -627,6 +644,11 @@ public Builder withDeleteMissing(boolean newDeleteMissing) { return this; } + public Builder withDeleteUseTrash(boolean newDeleteUseTrash) { + this.deleteUseTrash = newDeleteUseTrash; + return this; + } + public Builder withIgnoreFailures(boolean newIgnoreFailures) { this.ignoreFailures = newIgnoreFailures; return this; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java index 3b9d13b3b0308..d011e69c948ca 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java @@ -102,6 +102,8 @@ public static DistCpOptions parse(String[] args) command.hasOption(DistCpOptionSwitch.SYNC_FOLDERS.getSwitch())) .withDeleteMissing( command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch())) + .withDeleteUseTrash( + command.hasOption(DistCpOptionSwitch.DELETE_USETRASH.getSwitch())) .withIgnoreFailures( command.hasOption(DistCpOptionSwitch.IGNORE_FAILURES.getSwitch())) .withOverwrite( @@ -153,6 +155,9 @@ public static DistCpOptions parse(String[] args) command, DistCpOptionSwitch.TRACK_MISSING.getSwitch()))); } + if (command.hasOption(DistCpOptionSwitch.DELETE_USETRASH.getSwitch())) { + builder.withDeleteUseTrash(true); + } if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) { try { diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java index 139bd08fd7abc..4cf381178ed5d 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Trash; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; @@ -458,7 +459,8 @@ private void deleteMissing(Configuration conf) throws IOException { if (tracker.shouldDelete(trgtFileStatus)) { showProgress = true; try { - if (targetFS.delete(targetEntry, true)) { + boolean result = deletePath(targetFS, targetEntry, conf); + if (result) { // the delete worked. Unless the file is actually missing, this is the LOG.info("Deleted " + targetEntry + " - missing at source"); deletedEntries++; @@ -472,7 +474,8 @@ private void deleteMissing(Configuration conf) throws IOException { // For all the filestores which implement the FS spec properly, // this means "the file wasn't there". // so track but don't worry about it. - LOG.info("delete({}) returned false ({})", + LOG.info("delete({}) returned false ({}). Consider using " + + "-useTrash option if trash is enabled.", targetEntry, trgtFileStatus); missingDeletes++; } @@ -520,6 +523,17 @@ private void deleteMissing(Configuration conf) throws IOException { formatDuration(deletionEnd - listingStart)); } + private boolean deletePath(FileSystem targetFS, Path targetEntry, + Configuration conf) throws IOException { + if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING_USETRASH, + false)) { + return Trash.moveToAppropriateTrash( + targetFS, targetEntry, conf); + } else { + return targetFS.delete(targetEntry, true); + } + } + /** * Take a duration and return a human-readable duration of * hours:minutes:seconds.millis. diff --git a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm index bf5b89135fccb..6be76681cafd8 100644 --- a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm +++ b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm @@ -349,7 +349,7 @@ Command Line Options | `-filters` | The path to a file containing a list of pattern strings, one string per line, such that paths matching the pattern will be excluded from the copy. | Support regular expressions specified by java.util.regex.Pattern. | | `-filelimit ` | Limit the total number of files to be <= n | **Deprecated!** Ignored in the new DistCp. | | `-sizelimit ` | Limit the total size to be <= n bytes | **Deprecated!** Ignored in the new DistCp. | -| `-delete` | Delete the files existing in the dst but not in src | The deletion is done by FS Shell. So the trash will be used, if it is enable. Delete is applicable only with update or overwrite options. | +| `-delete [-useTrash]` | Delete the files existing in the `/dst/` but not in `/src/` . when `[-useTrash]` is enabled, the files will be moved into the user's trash directory. Notice that `[-useTrash]` option on some object store does a copy and delete ops and can be slow. Delete is applicable only with update or overwrite options. | | `-strategy {dynamic|uniformsize}` | Choose the copy-strategy to be used in DistCp. | By default, uniformsize is used. (i.e. Maps are balanced on the total size of files copied by each map. Similar to legacy.) If "dynamic" is specified, `DynamicInputFormat` is used instead. (This is described in the Architecture section, under InputFormats.) | | `-bandwidth` | Specify bandwidth per map, in MB/second. | Each map will be restricted to consume only the specified bandwidth. This is not always exact. The map throttles back its bandwidth consumption during a copy, such that the **net** bandwidth used tends towards the specified value. | | `-atomic {-tmp }` | Specify atomic commit, with optional tmp directory. | `-atomic` instructs DistCp to copy the source data to a temporary target location, and then move the temporary target to the final-location atomically. Data will either be available at final target in a complete and consistent form, or not at all. Optionally, `-tmp` may be used to specify the location of the tmp-target. If not specified, a default is chosen. **Note:** tmp_dir must be on the final target cluster. | diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java index 7382795dd90d7..604b0250a3fcb 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java @@ -21,6 +21,7 @@ import java.util.Collections; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.test.LambdaTestUtils; import org.junit.Assert; import org.junit.Test; @@ -225,6 +226,42 @@ public void testSetDeleteMissing() { } } + @Test + public void testDeleteMissingUseTrash() throws Exception { + final DistCpOptions.Builder builder = new DistCpOptions.Builder( + Collections.singletonList(new Path("hdfs://localhost:8020/source")), + new Path("hdfs://localhost:8020/target/")); + Assert.assertFalse("Delete does not use trash by default.", + builder.build().shouldDeleteUseTrash()); + + DistCpOptions options = builder.withSyncFolder(true) + .withDeleteMissing(true) + .withDeleteUseTrash(true) + .build(); + Assert.assertTrue(options.shouldSyncFolder()); + Assert.assertTrue(options.shouldDeleteMissing()); + Assert.assertTrue(options.shouldDeleteUseTrash()); + + options = new DistCpOptions.Builder( + Collections.singletonList(new Path("hdfs://localhost:8020/source")), + new Path("hdfs://localhost:8020/target/")) + .withOverwrite(true) + .withDeleteMissing(true) + .withDeleteUseTrash(true) + .build(); + + Assert.assertTrue(options.shouldDeleteUseTrash()); + Assert.assertTrue(options.shouldOverwrite()); + Assert.assertTrue(options.shouldDeleteMissing()); + + LambdaTestUtils.intercept(IllegalArgumentException.class, + () -> new DistCpOptions.Builder(Collections.singletonList( + new Path("hdfs://localhost:8020/source")), + new Path("hdfs://localhost:8020/target/")) + .withDeleteUseTrash(true) + .build()); + } + @Test public void testSetMaps() { final DistCpOptions.Builder builder = new DistCpOptions.Builder( @@ -281,8 +318,8 @@ public void testToString() { DistCpOptions option = new DistCpOptions.Builder(new Path("abc"), new Path("xyz")).build(); String val = "DistCpOptions{atomicCommit=false, syncFolder=false, " + - "deleteMissing=false, ignoreFailures=false, overwrite=false, " + - "append=false, useDiff=false, useRdiff=false, " + + "deleteMissing=false, deleteUseTrash=false, ignoreFailures=false, " + + "overwrite=false, append=false, useDiff=false, useRdiff=false, " + "fromSnapshot=null, toSnapshot=null, " + "skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, " + "mapBandwidth=0.0, copyStrategy='uniformsize', preserveStatus=[], " + diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java index 73cdf24789ace..e7bf133bd4c54 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java @@ -348,7 +348,7 @@ public void testDeleteMissingInDestination() { createFiles("srcdir/file1", "dstdir/file1", "dstdir/file2"); Path target = new Path(root + "/dstdir"); - runTest(listFile, target, false, true, true, false); + runTest(listFile, target, false, true, true, false, false); checkResult(target, 1, "file1"); } catch (IOException e) { @@ -372,7 +372,7 @@ public void testOverwrite() { createWithContents("dstdir/file1", contents2); Path target = new Path(root + "/dstdir"); - runTest(listFile, target, false, false, false, true); + runTest(listFile, target, false, false, false, true, false); checkResult(target, 1, "file1"); @@ -553,15 +553,16 @@ private void mkdirs(String... entries) throws IOException { private void runTest(Path listFile, Path target, boolean targetExists, boolean sync) throws IOException { - runTest(listFile, target, targetExists, sync, false, false); + runTest(listFile, target, targetExists, sync, false, false, false); } private void runTest(Path listFile, Path target, boolean targetExists, boolean sync, boolean delete, - boolean overwrite) throws IOException { + boolean overwrite, boolean useTrash) throws IOException { final DistCpOptions options = new DistCpOptions.Builder(listFile, target) .withSyncFolder(sync) .withDeleteMissing(delete) + .withDeleteUseTrash(useTrash) .withOverwrite(overwrite) .withNumListstatusThreads(numListstatusThreads) .build(); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java index b48355af25ba5..67475de01cc85 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java @@ -804,4 +804,23 @@ public void testExclusionsOption() { "hdfs://localhost:8020/target/"}); assertThat(options.getFiltersFile()).isEqualTo("/tmp/filters.txt"); } + + @Test + public void testParseDeleteSkipTrash() { + DistCpOptions options = OptionsParser.parse(new String[] { + "-overwrite", + "-delete", + "-useTrash", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertTrue("Delete with useTrash.", + options.shouldDeleteUseTrash()); + options = OptionsParser.parse(new String[] { + "-overwrite", + "-delete", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertFalse("Delete does not use trash.", + options.shouldDeleteUseTrash()); + } } diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java index eeaf30a929996..764f2d7daa0c3 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -137,6 +138,7 @@ protected Configuration createConfiguration() { public void setup() throws Exception { super.setup(); conf = getContract().getConf(); + conf.setLong(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 10); localFS = FileSystem.getLocal(conf); remoteFS = getFileSystem(); // Test paths are isolated by concrete subclass name and test method name. @@ -224,6 +226,13 @@ public void testUpdateDeepDirectoryStructureToRemote() throws Exception { distCpUpdateDeepDirectoryStructure(inputDirUnderOutputDir); } + @Test + public void testUpdateUseTrashDeepDirectoryStructureToRemote() throws Exception { + describe("update a deep directory structure from local to remote"); + distCpDeepDirectoryStructure(localFS, localDir, remoteFS, remoteDir); + distCpUpdateUseTrashDeepDirectoryStructure(remoteDir); + } + @Test public void testUpdateDeepDirectoryStructureNoChange() throws Exception { describe("update an unchanged directory structure" @@ -284,6 +293,10 @@ protected Job distCpUpdateDeepDirectoryStructure(final Path destDir) modifySourceDirectories(); + ContractTestUtils.assertPathsDoNotExist(localFS, + "Paths for test are wrong", + inputFile1, inputFile3, inputSubDir4); + Job job = distCpUpdate(srcDir, destDir); Path outputFileNew1 = new Path(outputSubDir2, "newfile1"); @@ -322,6 +335,72 @@ private Job distCpUpdate(final Path srcDir, final Path destDir) .withOverwrite(false))); } + /** + * Do a distcp -update -delete -useTrash. + * @param destDir output directory used by the initial distcp + * @return the distcp job + */ + protected Job distCpUpdateUseTrashDeepDirectoryStructure(final Path destDir) + throws Exception { + describe("Incremental update with deletion-use-trash of missing files"); + Path srcDir = inputDir; + LOG.info("Source directory = {}, dest={}", srcDir, destDir); + + ContractTestUtils.assertPathsExist(localFS, + "Paths for test are wrong", + inputFile1, inputFile2, inputFile3, inputFile4, inputFile5); + + modifySourceDirectories(); + ContractTestUtils.assertPathsDoNotExist(localFS, "deleted right now", + inputFile1, inputFile3, inputSubDir4); + + Path trashRootDir = remoteFS.getTrashRoot(null); + remoteFS.delete(trashRootDir, true); + + Job job = distCpUpdateDeleteUseTrash(inputDir, inputDirUnderOutputDir); + lsR("Updated Remote", remoteFS, destDir); + + ContractTestUtils.assertPathsDoNotExist(remoteFS, + "DistCP should have deleted", + outputFile1, outputFile3, outputFile4, outputSubDir4); + ContractTestUtils.assertPathExists(remoteFS, + "Path delete does not use trash", trashRootDir); + Path trashFile1 = new Path(trashRootDir, + "Current" + outputFile1.toUri().getPath()); + Path trashFile3 = new Path(trashRootDir, + "Current" + outputFile3.toUri().getPath()); + Path trashFile4 = new Path(trashRootDir, + "Current" + outputFile4.toUri().getPath()); + Path trashFile5 = new Path(trashRootDir, + "Current" + outputFile5.toUri().getPath()); + ContractTestUtils.assertPathsExist(remoteFS, + "Path delete does not use trash", + trashFile1, trashFile3, trashFile4, trashFile5); + return job; + } + + /** + * Run distcp -update -delete -useTrash. + * @param srcDir local source directory + * @param destDir remote destination directory. + * @return the completed job + * @throws Exception any failure. + */ + private Job distCpUpdateDeleteUseTrash(final Path srcDir, final Path destDir) + throws Exception { + describe("\nDistcp -update from " + srcDir + " to " + destDir); + lsR("Local to update", localFS, srcDir); + lsR("Remote before update", remoteFS, destDir); + return runDistCp(buildWithStandardOptions( + new DistCpOptions.Builder( + Collections.singletonList(srcDir), destDir) + .withDeleteMissing(true) + .withDeleteUseTrash(true) + .withSyncFolder(true) + .withCRC(true) + .withOverwrite(false))); + } + /** * Update the source directories as various tests expect, * including adding a new file. @@ -336,6 +415,11 @@ private Path modifySourceDirectories() throws IOException { // add one new file Path inputFileNew1 = new Path(inputSubDir2, "newfile1"); ContractTestUtils.touch(localFS, inputFileNew1); + + ContractTestUtils.assertPathsDoNotExist(localFS, "deleted right now", + inputFile1, inputFile3, inputSubDir4); + ContractTestUtils.assertPathsExist(localFS, "touched right now", + inputFileNew1); return inputFileNew1; } @@ -493,6 +577,7 @@ private Path distCpDeepDirectoryStructure(FileSystem srcFS, createFile(srcFS, inputFile4, true, dataset(400, 53, 63)); createFile(srcFS, inputFile5, true, dataset(500, 53, 63)); Path target = new Path(dstDir, "outputDir"); + runDistCp(inputDir, target); ContractTestUtils.assertIsDirectory(dstFS, target); lsR("Destination tree after distcp", dstFS, target); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index 11118c1f72400..d02c3ea6f1ba1 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -18,6 +18,8 @@ package org.apache.hadoop.tools.mapred; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.contract.ContractTestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -84,6 +86,7 @@ public static void create() throws IOException { clusterConfig = getJobForClient().getConfiguration(); clusterConfig.setLong( DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, 0); + clusterConfig.setLong(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 10); clusterConfig.setLong( DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE); clusterConfig.setLong( @@ -133,6 +136,62 @@ public void testNoCommitAction() throws IOException { Assert.assertEquals("Commit Successful", taskAttemptContext.getStatus()); } + @Test + public void testDeleteUseTrash() throws IOException { + TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); + JobContext jobContext = new JobContextImpl( + taskAttemptContext.getConfiguration(), + taskAttemptContext.getTaskAttemptID().getJobID()); + Configuration conf = jobContext.getConfiguration(); + + String sourceBase; + String targetBase; + FileSystem fs = null; + try { + OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); + fs = FileSystem.get(conf); + sourceBase = TestDistCpUtils.createTestSetup(fs); + targetBase = TestDistCpUtils.createTestSetup(fs); + String targetBaseAdd = TestDistCpUtils.createTestSetup(fs); + ContractTestUtils.assertRenameOutcome(fs, new Path(targetBaseAdd), + new Path(targetBase), true); + + DistCpOptions.Builder builder = new DistCpOptions.Builder( + Arrays.asList(new Path(sourceBase)), new Path("/out")); + builder.withSyncFolder(true); + builder.withDeleteMissing(true); + builder.withDeleteUseTrash(true); + builder.build().appendToConf(conf); + DistCpContext cpContext = new DistCpContext(builder.build()); + + CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS); + Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong())); + listing.buildListing(listingFile, cpContext); + + conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); + conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); + + Path trashRootDir = fs.getTrashRoot(null); + if (fs.exists(trashRootDir)) { + fs.delete(trashRootDir, true); + } + committer.commitJob(jobContext); + + verifyFoldersAreInSync(fs, targetBase, sourceBase); + verifyFoldersAreInSync(fs, sourceBase, targetBase); + + Assert.assertTrue("Path delete does not use trash", + fs.exists(trashRootDir)); + Path trashDir = new Path(trashRootDir, "Current" + targetBaseAdd); + verifyFoldersAreInSync(fs, trashDir.toString(), sourceBase); + } finally { + TestDistCpUtils.delete(fs, "/tmp1"); + fs.close(); + conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false"); + conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING_USETRASH, "false"); + } + } + @Test public void testPreserveStatus() throws IOException { TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); @@ -229,8 +288,8 @@ public void testDeleteMissing() throws IOException { public void testPreserveTimeWithDeleteMiss() throws IOException { TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); JobContext jobContext = new JobContextImpl( - taskAttemptContext.getConfiguration(), - taskAttemptContext.getTaskAttemptID().getJobID()); + taskAttemptContext.getConfiguration(), + taskAttemptContext.getTaskAttemptID().getJobID()); Configuration conf = jobContext.getConfiguration(); FileSystem fs = null; @@ -238,17 +297,17 @@ public void testPreserveTimeWithDeleteMiss() throws IOException { OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); fs = FileSystem.get(conf); String sourceBase = TestDistCpUtils.createTestSetup( - fs, FsPermission.getDefault()); + fs, FsPermission.getDefault()); String targetBase = TestDistCpUtils.createTestSetup( - fs, FsPermission.getDefault()); + fs, FsPermission.getDefault()); String targetBaseAdd = TestDistCpUtils.createTestSetup( - fs, FsPermission.getDefault()); + fs, FsPermission.getDefault()); fs.rename(new Path(targetBaseAdd), new Path(targetBase)); final DistCpOptions options = new DistCpOptions.Builder( - Collections.singletonList(new Path(sourceBase)), new Path("/out")) - .withSyncFolder(true).withDeleteMissing(true) - .preserve(FileAttribute.TIMES).build(); + Collections.singletonList(new Path(sourceBase)), new Path("/out")) + .withSyncFolder(true).withDeleteMissing(true) + .preserve(FileAttribute.TIMES).build(); options.appendToConf(conf); final DistCpContext context = new DistCpContext(options); @@ -260,9 +319,9 @@ public void testPreserveTimeWithDeleteMiss() throws IOException { conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); Path sourceListing = new Path( - conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH)); + conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH)); SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf, - SequenceFile.Reader.file(sourceListing)); + SequenceFile.Reader.file(sourceListing)); Path targetRoot = new Path(targetBase); committer.commitJob(jobContext); @@ -278,6 +337,61 @@ public void testPreserveTimeWithDeleteMiss() throws IOException { } } + @Test + public void testDeleteUseTrash() throws IOException { + TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); + JobContext jobContext = new JobContextImpl( + taskAttemptContext.getConfiguration(), + taskAttemptContext.getTaskAttemptID().getJobID()); + Configuration conf = jobContext.getConfiguration(); + + String sourceBase; + String targetBase; + FileSystem fs = null; + try { + OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); + fs = FileSystem.get(conf); + sourceBase = TestDistCpUtils.createTestSetup(fs); + targetBase = TestDistCpUtils.createTestSetup(fs); + String targetBaseAdd = TestDistCpUtils.createTestSetup(fs); + ContractTestUtils.assertRenameOutcome(fs, new Path(targetBaseAdd), + new Path(targetBase),true); + + DistCpOptions.Builder builder = new DistCpOptions.Builder( + Arrays.asList(new Path(sourceBase)), new Path("/out")); + builder.withSyncFolder(true); + builder.withDeleteMissing(true); + builder.withDeleteUseTrash(true); + builder.build().appendToConf(conf); + DistCpContext cpContext = new DistCpContext(builder.build()); + + CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS); + Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong())); + listing.buildListing(listingFile, cpContext); + + conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); + conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); + + Path trashRootDir = fs.getTrashRoot(null); + if (fs.exists(trashRootDir)) { + fs.delete(trashRootDir, true); + } + committer.commitJob(jobContext); + + verifyFoldersAreInSync(fs, targetBase, sourceBase); + verifyFoldersAreInSync(fs, sourceBase, targetBase); + + Assert.assertTrue("Path delete does not use trash", + fs.exists(trashRootDir)); + Path trashDir = new Path(trashRootDir, "Current" + targetBaseAdd); + verifyFoldersAreInSync(fs, trashDir.toString(), sourceBase); + } finally { + TestDistCpUtils.delete(fs, "/tmp1"); + fs.close(); + conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false"); + conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING_USETRASH, "false"); + } + } @Test public void testDeleteMissingFlatInterleavedFiles() throws IOException {