Skip to content

Commit 7ade05d

Browse files
committed
HBASE-22867 The ForkJoinPool in CleanerChore will spawn thousands of threads in our cluster with thousands table
1 parent a59f7d4 commit 7ade05d

File tree

2 files changed

+110
-127
lines changed

2 files changed

+110
-127
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java

Lines changed: 93 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -18,32 +18,32 @@
1818
package org.apache.hadoop.hbase.master.cleaner;
1919

2020
import java.io.IOException;
21-
import java.util.Collections;
21+
import java.util.ArrayList;
22+
import java.util.Arrays;
2223
import java.util.Comparator;
2324
import java.util.HashMap;
2425
import java.util.LinkedList;
2526
import java.util.List;
2627
import java.util.Map;
27-
import java.util.Optional;
28-
import java.util.concurrent.ExecutionException;
29-
import java.util.concurrent.RecursiveTask;
28+
import java.util.concurrent.CompletableFuture;
3029
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.stream.Collectors;
31+
3132
import org.apache.hadoop.conf.Configuration;
3233
import org.apache.hadoop.fs.FileStatus;
3334
import org.apache.hadoop.fs.FileSystem;
3435
import org.apache.hadoop.fs.Path;
3536
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
3637
import org.apache.hadoop.hbase.ScheduledChore;
3738
import org.apache.hadoop.hbase.Stoppable;
38-
import org.apache.hadoop.hbase.util.FSUtils;
39+
import org.apache.hadoop.hbase.util.FutureUtils;
3940
import org.apache.hadoop.ipc.RemoteException;
4041
import org.apache.yetus.audience.InterfaceAudience;
4142
import org.slf4j.Logger;
4243
import org.slf4j.LoggerFactory;
4344

4445
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
4546
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
46-
import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
4747
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
4848
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
4949
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -213,9 +213,14 @@ private void preRunCleaner() {
213213

214214
public Boolean runCleaner() {
215215
preRunCleaner();
216-
CleanerTask task = new CleanerTask(this.oldFileDir, true);
217-
pool.execute(task);
218-
return task.join();
216+
try {
217+
CompletableFuture<Boolean> future = new CompletableFuture<>();
218+
pool.runAsync(() -> traverseAndDelete(oldFileDir, true, future));
219+
return future.get();
220+
} catch (Exception e) {
221+
LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e);
222+
return false;
223+
}
219224
}
220225

