| 
18 | 18 | package org.apache.hadoop.hbase.master.cleaner;  | 
19 | 19 | 
 
  | 
20 | 20 | import java.io.IOException;  | 
21 |  | -import java.util.Collections;  | 
 | 21 | +import java.util.ArrayList;  | 
 | 22 | +import java.util.Arrays;  | 
22 | 23 | import java.util.Comparator;  | 
23 | 24 | import java.util.HashMap;  | 
24 | 25 | import java.util.LinkedList;  | 
25 | 26 | import java.util.List;  | 
26 | 27 | import java.util.Map;  | 
27 |  | -import java.util.Optional;  | 
28 |  | -import java.util.concurrent.ExecutionException;  | 
29 |  | -import java.util.concurrent.RecursiveTask;  | 
 | 28 | +import java.util.concurrent.CompletableFuture;  | 
30 | 29 | import java.util.concurrent.atomic.AtomicBoolean;  | 
 | 30 | +import java.util.stream.Collectors;  | 
 | 31 | + | 
31 | 32 | import org.apache.hadoop.conf.Configuration;  | 
32 | 33 | import org.apache.hadoop.fs.FileStatus;  | 
33 | 34 | import org.apache.hadoop.fs.FileSystem;  | 
34 | 35 | import org.apache.hadoop.fs.Path;  | 
35 | 36 | import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;  | 
36 | 37 | import org.apache.hadoop.hbase.ScheduledChore;  | 
37 | 38 | import org.apache.hadoop.hbase.Stoppable;  | 
38 |  | -import org.apache.hadoop.hbase.util.FSUtils;  | 
 | 39 | +import org.apache.hadoop.hbase.util.FutureUtils;  | 
39 | 40 | import org.apache.hadoop.ipc.RemoteException;  | 
40 | 41 | import org.apache.yetus.audience.InterfaceAudience;  | 
41 | 42 | import org.slf4j.Logger;  | 
42 | 43 | import org.slf4j.LoggerFactory;  | 
43 | 44 | 
 
  | 
44 | 45 | import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;  | 
45 | 46 | import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;  | 
46 |  | -import org.apache.hbase.thirdparty.com.google.common.base.Predicate;  | 
47 | 47 | import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;  | 
48 | 48 | import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;  | 
49 | 49 | import org.apache.hbase.thirdparty.com.google.common.collect.Lists;  | 
@@ -211,11 +211,16 @@ private void preRunCleaner() {  | 
211 | 211 |     cleanersChain.forEach(FileCleanerDelegate::preClean);  | 
212 | 212 |   }  | 
213 | 213 | 
 
  | 
214 |  | -  public Boolean runCleaner() {  | 
 | 214 | +  public boolean runCleaner() {  | 
215 | 215 |     preRunCleaner();  | 
216 |  | -    CleanerTask task = new CleanerTask(this.oldFileDir, true);  | 
217 |  | -    pool.execute(task);  | 
218 |  | -    return task.join();  | 
 | 216 | +    try {  | 
 | 217 | +      CompletableFuture<Boolean> future = new CompletableFuture<>();  | 
 | 218 | +      pool.execute(() -> traverseAndDelete(oldFileDir, true, future));  | 
 | 219 | +      return future.get();  | 
 | 220 | +    } catch (Exception e) {  | 
 | 221 | +      LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e);  | 
 | 222 | +      return false;  | 
 | 223 | +    }  | 
219 | 224 |   }  | 
220 | 225 | 
 
  | 
