Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -78,6 +79,7 @@
import org.apache.hadoop.hbase.PleaseRestartMasterException;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -121,6 +123,7 @@
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.balancer.MaintenanceLoadBalancer;
import org.apache.hadoop.hbase.master.cleaner.CleanerChore;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
Expand Down Expand Up @@ -373,6 +376,9 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
private DirScanPool logCleanerPool;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
private HFileCleaner[] customCleaners;
private Path[] customPaths;
private DirScanPool customCleanerPool;
private ReplicationBarrierCleaner replicationBarrierCleaner;
private MobFileCleanerChore mobFileCleanerChore;
private MobFileCompactionChore mobFileCompactionChore;
Expand Down Expand Up @@ -1139,6 +1145,14 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc
configurationManager.registerObserver(this.hfileCleaner);
configurationManager.registerObserver(this.logCleaner);
configurationManager.registerObserver(this.regionsRecoveryConfigManager);
if (this.customCleanerPool != null) {
configurationManager.registerObserver(this.customCleanerPool);
}
if (this.customCleaners != null) {
for (HFileCleaner cleaner : customCleaners) {
configurationManager.registerObserver(cleaner);
}
}
// Set master as 'initialized'.
setInitialized(true);

Expand Down Expand Up @@ -1509,8 +1523,36 @@ private void startServiceThreads() throws IOException {
logCleanerPool, params);
getChoreService().scheduleChore(logCleaner);

// start the hfile archive cleaner thread
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
if (conf.getBoolean(HFileCleaner.HFILE_CLEANER_ENABLE_CUSTOM_PATHS, false)) {
String[] paths = conf.getStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS);
if (paths != null && paths.length > 0) {
if (conf.getStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS) == null) {
Set<String> cleanerClasses = new HashSet<>();
String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
if (cleaners != null) {
Collections.addAll(cleanerClasses, cleaners);
}
conf.setStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS,
cleanerClasses.toArray(new String[cleanerClasses.size()]));
LOG.info("Custom cleaner paths: {}, plugins: {}", Arrays.asList(paths), cleanerClasses);
}
customCleanerPool = DirScanPool.getHFileCleanerScanPool(conf);
customPaths = new Path[paths.length];
customCleaners = new HFileCleaner[paths.length];
for (int i = 0; i < paths.length; i++) {
Path path = new Path(paths[i].trim());
customPaths[i] = path;
HFileCleaner cleaner =
new HFileCleaner("CustomPathHFileCleaner-" + path.getName(), cleanerInterval, this,
conf, getMasterFileSystem().getFileSystem(), new Path(archiveDir, path),
HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS, customCleanerPool, params, null);
customCleaners[i] = cleaner;
getChoreService().scheduleChore(cleaner);
}
}
}

// Create archive cleaner thread pool
hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf);
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
Expand Down Expand Up @@ -1572,6 +1614,10 @@ protected void stopServiceThreads() {
logCleanerPool.shutdownNow();
logCleanerPool = null;
}
if (customCleanerPool != null) {
customCleanerPool.shutdownNow();
customCleanerPool = null;
}
if (maintenanceRegionServer != null) {
maintenanceRegionServer.getRegionServer().stop(HBASE_MASTER_CLEANER_INTERVAL);
}
Expand Down Expand Up @@ -1709,6 +1755,12 @@ protected void stopChores() {
shutdownChore(snapshotQuotaChore);
shutdownChore(logCleaner);
shutdownChore(hfileCleaner);
if (customCleaners != null) {
for (ScheduledChore chore : customCleaners) {
chore.shutdown();
}
customCleaners = null;
}
shutdownChore(replicationBarrierCleaner);
shutdownChore(snapshotCleanerChore);
shutdownChore(hbckChore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.master.cleaner;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -80,10 +81,11 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
protected final Map<String, Object> params;
private final AtomicBoolean enabled = new AtomicBoolean(true);
protected List<T> cleanersChain;
protected String[] excludeDirs;

public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) {
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null);
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null, null);
}