221226
/**
@@ -380,126 +385,95 @@ public boolean setEnabled(final boolean enabled) {
380385
}
381386

382387
private interface Action<T> {
383-
T act() throws IOException;
388+
T act() throws Exception;
384389
}
385390

386391
/**
387-
* Attemps to clean up a directory, its subdirectories, and files. Return value is true if
388-
* everything was deleted. false on partial / total failures.
392+
* Attemps to clean up a directory, its subdirectories, and files.
389393
*/
390-
private final class CleanerTask extends RecursiveTask<Boolean> {
391-
392-
private static final long serialVersionUID = -5444212174088754172L;
393-
394-
private final Path dir;
395-
private final boolean root;
396-
397-
CleanerTask(final FileStatus dir, final boolean root) {
398-
this(dir.getPath(), root);
399-
}
400-
401-
CleanerTask(final Path dir, final boolean root) {
402-
this.dir = dir;
403-
this.root = root;
404-
}
405-
406-
@Override
407-
protected Boolean compute() {
408-
LOG.trace("Cleaning under {}", dir);
409-
List<FileStatus> subDirs;
410-
List<FileStatus> files;
411-
try {
412-
// if dir doesn't exist, we'll get null back for both of these
413-
// which will fall through to succeeding.
414-
subDirs = getFilteredStatus(FileStatus::isDirectory);
415-
files = getFilteredStatus(FileStatus::isFile);
416-
} catch (IOException ioe) {
417-
LOG.warn("failed to get FileStatus for contents of '{}'", dir, ioe);
418-
return false;
419-
}
420-
421-
boolean allFilesDeleted = true;
422-
if (!files.isEmpty()) {
423-
allFilesDeleted = deleteAction(() -> checkAndDeleteFiles(files), "files");
424-
}
425-
426-
boolean allSubdirsDeleted = true;
394+
private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean> result) {
395+
try {
396+
// Step.1: List all files under the given directory.
397+
List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir));
398+
List<FileStatus> subDirs =
399+
allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
400+
List<FileStatus> files =
401+
allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
402+
403+
// Step.2: Try to delete all the deletable files.
404+
boolean allFilesDeleted =
405+
files.isEmpty() || deleteAction(() -> checkAndDeleteFiles(files), "files", dir);
406+
407+
// Step.3: Start to traverse and delete the sub-directories.
408+
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
427409
if (!subDirs.isEmpty()) {
428-
List<CleanerTask> tasks = Lists.newArrayListWithCapacity(subDirs.size());
429410
sortByConsumedSpace(subDirs);
430-
for (FileStatus subdir : subDirs) {
431-
CleanerTask task = new CleanerTask(subdir, false);
432-
tasks.add(task);
433-
task.fork();
434-
}
435-
allSubdirsDeleted = deleteAction(() -> getCleanResult(tasks), "subdirs");
411+
// Submit the request of sub-directory deletion.
412+
subDirs.forEach(subDir -> {
413+
CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
414+
pool.runAsync(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
415+
futures.add(subFuture);
416+
});
436417
}
437418

438-
boolean result = allFilesDeleted && allSubdirsDeleted && isEmptyDirDeletable(dir);
439-
// if and only if files and subdirs under current dir are deleted successfully, and the empty
440-
// directory can be deleted, and it is not the root dir then task will try to delete it.
441-
if (result && !root) {
442-
result &= deleteAction(() -> fs.delete(dir, false), "dir");
443-
}
444-
return result;
445-
}
446-
447-
/**
448-
* Get FileStatus with filter.
449-
* @param function a filter function
450-
* @return filtered FileStatus or empty list if dir doesn't exist
451-
* @throws IOException if there's an error other than dir not existing
452-
*/
453-
private List<FileStatus> getFilteredStatus(Predicate<FileStatus> function) throws IOException {
454-
return Optional.ofNullable(FSUtils.listStatusWithStatusFilter(fs, dir,
455-
status -> function.test(status))).orElseGet(Collections::emptyList);
456-
}
457-
458-
/**
459-
* Perform a delete on a specified type.
460-
* @param deletion a delete
461-
* @param type possible values are 'files', 'subdirs', 'dirs'
462-
* @return true if it deleted successfully, false otherwise
463-
*/
464-
private boolean deleteAction(Action<Boolean> deletion, String type) {
465-
boolean deleted;
466-
try {
467-
LOG.trace("Start deleting {} under {}", type, dir);
468-
deleted = deletion.act();
469-
} catch (PathIsNotEmptyDirectoryException exception) {
470-
// N.B. HDFS throws this exception when we try to delete a non-empty directory, but
471-
// LocalFileSystem throws a bare IOException. So some test code will get the verbose
472-
// message below.
473-
LOG.debug("Couldn't delete '{}' yet because it isn't empty. Probably transient. " +
474-
"exception details at TRACE.", dir);
475-
LOG.trace("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
476-
deleted = false;
477-
} catch (IOException ioe) {
478-
LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps " +
479-
"happening, use following exception when asking on mailing list.",
480-
type, dir, ioe);
481-
deleted = false;
482-
}
483-
LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);
484-
return deleted;
419+
// Step.4: Once all sub-files & sub-directories are deleted, then can try to delete the
420+
// current directory asynchronously.
421+
FutureUtils.addListener(
422+
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
423+
(voidObj, e) -> {
424+
if (e != null) {
425+
result.completeExceptionally(e);
426+
return;
427+
}
428+
try {
429+
boolean allSubDirsDeleted = futures.stream().allMatch(CompletableFuture::join);
430+
boolean deleted = allFilesDeleted && allSubDirsDeleted && isEmptyDirDeletable(dir);
431+
if (deleted && !root) {
432+
// If and only if files and sub-dirs under current dir are deleted successfully, and
433+
// the empty directory can be deleted, and it is not the root dir then task will
434+
// try to delete it.
435+
deleted = deleteAction(() -> fs.delete(dir, false), "dir", dir);
436+
}
437+
result.complete(deleted);
438+
} catch (Exception ie) {
439+
// Must handle the inner exception here, otherwise the result may get stuck if one
440+
// sub-directory get some failure.
441+
result.completeExceptionally(ie);
442+
}
443+
});
444+
} catch (Exception e) {
445+
LOG.warn("Failed to traverse and delete the path: {}", dir, e);
446+
result.completeExceptionally(e);
485447
}
448+
}
486449

