Skip to content

Commit bbb07e4

Browse files
committed
HBASE-22871 Move the DirScanPool out and do not use static field
1 parent 43a0ec8 commit bbb07e4

File tree

11 files changed

+256
-231
lines changed

11 files changed

+256
-231
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@
109109
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
110110
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
111111
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
112-
import org.apache.hadoop.hbase.master.cleaner.CleanerChore;
112+
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
113113
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
114114
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
115115
import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner;
@@ -387,6 +387,7 @@ public void run() {
387387

388388
private HbckChore hbckChore;
389389
CatalogJanitor catalogJanitorChore;
390+
private DirScanPool cleanerPool;
390391
private LogCleaner logCleaner;
391392
private HFileCleaner hfileCleaner;
392393
private ReplicationBarrierCleaner replicationBarrierCleaner;
@@ -1133,6 +1134,7 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc
11331134
(System.currentTimeMillis() - masterActiveTime) / 1000.0f));
11341135
this.masterFinishedInitializationTime = System.currentTimeMillis();
11351136
configurationManager.registerObserver(this.balancer);
1137+
configurationManager.registerObserver(this.cleanerPool);
11361138
configurationManager.registerObserver(this.hfileCleaner);
11371139
configurationManager.registerObserver(this.logCleaner);
11381140
// Set master as 'initialized'.
@@ -1445,22 +1447,20 @@ private void startServiceThreads() throws IOException {
14451447
this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
14461448
startProcedureExecutor();
14471449

1448-
// Initial cleaner chore
1449-
CleanerChore.initChorePool(conf);
1450-
// Start log cleaner thread
1451-
int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 600 * 1000);
1452-
this.logCleaner =
1453-
new LogCleaner(cleanerInterval,
1454-
this, conf, getMasterWalManager().getFileSystem(),
1455-
getMasterWalManager().getOldLogDir());
1450+
// Create cleaner thread pool
1451+
cleanerPool = new DirScanPool(conf);
1452+
// Start log cleaner thread
1453+
int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 600 * 1000);
1454+
this.logCleaner = new LogCleaner(cleanerInterval, this, conf,
1455+
getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool);
14561456
getChoreService().scheduleChore(logCleaner);
14571457

14581458
// start the hfile archive cleaner thread
14591459
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
14601460
Map<String, Object> params = new HashMap<>();
14611461
params.put(MASTER, this);
1462-
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
1463-
.getFileSystem(), archiveDir, params);
1462+
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
1463+
getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params);
14641464
getChoreService().scheduleChore(hfileCleaner);
14651465

