Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
* Utility class to hold commonly used constants.
*/
public class DistCpConstants {

/* Default number of maps to use for DistCp */
public static final int DEFAULT_MAPS = 20;

Expand All @@ -44,6 +43,7 @@ public class DistCpConstants {
public static final String CONF_LABEL_PRESERVE_STATUS = "distcp.preserve.status";
public static final String CONF_LABEL_SYNC_FOLDERS = "distcp.sync.folders";
public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source";
public static final String CONF_LABEL_LIST_MISSING_FILE = "distcp.list.missing.source";
public static final String CONF_LABEL_SSL_CONF = "distcp.keystore.resource";
public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps";
public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing";
Expand All @@ -52,16 +52,12 @@ public class DistCpConstants {
public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite";
public static final String CONF_LABEL_APPEND = "distcp.copy.append";
public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";

public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE =
"distcp.dynamic.max.chunks.tolerable";
public static final String CONF_LABEL_MAX_CHUNKS_IDEAL =
"distcp.dynamic.max.chunks.ideal";
public static final String CONF_LABEL_MIN_RECORDS_PER_CHUNK =
"distcp.dynamic.min.records_per_chunk";
public static final String CONF_LABEL_SPLIT_RATIO =
"distcp.dynamic.split.ratio";


public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE = "distcp.dynamic.max.chunks.tolerable";
public static final String CONF_LABEL_MAX_CHUNKS_IDEAL = "distcp.dynamic.max.chunks.ideal";
public static final String CONF_LABEL_MIN_RECORDS_PER_CHUNK = "distcp.dynamic.min.records_per_chunk";
public static final String CONF_LABEL_SPLIT_RATIO = "distcp.dynamic.split.ratio";

/* Total bytes to be copied. Updated by copylisting. Unfiltered count */
public static final String CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED = "mapred.total.bytes.expected";

Expand All @@ -86,9 +82,9 @@ public class DistCpConstants {

/* Boolean to indicate whether the target of distcp exists. */
public static final String CONF_LABEL_TARGET_PATH_EXISTS = "distcp.target.path.exists";

/**
* DistCp job id for consumers of the Disctp
* DistCp job id for consumers of the Disctp
*/
public static final String CONF_LABEL_DISTCP_JOB_ID = "distcp.job.id";

Expand All @@ -101,14 +97,12 @@ public class DistCpConstants {
/**
* Conf label for SSL Trust-store location.
*/
public static final String CONF_LABEL_SSL_TRUST_STORE_LOCATION
= "ssl.client.truststore.location";
public static final String CONF_LABEL_SSL_TRUST_STORE_LOCATION = "ssl.client.truststore.location";

/**
* Conf label for SSL Key-store location.
*/
public static final String CONF_LABEL_SSL_KEY_STORE_LOCATION
= "ssl.client.keystore.location";
public static final String CONF_LABEL_SSL_KEY_STORE_LOCATION = "ssl.client.keystore.location";

/**
* Constants for DistCp return code to shell / consumer of ToolRunner's run
Expand All @@ -119,13 +113,13 @@ public class DistCpConstants {
public static final int ACLS_NOT_SUPPORTED = -3;
public static final int XATTRS_NOT_SUPPORTED = -4;
public static final int UNKNOWN_ERROR = -999;

/**
* Constants for DistCp default values of configurable values
*/
public static final int MAX_CHUNKS_TOLERABLE_DEFAULT = 400;
public static final int MAX_CHUNKS_IDEAL_DEFAULT = 100;
public static final int MAX_CHUNKS_IDEAL_DEFAULT = 100;
public static final int MIN_RECORDS_PER_CHUNK_DEFAULT = 5;
public static final int SPLIT_RATIO_DEFAULT = 2;
public static final int SPLIT_RATIO_DEFAULT = 2;

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,17 @@
import org.apache.commons.cli.Option;
import org.apache.hadoop.conf.Configuration;


/**
* Enumeration mapping configuration keys to distcp command line
* options.
*/
public enum DistCpOptionSwitch {

/**
* Ignores any failures during copy, and continues with rest.
* Logs failures in a file
*/
IGNORE_FAILURES(DistCpConstants.CONF_LABEL_IGNORE_FAILURES,
new Option("i", false, "Ignore failures during copy")),
IGNORE_FAILURES(DistCpConstants.CONF_LABEL_IGNORE_FAILURES, new Option("i", false, "Ignore failures during copy")),

/**
* Preserves status of file/path in the target.
Expand All @@ -45,10 +44,12 @@ public enum DistCpOptionSwitch {
*
*/
PRESERVE_STATUS(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
new Option("p", true, "preserve status (rbugpcax)(replication, " +
"block-size, user, group, permission, checksum-type, ACL, XATTR). " +
"If -p is specified with no <arg>, then preserves replication, " +
"block size, user, group, permission and checksum type.")),
new Option("p",
true,
"preserve status (rbugpcax)(replication, " +
"block-size, user, group, permission, checksum-type, ACL, XATTR). " +
"If -p is specified with no <arg>, then preserves replication, " +
"block size, user, group, permission and checksum type.")),

/**
* Update target location by copying only files that are missing
Expand All @@ -57,8 +58,8 @@ public enum DistCpOptionSwitch {
* Incompatible with ATOMIC_COMMIT
*/
SYNC_FOLDERS(DistCpConstants.CONF_LABEL_SYNC_FOLDERS,
new Option("update", false, "Update target, copying only missing" +
"files or directories")),
new Option("update", false, "Update target, copying only missing" +
"files or directories")),

/**
* Deletes missing files in target that are missing from source
Expand All @@ -67,100 +68,103 @@ public enum DistCpOptionSwitch {
* Incompatible with ATOMIC_COMMIT
*/
DELETE_MISSING(DistCpConstants.CONF_LABEL_DELETE_MISSING,
new Option("delete", false, "Delete from target, " +
"files missing in source")),
new Option("delete", false, "Delete from target, " +
"files missing in source")),


/**
* Create a list of files that are missing in the source and
* exist in the target.
*/
LIST_MISSING_FILE(DistCpConstants.CONF_LABEL_LIST_MISSING_FILE,
new Option("listMissingFile", true, "Create a list of files that " +
"are missing in source and exist in target")),

/**
* Configuration file to use with hftps:// for securely copying
* files across clusters. Typically the configuration file contains
* truststore/keystore information such as location, password and type
*/
SSL_CONF(DistCpConstants.CONF_LABEL_SSL_CONF,
new Option("mapredSslConf", true, "Configuration for ssl config file" +
", to use with hftps://")),
new Option("mapredSslConf", true, "Configuration for ssl config file" +
", to use with hftps://")),

/**
* Max number of maps to use during copy. DistCp will split work
* as equally as possible among these maps
*/
MAX_MAPS(DistCpConstants.CONF_LABEL_MAX_MAPS,
new Option("m", true, "Max number of concurrent maps to use for copy")),
MAX_MAPS(DistCpConstants.CONF_LABEL_MAX_MAPS, new Option("m", true, "Max number of concurrent maps to use for copy")),

/**
* Source file listing can be provided to DistCp in a file.
* This allows DistCp to copy random list of files from source
* and copy them to target
*/
SOURCE_FILE_LISTING(DistCpConstants.CONF_LABEL_SOURCE_LISTING,
new Option("f", true, "List of files that need to be copied")),
new Option("f", true, "List of files that need to be copied")),

/**
* Copy all the source files and commit them atomically to the target
* This is typically useful in cases where there is a process
* polling for availability of a file/dir. This option is incompatible
* with SYNC_FOLDERS & DELETE_MISSING
*/
ATOMIC_COMMIT(DistCpConstants.CONF_LABEL_ATOMIC_COPY,
new Option("atomic", false, "Commit all changes or none")),
ATOMIC_COMMIT(DistCpConstants.CONF_LABEL_ATOMIC_COPY, new Option("atomic", false, "Commit all changes or none")),

/**
* Work path to be used only in conjunction in Atomic commit
*/
WORK_PATH(DistCpConstants.CONF_LABEL_WORK_PATH,
new Option("tmp", true, "Intermediate work path to be used for atomic commit")),
new Option("tmp", true, "Intermediate work path to be used for atomic commit")),