487-
/**
488-
* Get cleaner results of subdirs.
489-
* @param tasks subdirs cleaner tasks
490-
* @return true if all subdirs deleted successfully, false for patial/all failures
491-
* @throws IOException something happen during computation
492-
*/
493-
private boolean getCleanResult(List<CleanerTask> tasks) throws IOException {
494-
boolean cleaned = true;
495-
try {
496-
for (CleanerTask task : tasks) {
497-
cleaned &= task.get();
498-
}
499-
} catch (InterruptedException | ExecutionException e) {
500-
throw new IOException(e);
501-
}
502-
return cleaned;
450+
/**
451+
* Perform a delete on a specified type.
452+
* @param deletion a delete
453+
* @param type possible values are 'files', 'subdirs', 'dirs'
454+
* @return true if it deleted successfully, false otherwise
455+
*/
456+
private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) {
457+
boolean deleted;
458+
try {
459+
LOG.trace("Start deleting {} under {}", type, dir);
460+
deleted = deletion.act();
461+
} catch (PathIsNotEmptyDirectoryException exception) {
462+
// N.B. HDFS throws this exception when we try to delete a non-empty directory, but
463+
// LocalFileSystem throws a bare IOException. So some test code will get the verbose
464+
// message below.
465+
LOG.debug("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
466+
deleted = false;
467+
} catch (IOException ioe) {
468+
LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps "
469+
+ "happening, use following exception when asking on mailing list.",
470+
type, dir, ioe);
471+
deleted = false;
472+
} catch (Exception e) {
473+
LOG.info("unexpected exception: ", e);
474+
deleted = false;
503475
}
476+
LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);
477+
return deleted;
504478
}
505479
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@
1717
*/
1818
package org.apache.hadoop.hbase.master.cleaner;
1919

20-
import java.util.concurrent.ForkJoinPool;
21-
import java.util.concurrent.ForkJoinTask;
20+
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.LinkedBlockingQueue;
22+
import java.util.concurrent.ThreadPoolExecutor;
23+
import java.util.concurrent.TimeUnit;
24+
2225
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.hbase.DaemonThreadFactory;
2327
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
2428
import org.apache.yetus.audience.InterfaceAudience;
2529
import org.slf4j.Logger;
@@ -32,7 +36,7 @@
3236
public class DirScanPool implements ConfigurationObserver {
3337
private static final Logger LOG = LoggerFactory.getLogger(DirScanPool.class);
3438
private volatile int size;
35-
private ForkJoinPool pool;
39+
private ThreadPoolExecutor pool;
3640
private int cleanerLatch;
3741
private boolean reconfigNotification;
3842

@@ -42,11 +46,16 @@ public DirScanPool(Configuration conf) {
4246
// poolSize may be 0 or 0.0 from a careless configuration,
4347
// double check to make sure.
4448
size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size;
45-
pool = new ForkJoinPool(size);
49+
pool = initializePool(size);
4650
LOG.info("Cleaner pool size is {}", size);
4751
cleanerLatch = 0;
4852
}
4953

54+
private static ThreadPoolExecutor initializePool(int size) {
55+
return new ThreadPoolExecutor(size, size, 500, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
56+
new DaemonThreadFactory("dir-scan-pool"));
57+
}
58+
5059
/**
5160
* Checks if pool can be updated. If so, mark for update later.
5261
* @param conf configuration
@@ -73,8 +82,8 @@ synchronized void latchCountDown() {
7382
notifyAll();
7483
}
7584

76-
synchronized void execute(ForkJoinTask<?> task) {
77-
pool.execute(task);
85+
synchronized void runAsync(Runnable runnable) {
86+
CompletableFuture.runAsync(runnable, pool);
7887
}
7988

8089
public synchronized void shutdownNow() {
@@ -100,8 +109,8 @@ synchronized void tryUpdatePoolSize(long timeout) {
100109
}
101110
}
102111
shutdownNow();
103-
LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size);
104-
pool = new ForkJoinPool(size);
112+
LOG.info("Update chore's pool size from {} to {}", pool.getPoolSize(), size);
113+
pool = initializePool(size);
105114
}
106115

107116
public int getSize() {

0 commit comments

Comments
 (0)