Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,26 @@ private DistCpConstants() {
"distcp.blocks.per.chunk";

public static final String CONF_LABEL_USE_ITERATOR = "distcp.use.iterator";

/**
* Enabling {@code distcp -update} to use modification time of source and
* target file to check while copying same file with same size but
* different content.
*
* The check would verify if the target file is perceived as older than the
* source then it indicates that the source has been recently updated and it
* is a newer version than what was synced, so we should not skip the copy.
* {@value}
*/
public static final String CONF_LABEL_UPDATE_MOD_TIME =
"distcp.update.modification.time";

/**
* Default value for 'distcp.update.modification.time' configuration.
*/
public static final boolean CONF_LABEL_UPDATE_MOD_TIME_DEFAULT =
true;

/**
* Constants for DistCp return code to shell / consumer of ToolRunner's run
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_UPDATE_MOD_TIME_DEFAULT;

/**
* Mapper class that executes the DistCp copy operation.
* Implements the o.a.h.mapreduce.Mapper interface.
Expand Down Expand Up @@ -74,6 +76,15 @@ static enum FileAction {
OVERWRITE, // Overwrite the whole file
}

/**
* Indicates the checksum comparison result.
*/
public enum ChecksumComparison {
TRUE, // checksum comparison is compatible and true.
FALSE, // checksum comparison is compatible and false.
INCOMPATIBLE, // checksum comparison is not compatible.
}

private static Logger LOG = LoggerFactory.getLogger(CopyMapper.class);

private Configuration conf;
Expand All @@ -85,6 +96,7 @@ static enum FileAction {
private boolean append = false;
private boolean verboseLog = false;
private boolean directWrite = false;
private boolean useModTimeToUpdate;
private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);

private FileSystem targetFS = null;
Expand Down Expand Up @@ -114,6 +126,9 @@ public void setup(Context context) throws IOException, InterruptedException {
PRESERVE_STATUS.getConfigLabel()));
directWrite = conf.getBoolean(
DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false);
useModTimeToUpdate =
conf.getBoolean(DistCpConstants.CONF_LABEL_UPDATE_MOD_TIME,
CONF_LABEL_UPDATE_MOD_TIME_DEFAULT);

targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
Path targetFinalPath = new Path(conf.get(
Expand Down Expand Up @@ -350,13 +365,65 @@ private boolean canSkip(FileSystem sourceFS, CopyListingFileStatus source,
boolean sameLength = target.getLen() == source.getLen();
boolean sameBlockSize = source.getBlockSize() == target.getBlockSize()
|| !preserve.contains(FileAttribute.BLOCKSIZE);
// Skip the copy if a 0 size file is being copied.
if (sameLength && source.getLen() == 0) {
return true;
}
// If the src and target file have same size and block size, we would
// check if the checkCrc flag is enabled or not. If enabled, and the
// modTime comparison is enabled then return true if target file is older
// than the source file, since this indicates that the target file is
// recently updated and the source is not changed more recently than the
// update, we can skip the copy else we would copy.
// If skipCrc flag is disabled, we would check the checksum comparison
// which is an enum representing 3 values, of which if the comparison
// returns NOT_COMPATIBLE, we'll try to check modtime again, else return
// the result of checksum comparison which are compatible(true or false).
//
// Note: Different object stores can have different checksum algorithms
// resulting in no checksum comparison that results in return true
// always, having the modification time enabled can help in these
// scenarios to not incorrectly skip a copy. Refer: HADOOP-18596.

if (sameLength && sameBlockSize) {
return skipCrc ||
DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null,
targetFS, target.getPath(), source.getLen());
} else {
return false;
if (skipCrc) {
return maybeUseModTimeToCompare(source, target);
} else {
ChecksumComparison checksumComparison = DistCpUtils
.checksumsAreEqual(sourceFS, source.getPath(), null,
targetFS, target.getPath(), source.getLen());
LOG.debug("Result of checksum comparison between src {} and target "
+ "{} : {}", source, target, checksumComparison);
if (checksumComparison.equals(ChecksumComparison.INCOMPATIBLE)) {
return maybeUseModTimeToCompare(source, target);
}
// if skipCrc is disabled and checksumComparison is compatible we
// need not check the mod time.
return checksumComparison.equals(ChecksumComparison.TRUE);
}
}
return false;
}

/**
* If the mod time comparison is enabled, check the mod time else return
* false.
* Comparison: If the target file perceives to have greater or equal mod time
* (older) than the source file, we can assume that there has been no new
* changes that occurred in the source file, hence we should return true to
* skip the copy of the file.
*
* @param source Source fileStatus.
* @param target Target fileStatus.
* @return boolean representing result of modTime check.
*/
private boolean maybeUseModTimeToCompare(
CopyListingFileStatus source, FileStatus target) {
if (useModTimeToUpdate) {
return source.getModificationTime() <= target.getModificationTime();
}
// if we cannot check mod time, return true (skip the copy).
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpContext;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import org.apache.hadoop.tools.mapred.CopyMapper;
import org.apache.hadoop.tools.mapred.UniformSizeInputFormat;
import org.apache.hadoop.util.StringUtils;

Expand Down Expand Up @@ -568,10 +569,12 @@ public static String getStringDescriptionFor(long nBytes) {
* and false otherwise.
* @throws IOException if there's an exception while retrieving checksums.
*/
public static boolean checksumsAreEqual(FileSystem sourceFS, Path source,
FileChecksum sourceChecksum,
FileSystem targetFS,
Path target, long sourceLen)
public static CopyMapper.ChecksumComparison checksumsAreEqual(
FileSystem sourceFS,
Path source,
FileChecksum sourceChecksum,
FileSystem targetFS,
Path target, long sourceLen)
throws IOException {
FileChecksum targetChecksum = null;
try {
Expand All @@ -585,8 +588,15 @@ public static boolean checksumsAreEqual(FileSystem sourceFS, Path source,
} catch (IOException e) {
LOG.error("Unable to retrieve checksum for " + source + " or " + target, e);
}
return (sourceChecksum == null || targetChecksum == null ||
sourceChecksum.equals(targetChecksum));
// If the source or target checksum is null, that means there is no
// comparison that took place and return not compatible.
// else if matched, return compatible with the matched result.
if (sourceChecksum == null || targetChecksum == null) {
return CopyMapper.ChecksumComparison.INCOMPATIBLE;
} else if (sourceChecksum.equals(targetChecksum)) {
return CopyMapper.ChecksumComparison.TRUE;
}
return CopyMapper.ChecksumComparison.FALSE;
}

/**
Expand All @@ -613,8 +623,12 @@ public static void compareFileLengthsAndChecksums(long srcLen,

//At this point, src & dest lengths are same. if length==0, we skip checksum
if ((srcLen != 0) && (!skipCrc)) {
if (!checksumsAreEqual(sourceFS, source, sourceChecksum,
targetFS, target, srcLen)) {
CopyMapper.ChecksumComparison
checksumComparison = checksumsAreEqual(sourceFS, source, sourceChecksum,
targetFS, target, srcLen);
// If Checksum comparison is false set it to false, else set to true.
boolean checksumResult = !checksumComparison.equals(CopyMapper.ChecksumComparison.FALSE);
if (!checksumResult) {
StringBuilder errorMessage =
new StringBuilder(DistCpConstants.CHECKSUM_MISMATCH_ERROR_MSG)
.append(source).append(" and ").append(target).append(".");
Expand Down
39 changes: 31 additions & 8 deletions hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
Original file line number Diff line number Diff line change
Expand Up @@ -630,14 +630,37 @@ hadoop distcp -update -numListstatusThreads 20 \
Because object stores are slow to list files, consider setting the `-numListstatusThreads` option when performing a `-update` operation
on a large directory tree (the limit is 40 threads).

When `DistCp -update` is used with object stores,
generally only the modification time and length of the individual files are compared,
not any checksums. The fact that most object stores do have valid timestamps
for directories is irrelevant; only the file timestamps are compared.
However, it is important to have the clock of the client computers close
to that of the infrastructure, so that timestamps are consistent between
the client/HDFS cluster and that of the object store. Otherwise, changed files may be
missed/copied too often.
When `DistCp -update` is used with object stores, generally only the
modification time and length of the individual files are compared, not any
checksums if the checksum algorithm between the two stores is different.

* The `distcp -update` between two object stores with different checksum
algorithm compares the modification times of source and target files along
with the file size to determine whether to skip the file copy. The behavior
is controlled by the property `distcp.update.modification.time`, which is
set to true by default. If the source file is more recently modified than
the target file, it is assumed that the content has changed, and the file
should be updated.
We need to ensure that there is no clock skew between the machines.
The fact that most object stores do have valid timestamps for directories
is irrelevant; only the file timestamps are compared. However, it is
important to have the clock of the client computers close to that of the
infrastructure, so that timestamps are consistent between the client/HDFS
cluster and that of the object store. Otherwise, changed files may be
missed/copied too often.

* `distcp.update.modification.time` would only be used if either of the two
stores don't have checksum validation resulting in incompatible checksum
comparison between the two. Even if the property is set to true, it won't
be used if their is valid checksum comparison between the two stores.

To turn off the modification time check, set this in your core-site.xml
```xml
<property>
<name>distcp.update.modification.time</name>
<value>false</value>
</property>
```

**Notes**

Expand Down
Loading