diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index d1dba19f2f7f4..e1269d5b291e7 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -22,7 +22,6 @@ * Utility class to hold commonly used constants. */ public class DistCpConstants { - /* Default number of maps to use for DistCp */ public static final int DEFAULT_MAPS = 20; @@ -44,6 +43,7 @@ public class DistCpConstants { public static final String CONF_LABEL_PRESERVE_STATUS = "distcp.preserve.status"; public static final String CONF_LABEL_SYNC_FOLDERS = "distcp.sync.folders"; public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source"; + public static final String CONF_LABEL_LIST_MISSING_FILE = "distcp.list.missing.source"; public static final String CONF_LABEL_SSL_CONF = "distcp.keystore.resource"; public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps"; public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing"; @@ -52,16 +52,12 @@ public class DistCpConstants { public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite"; public static final String CONF_LABEL_APPEND = "distcp.copy.append"; public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb"; - - public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE = - "distcp.dynamic.max.chunks.tolerable"; - public static final String CONF_LABEL_MAX_CHUNKS_IDEAL = - "distcp.dynamic.max.chunks.ideal"; - public static final String CONF_LABEL_MIN_RECORDS_PER_CHUNK = - "distcp.dynamic.min.records_per_chunk"; - public static final String CONF_LABEL_SPLIT_RATIO = - "distcp.dynamic.split.ratio"; - + + public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE = "distcp.dynamic.max.chunks.tolerable"; + public static final String CONF_LABEL_MAX_CHUNKS_IDEAL = "distcp.dynamic.max.chunks.ideal"; + public static final String CONF_LABEL_MIN_RECORDS_PER_CHUNK = "distcp.dynamic.min.records_per_chunk"; + public static final String CONF_LABEL_SPLIT_RATIO = "distcp.dynamic.split.ratio"; + /* Total bytes to be copied. Updated by copylisting. Unfiltered count */ public static final String CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED = "mapred.total.bytes.expected"; @@ -86,9 +82,9 @@ public class DistCpConstants { /* Boolean to indicate whether the target of distcp exists. */ public static final String CONF_LABEL_TARGET_PATH_EXISTS = "distcp.target.path.exists"; - + /** - * DistCp job id for consumers of the Disctp + * DistCp job id for consumers of the Disctp */ public static final String CONF_LABEL_DISTCP_JOB_ID = "distcp.job.id"; @@ -101,14 +97,12 @@ public class DistCpConstants { /** * Conf label for SSL Trust-store location. */ - public static final String CONF_LABEL_SSL_TRUST_STORE_LOCATION - = "ssl.client.truststore.location"; + public static final String CONF_LABEL_SSL_TRUST_STORE_LOCATION = "ssl.client.truststore.location"; /** * Conf label for SSL Key-store location. */ - public static final String CONF_LABEL_SSL_KEY_STORE_LOCATION - = "ssl.client.keystore.location"; + public static final String CONF_LABEL_SSL_KEY_STORE_LOCATION = "ssl.client.keystore.location"; /** * Constants for DistCp return code to shell / consumer of ToolRunner's run @@ -119,13 +113,13 @@ public class DistCpConstants { public static final int ACLS_NOT_SUPPORTED = -3; public static final int XATTRS_NOT_SUPPORTED = -4; public static final int UNKNOWN_ERROR = -999; - + /** * Constants for DistCp default values of configurable values */ public static final int MAX_CHUNKS_TOLERABLE_DEFAULT = 400; - public static final int MAX_CHUNKS_IDEAL_DEFAULT = 100; + public static final int MAX_CHUNKS_IDEAL_DEFAULT = 100; public static final int MIN_RECORDS_PER_CHUNK_DEFAULT = 5; - public static final int SPLIT_RATIO_DEFAULT = 2; + public static final int SPLIT_RATIO_DEFAULT = 2; } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java index 2f2eb7c838c00..8a86c9806165a 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java @@ -21,18 +21,17 @@ import org.apache.commons.cli.Option; import org.apache.hadoop.conf.Configuration; + /** * Enumeration mapping configuration keys to distcp command line * options. */ public enum DistCpOptionSwitch { - /** * Ignores any failures during copy, and continues with rest. * Logs failures in a file */ - IGNORE_FAILURES(DistCpConstants.CONF_LABEL_IGNORE_FAILURES, - new Option("i", false, "Ignore failures during copy")), + IGNORE_FAILURES(DistCpConstants.CONF_LABEL_IGNORE_FAILURES, new Option("i", false, "Ignore failures during copy")), /** * Preserves status of file/path in the target. @@ -45,10 +44,12 @@ public enum DistCpOptionSwitch { * */ PRESERVE_STATUS(DistCpConstants.CONF_LABEL_PRESERVE_STATUS, - new Option("p", true, "preserve status (rbugpcax)(replication, " + - "block-size, user, group, permission, checksum-type, ACL, XATTR). " + - "If -p is specified with no , then preserves replication, " + - "block size, user, group, permission and checksum type.")), + new Option("p", + true, + "preserve status (rbugpcax)(replication, " + + "block-size, user, group, permission, checksum-type, ACL, XATTR). " + + "If -p is specified with no , then preserves replication, " + + "block size, user, group, permission and checksum type.")), /** * Update target location by copying only files that are missing @@ -57,8 +58,8 @@ public enum DistCpOptionSwitch { * Incompatible with ATOMIC_COMMIT */ SYNC_FOLDERS(DistCpConstants.CONF_LABEL_SYNC_FOLDERS, - new Option("update", false, "Update target, copying only missing" + - "files or directories")), + new Option("update", false, "Update target, copying only missing" + + "files or directories")), /** * Deletes missing files in target that are missing from source @@ -67,8 +68,17 @@ public enum DistCpOptionSwitch { * Incompatible with ATOMIC_COMMIT */ DELETE_MISSING(DistCpConstants.CONF_LABEL_DELETE_MISSING, - new Option("delete", false, "Delete from target, " + - "files missing in source")), + new Option("delete", false, "Delete from target, " + + "files missing in source")), + + + /** + * Create a list of files that are missing in the source and + * exist in the target. + */ + LIST_MISSING_FILE(DistCpConstants.CONF_LABEL_LIST_MISSING_FILE, + new Option("listMissingFile", true, "Create a list of files that " + + "are missing in source and exist in target")), /** * Configuration file to use with hftps:// for securely copying @@ -76,15 +86,14 @@ public enum DistCpOptionSwitch { * truststore/keystore information such as location, password and type */ SSL_CONF(DistCpConstants.CONF_LABEL_SSL_CONF, - new Option("mapredSslConf", true, "Configuration for ssl config file" + - ", to use with hftps://")), + new Option("mapredSslConf", true, "Configuration for ssl config file" + + ", to use with hftps://")), /** * Max number of maps to use during copy. DistCp will split work * as equally as possible among these maps */ - MAX_MAPS(DistCpConstants.CONF_LABEL_MAX_MAPS, - new Option("m", true, "Max number of concurrent maps to use for copy")), + MAX_MAPS(DistCpConstants.CONF_LABEL_MAX_MAPS, new Option("m", true, "Max number of concurrent maps to use for copy")), /** * Source file listing can be provided to DistCp in a file. @@ -92,7 +101,7 @@ public enum DistCpOptionSwitch { * and copy them to target */ SOURCE_FILE_LISTING(DistCpConstants.CONF_LABEL_SOURCE_LISTING, - new Option("f", true, "List of files that need to be copied")), + new Option("f", true, "List of files that need to be copied")), /** * Copy all the source files and commit them atomically to the target @@ -100,67 +109,62 @@ public enum DistCpOptionSwitch { * polling for availability of a file/dir. This option is incompatible * with SYNC_FOLDERS & DELETE_MISSING */ - ATOMIC_COMMIT(DistCpConstants.CONF_LABEL_ATOMIC_COPY, - new Option("atomic", false, "Commit all changes or none")), + ATOMIC_COMMIT(DistCpConstants.CONF_LABEL_ATOMIC_COPY, new Option("atomic", false, "Commit all changes or none")), /** * Work path to be used only in conjunction in Atomic commit */ WORK_PATH(DistCpConstants.CONF_LABEL_WORK_PATH, - new Option("tmp", true, "Intermediate work path to be used for atomic commit")), + new Option("tmp", true, "Intermediate work path to be used for atomic commit")), /** * Log path where distcp output logs are written to */ LOG_PATH(DistCpConstants.CONF_LABEL_LOG_PATH, - new Option("log", true, "Folder on DFS where distcp execution logs are saved")), + new Option("log", true, "Folder on DFS where distcp execution logs are saved")), /** * Copy strategy is use. This could be dynamic or uniform size etc. * DistCp would use an appropriate input format based on this. */ COPY_STRATEGY(DistCpConstants.CONF_LABEL_COPY_STRATEGY, - new Option("strategy", true, "Copy strategy to use. Default is " + - "dividing work based on file sizes")), + new Option("strategy", true, "Copy strategy to use. Default is " + + "dividing work based on file sizes")), /** * Skip CRC checks between source and target, when determining what * files need to be copied. */ SKIP_CRC(DistCpConstants.CONF_LABEL_SKIP_CRC, - new Option("skipcrccheck", false, "Whether to skip CRC checks between " + - "source and target paths.")), + new Option("skipcrccheck", false, "Whether to skip CRC checks between " + + "source and target paths.")), /** * Overwrite target-files unconditionally. */ OVERWRITE(DistCpConstants.CONF_LABEL_OVERWRITE, - new Option("overwrite", false, "Choose to overwrite target files " + - "unconditionally, even if they exist.")), + new Option("overwrite", false, "Choose to overwrite target files " + + "unconditionally, even if they exist.")), APPEND(DistCpConstants.CONF_LABEL_APPEND, - new Option("append", false, - "Reuse existing data in target files and append new data to them if possible")), + new Option("append", false, + "Reuse existing data in target files and append new data to them if possible")), /** * Should DisctpExecution be blocking */ - BLOCKING("", - new Option("async", false, "Should distcp execution be blocking")), + BLOCKING("", new Option("async", false, "Should distcp execution be blocking")), - FILE_LIMIT("", - new Option("filelimit", true, "(Deprecated!) Limit number of files " + - "copied to <= n")), + FILE_LIMIT("", new Option("filelimit", true, "(Deprecated!) Limit number of files " + + "copied to <= n")), - SIZE_LIMIT("", - new Option("sizelimit", true, "(Deprecated!) Limit number of files " + - "copied to <= n bytes")), + SIZE_LIMIT("", new Option("sizelimit", true, "(Deprecated!) Limit number of files " + + "copied to <= n bytes")), /** * Specify bandwidth per map in MB */ - BANDWIDTH(DistCpConstants.CONF_LABEL_BANDWIDTH_MB, - new Option("bandwidth", true, "Specify bandwidth per map in MB")); + BANDWIDTH(DistCpConstants.CONF_LABEL_BANDWIDTH_MB, new Option("bandwidth", true, "Specify bandwidth per map in MB")); private final String confLabel; private final Option option; @@ -196,9 +200,9 @@ public String getSwitch() { @Override public String toString() { - return super.name() + " {" + - "confLabel='" + confLabel + '\'' + - ", option=" + option + '}'; + return super.name() + " {" + + "confLabel='" + confLabel + '\'' + + ", option=" + option + '}'; } /** diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index 1ed9ccdb3cb78..b2290ee926292 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -21,19 +21,18 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.tools.util.DistCpUtils; - import java.util.EnumSet; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; + /** * The Options class encapsulates all DistCp options. * These may be set from command-line (via the OptionsParser) * or may be set manually. */ public class DistCpOptions { - private boolean atomicCommit = false; private boolean syncFolder = false; private boolean deleteMissing = false; @@ -46,6 +45,8 @@ public class DistCpOptions { private int maxMaps = DistCpConstants.DEFAULT_MAPS; private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB; + private String listMissingFile; + private String sslConfigurationFile; private String copyStrategy = DistCpConstants.UNIFORMSIZE; @@ -61,12 +62,19 @@ public class DistCpOptions { private Path targetPath; - // targetPathExist is a derived field, it's initialized in the + // targetPathExist is a derived field, it's initialized in the // beginning of distcp. private boolean targetPathExists = true; - - public static enum FileAttribute{ - REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE, ACL, XATTR; + + public static enum FileAttribute { + REPLICATION, + BLOCKSIZE, + USER, + GROUP, + PERMISSION, + CHECKSUMTYPE, + ACL, + XATTR; public static FileAttribute getAttribute(char symbol) { for (FileAttribute attribute : values()) { @@ -85,7 +93,7 @@ public static FileAttribute getAttribute(char symbol) { * @param targetPath Destination path for the dist-copy. */ public DistCpOptions(List sourcePaths, Path targetPath) { - assert sourcePaths != null && !sourcePaths.isEmpty() : "Invalid source paths"; + assert (sourcePaths != null) && !sourcePaths.isEmpty() : "Invalid source paths"; assert targetPath != null : "Invalid Target path"; this.sourcePaths = sourcePaths; @@ -110,7 +118,7 @@ public DistCpOptions(Path sourceFileListing, Path targetPath) { * @param that DistCpOptions being copied from. */ public DistCpOptions(DistCpOptions that) { - if (this != that && that != null) { + if ((this != that) && (that != null)) { this.atomicCommit = that.atomicCommit; this.syncFolder = that.syncFolder; this.deleteMissing = that.deleteMissing; @@ -129,6 +137,7 @@ public DistCpOptions(DistCpOptions that) { this.sourcePaths = that.getSourcePaths(); this.targetPath = that.getTargetPath(); this.targetPathExists = that.getTargetPathExists(); + this.listMissingFile = that.getListMissingFile(); } } @@ -189,11 +198,19 @@ public void setDeleteMissing(boolean deleteMissing) { this.deleteMissing = deleteMissing; } + public String getListMissingFile() { + return listMissingFile; + } + + public void setListMissingFile(String listMissingFile) { + this.listMissingFile = listMissingFile; + } + /** - * Should failures be logged and ignored during copy? - * - * @return true if failures are to be logged and ignored. false otherwise - */ + * Should failures be logged and ignored during copy? + * + * @return true if failures are to be logged and ignored. false otherwise + */ public boolean shouldIgnoreFailures() { return ignoreFailures; } @@ -449,7 +466,7 @@ public List getSourcePaths() { * @param sourcePaths The new list of source-paths. */ public void setSourcePaths(List sourcePaths) { - assert sourcePaths != null && sourcePaths.size() != 0; + assert (sourcePaths != null) && (sourcePaths.size() != 0); this.sourcePaths = sourcePaths; } @@ -468,7 +485,7 @@ public Path getTargetPath() { public boolean getTargetPathExists() { return targetPathExists; } - + /** * Set targetPathExists. * @param targetPathExists Whether the target path of distcp exists. @@ -478,32 +495,26 @@ public boolean setTargetPathExists(boolean targetPathExists) { } public void validate(DistCpOptionSwitch option, boolean value) { - - boolean syncFolder = (option == DistCpOptionSwitch.SYNC_FOLDERS ? - value : this.syncFolder); - boolean overwrite = (option == DistCpOptionSwitch.OVERWRITE ? - value : this.overwrite); - boolean deleteMissing = (option == DistCpOptionSwitch.DELETE_MISSING ? - value : this.deleteMissing); - boolean atomicCommit = (option == DistCpOptionSwitch.ATOMIC_COMMIT ? - value : this.atomicCommit); - boolean skipCRC = (option == DistCpOptionSwitch.SKIP_CRC ? - value : this.skipCRC); - boolean append = (option == DistCpOptionSwitch.APPEND ? value : this.append); + boolean syncFolder = ((option == DistCpOptionSwitch.SYNC_FOLDERS) ? value : this.syncFolder); + boolean overwrite = ((option == DistCpOptionSwitch.OVERWRITE) ? value : this.overwrite); + boolean deleteMissing = ((option == DistCpOptionSwitch.DELETE_MISSING) ? value : this.deleteMissing); + boolean atomicCommit = ((option == DistCpOptionSwitch.ATOMIC_COMMIT) ? value : this.atomicCommit); + boolean skipCRC = ((option == DistCpOptionSwitch.SKIP_CRC) ? value : this.skipCRC); + boolean append = ((option == DistCpOptionSwitch.APPEND) ? value : this.append); if (syncFolder && atomicCommit) { throw new IllegalArgumentException("Atomic commit can't be used with " + - "sync folder or overwrite options"); + "sync folder or overwrite options"); } if (deleteMissing && !(overwrite || syncFolder)) { throw new IllegalArgumentException("Delete missing is applicable " + - "only with update or overwrite options"); + "only with update or overwrite options"); } if (overwrite && syncFolder) { throw new IllegalArgumentException("Overwrite and update options are " + - "mutually exclusive"); + "mutually exclusive"); } if (!syncFolder && skipCRC) { @@ -512,11 +523,11 @@ public void validate(DistCpOptionSwitch option, boolean value) { if (!syncFolder && append) { throw new IllegalArgumentException( - "Append is valid only with update options"); + "Append is valid only with update options"); } if (skipCRC && append) { throw new IllegalArgumentException( - "Append is disallowed when skipping CRC"); + "Append is disallowed when skipping CRC"); } } @@ -527,23 +538,25 @@ public void validate(DistCpOptionSwitch option, boolean value) { */ public void appendToConf(Configuration conf) { DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.ATOMIC_COMMIT, - String.valueOf(atomicCommit)); + String.valueOf(atomicCommit)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.IGNORE_FAILURES, - String.valueOf(ignoreFailures)); + String.valueOf(ignoreFailures)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SYNC_FOLDERS, - String.valueOf(syncFolder)); + String.valueOf(syncFolder)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DELETE_MISSING, - String.valueOf(deleteMissing)); + String.valueOf(deleteMissing)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.OVERWRITE, - String.valueOf(overwrite)); + String.valueOf(overwrite)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.APPEND, - String.valueOf(append)); + String.valueOf(append)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC, - String.valueOf(skipCRC)); + String.valueOf(skipCRC)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH, - String.valueOf(mapBandwidth)); + String.valueOf(mapBandwidth)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.PRESERVE_STATUS, - DistCpUtils.packAttributes(preserveStatus)); + DistCpUtils.packAttributes(preserveStatus)); + DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.LIST_MISSING_FILE, + String.valueOf(listMissingFile)); } /** @@ -554,18 +567,19 @@ public void appendToConf(Configuration conf) { @Override public String toString() { return "DistCpOptions{" + - "atomicCommit=" + atomicCommit + - ", syncFolder=" + syncFolder + - ", deleteMissing=" + deleteMissing + - ", ignoreFailures=" + ignoreFailures + - ", maxMaps=" + maxMaps + - ", sslConfigurationFile='" + sslConfigurationFile + '\'' + - ", copyStrategy='" + copyStrategy + '\'' + - ", sourceFileListing=" + sourceFileListing + - ", sourcePaths=" + sourcePaths + - ", targetPath=" + targetPath + - ", targetPathExists=" + targetPathExists + - '}'; + "atomicCommit=" + atomicCommit + + ", syncFolder=" + syncFolder + + ", deleteMissing=" + deleteMissing + + ", ignoreFailures=" + ignoreFailures + + ", maxMaps=" + maxMaps + + ", sslConfigurationFile='" + sslConfigurationFile + '\'' + + ", copyStrategy='" + copyStrategy + '\'' + + ", sourceFileListing=" + sourceFileListing + + ", sourcePaths=" + sourcePaths + + ", targetPath=" + targetPath + + ", targetPathExists=" + targetPathExists + + ", listMissingFile=" + listMissingFile + + '}'; } @Override diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java index 09e8550522713..08bad03322412 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java @@ -23,15 +23,16 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.tools.DistCpOptions.FileAttribute; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; -import java.util.*; /** * The OptionsParser parses out the command-line options passed to DistCp, * and interprets those specific to DistCp, to create an Options object. */ public class OptionsParser { - private static final Log LOG = LogFactory.getLog(OptionsParser.class); private static final Options cliOptions = new Options(); @@ -65,8 +66,7 @@ protected String[] flatten(Options options, String[] arguments, boolean stopAtNo * @return The Options object, corresponding to the specified command-line. * @throws IllegalArgumentException: Thrown if the parse fails. */ - public static DistCpOptions parse(String args[]) throws IllegalArgumentException { - + public static DistCpOptions parse(String[] args) throws IllegalArgumentException { CommandLineParser parser = new CustomParser(); CommandLine command; @@ -81,16 +81,16 @@ public static DistCpOptions parse(String args[]) throws IllegalArgumentException Path targetPath; List sourcePaths = new ArrayList(); - String leftOverArgs[] = command.getArgs(); - if (leftOverArgs == null || leftOverArgs.length < 1) { + String[] leftOverArgs = command.getArgs(); + if ((leftOverArgs == null) || (leftOverArgs.length < 1)) { throw new IllegalArgumentException("Target path not specified"); } //Last Argument is the target path - targetPath = new Path(leftOverArgs[leftOverArgs.length -1].trim()); + targetPath = new Path(leftOverArgs[leftOverArgs.length - 1].trim()); //Copy any source paths in the arguments to the list - for (int index = 0; index < leftOverArgs.length - 1; index++) { + for (int index = 0; index < (leftOverArgs.length - 1); index++) { sourcePaths.add(new Path(leftOverArgs[index].trim())); } @@ -100,8 +100,8 @@ public static DistCpOptions parse(String args[]) throws IllegalArgumentException if (!sourcePaths.isEmpty()) { throw new IllegalArgumentException("Both source file listing and source paths present"); } - option = new DistCpOptions(new Path(getVal(command, DistCpOptionSwitch. - SOURCE_FILE_LISTING.getSwitch())), targetPath); + option = new DistCpOptions(new Path(getVal(command, DistCpOptionSwitch.SOURCE_FILE_LISTING.getSwitch())), + targetPath); } else { if (sourcePaths.isEmpty()) { throw new IllegalArgumentException("Neither source file listing nor source paths present"); @@ -121,7 +121,7 @@ public static DistCpOptions parse(String args[]) throws IllegalArgumentException if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch()) && option.shouldAtomicCommit()) { String workPath = getVal(command, DistCpOptionSwitch.WORK_PATH.getSwitch()); - if (workPath != null && !workPath.isEmpty()) { + if ((workPath != null) && !workPath.isEmpty()) { option.setAtomicWorkPath(new Path(workPath)); } } else if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch())) { @@ -148,6 +148,10 @@ public static DistCpOptions parse(String args[]) throws IllegalArgumentException option.setDeleteMissing(true); } + if (command.hasOption(DistCpOptionSwitch.LIST_MISSING_FILE.getSwitch())) { + option.setListMissingFile(command.getOptionValue(DistCpOptionSwitch.LIST_MISSING_FILE.getSwitch())); + } + if (command.hasOption(DistCpOptionSwitch.SKIP_CRC.getSwitch())) { option.setSkipCRC(true); } @@ -159,80 +163,79 @@ public static DistCpOptions parse(String args[]) throws IllegalArgumentException if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) { try { Integer mapBandwidth = Integer.parseInt( - getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim()); + getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim()); if (mapBandwidth.intValue() <= 0) { throw new IllegalArgumentException("Bandwidth specified is not positive: " + - mapBandwidth); + mapBandwidth); } option.setMapBandwidth(mapBandwidth); } catch (NumberFormatException e) { - throw new IllegalArgumentException("Bandwidth specified is invalid: " + - getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()), e); + throw new IllegalArgumentException( + "Bandwidth specified is invalid: " + + getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()), + e); } } if (command.hasOption(DistCpOptionSwitch.SSL_CONF.getSwitch())) { - option.setSslConfigurationFile(command. - getOptionValue(DistCpOptionSwitch.SSL_CONF.getSwitch())); + option.setSslConfigurationFile(command.getOptionValue(DistCpOptionSwitch.SSL_CONF.getSwitch())); } if (command.hasOption(DistCpOptionSwitch.MAX_MAPS.getSwitch())) { try { Integer maps = Integer.parseInt( - getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()).trim()); + getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()).trim()); option.setMaxMaps(maps); } catch (NumberFormatException e) { - throw new IllegalArgumentException("Number of maps is invalid: " + - getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()), e); + throw new IllegalArgumentException( + "Number of maps is invalid: " + + getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()), + e); } } if (command.hasOption(DistCpOptionSwitch.COPY_STRATEGY.getSwitch())) { option.setCopyStrategy( - getVal(command, DistCpOptionSwitch.COPY_STRATEGY.getSwitch())); + getVal(command, DistCpOptionSwitch.COPY_STRATEGY.getSwitch())); } if (command.hasOption(DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) { - String attributes = - getVal(command, DistCpOptionSwitch.PRESERVE_STATUS.getSwitch()); - if (attributes == null || attributes.isEmpty()) { + String attributes = getVal(command, DistCpOptionSwitch.PRESERVE_STATUS.getSwitch()); + if ((attributes == null) || attributes.isEmpty()) { for (FileAttribute attribute : FileAttribute.values()) { option.preserve(attribute); } } else { for (int index = 0; index < attributes.length(); index++) { - option.preserve(FileAttribute. - getAttribute(attributes.charAt(index))); + option.preserve(FileAttribute.getAttribute(attributes.charAt(index))); } } } if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) { String fileLimitString = getVal(command, - DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim()); + DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim()); try { Integer.parseInt(fileLimitString); - } - catch (NumberFormatException e) { - throw new IllegalArgumentException("File-limit is invalid: " - + fileLimitString, e); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("File-limit is invalid: " + + fileLimitString, e); } LOG.warn(DistCpOptionSwitch.FILE_LIMIT.getSwitch() + " is a deprecated" + - " option. Ignoring."); + " option. Ignoring."); } if (command.hasOption(DistCpOptionSwitch.SIZE_LIMIT.getSwitch())) { String sizeLimitString = getVal(command, - DistCpOptionSwitch.SIZE_LIMIT.getSwitch().trim()); + DistCpOptionSwitch.SIZE_LIMIT.getSwitch().trim()); try { Long.parseLong(sizeLimitString); - } - catch (NumberFormatException e) { - throw new IllegalArgumentException("Size-limit is invalid: " - + sizeLimitString, e); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Size-limit is invalid: " + + sizeLimitString, e); } LOG.warn(DistCpOptionSwitch.SIZE_LIMIT.getSwitch() + " is a deprecated" + - " option. Ignoring."); + " option. Ignoring."); } return option; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java index 4d16445d0ea69..2524cc72050d2 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java @@ -18,6 +18,7 @@ package org.apache.hadoop.tools.mapred; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -27,17 +28,26 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.tools.*; +import org.apache.hadoop.tools.CopyListing; +import org.apache.hadoop.tools.CopyListingFileStatus; +import org.apache.hadoop.tools.DistCpConstants; +import org.apache.hadoop.tools.DistCpOptions; +import org.apache.hadoop.tools.GlobbedCopyListing; import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.tools.util.DistCpUtils; - +import java.io.BufferedWriter; import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; + /** * The CopyCommitter class is DistCp's OutputCommitter implementation. It is * responsible for handling the completion/cleanup of the DistCp run. @@ -57,7 +67,7 @@ public class CopyCommitter extends FileOutputCommitter { private boolean syncFolder = false; private boolean overwrite = false; private boolean targetPathExists = true; - + /** * Create a output committer * @@ -77,25 +87,29 @@ public void commitJob(JobContext jobContext) throws IOException { syncFolder = conf.getBoolean(DistCpConstants.CONF_LABEL_SYNC_FOLDERS, false); overwrite = conf.getBoolean(DistCpConstants.CONF_LABEL_OVERWRITE, false); targetPathExists = conf.getBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, true); - + super.commitJob(jobContext); cleanupTempFiles(jobContext); String attributes = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS); - if (attributes != null && !attributes.isEmpty()) { + if ((attributes != null) && !attributes.isEmpty()) { preserveFileAttributesForDirectories(conf); } try { - if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING, false)) { - deleteMissing(conf); + boolean deleteMissing = conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING, false); + String listMissingFile = conf.get(DistCpConstants.CONF_LABEL_LIST_MISSING_FILE, ""); + if (listMissingFile.equals("null")) { + listMissingFile = ""; + } + if (deleteMissing || StringUtils.isNotBlank(listMissingFile)) { + deleteAndOrListMissing(conf, deleteMissing, listMissingFile); } else if (conf.getBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false)) { commitData(conf); } taskAttemptContext.setStatus("Commit Successful"); - } - finally { + } finally { cleanup(conf); } } @@ -130,11 +144,10 @@ private void cleanupTempFiles(JobContext context) { private void deleteAttemptTempFiles(Path targetWorkPath, FileSystem targetFS, String jobId) throws IOException { - FileStatus[] tempFiles = targetFS.globStatus( - new Path(targetWorkPath, ".distcp.tmp." + jobId.replaceAll("job","attempt") + "*")); + new Path(targetWorkPath, ".distcp.tmp." + jobId.replaceAll("job", "attempt") + "*")); - if (tempFiles != null && tempFiles.length > 0) { + if ((tempFiles != null) && (tempFiles.length > 0)) { for (FileStatus file : tempFiles) { LOG.info("Cleaning up " + file.getPath()); targetFS.delete(file.getPath(), false); @@ -171,7 +184,7 @@ private void preserveFileAttributesForDirectories(Configuration conf) throws IOE Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH)); FileSystem clusterFS = sourceListing.getFileSystem(conf); SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf, - SequenceFile.Reader.file(sourceListing)); + SequenceFile.Reader.file(sourceListing)); long totalLen = clusterFS.getFileStatus(sourceListing).getLen(); Path targetRoot = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); @@ -185,20 +198,26 @@ private void preserveFileAttributesForDirectories(Configuration conf) throws IOE while (sourceReader.next(srcRelPath, srcFileStatus)) { // File-attributes for files are set at the time of copy, // in the map-task. - if (! srcFileStatus.isDirectory()) continue; + if (!srcFileStatus.isDirectory()) { + continue; + } Path targetFile = new Path(targetRoot.toString() + "/" + srcRelPath); + // // Skip the root folder when syncOrOverwrite is true. // - if (targetRoot.equals(targetFile) && syncOrOverwrite) continue; + if (targetRoot.equals(targetFile) && syncOrOverwrite) { + continue; + } FileSystem targetFS = targetFile.getFileSystem(conf); - DistCpUtils.preserve(targetFS, targetFile, srcFileStatus, attributes); + DistCpUtils.preserve(targetFS, targetFile, srcFileStatus, attributes); taskAttemptContext.progress(); - taskAttemptContext.setStatus("Preserving status on directory entries. [" + - sourceReader.getPosition() * 100 / totalLen + "%]"); + taskAttemptContext.setStatus( + "Preserving status on directory entries. [" + + (sourceReader.getPosition() * 100 / totalLen) + "%]"); } } finally { IOUtils.closeStream(sourceReader); @@ -206,11 +225,14 @@ private void preserveFileAttributesForDirectories(Configuration conf) throws IOE LOG.info("Preserved status on " + preservedEntries + " dir entries on target"); } - // This method deletes "extra" files from the target, if they're not + // This method deletes and/or lists "extra" files from the target, if they're not // available at the source. - private void deleteMissing(Configuration conf) throws IOException { - LOG.info("-delete option is enabled. About to remove entries from " + + private void deleteAndOrListMissing(Configuration conf, boolean deleteMissing, String listMissingFile) + throws IOException { + if (deleteMissing) { + LOG.info("-delete option is enabled. About to remove entries from " + "target that are missing in source"); + } // Sort the source-file listing alphabetically. Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH)); @@ -224,7 +246,19 @@ private void deleteMissing(Configuration conf) throws IOException { List targets = new ArrayList(1); Path targetFinalPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH)); targets.add(targetFinalPath); + DistCpOptions options = new DistCpOptions(targets, new Path("/NONE")); + + // set up the missings file + PrintWriter missings = null; + if (StringUtils.isNotBlank(listMissingFile)) { + LOG.info( + "-listMissingFile option is enabled. Creating a list of all entries from " + + "target that are missing in source in file " + conf.get(DistCpConstants.CONF_LABEL_LIST_MISSING_FILE)); + missings = new PrintWriter( + new BufferedWriter(new OutputStreamWriter(clusterFS.create(new Path(listMissingFile), true)))); + } + // // Set up options to be the same from the CopyListing.buildListing's perspective, // so to collect similar listings as when doing the copy @@ -232,15 +266,16 @@ private void deleteMissing(Configuration conf) throws IOException { options.setOverwrite(overwrite); options.setSyncFolder(syncFolder); options.setTargetPathExists(targetPathExists); - + target.buildListing(targetListing, options); + Path sortedTargetListing = DistCpUtils.sortListing(clusterFS, conf, targetListing); long totalLen = clusterFS.getFileStatus(sortedTargetListing).getLen(); SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf, - SequenceFile.Reader.file(sortedSourceListing)); + SequenceFile.Reader.file(sortedSourceListing)); SequenceFile.Reader targetReader = new SequenceFile.Reader(conf, - SequenceFile.Reader.file(sortedTargetListing)); + SequenceFile.Reader.file(sortedTargetListing)); // Walk both source and target file listings. // Delete all from target that doesn't also exist on source. @@ -255,34 +290,44 @@ private void deleteMissing(Configuration conf) throws IOException { boolean srcAvailable = sourceReader.next(srcRelPath, srcFileStatus); while (targetReader.next(trgtRelPath, trgtFileStatus)) { // Skip sources that don't exist on target. - while (srcAvailable && trgtRelPath.compareTo(srcRelPath) > 0) { + while (srcAvailable && (trgtRelPath.compareTo(srcRelPath) > 0)) { srcAvailable = sourceReader.next(srcRelPath, srcFileStatus); } - if (srcAvailable && trgtRelPath.equals(srcRelPath)) continue; + if (srcAvailable && trgtRelPath.equals(srcRelPath)) { + continue; + } // Target doesn't exist at source. Delete. - boolean result = (!targetFS.exists(trgtFileStatus.getPath()) || + if (deleteMissing) { + boolean result = (!targetFS.exists(trgtFileStatus.getPath()) || targetFS.delete(trgtFileStatus.getPath(), true)); - if (result) { - LOG.info("Deleted " + trgtFileStatus.getPath() + " - Missing at source"); - deletedEntries++; - } else { - throw new IOException("Unable to delete " + trgtFileStatus.getPath()); + if (result) { + LOG.info("Deleted " + trgtFileStatus.getPath() + " - Missing at source"); + deletedEntries++; + } else { + throw new IOException("Unable to delete " + trgtFileStatus.getPath()); + } + } + if (missings != null) { + missings.println(trgtFileStatus.getPath()); } taskAttemptContext.progress(); - taskAttemptContext.setStatus("Deleting missing files from target. [" + - targetReader.getPosition() * 100 / totalLen + "%]"); + taskAttemptContext.setStatus( + "Deleting missing files from target. [" + + (targetReader.getPosition() * 100 / totalLen) + "%]"); } } finally { IOUtils.closeStream(sourceReader); IOUtils.closeStream(targetReader); + if (missings != null) { + IOUtils.closeStream(missings); + } } LOG.info("Deleted " + deletedEntries + " from target: " + targets.get(0)); } private void commitData(Configuration conf) throws IOException { - Path workDir = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); Path finalDir = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH)); FileSystem targetFS = workDir.getFileSystem(conf); @@ -290,8 +335,9 @@ private void commitData(Configuration conf) throws IOException { LOG.info("Atomic commit enabled. Moving " + workDir + " to " + finalDir); if (targetFS.exists(finalDir) && targetFS.exists(workDir)) { LOG.error("Pre-existing final-path found at: " + finalDir); - throw new IOException("Target-path can't be committed to because it " + - "exists at " + finalDir + ". Copied data is in temp-dir: " + workDir + ". "); + throw new IOException( + "Target-path can't be committed to because it " + + "exists at " + finalDir + ". Copied data is in temp-dir: " + workDir + ". "); } boolean result = targetFS.rename(workDir, finalDir); diff --git a/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml b/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml index f50ddddae7286..6e1154ef1a9dd 100644 --- a/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml +++ b/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml @@ -31,16 +31,6 @@ Implementation of static input format - - mapred.job.map.memory.mb - 1024 - - - - mapred.job.reduce.memory.mb - 1024 - - mapred.reducer.new-api true diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java index d3da68ed3863d..059a0094156bf 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java @@ -19,52 +19,61 @@ package org.apache.hadoop.tools; import static org.junit.Assert.fail; - +import org.apache.commons.lang.StringUtils; import org.junit.Assert; import org.junit.Test; import org.apache.hadoop.fs.Path; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.tools.DistCpOptions.*; +import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.conf.Configuration; - import java.util.Iterator; import java.util.NoSuchElementException; -public class TestOptionsParser { +public class TestOptionsParser { @Test public void testParseIgnoreFailure() { - DistCpOptions options = OptionsParser.parse(new String[] { + DistCpOptions options = OptionsParser.parse( + new String[] { "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertFalse(options.shouldIgnoreFailures()); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-i", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertTrue(options.shouldIgnoreFailures()); } @Test public void testParseOverwrite() { - DistCpOptions options = OptionsParser.parse(new String[] { + DistCpOptions options = OptionsParser.parse( + new String[] { "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertFalse(options.shouldOverwrite()); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-overwrite", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertTrue(options.shouldOverwrite()); try { - OptionsParser.parse(new String[] { + OptionsParser.parse( + new String[] { "-update", "-overwrite", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.fail("Update and overwrite aren't allowed together"); } catch (IllegalArgumentException ignore) { } @@ -72,329 +81,407 @@ public void testParseOverwrite() { @Test public void testLogPath() { - DistCpOptions options = OptionsParser.parse(new String[] { + DistCpOptions options = OptionsParser.parse( + new String[] { "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertNull(options.getLogPath()); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-log", "hdfs://localhost:8020/logs", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertEquals(options.getLogPath(), new Path("hdfs://localhost:8020/logs")); } @Test public void testParseBlokcing() { - DistCpOptions options = OptionsParser.parse(new String[] { + DistCpOptions options = OptionsParser.parse( + new String[] { "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertTrue(options.shouldBlock()); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-async", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertFalse(options.shouldBlock()); } @Test public void testParsebandwidth() { - DistCpOptions options = OptionsParser.parse(new String[] { + DistCpOptions options = OptionsParser.parse( + new String[] { "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertEquals(options.getMapBandwidth(), DistCpConstants.DEFAULT_BANDWIDTH_MB); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-bandwidth", "11", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertEquals(options.getMapBandwidth(), 11); } - @Test(expected=IllegalArgumentException.class) + @Test(expected = IllegalArgumentException.class) public void testParseNonPositiveBandwidth() { - OptionsParser.parse(new String[] { + OptionsParser.parse( + new String[] { "-bandwidth", "-11", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); } - @Test(expected=IllegalArgumentException.class) + @Test(expected = IllegalArgumentException.class) public void testParseZeroBandwidth() { - OptionsParser.parse(new String[] { + OptionsParser.parse( + new String[] { "-bandwidth", "0", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); } @Test public void testParseSkipCRC() { - DistCpOptions options = OptionsParser.parse(new String[] { + DistCpOptions options = OptionsParser.parse( + new String[] { "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertFalse(options.shouldSkipCRC()); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-update", "-skipcrccheck", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertTrue(options.shouldSyncFolder()); Assert.assertTrue(options.shouldSkipCRC()); } @Test public void testParseAtomicCommit() { - DistCpOptions options = OptionsParser.parse(new String[] { + DistCpOptions options = OptionsParser.parse( + new String[] { "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertFalse(options.shouldAtomicCommit()); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-atomic", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertTrue(options.shouldAtomicCommit()); try { - OptionsParser.parse(new String[] { + OptionsParser.parse( + new String[] { "-atomic", "-update", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.fail("Atomic and sync folders were allowed"); - } catch (IllegalArgumentException ignore) { } + } catch (IllegalArgumentException ignore) { + } } @Test public void testParseWorkPath() { - DistCpOptions options = OptionsParser.parse(new String[] { + DistCpOptions options = OptionsParser.parse( + new String[] { "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertNull(options.getAtomicWorkPath()); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-atomic", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertNull(options.getAtomicWorkPath()); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-atomic", "-tmp", "hdfs://localhost:8020/work", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertEquals(options.getAtomicWorkPath(), new Path("hdfs://localhost:8020/work")); try { - OptionsParser.parse(new String[] { + OptionsParser.parse( + new String[] { "-tmp", "hdfs://localhost:8020/work", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.fail("work path was allowed without -atomic switch"); - } catch (IllegalArgumentException ignore) {} + } catch (IllegalArgumentException ignore) { + } } @Test public void testParseSyncFolders() { - DistCpOptions options = OptionsParser.parse(new String[] { + DistCpOptions options = OptionsParser.parse( + new String[] { "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertFalse(options.shouldSyncFolder()); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-update", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertTrue(options.shouldSyncFolder()); } @Test public void testParseDeleteMissing() { - DistCpOptions options = OptionsParser.parse(new String[] { + DistCpOptions options = OptionsParser.parse( + new String[] { "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertFalse(options.shouldDeleteMissing()); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-update", "-delete", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertTrue(options.shouldSyncFolder()); Assert.assertTrue(options.shouldDeleteMissing()); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-overwrite", "-delete", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertTrue(options.shouldOverwrite()); Assert.assertTrue(options.shouldDeleteMissing()); try { - OptionsParser.parse(new String[] { + OptionsParser.parse( + new String[] { "-atomic", "-delete", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.fail("Atomic and delete folders were allowed"); - } catch (IllegalArgumentException ignore) { } + } catch (IllegalArgumentException ignore) { + } } @Test public void testParseSSLConf() { - DistCpOptions options = OptionsParser.parse(new String[] { + DistCpOptions options = OptionsParser.parse( + new String[] { "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertNull(options.getSslConfigurationFile()); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-mapredSslConf", "/tmp/ssl-client.xml", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertEquals(options.getSslConfigurationFile(), "/tmp/ssl-client.xml"); } @Test public void testParseMaps() { - DistCpOptions options = OptionsParser.parse(new String[] { + DistCpOptions options = OptionsParser.parse( + new String[] { "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertEquals(options.getMaxMaps(), DistCpConstants.DEFAULT_MAPS); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-m", "1", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertEquals(options.getMaxMaps(), 1); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-m", "0", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertEquals(options.getMaxMaps(), 1); try { - OptionsParser.parse(new String[] { + OptionsParser.parse( + new String[] { "-m", "hello", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.fail("Non numberic map parsed"); - } catch (IllegalArgumentException ignore) { } + } catch (IllegalArgumentException ignore) { + } try { - OptionsParser.parse(new String[] { + OptionsParser.parse( + new String[] { "-mapredXslConf", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.fail("Non numberic map parsed"); - } catch (IllegalArgumentException ignore) { } + } catch (IllegalArgumentException ignore) { + } } @Test public void testSourceListing() { - DistCpOptions options = OptionsParser.parse(new String[] { + DistCpOptions options = OptionsParser.parse( + new String[] { "-f", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertEquals(options.getSourceFileListing(), - new Path("hdfs://localhost:8020/source/first")); + new Path("hdfs://localhost:8020/source/first")); } @Test public void testSourceListingAndSourcePath() { try { - OptionsParser.parse(new String[] { + OptionsParser.parse( + new String[] { "-f", "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.fail("Both source listing & source paths allowed"); - } catch (IllegalArgumentException ignore) {} + } catch (IllegalArgumentException ignore) { + } } @Test public void testMissingSourceInfo() { try { - OptionsParser.parse(new String[] { - "hdfs://localhost:8020/target/"}); + OptionsParser.parse(new String[] { "hdfs://localhost:8020/target/" }); Assert.fail("Neither source listing not source paths present"); - } catch (IllegalArgumentException ignore) {} + } catch (IllegalArgumentException ignore) { + } } @Test public void testMissingTarget() { try { - OptionsParser.parse(new String[] { - "-f", "hdfs://localhost:8020/source"}); + OptionsParser.parse(new String[] { "-f", "hdfs://localhost:8020/source" }); Assert.fail("Missing target allowed"); - } catch (IllegalArgumentException ignore) {} + } catch (IllegalArgumentException ignore) { + } } @Test public void testInvalidArgs() { try { - OptionsParser.parse(new String[] { - "-m", "-f", "hdfs://localhost:8020/source"}); + OptionsParser.parse(new String[] { "-m", "-f", "hdfs://localhost:8020/source" }); Assert.fail("Missing map value"); - } catch (IllegalArgumentException ignore) {} + } catch (IllegalArgumentException ignore) { + } } @Test public void testToString() { DistCpOptions option = new DistCpOptions(new Path("abc"), new Path("xyz")); String val = "DistCpOptions{atomicCommit=false, syncFolder=false, deleteMissing=false, " + - "ignoreFailures=false, maxMaps=20, sslConfigurationFile='null', copyStrategy='uniformsize', " + - "sourceFileListing=abc, sourcePaths=null, targetPath=xyz, targetPathExists=true}"; + "ignoreFailures=false, maxMaps=20, sslConfigurationFile='null', copyStrategy='uniformsize', " + + "sourceFileListing=abc, sourcePaths=null, targetPath=xyz, targetPathExists=true, listMissingFile=null}"; Assert.assertEquals(val, option.toString()); Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(), - DistCpOptionSwitch.ATOMIC_COMMIT.name()); + DistCpOptionSwitch.ATOMIC_COMMIT.name()); } @Test public void testCopyStrategy() { - DistCpOptions options = OptionsParser.parse(new String[] { + DistCpOptions options = OptionsParser.parse( + new String[] { "-strategy", "dynamic", "-f", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertEquals(options.getCopyStrategy(), "dynamic"); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-f", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertEquals(options.getCopyStrategy(), DistCpConstants.UNIFORMSIZE); } @Test public void testTargetPath() { - DistCpOptions options = OptionsParser.parse(new String[] { + DistCpOptions options = OptionsParser.parse( + new String[] { "-f", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertEquals(options.getTargetPath(), new Path("hdfs://localhost:8020/target/")); } @Test public void testPreserve() { - DistCpOptions options = OptionsParser.parse(new String[] { + DistCpOptions options = OptionsParser.parse( + new String[] { "-f", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertFalse(options.shouldPreserve(FileAttribute.BLOCKSIZE)); Assert.assertFalse(options.shouldPreserve(FileAttribute.REPLICATION)); Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION)); @@ -402,11 +489,13 @@ public void testPreserve() { Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP)); Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE)); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-p", "-f", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE)); Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION)); Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION)); @@ -416,10 +505,12 @@ public void testPreserve() { Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL)); Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR)); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-p", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE)); Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION)); Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION)); @@ -429,11 +520,13 @@ public void testPreserve() { Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL)); Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR)); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-pbr", "-f", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE)); Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION)); Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION)); @@ -443,11 +536,13 @@ public void testPreserve() { Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL)); Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR)); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-pbrgup", "-f", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE)); Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION)); Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION)); @@ -457,11 +552,13 @@ public void testPreserve() { Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL)); Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR)); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-pbrgupcax", "-f", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE)); Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION)); Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION)); @@ -471,11 +568,13 @@ public void testPreserve() { Assert.assertTrue(options.shouldPreserve(FileAttribute.ACL)); Assert.assertTrue(options.shouldPreserve(FileAttribute.XATTR)); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-pc", "-f", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertFalse(options.shouldPreserve(FileAttribute.BLOCKSIZE)); Assert.assertFalse(options.shouldPreserve(FileAttribute.REPLICATION)); Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION)); @@ -485,11 +584,14 @@ public void testPreserve() { Assert.assertFalse(options.shouldPreserve(FileAttribute.ACL)); Assert.assertFalse(options.shouldPreserve(FileAttribute.XATTR)); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-p", "-f", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); + int i = 0; Iterator attribIterator = options.preserveAttributes(); while (attribIterator.hasNext()) { @@ -499,20 +601,24 @@ public void testPreserve() { Assert.assertEquals(i, 6); try { - OptionsParser.parse(new String[] { + OptionsParser.parse( + new String[] { "-pabcd", "-f", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target"}); + "hdfs://localhost:8020/target" + }); Assert.fail("Invalid preserve attribute"); + } catch (IllegalArgumentException ignore) { + } catch (NoSuchElementException ignore) { } - catch (IllegalArgumentException ignore) {} - catch (NoSuchElementException ignore) {} - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-f", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION)); options.preserve(FileAttribute.PERMISSION); Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION)); @@ -534,29 +640,34 @@ public void testOptionsAppendToConf() { Configuration conf = new Configuration(); Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false)); Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel(), false)); - DistCpOptions options = OptionsParser.parse(new String[] { + + DistCpOptions options = OptionsParser.parse( + new String[] { "-atomic", "-i", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); options.appendToConf(conf); Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false)); Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel(), false)); Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), - DistCpConstants.DEFAULT_BANDWIDTH_MB); + DistCpConstants.DEFAULT_BANDWIDTH_MB); conf = new Configuration(); Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false)); Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.DELETE_MISSING.getConfigLabel(), false)); Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), null); - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-update", "-delete", "-pu", "-bandwidth", "11", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/"}); + "hdfs://localhost:8020/target/" + }); options.appendToConf(conf); Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false)); Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.DELETE_MISSING.getConfigLabel(), false)); @@ -568,40 +679,73 @@ public void testOptionsAppendToConf() { public void testAppendOption() { Configuration conf = new Configuration(); Assert.assertFalse(conf.getBoolean( - DistCpOptionSwitch.APPEND.getConfigLabel(), false)); + DistCpOptionSwitch.APPEND.getConfigLabel(), + false)); Assert.assertFalse(conf.getBoolean( - DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false)); + DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), + false)); - DistCpOptions options = OptionsParser.parse(new String[] { "-update", + DistCpOptions options = OptionsParser.parse( + new String[] { + "-update", "-append", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/" }); + "hdfs://localhost:8020/target/" + }); options.appendToConf(conf); Assert.assertTrue(conf.getBoolean( - DistCpOptionSwitch.APPEND.getConfigLabel(), false)); + DistCpOptionSwitch.APPEND.getConfigLabel(), + false)); Assert.assertTrue(conf.getBoolean( - DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false)); + DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), + false)); // make sure -append is only valid when -update is specified try { - options = OptionsParser.parse(new String[] { "-append", - "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/" }); + options = OptionsParser.parse( + new String[] { + "-append", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/" + }); fail("Append should fail if update option is not specified"); } catch (IllegalArgumentException e) { GenericTestUtils.assertExceptionContains( - "Append is valid only with update options", e); + "Append is valid only with update options", + e); } // make sure -append is invalid when skipCrc is specified try { - options = OptionsParser.parse(new String[] { + options = OptionsParser.parse( + new String[] { "-append", "-update", "-skipcrccheck", "hdfs://localhost:8020/source/first", - "hdfs://localhost:8020/target/" }); + "hdfs://localhost:8020/target/" + }); fail("Append should fail if skipCrc option is specified"); } catch (IllegalArgumentException e) { GenericTestUtils.assertExceptionContains( - "Append is disallowed when skipping CRC", e); + "Append is disallowed when skipping CRC", + e); } } + + @Test + public void testListMissingOption() throws Exception { + DistCpOptions options = OptionsParser.parse( + new String[] { + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/" + }); + Assert.assertTrue(StringUtils.isBlank(options.getListMissingFile())); + + options = OptionsParser.parse( + new String[] { + "-listMissingFile", + "hdfs://localhost:8020/tmp/missingInSource.txt", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/" + }); + Assert.assertTrue(StringUtils.isNotBlank(options.getListMissingFile())); + } }