From 5a5aa30195d4711e2fd176fcce9526a05e5d6296 Mon Sep 17 00:00:00 2001 From: Min Zhao Date: Thu, 3 Jun 2021 16:05:06 +0800 Subject: [PATCH] HADOOP-17742. fix distcp fail when copying to ftp filesystem --- .../apache/hadoop/tools/DistCpConstants.java | 2 ++ .../hadoop/tools/mapred/CopyCommitter.java | 33 +++++++++++++++---- .../mapred/RetriableFileCopyCommand.java | 2 +- 3 files changed, 29 insertions(+), 8 deletions(-) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index 25815687c2973..0dac471d79208 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -191,4 +191,6 @@ private DistCpConstants() { public static final String CLASS_INSTANTIATION_ERROR_MSG = "Unable to instantiate "; + + public static final String TARGET_TMP_FILE_PREFIX = "distcp.tmp."; } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java index 139bd08fd7abc..dbad816d517f5 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java @@ -45,10 +45,8 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.LinkedList; -import java.util.List; +import java.util.*; +import java.util.stream.Collectors; import static org.apache.hadoop.tools.DistCpConstants.*; @@ -170,10 +168,10 @@ private void deleteAttemptTempFiles(Path targetWorkPath, return; } - FileStatus[] tempFiles = targetFS.globStatus( - new Path(targetWorkPath, ".distcp.tmp." + jobId.replaceAll("job","attempt") + "*")); + String tmpFilePattern = TARGET_TMP_FILE_PREFIX + jobId.replaceAll("job","attempt") + "*"; + List tempFiles = listTmpFilePaths(targetFS, targetWorkPath, tmpFilePattern); - if (tempFiles != null && tempFiles.length > 0) { + if (tempFiles != null && tempFiles.size() > 0) { for (FileStatus file : tempFiles) { LOG.info("Cleaning up " + file.getPath()); targetFS.delete(file.getPath(), false); @@ -181,6 +179,27 @@ private void deleteAttemptTempFiles(Path targetWorkPath, } } + private List listTmpFilePaths(FileSystem fileSystem, Path targetWorkPath, String pattern) throws IOException { + List tmpPaths = new ArrayList<>(); + tmpPaths.add(targetWorkPath); + listChildrenPaths(fileSystem, targetWorkPath, tmpPaths); + List allTmpFiles = new ArrayList<>(tmpPaths.size()); + for (Path path : tmpPaths) { + FileStatus[] tmpFiles = fileSystem.globStatus(new Path(path, pattern)); + allTmpFiles.addAll(Arrays.asList(tmpFiles)); + } + return allTmpFiles; + } + + private void listChildrenPaths(FileSystem fileSystem, Path targetWorkPath, List paths) throws IOException { + List directoryPaths = Arrays.stream(fileSystem.listStatus(targetWorkPath)) + .filter(status -> status.isDirectory()).map(status -> status.getPath()).collect(Collectors.toList()); + paths.addAll(directoryPaths); + for (Path path : directoryPaths) { + listChildrenPaths(fileSystem, path, paths); + } + } + /** * Cleanup meta folder and other temporary files * diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index cde160c965485..0d43a56a1160d 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -263,7 +263,7 @@ private Path getTempFile(Path target, Mapper.Context context) { Path root = target.equals(targetWorkPath) ? targetWorkPath.getParent() : targetWorkPath; - Path tempFile = new Path(root, ".distcp.tmp." + + Path tempFile = new Path(root, DistCpConstants.TARGET_TMP_FILE_PREFIX + context.getTaskAttemptID().toString() + "." + String.valueOf(System.currentTimeMillis())); LOG.info("Creating temp file: {}", tempFile);