diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 989739677503..c083550cda9d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -37,6 +37,7 @@ import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -78,6 +79,7 @@ import org.apache.hadoop.hbase.PleaseRestartMasterException; import org.apache.hadoop.hbase.RegionMetrics; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ServerMetrics; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -121,6 +123,7 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.balancer.MaintenanceLoadBalancer; +import org.apache.hadoop.hbase.master.cleaner.CleanerChore; import org.apache.hadoop.hbase.master.cleaner.DirScanPool; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; @@ -373,6 +376,9 @@ public class HMaster extends HBaseServerBase implements Maste private DirScanPool logCleanerPool; private LogCleaner logCleaner; private HFileCleaner hfileCleaner; + private HFileCleaner[] customCleaners; + private Path[] customPaths; + private DirScanPool customCleanerPool; private ReplicationBarrierCleaner replicationBarrierCleaner; private MobFileCleanerChore mobFileCleanerChore; private MobFileCompactionChore mobFileCompactionChore; @@ -1139,6 +1145,14 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc configurationManager.registerObserver(this.hfileCleaner); configurationManager.registerObserver(this.logCleaner); configurationManager.registerObserver(this.regionsRecoveryConfigManager); + if (this.customCleanerPool != null) { + configurationManager.registerObserver(this.customCleanerPool); + } + if (this.customCleaners != null) { + for (HFileCleaner cleaner : customCleaners) { + configurationManager.registerObserver(cleaner); + } + } // Set master as 'initialized'. setInitialized(true); @@ -1509,8 +1523,36 @@ private void startServiceThreads() throws IOException { logCleanerPool, params); getChoreService().scheduleChore(logCleaner); - // start the hfile archive cleaner thread Path archiveDir = HFileArchiveUtil.getArchivePath(conf); + if (conf.getBoolean(HFileCleaner.HFILE_CLEANER_ENABLE_CUSTOM_PATHS, false)) { + String[] paths = conf.getStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS); + if (paths != null && paths.length > 0) { + if (conf.getStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS) == null) { + Set cleanerClasses = new HashSet<>(); + String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS); + if (cleaners != null) { + Collections.addAll(cleanerClasses, cleaners); + } + conf.setStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS, + cleanerClasses.toArray(new String[cleanerClasses.size()])); + LOG.info("Custom cleaner paths: {}, plugins: {}", Arrays.asList(paths), cleanerClasses); + } + customCleanerPool = DirScanPool.getHFileCleanerScanPool(conf); + customPaths = new Path[paths.length]; + customCleaners = new HFileCleaner[paths.length]; + for (int i = 0; i < paths.length; i++) { + Path path = new Path(paths[i].trim()); + customPaths[i] = path; + HFileCleaner cleaner = + new HFileCleaner("CustomPathHFileCleaner-" + path.getName(), cleanerInterval, this, + conf, getMasterFileSystem().getFileSystem(), new Path(archiveDir, path), + HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS, customCleanerPool, params, null); + customCleaners[i] = cleaner; + getChoreService().scheduleChore(cleaner); + } + } + } + // Create archive cleaner thread pool hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf); this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, @@ -1572,6 +1614,10 @@ protected void stopServiceThreads() { logCleanerPool.shutdownNow(); logCleanerPool = null; } + if (customCleanerPool != null) { + customCleanerPool.shutdownNow(); + customCleanerPool = null; + } if (maintenanceRegionServer != null) { maintenanceRegionServer.getRegionServer().stop(HBASE_MASTER_CLEANER_INTERVAL); } @@ -1709,6 +1755,12 @@ protected void stopChores() { shutdownChore(snapshotQuotaChore); shutdownChore(logCleaner); shutdownChore(hfileCleaner); + if (customCleaners != null) { + for (ScheduledChore chore : customCleaners) { + chore.shutdown(); + } + customCleaners = null; + } shutdownChore(replicationBarrierCleaner); shutdownChore(snapshotCleanerChore); shutdownChore(hbckChore); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index 8454eae3ea57..71574bd45c11 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.master.cleaner; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -80,10 +81,11 @@ public abstract class CleanerChore extends Schedu protected final Map params; private final AtomicBoolean enabled = new AtomicBoolean(true); protected List cleanersChain; + protected String[] excludeDirs; public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) { - this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null); + this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null, null); } /** @@ -98,7 +100,8 @@ public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Confi * @param params members could be used in cleaner */ public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, - FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map params) { + FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map params, + Path[] excludePaths) { super(name, s, sleepPeriod); Preconditions.checkNotNull(pool, "Chore's pool can not be null"); @@ -107,6 +110,19 @@ public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Confi this.oldFileDir = oldFileDir; this.conf = conf; this.params = params; + if (excludePaths != null && excludePaths.length > 0) { + excludeDirs = new String[excludePaths.length]; + for (int i = 0; i < excludePaths.length; i++) { + StringBuilder dirPart = new StringBuilder(excludePaths[i].toString()); + if (!excludePaths[i].toString().endsWith("/")) { + dirPart.append("/"); + } + excludeDirs[i] = dirPart.toString(); + } + } + if (excludeDirs != null) { + LOG.info("Cleaner {} exclude archive dirs' parts are: {}", name, Arrays.asList(excludeDirs)); + } initCleanerChain(confKey); } @@ -420,9 +436,11 @@ private void traverseAndDelete(Path dir, boolean root, CompletableFuture { - CompletableFuture subFuture = new CompletableFuture<>(); - pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture)); - futures.add(subFuture); + if (!shouldExclude(subDir)) { + CompletableFuture subFuture = new CompletableFuture<>(); + pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture)); + futures.add(subFuture); + } }); } @@ -452,11 +470,34 @@ private void traverseAndDelete(Path dir, boolean root, CompletableFuture 0) { + for (String dirPart : excludeDirs) { + // since we make excludeDirs end with '/', + // if a path contains() the dirPart, the path should be excluded + if (f.getPath().toString().contains(dirPart)) { + return true; + } + } + } + return false; + } + /** * Perform a delete on a specified type. * @param deletion a delete diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java index 6e1426985cc4..4fcbae4503a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java @@ -24,11 +24,12 @@ import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * The thread pool used for scan directories */ @@ -61,6 +62,7 @@ private DirScanPool(Configuration conf, Type dirScanPoolType) { this.name = dirScanPoolType.name().toLowerCase(); String poolSize = conf.get(dirScanPoolType.cleanerPoolSizeConfigName, dirScanPoolType.cleanerPoolSizeConfigDefault); + size = CleanerChore.calculatePoolSize(poolSize); // poolSize may be 0 or 0.0 from a careless configuration, // double check to make sure. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java index 45b82e9af26f..efd7ab1aa283 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java @@ -88,6 +88,18 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con "hbase.regionserver.hfilecleaner.thread.check.interval.msec"; static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L; + /** Set true if want to use custom paths of independent configuration and pool */ + public static final String HFILE_CLEANER_ENABLE_CUSTOM_PATHS = + "hbase.master.hfile.cleaner.enable.custom.paths"; + + /** The custom paths under archive, e.g. data/default/testTable1,data/default/testTable2 */ + public static final String HFILE_CLEANER_CUSTOM_PATHS = + "hbase.master.hfile.cleaner.custom.paths"; + + /** Configures like hbase.master.hfilecleaner.plugins */ + public static final String HFILE_CLEANER_CUSTOM_PATHS_PLUGINS = + "hbase.master.hfilecleaner.custom.paths.plugins"; + private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class); StealJobQueue largeFileQueue; @@ -117,8 +129,13 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, Path directory, DirScanPool pool, Map params) { this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool, - params); + params, null); + } + public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, + Path directory, DirScanPool pool, Map params, Path[] excludePaths) { + this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool, + params, excludePaths); } /** @@ -134,8 +151,9 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con * @param params params could be used in subclass of BaseHFileCleanerDelegate */ public HFileCleaner(String name, int period, Stoppable stopper, Configuration conf, FileSystem fs, - Path directory, String confKey, DirScanPool pool, Map params) { - super(name, period, stopper, conf, fs, directory, confKey, pool, params); + Path directory, String confKey, DirScanPool pool, Map params, + Path[] excludePaths) { + super(name, period, stopper, conf, fs, directory, confKey, pool, params, excludePaths); throttlePoint = conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); largeQueueInitSize = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java index d8993b38ffef..a6083d2d5aa6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java @@ -75,7 +75,7 @@ public class LogCleaner extends CleanerChore public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, Path oldLogDir, DirScanPool pool, Map params) { super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS, - pool, params); + pool, params, null); this.pendingDelete = new LinkedBlockingQueue<>(); int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); this.oldWALsCleaner = createOldWalsCleaner(size); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerClearHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerClearHFiles.java new file mode 100644 index 000000000000..3de113b1c154 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerClearHFiles.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.cleaner; + +import static org.apache.hadoop.hbase.master.HMaster.HBASE_MASTER_CLEANER_INTERVAL; +import static org.apache.hadoop.hbase.master.cleaner.HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category(LargeTests.class) +public class TestCleanerClearHFiles { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCleanerClearHFiles.class); + + @Rule + public TestName name = new TestName(); + + private static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static Configuration conf = TEST_UTIL.getConfiguration(); + private static Admin admin = null; + + private static final byte[] COLUMN_FAMILY = Bytes.toBytes("CF"); + + private static final String TABLE1 = "table1"; + private static final String TABLE2 = "table2"; + private static final String DEFAULT_ARCHIVE_PREFIX = "data/default/"; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + conf.setBoolean(HFileCleaner.HFILE_CLEANER_ENABLE_CUSTOM_PATHS, true); + conf.setStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS, DEFAULT_ARCHIVE_PREFIX + TABLE1); + conf.setStrings(HFILE_CLEANER_CUSTOM_PATHS_PLUGINS, HFileLinkCleaner.class.getName()); + + conf.setInt(TimeToLiveHFileCleaner.TTL_CONF_KEY, 10); + conf.setInt(HBASE_MASTER_CLEANER_INTERVAL, 20000); + + TEST_UTIL.startMiniCluster(); + admin = TEST_UTIL.getAdmin(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testClearArchive() throws Exception { + DistributedFileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); + Table table1 = createTable(TEST_UTIL, TableName.valueOf(TABLE1)); + Table table2 = createTable(TEST_UTIL, TableName.valueOf(TABLE2)); + + admin.disableTable(table1.getName()); + admin.deleteTable(table1.getName()); + admin.disableTable(table2.getName()); + admin.deleteTable(table2.getName()); + + Path archiveDir = HFileArchiveUtil.getArchivePath(conf); + Path archiveTable1Path = new Path(archiveDir, DEFAULT_ARCHIVE_PREFIX + TABLE1); + Path archiveTable2Path = new Path(archiveDir, DEFAULT_ARCHIVE_PREFIX + TABLE2); + + TEST_UTIL.waitFor(10000, + () -> !notExistOrEmptyDir(archiveTable1Path, fs) && !notExistOrEmptyDir(archiveTable2Path, + fs)); + + TEST_UTIL.waitFor(30000, + () -> notExistOrEmptyDir(archiveTable1Path, fs) && notExistOrEmptyDir(archiveTable2Path, fs)); + } + + private boolean notExistOrEmptyDir(Path dir, DistributedFileSystem fs) { + try { + return fs.listStatus(dir).length == 0; + } catch (Exception e) { + return e instanceof FileNotFoundException; + } + } + + private Table createTable(HBaseTestingUtil util, TableName tableName) throws IOException { + TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_FAMILY).build()) + .build(); + return util.createTable(td, null); + } +}