/**
* Log path where distcp output logs are written to
*/
LOG_PATH(DistCpConstants.CONF_LABEL_LOG_PATH,
new Option("log", true, "Folder on DFS where distcp execution logs are saved")),
new Option("log", true, "Folder on DFS where distcp execution logs are saved")),

/**
* Copy strategy is use. This could be dynamic or uniform size etc.
* DistCp would use an appropriate input format based on this.
*/
COPY_STRATEGY(DistCpConstants.CONF_LABEL_COPY_STRATEGY,
new Option("strategy", true, "Copy strategy to use. Default is " +
"dividing work based on file sizes")),
new Option("strategy", true, "Copy strategy to use. Default is " +
"dividing work based on file sizes")),

/**
* Skip CRC checks between source and target, when determining what
* files need to be copied.
*/
SKIP_CRC(DistCpConstants.CONF_LABEL_SKIP_CRC,
new Option("skipcrccheck", false, "Whether to skip CRC checks between " +
"source and target paths.")),
new Option("skipcrccheck", false, "Whether to skip CRC checks between " +
"source and target paths.")),

/**
* Overwrite target-files unconditionally.
*/
OVERWRITE(DistCpConstants.CONF_LABEL_OVERWRITE,
new Option("overwrite", false, "Choose to overwrite target files " +
"unconditionally, even if they exist.")),
new Option("overwrite", false, "Choose to overwrite target files " +
"unconditionally, even if they exist.")),

APPEND(DistCpConstants.CONF_LABEL_APPEND,
new Option("append", false,
"Reuse existing data in target files and append new data to them if possible")),
new Option("append", false,
"Reuse existing data in target files and append new data to them if possible")),

/**
* Should DisctpExecution be blocking
*/
BLOCKING("",
new Option("async", false, "Should distcp execution be blocking")),
BLOCKING("", new Option("async", false, "Should distcp execution be blocking")),

FILE_LIMIT("",
new Option("filelimit", true, "(Deprecated!) Limit number of files " +
"copied to <= n")),
FILE_LIMIT("", new Option("filelimit", true, "(Deprecated!) Limit number of files " +
"copied to <= n")),

SIZE_LIMIT("",
new Option("sizelimit", true, "(Deprecated!) Limit number of files " +
"copied to <= n bytes")),
SIZE_LIMIT("", new Option("sizelimit", true, "(Deprecated!) Limit number of files " +
"copied to <= n bytes")),

/**
* Specify bandwidth per map in MB
*/
BANDWIDTH(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
new Option("bandwidth", true, "Specify bandwidth per map in MB"));
BANDWIDTH(DistCpConstants.CONF_LABEL_BANDWIDTH_MB, new Option("bandwidth", true, "Specify bandwidth per map in MB"));

private final String confLabel;
private final Option option;
Expand Down Expand Up @@ -196,9 +200,9 @@ public String getSwitch() {

@Override
public String toString() {
return super.name() + " {" +
"confLabel='" + confLabel + '\'' +
", option=" + option + '}';
return super.name() + " {" +
"confLabel='" + confLabel + '\'' +
", option=" + option + '}';
}

/**
Expand Down
Loading