221 | 226 |   /**  | 
@@ -380,126 +385,97 @@ public boolean setEnabled(final boolean enabled) {  | 
380 | 385 |   }  | 
381 | 386 | 
 
  | 
382 | 387 |   private interface Action<T> {  | 
383 |  | -    T act() throws IOException;  | 
 | 388 | +    T act() throws Exception;  | 
384 | 389 |   }  | 
385 | 390 | 
 
  | 
386 | 391 |   /**  | 
387 |  | -   * Attemps to clean up a directory, its subdirectories, and files. Return value is true if  | 
388 |  | -   * everything was deleted. false on partial / total failures.  | 
 | 392 | +   * Attempts to clean up a directory(its subdirectories, and files) in a  | 
 | 393 | +   * {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by  | 
 | 394 | +   * calling result.get().  | 
389 | 395 |    */  | 
390 |  | -  private final class CleanerTask extends RecursiveTask<Boolean> {  | 
391 |  | - | 
392 |  | -    private static final long serialVersionUID = -5444212174088754172L;  | 
393 |  | - | 
394 |  | -    private final Path dir;  | 
395 |  | -    private final boolean root;  | 
396 |  | - | 
397 |  | -    CleanerTask(final FileStatus dir, final boolean root) {  | 
398 |  | -      this(dir.getPath(), root);  | 
399 |  | -    }  | 
400 |  | - | 
401 |  | -    CleanerTask(final Path dir, final boolean root) {  | 
402 |  | -      this.dir = dir;  | 
403 |  | -      this.root = root;  | 
404 |  | -    }  | 
405 |  | - | 
406 |  | -    @Override  | 
407 |  | -    protected Boolean compute() {  | 
408 |  | -      LOG.trace("Cleaning under {}", dir);  | 
409 |  | -      List<FileStatus> subDirs;  | 
410 |  | -      List<FileStatus> files;  | 
411 |  | -      try {  | 
412 |  | -        // if dir doesn't exist, we'll get null back for both of these  | 
413 |  | -        // which will fall through to succeeding.  | 
414 |  | -        subDirs = getFilteredStatus(FileStatus::isDirectory);  | 
415 |  | -        files = getFilteredStatus(FileStatus::isFile);  | 
416 |  | -      } catch (IOException ioe) {  | 
417 |  | -        LOG.warn("failed to get FileStatus for contents of '{}'", dir, ioe);  | 
418 |  | -        return false;  | 
419 |  | -      }  | 
420 |  | - | 
421 |  | -      boolean allFilesDeleted = true;  | 
422 |  | -      if (!files.isEmpty()) {  | 
423 |  | -        allFilesDeleted = deleteAction(() -> checkAndDeleteFiles(files), "files");  | 
424 |  | -      }  | 
425 |  | - | 
426 |  | -      boolean allSubdirsDeleted = true;  | 
 | 396 | +  private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean> result) {  | 
 | 397 | +    try {  | 
 | 398 | +      // Step.1: List all files under the given directory.  | 
 | 399 | +      List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir));  | 
 | 400 | +      List<FileStatus> subDirs =  | 
 | 401 | +          allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());  | 
 | 402 | +      List<FileStatus> files =  | 
 | 403 | +          allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());  | 
 | 404 | + | 
 | 405 | +      // Step.2: Try to delete all the deletable files.  | 
 | 406 | +      boolean allFilesDeleted =  | 
 | 407 | +          files.isEmpty() || deleteAction(() -> checkAndDeleteFiles(files), "files", dir);  | 
 | 408 | + | 
 | 409 | +      // Step.3: Start to traverse and delete the sub-directories.  | 
 | 410 | +      List<CompletableFuture<Boolean>> futures = new ArrayList<>();  | 
427 | 411 |       if (!subDirs.isEmpty()) {  | 
428 |  | -        List<CleanerTask> tasks = Lists.newArrayListWithCapacity(subDirs.size());  | 
429 | 412 |         sortByConsumedSpace(subDirs);  | 
430 |  | -        for (FileStatus subdir : subDirs) {  | 
431 |  | -          CleanerTask task = new CleanerTask(subdir, false);  | 
432 |  | -          tasks.add(task);  | 
433 |  | -          task.fork();  | 
434 |  | -        }  | 
435 |  | -        allSubdirsDeleted = deleteAction(() -> getCleanResult(tasks), "subdirs");  | 
 | 413 | +        // Submit the request of sub-directory deletion.  | 
 | 414 | +        subDirs.forEach(subDir -> {  | 
 | 415 | +          CompletableFuture<Boolean> subFuture = new CompletableFuture<>();  | 
 | 416 | +          pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));  | 
 | 417 | +          futures.add(subFuture);  | 
 | 418 | +        });  | 