14661466
replicationBarrierCleaner = new ReplicationBarrierCleaner(conf, this, getConnection(),
@@ -1498,7 +1498,10 @@ protected void stopServiceThreads() {
14981498
this.mobCompactThread.close();
14991499
}
15001500
super.stopServiceThreads();
1501-
CleanerChore.shutDownChorePool();
1501+
if (cleanerPool != null) {
1502+
cleanerPool.shutdownNow();
1503+
cleanerPool = null;
1504+
}
15021505

15031506
LOG.debug("Stopping service threads");
15041507

hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java

Lines changed: 22 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,15 @@
2626
import java.util.Map;
2727
import java.util.Optional;
2828
import java.util.concurrent.ExecutionException;
29-
import java.util.concurrent.ForkJoinPool;
30-
import java.util.concurrent.ForkJoinTask;
3129
import java.util.concurrent.RecursiveTask;
3230
import java.util.concurrent.atomic.AtomicBoolean;
33-
3431
import org.apache.hadoop.conf.Configuration;
3532
import org.apache.hadoop.fs.FileStatus;
3633
import org.apache.hadoop.fs.FileSystem;
3734
import org.apache.hadoop.fs.Path;
3835
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
3936
import org.apache.hadoop.hbase.ScheduledChore;
4037
import org.apache.hadoop.hbase.Stoppable;
41-
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
4238
import org.apache.hadoop.hbase.util.FSUtils;
4339
import org.apache.hadoop.ipc.RemoteException;
4440
import org.apache.yetus.audience.InterfaceAudience;
@@ -56,11 +52,8 @@
5652
* Abstract Cleaner that uses a chain of delegates to clean a directory of files
5753
* @param <T> Cleaner delegate class that is dynamically loaded from configuration
5854
*/
59-
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD",
60-
justification="Static pool will be only updated once.")
6155
@InterfaceAudience.Private
62-
public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore
63-
implements ConfigurationObserver {
56+
public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore {
6457

6558
private static final Logger LOG = LoggerFactory.getLogger(CleanerChore.class);
6659
private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors();
@@ -72,84 +65,9 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
7265
* while latter will use only 1 thread for chore to scan dir.
7366
*/
7467
public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
75-
private static final String DEFAULT_CHORE_POOL_SIZE = "0.25";
76-
77-
private static class DirScanPool {
78-
int size;
79-
ForkJoinPool pool;
80-
int cleanerLatch;
81-
AtomicBoolean reconfigNotification;
82-
83-
DirScanPool(Configuration conf) {
84-
String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE);
85-
size = calculatePoolSize(poolSize);
86-
// poolSize may be 0 or 0.0 from a careless configuration,
87-
// double check to make sure.
88-
size = size == 0 ? calculatePoolSize(DEFAULT_CHORE_POOL_SIZE) : size;
89-
pool = new ForkJoinPool(size);
90-
LOG.info("Cleaner pool size is {}", size);
91-
reconfigNotification = new AtomicBoolean(false);
92-
cleanerLatch = 0;
93-
}
94-
95-
/**
96-
* Checks if pool can be updated. If so, mark for update later.
97-
* @param conf configuration
98-
*/
99-
synchronized void markUpdate(Configuration conf) {
100-
int newSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE));
101-
if (newSize == size) {
102-
LOG.trace("Size from configuration is same as previous={}, no need to update.", newSize);
103-
return;
104-
}
105-
size = newSize;
106-
// Chore is working, update it later.
107-
reconfigNotification.set(true);
108-
}
109-
110-
/**
111-
* Update pool with new size.
112-
*/
113-
synchronized void updatePool(long timeout) {
114-
long stopTime = System.currentTimeMillis() + timeout;
115-
while (cleanerLatch != 0 && timeout > 0) {
116-
try {
117-
wait(timeout);
118-
timeout = stopTime - System.currentTimeMillis();
119-
} catch (InterruptedException ie) {
120-
Thread.currentThread().interrupt();
121-
break;
122-
}
123-
}
124-
shutDownNow();
125-
LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size);
126-
pool = new ForkJoinPool(size);
127-
}
68+
static final String DEFAULT_CHORE_POOL_SIZE = "0.25";
12869

129-
synchronized void latchCountUp() {
130-
cleanerLatch++;
131-
}
132-
133-
synchronized void latchCountDown() {
134-
cleanerLatch--;
135-
notifyAll();
136-
}
137-
138-
@SuppressWarnings("FutureReturnValueIgnored")
139-
synchronized void submit(ForkJoinTask task) {
140-
pool.submit(task);
141-
}
142-
143-
synchronized void shutDownNow() {
144-
if (pool == null || pool.isShutdown()) {
145-
return;
146-
}
147-
pool.shutdownNow();
148-
}
149-
}
150-
// It may be waste resources for each cleaner chore own its pool,
151-
// so let's make pool for all cleaner chores.
152-
private static volatile DirScanPool POOL;
70+
private final DirScanPool pool;
15371

15472
protected final FileSystem fs;
15573
private final Path oldFileDir;
@@ -158,22 +76,9 @@ synchronized void shutDownNow() {
15876
private final AtomicBoolean enabled = new AtomicBoolean(true);
15977
protected List<T> cleanersChain;
16078

161-
public static void initChorePool(Configuration conf) {
162-
if (POOL == null) {
163-
POOL = new DirScanPool(conf);
164-
}
165-
}
166-
167-
public static void shutDownChorePool() {
168-
if (POOL != null) {
169-
POOL.shutDownNow();
170-
POOL = null;
171-
}
172-
}
173-
17479
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
175-
FileSystem fs, Path oldFileDir, String confKey) {
176-
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null);
80+
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) {
81+
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null);
17782
}
17883

