|
18 | 18 | package org.apache.hadoop.hbase.master.cleaner; |
19 | 19 |
|
20 | 20 | import java.io.IOException; |
21 | | -import java.util.Collections; |
| 21 | +import java.util.ArrayList; |
| 22 | +import java.util.Arrays; |
22 | 23 | import java.util.Comparator; |
23 | 24 | import java.util.HashMap; |
24 | 25 | import java.util.LinkedList; |
25 | 26 | import java.util.List; |
26 | 27 | import java.util.Map; |
27 | | -import java.util.Optional; |
28 | | -import java.util.concurrent.ExecutionException; |
29 | | -import java.util.concurrent.RecursiveTask; |
| 28 | +import java.util.concurrent.CompletableFuture; |
30 | 29 | import java.util.concurrent.atomic.AtomicBoolean; |
| 30 | +import java.util.stream.Collectors; |
| 31 | + |
31 | 32 | import org.apache.hadoop.conf.Configuration; |
32 | 33 | import org.apache.hadoop.fs.FileStatus; |
33 | 34 | import org.apache.hadoop.fs.FileSystem; |
34 | 35 | import org.apache.hadoop.fs.Path; |
35 | 36 | import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; |
36 | 37 | import org.apache.hadoop.hbase.ScheduledChore; |
37 | 38 | import org.apache.hadoop.hbase.Stoppable; |
38 | | -import org.apache.hadoop.hbase.util.FSUtils; |
39 | 39 | import org.apache.hadoop.ipc.RemoteException; |
40 | 40 | import org.apache.yetus.audience.InterfaceAudience; |
41 | 41 | import org.slf4j.Logger; |
42 | 42 | import org.slf4j.LoggerFactory; |
43 | 43 |
|
44 | 44 | import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; |
45 | 45 | import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; |
46 | | -import org.apache.hbase.thirdparty.com.google.common.base.Predicate; |
47 | 46 | import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; |
48 | 47 | import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; |
49 | 48 | import org.apache.hbase.thirdparty.com.google.common.collect.Lists; |
@@ -213,9 +212,14 @@ private void preRunCleaner() { |
213 | 212 |
|
214 | 213 | public Boolean runCleaner() { |
215 | 214 | preRunCleaner(); |
216 | | - CleanerTask task = new CleanerTask(this.oldFileDir, true); |
217 | | - pool.execute(task); |
218 | | - return task.join(); |
| 215 | + try { |
| 216 | + CompletableFuture<Boolean> future = new CompletableFuture<>(); |
| 217 | + pool.runAsync(() -> traverseAndDelete(oldFileDir, true, future)); |
| 218 | + return future.get(); |
| 219 | + } catch (Exception e) { |
| 220 | + LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e); |
| 221 | + return false; |
| 222 | + } |
219 | 223 | } |
220 | 224 |
|
221 | 225 | /** |
@@ -380,126 +384,93 @@ public boolean setEnabled(final boolean enabled) { |
380 | 384 | } |
381 | 385 |
|
382 | 386 | private interface Action<T> { |
383 | | - T act() throws IOException; |
| 387 | + T act() throws Exception; |
384 | 388 | } |
385 | 389 |
|
386 | 390 | /** |
387 | | - * Attemps to clean up a directory, its subdirectories, and files. Return value is true if |
388 | | - * everything was deleted. false on partial / total failures. |
| 391 | + * Attemps to clean up a directory, its subdirectories, and files. |
389 | 392 | */ |
390 | | - private final class CleanerTask extends RecursiveTask<Boolean> { |
391 | | - |
392 | | - private static final long serialVersionUID = -5444212174088754172L; |
393 | | - |
394 | | - private final Path dir; |
395 | | - private final boolean root; |
396 | | - |
397 | | - CleanerTask(final FileStatus dir, final boolean root) { |
398 | | - this(dir.getPath(), root); |
399 | | - } |
400 | | - |
401 | | - CleanerTask(final Path dir, final boolean root) { |
402 | | - this.dir = dir; |
403 | | - this.root = root; |
404 | | - } |
405 | | - |
406 | | - @Override |
407 | | - protected Boolean compute() { |
408 | | - LOG.trace("Cleaning under {}", dir); |
409 | | - List<FileStatus> subDirs; |
410 | | - List<FileStatus> files; |
411 | | - try { |
412 | | - // if dir doesn't exist, we'll get null back for both of these |
413 | | - // which will fall through to succeeding. |
414 | | - subDirs = getFilteredStatus(FileStatus::isDirectory); |
415 | | - files = getFilteredStatus(FileStatus::isFile); |
416 | | - } catch (IOException ioe) { |
417 | | - LOG.warn("failed to get FileStatus for contents of '{}'", dir, ioe); |
418 | | - return false; |
419 | | - } |
420 | | - |
421 | | - boolean allFilesDeleted = true; |
422 | | - if (!files.isEmpty()) { |
423 | | - allFilesDeleted = deleteAction(() -> checkAndDeleteFiles(files), "files"); |
424 | | - } |
425 | | - |
426 | | - boolean allSubdirsDeleted = true; |
| 393 | + private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean> result) { |
| 394 | + try { |
| 395 | + // Step.1: List all files under the given directory. |
| 396 | + List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir)); |
| 397 | + List<FileStatus> subDirs = |
| 398 | + allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList()); |
| 399 | + List<FileStatus> files = |
| 400 | + allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList()); |
| 401 | + |
| 402 | + // Step.2: Try to delete all the deletable files. |
| 403 | + boolean allFilesDeleted = |
| 404 | + !files.isEmpty() && deleteAction(() -> checkAndDeleteFiles(files), "files", dir); |
| 405 | + |
| 406 | + // Step.3: Start to traverse and delete the sub-directories. |
| 407 | + List<CompletableFuture<Boolean>> futures = new ArrayList<>(); |
427 | 408 | if (!subDirs.isEmpty()) { |
428 | | - List<CleanerTask> tasks = Lists.newArrayListWithCapacity(subDirs.size()); |
429 | 409 | sortByConsumedSpace(subDirs); |
430 | | - for (FileStatus subdir : subDirs) { |
431 | | - CleanerTask task = new CleanerTask(subdir, false); |
432 | | - tasks.add(task); |
433 | | - task.fork(); |
434 | | - } |
435 | | - allSubdirsDeleted = deleteAction(() -> getCleanResult(tasks), "subdirs"); |
| 410 | + // Submit the request of sub-directory deletion. |
| 411 | + subDirs.forEach(subDir -> { |
| 412 | + CompletableFuture<Boolean> subFuture = new CompletableFuture<>(); |
| 413 | + pool.runAsync(() -> traverseAndDelete(subDir.getPath(), false, subFuture)); |
| 414 | + futures.add(subFuture); |
| 415 | + }); |
436 | 416 | } |
437 | 417 |
|
438 | | - boolean result = allFilesDeleted && allSubdirsDeleted && isEmptyDirDeletable(dir); |
439 | | - // if and only if files and subdirs under current dir are deleted successfully, and the empty |
440 | | - // directory can be deleted, and it is not the root dir then task will try to delete it. |
441 | | - if (result && !root) { |
442 | | - result &= deleteAction(() -> fs.delete(dir, false), "dir"); |
443 | | - } |
444 | | - return result; |
445 | | - } |
446 | | - |
447 | | - /** |
448 | | - * Get FileStatus with filter. |
449 | | - * @param function a filter function |
450 | | - * @return filtered FileStatus or empty list if dir doesn't exist |
451 | | - * @throws IOException if there's an error other than dir not existing |
452 | | - */ |
453 | | - private List<FileStatus> getFilteredStatus(Predicate<FileStatus> function) throws IOException { |
454 | | - return Optional.ofNullable(FSUtils.listStatusWithStatusFilter(fs, dir, |
455 | | - status -> function.test(status))).orElseGet(Collections::emptyList); |
456 | | - } |
457 | | - |
458 | | - /** |
459 | | - * Perform a delete on a specified type. |
460 | | - * @param deletion a delete |
461 | | - * @param type possible values are 'files', 'subdirs', 'dirs' |
462 | | - * @return true if it deleted successfully, false otherwise |
463 | | - */ |
464 | | - private boolean deleteAction(Action<Boolean> deletion, String type) { |
465 | | - boolean deleted; |
466 | | - try { |
467 | | - LOG.trace("Start deleting {} under {}", type, dir); |
468 | | - deleted = deletion.act(); |
469 | | - } catch (PathIsNotEmptyDirectoryException exception) { |
470 | | - // N.B. HDFS throws this exception when we try to delete a non-empty directory, but |
471 | | - // LocalFileSystem throws a bare IOException. So some test code will get the verbose |
472 | | - // message below. |
473 | | - LOG.debug("Couldn't delete '{}' yet because it isn't empty. Probably transient. " + |
474 | | - "exception details at TRACE.", dir); |
475 | | - LOG.trace("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception); |
476 | | - deleted = false; |
477 | | - } catch (IOException ioe) { |
478 | | - LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps " + |
479 | | - "happening, use following exception when asking on mailing list.", |
480 | | - type, dir, ioe); |
481 | | - deleted = false; |
482 | | - } |
483 | | - LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted); |
484 | | - return deleted; |
| 418 | + // Step.4: Once all sub-files & sub-directories are deleted, then can try to delete the |
| 419 | + // current directory asynchronously. |
| 420 | + CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) |
| 421 | + .whenComplete((voidObj, e) -> { |
| 422 | + if (e != null) { |
| 423 | + result.completeExceptionally(e); |
| 424 | + return; |
| 425 | + } |
| 426 | + try { |
| 427 | + boolean allSubDirsDeleted = futures.stream().allMatch(CompletableFuture::join); |
| 428 | + if (allFilesDeleted && allSubDirsDeleted && isEmptyDirDeletable(dir) && !root) { |
| 429 | + // If and only if files and sub-dirs under current dir are deleted successfully, and |
| 430 | + // the empty directory can be deleted, and it is not the root dir then task will |
| 431 | + // try to delete it. |
| 432 | + boolean deleted = deleteAction(() -> fs.delete(dir, false), "dir", dir); |
| 433 | + result.complete(deleted); |
| 434 | + } |
| 435 | + } catch (Exception ie) { |
| 436 | + // Must handle the inner exception here, otherwise the result may get stuck if one |
| 437 | + // sub-directory get some failure. |
| 438 | + result.completeExceptionally(ie); |
| 439 | + } |
| 440 | + }); |
| 441 | + } catch (Exception e) { |
| 442 | + LOG.warn("failed to traverse the path: {}", dir, e); |
| 443 | + result.completeExceptionally(e); |
485 | 444 | } |
| 445 | + } |
486 | 446 |
|
487 | | - /** |
488 | | - * Get cleaner results of subdirs. |
489 | | - * @param tasks subdirs cleaner tasks |
490 | | - * @return true if all subdirs deleted successfully, false for patial/all failures |
491 | | - * @throws IOException something happen during computation |
492 | | - */ |
493 | | - private boolean getCleanResult(List<CleanerTask> tasks) throws IOException { |
494 | | - boolean cleaned = true; |
495 | | - try { |
496 | | - for (CleanerTask task : tasks) { |
497 | | - cleaned &= task.get(); |
498 | | - } |
499 | | - } catch (InterruptedException | ExecutionException e) { |
500 | | - throw new IOException(e); |
501 | | - } |
502 | | - return cleaned; |
| 447 | + /** |
| 448 | + * Perform a delete on a specified type. |
| 449 | + * @param deletion a delete |
| 450 | + * @param type possible values are 'files', 'subdirs', 'dirs' |
| 451 | + * @return true if it deleted successfully, false otherwise |
| 452 | + */ |
| 453 | + private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) { |
| 454 | + boolean deleted; |
| 455 | + try { |
| 456 | + LOG.trace("Start deleting {} under {}", type, dir); |
| 457 | + deleted = deletion.act(); |
| 458 | + } catch (PathIsNotEmptyDirectoryException exception) { |
| 459 | + // N.B. HDFS throws this exception when we try to delete a non-empty directory, but |
| 460 | + // LocalFileSystem throws a bare IOException. So some test code will get the verbose |
| 461 | + // message below. |
| 462 | + LOG.debug("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception); |
| 463 | + deleted = false; |
| 464 | + } catch (IOException ioe) { |
| 465 | + LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps " |
| 466 | + + "happening, use following exception when asking on mailing list.", |
| 467 | + type, dir, ioe); |
| 468 | + deleted = false; |
| 469 | + } catch (Exception e) { |
| 470 | + LOG.info("unexpected exception: ", e); |
| 471 | + deleted = false; |
503 | 472 | } |
| 473 | + LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted); |
| 474 | + return deleted; |
504 | 475 | } |
505 | 476 | } |
0 commit comments