436 | 419 |       }  | 
437 | 420 | 
 
  | 
438 |  | -      boolean result = allFilesDeleted && allSubdirsDeleted && isEmptyDirDeletable(dir);  | 
439 |  | -      // if and only if files and subdirs under current dir are deleted successfully, and the empty  | 
440 |  | -      // directory can be deleted, and it is not the root dir then task will try to delete it.  | 
441 |  | -      if (result && !root) {  | 
442 |  | -        result &= deleteAction(() -> fs.delete(dir, false), "dir");  | 
443 |  | -      }  | 
444 |  | -      return result;  | 
445 |  | -    }  | 
446 |  | - | 
447 |  | -    /**  | 
448 |  | -     * Get FileStatus with filter.  | 
449 |  | -     * @param function a filter function  | 
450 |  | -     * @return filtered FileStatus or empty list if dir doesn't exist  | 
451 |  | -     * @throws IOException if there's an error other than dir not existing  | 
452 |  | -     */  | 
453 |  | -    private List<FileStatus> getFilteredStatus(Predicate<FileStatus> function) throws IOException {  | 
454 |  | -      return Optional.ofNullable(FSUtils.listStatusWithStatusFilter(fs, dir,  | 
455 |  | -        status -> function.test(status))).orElseGet(Collections::emptyList);  | 
456 |  | -    }  | 
457 |  | - | 
458 |  | -    /**  | 
459 |  | -     * Perform a delete on a specified type.  | 
460 |  | -     * @param deletion a delete  | 
461 |  | -     * @param type possible values are 'files', 'subdirs', 'dirs'  | 
462 |  | -     * @return true if it deleted successfully, false otherwise  | 
463 |  | -     */  | 
464 |  | -    private boolean deleteAction(Action<Boolean> deletion, String type) {  | 
465 |  | -      boolean deleted;  | 
466 |  | -      try {  | 
467 |  | -        LOG.trace("Start deleting {} under {}", type, dir);  | 
468 |  | -        deleted = deletion.act();  | 
469 |  | -      } catch (PathIsNotEmptyDirectoryException exception) {  | 
470 |  | -        // N.B. HDFS throws this exception when we try to delete a non-empty directory, but  | 
471 |  | -        // LocalFileSystem throws a bare IOException. So some test code will get the verbose  | 
472 |  | -        // message below.  | 
473 |  | -        LOG.debug("Couldn't delete '{}' yet because it isn't empty. Probably transient. " +  | 
474 |  | -            "exception details at TRACE.", dir);  | 
475 |  | -        LOG.trace("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);  | 
476 |  | -        deleted = false;  | 
477 |  | -      } catch (IOException ioe) {  | 
478 |  | -        LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps " +  | 
479 |  | -                  "happening, use following exception when asking on mailing list.",  | 
480 |  | -                  type, dir, ioe);  | 
481 |  | -        deleted = false;  | 
482 |  | -      }  | 
483 |  | -      LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);  | 
484 |  | -      return deleted;  | 
 | 421 | +      // Step.4: Once all sub-files & sub-directories are deleted, then can try to delete the  | 
 | 422 | +      // current directory asynchronously.  | 
 | 423 | +      FutureUtils.addListener(  | 
 | 424 | +        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),  | 
 | 425 | +        (voidObj, e) -> {  | 
 | 426 | +          if (e != null) {  | 
 | 427 | +            result.completeExceptionally(e);  | 
 | 428 | +            return;  | 
 | 429 | +          }  | 
 | 430 | +          try {  | 
 | 431 | +            boolean allSubDirsDeleted = futures.stream().allMatch(CompletableFuture::join);  | 
 | 432 | +            boolean deleted = allFilesDeleted && allSubDirsDeleted && isEmptyDirDeletable(dir);  | 
 | 433 | +            if (deleted && !root) {  | 
 | 434 | +              // If and only if files and sub-dirs under current dir are deleted successfully, and  | 
 | 435 | +              // the empty directory can be deleted, and it is not the root dir then task will  | 
 | 436 | +              // try to delete it.  | 
 | 437 | +              deleted = deleteAction(() -> fs.delete(dir, false), "dir", dir);  | 
 | 438 | +            }  | 
 | 439 | +            result.complete(deleted);  | 
 | 440 | +          } catch (Exception ie) {  | 
 | 441 | +            // Must handle the inner exception here, otherwise the result may get stuck if one  | 
 | 442 | +            // sub-directory get some failure.  | 
 | 443 | +            result.completeExceptionally(ie);  | 
 | 444 | +          }  | 
 | 445 | +        });  | 
 | 446 | +    } catch (Exception e) {  | 
 | 447 | +      LOG.warn("Failed to traverse and delete the path: {}", dir, e);  | 
 | 448 | +      result.completeExceptionally(e);  | 
485 | 449 |     }  | 
 | 450 | +  }  | 
