|
45 | 45 |
|
46 | 46 | import java.io.FileNotFoundException; |
47 | 47 | import java.io.IOException; |
48 | | -import java.util.ArrayList; |
49 | | -import java.util.EnumSet; |
50 | | -import java.util.LinkedList; |
51 | | -import java.util.List; |
| 48 | +import java.util.*; |
| 49 | +import java.util.stream.Collectors; |
52 | 50 |
|
53 | 51 | import static org.apache.hadoop.tools.DistCpConstants.*; |
54 | 52 |
|
@@ -170,17 +168,38 @@ private void deleteAttemptTempFiles(Path targetWorkPath, |
170 | 168 | return; |
171 | 169 | } |
172 | 170 |
|
173 | | - FileStatus[] tempFiles = targetFS.globStatus( |
174 | | - new Path(targetWorkPath, ".distcp.tmp." + jobId.replaceAll("job","attempt") + "*")); |
| 171 | + String tmpFilePattern = TARGET_TMP_FILE_PREFIX + jobId.replaceAll("job","attempt") + "*"; |
| 172 | + List<FileStatus> tempFiles = listTmpFilePaths(targetFS, targetWorkPath, tmpFilePattern); |
175 | 173 |
|
176 | | - if (tempFiles != null && tempFiles.length > 0) { |
| 174 | + if (tempFiles != null && tempFiles.size() > 0) { |
177 | 175 | for (FileStatus file : tempFiles) { |
178 | 176 | LOG.info("Cleaning up " + file.getPath()); |
179 | 177 | targetFS.delete(file.getPath(), false); |
180 | 178 | } |
181 | 179 | } |
182 | 180 | } |
183 | 181 |
|
| 182 | + private List<FileStatus> listTmpFilePaths(FileSystem fileSystem, Path targetWorkPath, String pattern) throws IOException { |
| 183 | + List<Path> tmpPaths = new ArrayList<>(); |
| 184 | + tmpPaths.add(targetWorkPath); |
| 185 | + listChildrenPaths(fileSystem, targetWorkPath, tmpPaths); |
| 186 | + List<FileStatus> allTmpFiles = new ArrayList<>(tmpPaths.size()); |
| 187 | + for (Path path : tmpPaths) { |
| 188 | + FileStatus[] tmpFiles = fileSystem.globStatus(new Path(path, pattern)); |
| 189 | + allTmpFiles.addAll(Arrays.asList(tmpFiles)); |
| 190 | + } |
| 191 | + return allTmpFiles; |
| 192 | + } |
| 193 | + |
| 194 | + private void listChildrenPaths(FileSystem fileSystem, Path targetWorkPath, List<Path> paths) throws IOException { |
| 195 | + List<Path> directoryPaths = Arrays.stream(fileSystem.listStatus(targetWorkPath)) |
| 196 | + .filter(status -> status.isDirectory()).map(status -> status.getPath()).collect(Collectors.toList()); |
| 197 | + paths.addAll(directoryPaths); |
| 198 | + for (Path path : directoryPaths) { |
| 199 | + listChildrenPaths(fileSystem, path, paths); |
| 200 | + } |
| 201 | + } |
| 202 | + |
184 | 203 | /** |
185 | 204 | * Cleanup meta folder and other temporary files |
186 | 205 | * |
|
0 commit comments