Skip to content

Commit 0690864

Browse files
committed
HBASE-22867 The ForkJoinPool in CleanerChore will spawn thousands of threads in our cluster with thousands table (#513)
Signed-off-by: Duo Zhang <[email protected]> Signed-off-by: Reid Chan <[email protected]>
1 parent cb0727d commit 0690864

File tree

2 files changed

+114
-129
lines changed

2 files changed

+114
-129
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java

Lines changed: 96 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -18,32 +18,32 @@
1818
package org.apache.hadoop.hbase.master.cleaner;
1919

2020
import java.io.IOException;
21-
import java.util.Collections;
21+
import java.util.ArrayList;
22+
import java.util.Arrays;
2223
import java.util.Comparator;
2324
import java.util.HashMap;
2425
import java.util.LinkedList;
2526
import java.util.List;
2627
import java.util.Map;
27-
import java.util.Optional;
28-
import java.util.concurrent.ExecutionException;
29-
import java.util.concurrent.RecursiveTask;
28+
import java.util.concurrent.CompletableFuture;
3029
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.stream.Collectors;
31+
3132
import org.apache.hadoop.conf.Configuration;
3233
import org.apache.hadoop.fs.FileStatus;
3334
import org.apache.hadoop.fs.FileSystem;
3435
import org.apache.hadoop.fs.Path;
3536
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
3637
import org.apache.hadoop.hbase.ScheduledChore;
3738
import org.apache.hadoop.hbase.Stoppable;
38-
import org.apache.hadoop.hbase.util.FSUtils;
39+
import org.apache.hadoop.hbase.util.FutureUtils;
3940
import org.apache.hadoop.ipc.RemoteException;
4041
import org.apache.yetus.audience.InterfaceAudience;
4142
import org.slf4j.Logger;
4243
import org.slf4j.LoggerFactory;
4344

4445
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
4546
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
46-
import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
4747
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
4848
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
4949
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -211,11 +211,16 @@ private void preRunCleaner() {
211211
cleanersChain.forEach(FileCleanerDelegate::preClean);
212212
}
213213

214-
public Boolean runCleaner() {
214+
public boolean runCleaner() {
215215
preRunCleaner();
216-
CleanerTask task = new CleanerTask(this.oldFileDir, true);
217-
pool.execute(task);
218-
return task.join();
216+
try {
217+
CompletableFuture<Boolean> future = new CompletableFuture<>();
218+
pool.execute(() -> traverseAndDelete(oldFileDir, true, future));
219+
return future.get();
220+
} catch (Exception e) {
221+
LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e);
222+
return false;
223+
}
219224
}
220225

221226
/**
@@ -360,126 +365,97 @@ public boolean setEnabled(final boolean enabled) {
360365
}
361366

362367
private interface Action<T> {
363-
T act() throws IOException;
368+
T act() throws Exception;
364369
}
365370

366371
/**
367-
* Attemps to clean up a directory, its subdirectories, and files. Return value is true if
368-
* everything was deleted. false on partial / total failures.
372+
* Attempts to clean up a directory(its subdirectories, and files) in a
373+
* {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by
374+
* calling result.get().
369375
*/
370-
private final class CleanerTask extends RecursiveTask<Boolean> {
371-
372-
private static final long serialVersionUID = -5444212174088754172L;
373-
374-
private final Path dir;
375-
private final boolean root;
376-
377-
CleanerTask(final FileStatus dir, final boolean root) {
378-
this(dir.getPath(), root);
379-
}
380-
381-
CleanerTask(final Path dir, final boolean root) {
382-
this.dir = dir;
383-
this.root = root;
384-
}
385-
386-
@Override
387-
protected Boolean compute() {
388-
LOG.trace("Cleaning under {}", dir);
389-
List<FileStatus> subDirs;
390-
List<FileStatus> files;
391-
try {
392-
// if dir doesn't exist, we'll get null back for both of these
393-
// which will fall through to succeeding.
394-
subDirs = getFilteredStatus(FileStatus::isDirectory);
395-
files = getFilteredStatus(FileStatus::isFile);
396-
} catch (IOException ioe) {
397-
LOG.warn("failed to get FileStatus for contents of '{}'", dir, ioe);
398-
return false;
399-
}
400-
401-
boolean allFilesDeleted = true;
402-
if (!files.isEmpty()) {
403-
allFilesDeleted = deleteAction(() -> checkAndDeleteFiles(files), "files");
404-
}
405-
406-
boolean allSubdirsDeleted = true;
376+
private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean> result) {
377+
try {
378+
// Step.1: List all files under the given directory.
379+
List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir));
380+
List<FileStatus> subDirs =
381+
allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
382+
List<FileStatus> files =
383+
allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
384+
385+
// Step.2: Try to delete all the deletable files.
386+
boolean allFilesDeleted =
387+
files.isEmpty() || deleteAction(() -> checkAndDeleteFiles(files), "files", dir);
388+
389+
// Step.3: Start to traverse and delete the sub-directories.
390+
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
407391
if (!subDirs.isEmpty()) {
408-
List<CleanerTask> tasks = Lists.newArrayListWithCapacity(subDirs.size());
409392
sortByConsumedSpace(subDirs);
410-
for (FileStatus subdir : subDirs) {
411-
CleanerTask task = new CleanerTask(subdir, false);
412-
tasks.add(task);
413-
task.fork();
414-
}
415-
allSubdirsDeleted = deleteAction(() -> getCleanResult(tasks), "subdirs");
393+
// Submit the request of sub-directory deletion.
394+
subDirs.forEach(subDir -> {
395+
CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
396+
pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
397+
futures.add(subFuture);
398+
});
416399
}
417400

418-
boolean result = allFilesDeleted && allSubdirsDeleted;
419-
// if and only if files and subdirs under current dir are deleted successfully, and
420-
// it is not the root dir, then task will try to delete it.
421-
if (result && !root) {
422-
result &= deleteAction(() -> fs.delete(dir, false), "dir");
423-
}
424-
return result;
425-
}
426-
427-
/**
428-
* Get FileStatus with filter.
429-
* @param function a filter function
430-
* @return filtered FileStatus or empty list if dir doesn't exist
431-
* @throws IOException if there's an error other than dir not existing
432-
*/
433-
private List<FileStatus> getFilteredStatus(Predicate<FileStatus> function) throws IOException {
434-
return Optional.ofNullable(FSUtils.listStatusWithStatusFilter(fs, dir,
435-
status -> function.test(status))).orElseGet(Collections::emptyList);
436-
}
437-
438-
/**
439-
* Perform a delete on a specified type.
440-
* @param deletion a delete
441-
* @param type possible values are 'files', 'subdirs', 'dirs'
442-
* @return true if it deleted successfully, false otherwise
443-
*/
444-
private boolean deleteAction(Action<Boolean> deletion, String type) {
445-
boolean deleted;
446-
try {
447-
LOG.trace("Start deleting {} under {}", type, dir);
448-
deleted = deletion.act();
449-
} catch (PathIsNotEmptyDirectoryException exception) {
450-
// N.B. HDFS throws this exception when we try to delete a non-empty directory, but
451-
// LocalFileSystem throws a bare IOException. So some test code will get the verbose
452-
// message below.
453-
LOG.debug("Couldn't delete '{}' yet because it isn't empty. Probably transient. " +
454-
"exception details at TRACE.", dir);
455-
LOG.trace("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
456-
deleted = false;
457-
} catch (IOException ioe) {
458-
LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps " +
459-
"happening, use following exception when asking on mailing list.",
460-
type, dir, ioe);
461-
deleted = false;
462-
}
463-
LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);
464-
return deleted;
401+
// Step.4: Once all sub-files & sub-directories are deleted, then can try to delete the
402+
// current directory asynchronously.
403+
FutureUtils.addListener(
404+
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
405+
(voidObj, e) -> {
406+
if (e != null) {
407+
result.completeExceptionally(e);
408+
return;
409+
}
410+
try {
411+
boolean allSubDirsDeleted = futures.stream().allMatch(CompletableFuture::join);
412+
boolean deleted = allFilesDeleted && allSubDirsDeleted;
413+
if (deleted && !root) {
414+
// If and only if files and sub-dirs under current dir are deleted successfully, and
415+
// the empty directory can be deleted, and it is not the root dir then task will
416+
// try to delete it.
417+
deleted = deleteAction(() -> fs.delete(dir, false), "dir", dir);
418+
}
419+
result.complete(deleted);
420+
} catch (Exception ie) {
421+
// Must handle the inner exception here, otherwise the result may get stuck if one
422+
// sub-directory get some failure.
423+
result.completeExceptionally(ie);
424+
}
425+
});
426+
} catch (Exception e) {
427+
LOG.debug("Failed to traverse and delete the path: {}", dir, e);
428+
result.completeExceptionally(e);
465429
}
430+
}
466431

