Skip to content

Commit cbc7a07

Browse files
committed
HBASE-22867 The ForkJoinPool in CleanerChore will spawn thousands of threads in our cluster with thousands table
1 parent a59f7d4 commit cbc7a07

File tree

2 files changed

+53
-63
lines changed

2 files changed

+53
-63
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java

Lines changed: 35 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -18,32 +18,33 @@
1818
package org.apache.hadoop.hbase.master.cleaner;
1919

2020
import java.io.IOException;
21-
import java.util.Collections;
21+
import java.util.ArrayList;
22+
import java.util.Arrays;
2223
import java.util.Comparator;
2324
import java.util.HashMap;
2425
import java.util.LinkedList;
2526
import java.util.List;
2627
import java.util.Map;
27-
import java.util.Optional;
28+
import java.util.concurrent.CompletableFuture;
2829
import java.util.concurrent.ExecutionException;
29-
import java.util.concurrent.RecursiveTask;
3030
import java.util.concurrent.atomic.AtomicBoolean;
31+
import java.util.function.Supplier;
32+
import java.util.stream.Collectors;
33+
3134
import org.apache.hadoop.conf.Configuration;
3235
import org.apache.hadoop.fs.FileStatus;
3336
import org.apache.hadoop.fs.FileSystem;
3437
import org.apache.hadoop.fs.Path;
3538
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
3639
import org.apache.hadoop.hbase.ScheduledChore;
3740
import org.apache.hadoop.hbase.Stoppable;
38-
import org.apache.hadoop.hbase.util.FSUtils;
3941
import org.apache.hadoop.ipc.RemoteException;
4042
import org.apache.yetus.audience.InterfaceAudience;
4143
import org.slf4j.Logger;
4244
import org.slf4j.LoggerFactory;
4345

