Skip to content

Commit 1612b9e

Browse files
authored
HBASE-23340 hmaster /hbase/replication/rs session expired (hbase replication default value is true, we don't use ) causes logcleaner can not clean oldWALs, which resulits in oldWALs too large (more than 2TB) (#2779)
Signed-off-by: Duo Zhang <[email protected]> Signed-off-by: Pankaj Kumar<[email protected]>
1 parent 478c4e8 commit 1612b9e

File tree

5 files changed

+28
-15
lines changed

5 files changed

+28
-15
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1319,17 +1319,17 @@ private void startServiceThreads() throws IOException {
13191319

13201320
// Create cleaner thread pool
13211321
cleanerPool = new DirScanPool(conf);
1322+
Map<String, Object> params = new HashMap<>();
1323+
params.put(MASTER, this);
13221324
// Start log cleaner thread
13231325
int cleanerInterval =
13241326
conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
13251327
this.logCleaner = new LogCleaner(cleanerInterval, this, conf,
1326-
getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool);
1328+
getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool, params);
13271329
getChoreService().scheduleChore(logCleaner);
13281330

13291331
// start the hfile archive cleaner thread
13301332
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
1331-
Map<String, Object> params = new HashMap<>();
1332-
params.put(MASTER, this);
13331333
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
13341334
getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params);
13351335
getChoreService().scheduleChore(hfileCleaner);

hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.IOException;
2323
import java.util.ArrayList;
2424
import java.util.List;
25+
import java.util.Map;
2526
import java.util.concurrent.CountDownLatch;
2627
import java.util.concurrent.LinkedBlockingQueue;
2728
import java.util.concurrent.TimeUnit;
@@ -72,9 +73,9 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
7273
* @param pool the thread pool used to scan directories
7374
*/
7475
public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
75-
Path oldLogDir, DirScanPool pool) {
76+
Path oldLogDir, DirScanPool pool, Map<String, Object> params) {
7677
super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS,
77-
pool);
78+
pool, params);
7879
this.pendingDelete = new LinkedBlockingQueue<>();
7980
int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
8081
this.oldWALsCleaner = createOldWalsCleaner(size);

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,19 @@
1919

2020
import java.io.IOException;
2121
import java.util.Collections;
22+
import java.util.Map;
2223
import java.util.Set;
2324
import org.apache.hadoop.conf.Configuration;
2425
import org.apache.hadoop.fs.FileStatus;
2526
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
27+
import org.apache.hadoop.hbase.master.HMaster;
2628
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
2729
import org.apache.hadoop.hbase.replication.ReplicationException;
2830
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
2931
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
3032
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
3133
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
34+
import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
3235
import org.apache.yetus.audience.InterfaceAudience;
3336
import org.slf4j.Logger;
3437
import org.slf4j.LoggerFactory;
@@ -43,7 +46,8 @@
4346
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
4447
public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
4548
private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class);
46-
private ZKWatcher zkw;
49+
private ZKWatcher zkw = null;
50+
private boolean shareZK = false;
4751
private ReplicationQueueStorage queueStorage;
4852
private boolean stopped = false;
4953
private Set<String> wals;
@@ -92,12 +96,20 @@ public boolean apply(FileStatus file) {
9296
}
9397

9498
@Override
95-
public void setConf(Configuration config) {
96-
// Make my own Configuration. Then I'll have my own connection to zk that
97-
// I can close myself when comes time.
98-
Configuration conf = new Configuration(config);
99+
public void init(Map<String, Object> params) {
100+
super.init(params);
99101
try {
100-
setConf(conf, new ZKWatcher(conf, "replicationLogCleaner", null));
102+
if (MapUtils.isNotEmpty(params)) {
103+
Object master = params.get(HMaster.MASTER);
104+
if (master != null && master instanceof HMaster) {
105+
zkw = ((HMaster) master).getZooKeeper();
106+
shareZK = true;
107+
}
108+
}
109+
if (zkw == null) {
110+
zkw = new ZKWatcher(getConf(), "replicationLogCleaner", null);
111+
}
112+
this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
101113
} catch (IOException e) {
102114
LOG.error("Error while configuring " + this.getClass().getName(), e);
103115
}
@@ -118,7 +130,7 @@ public void setConf(Configuration conf, ZKWatcher zk) {
118130
public void stop(String why) {
119131
if (this.stopped) return;
120132
this.stopped = true;
121-
if (this.zkw != null) {
133+
if (!shareZK && this.zkw != null) {
122134
LOG.info("Stopping " + this.zkw);
123135
this.zkw.close();
124136
}

hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ public void testLogCleaning() throws Exception {
200200
// 10 procedure WALs
201201
assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
202202

203-
LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL);
203+
LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL, null);
204204
cleaner.chore();
205205

206206
// In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which
@@ -297,7 +297,7 @@ public void testOnConfigurationChange() throws Exception {
297297
Server server = new DummyServer();
298298

299299
FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
300-
LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL);
300+
LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL, null);
301301
int size = cleaner.getSizeOfCleaners();
302302
assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
303303
cleaner.getCleanerThreadTimeoutMsec());

hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public void stop(String why) {
7272
public boolean isStopped() {
7373
return stopped;
7474
}
75-
}, conf, fs, globalWALArchiveDir, cleanerPool);
75+
}, conf, fs, globalWALArchiveDir, cleanerPool, null);
7676
choreService.scheduleChore(logCleaner);
7777
}
7878

0 commit comments

Comments
 (0)