467-
/**
468-
* Get cleaner results of subdirs.
469-
* @param tasks subdirs cleaner tasks
470-
* @return true if all subdirs deleted successfully, false for patial/all failures
471-
* @throws IOException something happen during computation
472-
*/
473-
private boolean getCleanResult(List<CleanerTask> tasks) throws IOException {
474-
boolean cleaned = true;
475-
try {
476-
for (CleanerTask task : tasks) {
477-
cleaned &= task.get();
478-
}
479-
} catch (InterruptedException | ExecutionException e) {
480-
throw new IOException(e);
481-
}
482-
return cleaned;
432+
/**
433+
* Perform a delete on a specified type.
434+
* @param deletion a delete
435+
* @param type possible values are 'files', 'subdirs', 'dirs'
436+
* @return true if it deleted successfully, false otherwise
437+
*/
438+
private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) {
439+
boolean deleted;
440+
try {
441+
LOG.trace("Start deleting {} under {}", type, dir);
442+
deleted = deletion.act();
443+
} catch (PathIsNotEmptyDirectoryException exception) {
444+
// N.B. HDFS throws this exception when we try to delete a non-empty directory, but
445+
// LocalFileSystem throws a bare IOException. So some test code will get the verbose
446+
// message below.
447+
LOG.debug("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
448+
deleted = false;
449+
} catch (IOException ioe) {
450+
LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps "
451+
+ "happening, use following exception when asking on mailing list.",
452+
type, dir, ioe);
453+
deleted = false;
454+
} catch (Exception e) {
455+
LOG.info("unexpected exception: ", e);
456+
deleted = false;
483457
}
458+
LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);
459+
return deleted;
484460
}
485461
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717
*/
1818
package org.apache.hadoop.hbase.master.cleaner;
1919