/**
Expand All @@ -98,7 +100,8 @@ public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Confi
* @param params members could be used in cleaner
*/
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params) {
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params,
Path[] excludePaths) {
super(name, s, sleepPeriod);

Preconditions.checkNotNull(pool, "Chore's pool can not be null");
Expand All @@ -107,6 +110,19 @@ public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Confi
this.oldFileDir = oldFileDir;
this.conf = conf;
this.params = params;
if (excludePaths != null && excludePaths.length > 0) {
excludeDirs = new String[excludePaths.length];
for (int i = 0; i < excludePaths.length; i++) {
StringBuilder dirPart = new StringBuilder(excludePaths[i].toString());
if (!excludePaths[i].toString().endsWith("/")) {
dirPart.append("/");
}
excludeDirs[i] = dirPart.toString();
}
}
if (excludeDirs != null) {
LOG.info("Cleaner {} exclude archive dirs' parts are: {}", name, Arrays.asList(excludeDirs));
}
initCleanerChain(confKey);
}

Expand Down Expand Up @@ -420,9 +436,11 @@ private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean
sortByConsumedSpace(subDirs);
// Submit the request of sub-directory deletion.
subDirs.forEach(subDir -> {
CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
futures.add(subFuture);
if (!shouldExclude(subDir)) {
CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
futures.add(subFuture);
}
});
}

Expand Down Expand Up @@ -452,11 +470,34 @@ private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean
}
});
} catch (Exception e) {
LOG.debug("Failed to traverse and delete the path: {}", dir, e);
if (e instanceof FileNotFoundException) {
LOG.debug("Dir dose not exist, {}", dir);
} else {
LOG.error("Failed to traverse and delete the path: {}", dir, e);
}
result.completeExceptionally(e);
}
}

/**
* Check if a path should not perform clear
*/
private boolean shouldExclude(FileStatus f) {
if (!f.isDirectory()) {
return false;
}
if (excludeDirs != null && excludeDirs.length > 0) {
for (String dirPart : excludeDirs) {
// since we make excludeDirs end with '/',
// if a path contains() the dirPart, the path should be excluded
if (f.getPath().toString().contains(dirPart)) {
return true;
}
}
}
return false;
}

/**
* Perform a delete on a specified type.
* @param deletion a delete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* The thread pool used for scan directories
*/
Expand Down Expand Up @@ -61,6 +62,7 @@ private DirScanPool(Configuration conf, Type dirScanPoolType) {
this.name = dirScanPoolType.name().toLowerCase();
String poolSize = conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
dirScanPoolType.cleanerPoolSizeConfigDefault);

size = CleanerChore.calculatePoolSize(poolSize);
// poolSize may be 0 or 0.0 from a careless configuration,
// double check to make sure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con
"hbase.regionserver.hfilecleaner.thread.check.interval.msec";
static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L;

/** Set true if want to use custom paths of independent configuration and pool */
public static final String HFILE_CLEANER_ENABLE_CUSTOM_PATHS =
"hbase.master.hfile.cleaner.enable.custom.paths";

/** The custom paths under archive, e.g. data/default/testTable1,data/default/testTable2 */
public static final String HFILE_CLEANER_CUSTOM_PATHS =
"hbase.master.hfile.cleaner.custom.paths";

/** Configures like hbase.master.hfilecleaner.plugins */
public static final String HFILE_CLEANER_CUSTOM_PATHS_PLUGINS =
"hbase.master.hfilecleaner.custom.paths.plugins";

private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class);

StealJobQueue<HFileDeleteTask> largeFileQueue;
Expand Down Expand Up @@ -117,8 +129,13 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con
public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path directory, DirScanPool pool, Map<String, Object> params) {
this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
params);
params, null);
}

public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path directory, DirScanPool pool, Map<String, Object> params, Path[] excludePaths) {
this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
params, excludePaths);
}

/**
Expand All @@ -134,8 +151,9 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con
* @param params params could be used in subclass of BaseHFileCleanerDelegate
*/
public HFileCleaner(String name, int period, Stoppable stopper, Configuration conf, FileSystem fs,
Path directory, String confKey, DirScanPool pool, Map<String, Object> params) {
super(name, period, stopper, conf, fs, directory, confKey, pool, params);
Path directory, String confKey, DirScanPool pool, Map<String, Object> params,
Path[] excludePaths) {
super(name, period, stopper, conf, fs, directory, confKey, pool, params, excludePaths);
throttlePoint =
conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
largeQueueInitSize =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path oldLogDir, DirScanPool pool, Map<String, Object> params) {
super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS,
pool, params);
pool, params, null);
this.pendingDelete = new LinkedBlockingQueue<>();
int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
this.oldWALsCleaner = createOldWalsCleaner(size);
Expand Down
Loading