17984
/**
@@ -184,14 +89,15 @@ public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Confi
18489
* @param fs handle to the FS
18590
* @param oldFileDir the path to the archived files
18691
* @param confKey configuration key for the classes to instantiate
92+
* @param pool the thread pool used to scan directories
18793
* @param params members could be used in cleaner
18894
*/
18995
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
190-
FileSystem fs, Path oldFileDir, String confKey, Map<String, Object> params) {
96+
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params) {
19197
super(name, s, sleepPeriod);
19298

193-
Preconditions.checkNotNull(POOL, "Chore's pool isn't initialized, please call"
194-
+ "CleanerChore.initChorePool(Configuration) before new a cleaner chore.");
99+
Preconditions.checkNotNull(pool, "Chore's pool can not be null");
100+
this.pool = pool;
195101
this.fs = fs;
196102
this.oldFileDir = oldFileDir;
197103
this.conf = conf;
@@ -255,11 +161,6 @@ private void initCleanerChain(String confKey) {
255161
}
256162
}
257163

258-
@Override
259-
public void onConfigurationChange(Configuration conf) {
260-
POOL.markUpdate(conf);
261-
}
262-
263164
/**
264165
* A utility method to create new instances of LogCleanerDelegate based on the class name of the
265166
* LogCleanerDelegate.
@@ -287,22 +188,20 @@ private T newFileCleaner(String className, Configuration conf) {
287188
protected void chore() {
288189
if (getEnabled()) {
289190
try {
290-
POOL.latchCountUp();
191+
pool.latchCountUp();
291192
if (runCleaner()) {
292193
LOG.trace("Cleaned all WALs under {}", oldFileDir);
293194
} else {
294195
LOG.trace("WALs outstanding under {}", oldFileDir);
295196
}
296197
} finally {
297-
POOL.latchCountDown();
198+
pool.latchCountDown();
298199
}
299200
// After each cleaner chore, checks if received reconfigure notification while cleaning.
300201
// First in cleaner turns off notification, to avoid another cleaner updating pool again.
301-
if (POOL.reconfigNotification.compareAndSet(true, false)) {
302-
// This cleaner is waiting for other cleaners finishing their jobs.
303-
// To avoid missing next chore, only wait 0.8 * period, then shutdown.
304-
POOL.updatePool((long) (0.8 * getTimeUnit().toMillis(getPeriod())));
305-
}
202+
// This cleaner is waiting for other cleaners finishing their jobs.
203+
// To avoid missing next chore, only wait 0.8 * period, then shutdown.
204+
pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod())));
306205
} else {
307206
LOG.trace("Cleaner chore disabled! Not cleaning.");
308207
}
@@ -315,7 +214,7 @@ private void preRunCleaner() {
315214
public Boolean runCleaner() {
316215
preRunCleaner();
317216
CleanerTask task = new CleanerTask(this.oldFileDir, true);
318-
POOL.submit(task);
217+
pool.execute(task);
319218
return task.join();
320219
}
321220

@@ -467,7 +366,7 @@ public synchronized void cleanup() {
467366

468367
@VisibleForTesting
469368
int getChorePoolSize() {
470-
return POOL.size;
369+
return pool.getSize();
471370
}
472371

473372
/**
@@ -485,10 +384,13 @@ private interface Action<T> {
485384
}
486385

487386
/**
488-
* Attemps to clean up a directory, its subdirectories, and files.
489-
* Return value is true if everything was deleted. false on partial / total failures.
387+
* Attemps to clean up a directory, its subdirectories, and files. Return value is true if
388+
* everything was deleted. false on partial / total failures.
490389
*/
491-
private class CleanerTask extends RecursiveTask<Boolean> {
390+
private final class CleanerTask extends RecursiveTask<Boolean> {
391+
392+
private static final long serialVersionUID = -5444212174088754172L;
393+
492394
private final Path dir;
493395
private final boolean root;
494396

0 commit comments

Comments
 (0)