20-
import java.util.concurrent.ForkJoinPool;
21-
import java.util.concurrent.ForkJoinTask;
20+
import java.util.concurrent.LinkedBlockingQueue;
21+
import java.util.concurrent.ThreadPoolExecutor;
22+
import java.util.concurrent.TimeUnit;
23+
2224
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.hbase.DaemonThreadFactory;
2326
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
2427
import org.apache.yetus.audience.InterfaceAudience;
2528
import org.slf4j.Logger;
@@ -32,7 +35,7 @@
3235
public class DirScanPool implements ConfigurationObserver {
3336
private static final Logger LOG = LoggerFactory.getLogger(DirScanPool.class);
3437
private volatile int size;
35-
private ForkJoinPool pool;
38+
private final ThreadPoolExecutor pool;
3639
private int cleanerLatch;
3740
private boolean reconfigNotification;
3841

@@ -42,11 +45,18 @@ public DirScanPool(Configuration conf) {
4245
// poolSize may be 0 or 0.0 from a careless configuration,
4346
// double check to make sure.
4447
size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size;
45-
pool = new ForkJoinPool(size);
48+
pool = initializePool(size);
4649
LOG.info("Cleaner pool size is {}", size);
4750
cleanerLatch = 0;
4851
}
4952

53+
private static ThreadPoolExecutor initializePool(int size) {
54+
ThreadPoolExecutor executor = new ThreadPoolExecutor(size, size, 1, TimeUnit.MINUTES,
55+
new LinkedBlockingQueue<>(), new DaemonThreadFactory("dir-scan-pool"));
56+
executor.allowCoreThreadTimeOut(true);
57+
return executor;
58+
}
59+
5060
/**
5161
* Checks if pool can be updated. If so, mark for update later.
5262
* @param conf configuration
@@ -73,8 +83,8 @@ synchronized void latchCountDown() {
7383
notifyAll();
7484
}
7585

76-
synchronized void execute(ForkJoinTask<?> task) {
77-
pool.execute(task);
86+
synchronized void execute(Runnable runnable) {
87+
pool.execute(runnable);
7888
}
7989

8090
public synchronized void shutdownNow() {
@@ -99,9 +109,8 @@ synchronized void tryUpdatePoolSize(long timeout) {
99109
break;
100110
}
101111
}
102-
shutdownNow();
103-
LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size);
104-
pool = new ForkJoinPool(size);
112+
LOG.info("Update chore's pool size from {} to {}", pool.getPoolSize(), size);
113+
pool.setCorePoolSize(size);
105114
}
106115

107116
public int getSize() {

0 commit comments

Comments
 (0)