486 | 451 | 
 
  | 
487 |  | -    /**  | 
488 |  | -     * Get cleaner results of subdirs.  | 
489 |  | -     * @param tasks subdirs cleaner tasks  | 
490 |  | -     * @return true if all subdirs deleted successfully, false for patial/all failures  | 
491 |  | -     * @throws IOException something happen during computation  | 
492 |  | -     */  | 
493 |  | -    private boolean getCleanResult(List<CleanerTask> tasks) throws IOException {  | 
494 |  | -      boolean cleaned = true;  | 
495 |  | -      try {  | 
496 |  | -        for (CleanerTask task : tasks) {  | 
497 |  | -          cleaned &= task.get();  | 
498 |  | -        }  | 
499 |  | -      } catch (InterruptedException | ExecutionException e) {  | 
500 |  | -        throw new IOException(e);  | 
501 |  | -      }  | 
502 |  | -      return cleaned;  | 
 | 452 | +  /**  | 
 | 453 | +   * Perform a delete on a specified type.  | 
 | 454 | +   * @param deletion a delete  | 
 | 455 | +   * @param type possible values are 'files', 'subdirs', 'dirs'  | 
 | 456 | +   * @return true if it deleted successfully, false otherwise  | 
 | 457 | +   */  | 
 | 458 | +  private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) {  | 
 | 459 | +    boolean deleted;  | 
 | 460 | +    try {  | 
 | 461 | +      LOG.trace("Start deleting {} under {}", type, dir);  | 
 | 462 | +      deleted = deletion.act();  | 
 | 463 | +    } catch (PathIsNotEmptyDirectoryException exception) {  | 
 | 464 | +      // N.B. HDFS throws this exception when we try to delete a non-empty directory, but  | 
 | 465 | +      // LocalFileSystem throws a bare IOException. So some test code will get the verbose  | 
 | 466 | +      // message below.  | 
 | 467 | +      LOG.debug("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);  | 
 | 468 | +      deleted = false;  | 
 | 469 | +    } catch (IOException ioe) {  | 
 | 470 | +      LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps "  | 
 | 471 | +          + "happening, use following exception when asking on mailing list.",  | 
 | 472 | +        type, dir, ioe);  | 
 | 473 | +      deleted = false;  | 
 | 474 | +    } catch (Exception e) {  | 
 | 475 | +      LOG.info("unexpected exception: ", e);  | 
 | 476 | +      deleted = false;  | 
503 | 477 |     }  | 
 | 478 | +    LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);  | 
 | 479 | +    return deleted;  | 
504 | 480 |   }  | 
505 | 481 | }  | 
0 commit comments