4446
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
4547
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
46-
import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
4748
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
4849
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
4950
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -214,8 +215,11 @@ private void preRunCleaner() {
214215
public Boolean runCleaner() {
215216
preRunCleaner();
216217
CleanerTask task = new CleanerTask(this.oldFileDir, true);
217-
pool.execute(task);
218-
return task.join();
218+
try {
219+
return pool.execute(task).get();
220+
} catch (InterruptedException | ExecutionException e) {
221+
return false;
222+
}
219223
}
220224

221225
/**
@@ -380,16 +384,14 @@ public boolean setEnabled(final boolean enabled) {
380384
}
381385

382386
private interface Action<T> {
383-
T act() throws IOException;
387+
T act() throws Exception;
384388
}
385389

386390
/**
387391
* Attemps to clean up a directory, its subdirectories, and files. Return value is true if
388392
* everything was deleted. false on partial / total failures.
389393
*/
390-
private final class CleanerTask extends RecursiveTask<Boolean> {
391-
392-
private static final long serialVersionUID = -5444212174088754172L;
394+
private final class CleanerTask implements Supplier<Boolean> {
393395

394396
private final Path dir;
395397
private final boolean root;
@@ -404,15 +406,16 @@ private final class CleanerTask extends RecursiveTask<Boolean> {
404406
}
405407

406408
@Override
407-
protected Boolean compute() {
409+
public Boolean get() {
408410
LOG.trace("Cleaning under {}", dir);
409411
List<FileStatus> subDirs;
410412
List<FileStatus> files;
411413
try {
412414
// if dir doesn't exist, we'll get null back for both of these
413415
// which will fall through to succeeding.
414-
subDirs = getFilteredStatus(FileStatus::isDirectory);
415-
files = getFilteredStatus(FileStatus::isFile);
416+
List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir));
417+
subDirs = allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
418+
files = allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
416419
} catch (IOException ioe) {
417420
LOG.warn("failed to get FileStatus for contents of '{}'", dir, ioe);
418421
return false;
@@ -423,16 +426,18 @@ protected Boolean compute() {
423426
allFilesDeleted = deleteAction(() -> checkAndDeleteFiles(files), "files");
424427
}
425428

429+
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
426430
boolean allSubdirsDeleted = true;
427431
if (!subDirs.isEmpty()) {
428-
List<CleanerTask> tasks = Lists.newArrayListWithCapacity(subDirs.size());
429432
sortByConsumedSpace(subDirs);
430-
for (FileStatus subdir : subDirs) {
431-
CleanerTask task = new CleanerTask(subdir, false);
432-
tasks.add(task);
433-
task.fork();
434-
}
435-
allSubdirsDeleted = deleteAction(() -> getCleanResult(tasks), "subdirs");
433+
subDirs.forEach(dir -> futures.add(pool.execute(new CleanerTask(dir, false))));
434+
allSubdirsDeleted = deleteAction(() -> {
435+
boolean allDeleted = true;
436+
for (CompletableFuture<Boolean> future : futures) {
437+
allDeleted &= future.get();
438+
}
439+
return allDeleted;
440+
}, "subdirs");
436441
}
437442

438443
boolean result = allFilesDeleted && allSubdirsDeleted && isEmptyDirDeletable(dir);
@@ -444,17 +449,6 @@ protected Boolean compute() {
444449
return result;
445450
}
446451

447-
/**
448-
* Get FileStatus with filter.
449-
* @param function a filter function
450-
* @return filtered FileStatus or empty list if dir doesn't exist
451-
* @throws IOException if there's an error other than dir not existing
452-
*/
453-
private List<FileStatus> getFilteredStatus(Predicate<FileStatus> function) throws IOException {
454-
return Optional.ofNullable(FSUtils.listStatusWithStatusFilter(fs, dir,
455-
status -> function.test(status))).orElseGet(Collections::emptyList);
456-
}
457-
458452
/**
459453
* Perform a delete on a specified type.
460454
* @param deletion a delete
@@ -470,36 +464,22 @@ private boolean deleteAction(Action<Boolean> deletion, String type) {
470464
// N.B. HDFS throws this exception when we try to delete a non-empty directory, but
471465
// LocalFileSystem throws a bare IOException. So some test code will get the verbose
472466
// message below.
473-
LOG.debug("Couldn't delete '{}' yet because it isn't empty. Probably transient. " +
474-
"exception details at TRACE.", dir);
467+
LOG.debug("Couldn't delete '{}' yet because it isn't empty. Probably transient. "
468+
+ "exception details at TRACE.",
469+
dir);
475470
LOG.trace("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
476471
deleted = false;
477472
} catch (IOException ioe) {
478-
LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps " +
479-
"happening, use following exception when asking on mailing list.",
480-
type, dir, ioe);
473+
LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps "
474+
+ "happening, use following exception when asking on mailing list.",
475+
type, dir, ioe);
476+
deleted = false;
477+
} catch (Exception e) {
478+
LOG.info("unexpected exception: ", e);
481479
deleted = false;
482480
}
483481
LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);
484482
return deleted;
485483
}
486-
487-
/**
488-
* Get cleaner results of subdirs.
489-
* @param tasks subdirs cleaner tasks
490-
* @return true if all subdirs deleted successfully, false for patial/all failures
491-
* @throws IOException something happen during computation
492-
*/
493-
private boolean getCleanResult(List<CleanerTask> tasks) throws IOException {
494-
boolean cleaned = true;
495-
try {
496-
for (CleanerTask task : tasks) {
497-
cleaned &= task.get();
498-
}
499-
} catch (InterruptedException | ExecutionException e) {
500-
throw new IOException(e);
501-
}
502-
return cleaned;
503-
}
504484
}
505485
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,14 @@
1717
*/
1818
package org.apache.hadoop.hbase.master.cleaner;
1919

20-
import java.util.concurrent.ForkJoinPool;
21-
import java.util.concurrent.ForkJoinTask;
20+
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.LinkedBlockingQueue;
22+
import java.util.concurrent.ThreadPoolExecutor;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.function.Supplier;
25+
2226
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.hbase.DaemonThreadFactory;
2328
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
2429
import org.apache.yetus.audience.InterfaceAudience;
2530
import org.slf4j.Logger;
@@ -32,7 +37,7 @@
3237
public class DirScanPool implements ConfigurationObserver {
3338
private static final Logger LOG = LoggerFactory.getLogger(DirScanPool.class);
3439
private volatile int size;
35-
private ForkJoinPool pool;
40+
private ThreadPoolExecutor pool;
3641
private int cleanerLatch;
3742
private boolean reconfigNotification;
3843

@@ -42,11 +47,16 @@ public DirScanPool(Configuration conf) {
4247
// poolSize may be 0 or 0.0 from a careless configuration,
4348
// double check to make sure.
4449
size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size;
45-
pool = new ForkJoinPool(size);
50+
pool = initializePool(size);
4651
LOG.info("Cleaner pool size is {}", size);
4752
cleanerLatch = 0;
4853
}
4954

55+
private static ThreadPoolExecutor initializePool(int size) {
56+
return new ThreadPoolExecutor(size, size, 500, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
57+
new DaemonThreadFactory("dir-scan-pool"));
58+
}
59+
5060
/**
5161
* Checks if pool can be updated. If so, mark for update later.
5262
* @param conf configuration
@@ -73,8 +83,8 @@ synchronized void latchCountDown() {
7383
notifyAll();
7484
}
7585

76-
synchronized void execute(ForkJoinTask<?> task) {
77-
pool.execute(task);
86+
synchronized CompletableFuture<Boolean> execute(Supplier<Boolean> task) {
87+
return CompletableFuture.supplyAsync(task, pool);
7888
}
7989

8090
public synchronized void shutdownNow() {
@@ -100,8 +110,8 @@ synchronized void tryUpdatePoolSize(long timeout) {
100110
}
101111
}
102112
shutdownNow();
103-
LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size);
104-
pool = new ForkJoinPool(size);
113+
LOG.info("Update chore's pool size from {} to {}", pool.getPoolSize(), size);
114+
pool = initializePool(size);
105115
}
106116

107117
public int getSize() {

0 commit comments

Comments
 (0)