Skip to content

Commit e066951

Browse files
authored
HBASE-24849 Branch-1 Backport : HBASE-24665 MultiWAL : Avoid rolling of ALL WALs when one of the WAL needs a roll (#2194)
Signed-off-by: Reid Chan <[email protected]>
1 parent fb25a7d commit e066951

File tree

2 files changed

+194
-28
lines changed

2 files changed

+194
-28
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java

Lines changed: 80 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.hadoop.hbase.Server;
3333
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
3434
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
35+
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
3536
import org.apache.hadoop.hbase.wal.WAL;
3637
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
3738
import org.apache.hadoop.hbase.util.Bytes;
@@ -56,23 +57,27 @@ public class LogRoller extends HasThread {
5657
private static final Log LOG = LogFactory.getLog(LogRoller.class);
5758
private final ReentrantLock rollLock = new ReentrantLock();
5859
private final AtomicBoolean rollLog = new AtomicBoolean(false);
59-
private final ConcurrentHashMap<WAL, Boolean> walNeedsRoll =
60-
new ConcurrentHashMap<WAL, Boolean>();
60+
private final ConcurrentHashMap<WAL, RollController> wals =
61+
new ConcurrentHashMap<WAL, RollController>();
6162
private final Server server;
6263
protected final RegionServerServices services;
63-
private volatile long lastrolltime = System.currentTimeMillis();
6464
// Period to roll log.
65-
private final long rollperiod;
65+
private final long rollPeriod;
6666
private final int threadWakeFrequency;
6767
// The interval to check low replication on hlog's pipeline
68-
private long checkLowReplicationInterval;
68+
private final long checkLowReplicationInterval;
6969

7070
public void addWAL(final WAL wal) {
71-
if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
71+
if (null == wals.putIfAbsent(wal, new RollController(wal))) {
7272
wal.registerWALActionsListener(new WALActionsListener.Base() {
7373
@Override
7474
public void logRollRequested(WALActionsListener.RollRequestReason reason) {
75-
walNeedsRoll.put(wal, Boolean.TRUE);
75+
RollController controller = wals.get(wal);
76+
if (controller == null) {
77+
wals.putIfAbsent(wal, new RollController(wal));
78+
controller = wals.get(wal);
79+
}
80+
controller.requestRoll();
7681
// TODO logs will contend with each other here, replace with e.g. DelayedQueue
7782
synchronized(rollLog) {
7883
rollLog.set(true);
@@ -84,8 +89,8 @@ public void logRollRequested(WALActionsListener.RollRequestReason reason) {
8489
}
8590

8691
public void requestRollAll() {
87-
for (WAL wal : walNeedsRoll.keySet()) {
88-
walNeedsRoll.put(wal, Boolean.TRUE);
92+
for (RollController controller : wals.values()) {
93+
controller.requestRoll();
8994
}
9095
synchronized(rollLog) {
9196
rollLog.set(true);
@@ -98,7 +103,7 @@ public LogRoller(final Server server, final RegionServerServices services) {
98103
super("LogRoller");
99104
this.server = server;
100105
this.services = services;
101-
this.rollperiod = this.server.getConfiguration().
106+
this.rollPeriod = this.server.getConfiguration().
102107
getLong("hbase.regionserver.logroll.period", 3600000);
103108
this.threadWakeFrequency = this.server.getConfiguration().
104109
getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
@@ -120,9 +125,9 @@ public void interrupt() {
120125
*/
121126
void checkLowReplication(long now) {
122127
try {
123-
for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
128+
for (Entry<WAL, RollController> entry : wals.entrySet()) {
124129
WAL wal = entry.getKey();
125-
boolean neeRollAlready = entry.getValue();
130+
boolean neeRollAlready = entry.getValue().needsRoll(now);
126131
if(wal instanceof FSHLog && !neeRollAlready) {
127132
FSHLog hlog = (FSHLog)wal;
128133
if ((now - hlog.getLastTimeCheckLowReplication())
@@ -139,11 +144,16 @@ void checkLowReplication(long now) {
139144
@Override
140145
public void run() {
141146
while (!server.isStopped()) {
142-
long now = System.currentTimeMillis();
147+
long now = EnvironmentEdgeManager.currentTime();
143148
checkLowReplication(now);
144-
boolean periodic = false;
145149
if (!rollLog.get()) {
146-
periodic = (now - this.lastrolltime) > this.rollperiod;
150+
boolean periodic = false;
151+
for (RollController controller : wals.values()) {
152+
if (controller.needsPeriodicRoll(now)) {
153+
periodic = true;
154+
break;
155+
}
156+
}
147157
if (!periodic) {
148158
synchronized (rollLog) {
149159
try {
@@ -156,23 +166,24 @@ public void run() {
156166
}
157167
continue;
158168
}
159-
// Time for periodic roll
160-
if (LOG.isDebugEnabled()) {
161-
LOG.debug("Wal roll period " + this.rollperiod + "ms elapsed");
162-
}
163-
} else if (LOG.isDebugEnabled()) {
164-
LOG.debug("WAL roll requested");
165169
}
166170
rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
167171
try {
168-
this.lastrolltime = now;
169-
for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
172+
for (Entry<WAL, RollController> entry : wals.entrySet()) {
170173
final WAL wal = entry.getKey();
174+
RollController controller = entry.getValue();
175+
if (controller.isRollRequested()) {
176+
// WAL roll requested, fall through
177+
LOG.debug("WAL " + wal + " roll requested");
178+
} else if (controller.needsPeriodicRoll(now)) {
179+
// Time for periodic roll, fall through
180+
LOG.debug("WAL " + wal + " roll period " + this.rollPeriod + "ms elapsed");
181+
} else {
182+
continue;
183+
}
171184
// Force the roll if the logroll.period is elapsed or if a roll was requested.
172185
// The returned value is an array of actual region names.
173-
final byte [][] regionsToFlush = wal.rollWriter(periodic ||
174-
entry.getValue().booleanValue());
175-
walNeedsRoll.put(wal, Boolean.FALSE);
186+
final byte [][] regionsToFlush = controller.rollWal(now);
176187
if (regionsToFlush != null) {
177188
for (byte [] r: regionsToFlush) scheduleFlush(r);
178189
}
@@ -229,11 +240,52 @@ private void scheduleFlush(final byte [] encodedRegionName) {
229240
*/
230241
@VisibleForTesting
231242
public boolean walRollFinished() {
232-
for (boolean needRoll : walNeedsRoll.values()) {
233-
if (needRoll) {
243+
long now = EnvironmentEdgeManager.currentTime();
244+
for (RollController controller : wals.values()) {
245+
if (controller.needsRoll(now)) {
234246
return false;
235247
}
236248
}
237249
return true;
238250
}
251+
252+
253+
/**
254+
* Independently control the roll of each wal. When use multiwal,
255+
* can avoid all wal roll together. see HBASE-24665 for detail
256+
*/
257+
protected class RollController {
258+
private final WAL wal;
259+
private final AtomicBoolean rollRequest;
260+
private long lastRollTime;
261+
262+
RollController(WAL wal) {
263+
this.wal = wal;
264+
this.rollRequest = new AtomicBoolean(false);
265+
this.lastRollTime = EnvironmentEdgeManager.currentTime();
266+
}
267+
268+
public void requestRoll() {
269+
this.rollRequest.set(true);
270+
}
271+
272+
public byte[][] rollWal(long now) throws IOException {
273+
this.lastRollTime = now;
274+
byte[][] regionsToFlush = wal.rollWriter(true);
275+
this.rollRequest.set(false);
276+
return regionsToFlush;
277+
}
278+
279+
public boolean isRollRequested() {
280+
return rollRequest.get();
281+
}
282+
283+
public boolean needsPeriodicRoll(long now) {
284+
return (now - this.lastRollTime) > rollPeriod;
285+
}
286+
287+
public boolean needsRoll(long now) {
288+
return isRollRequested() || needsPeriodicRoll(now);
289+
}
290+
}
239291
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/**
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package org.apache.hadoop.hbase.regionserver;
21+
22+
import static org.junit.Assert.assertEquals;
23+
import static org.junit.Assert.assertNotEquals;
24+
25+
import java.util.HashMap;
26+
import java.util.Iterator;
27+
import java.util.Map;
28+
29+
import org.apache.hadoop.conf.Configuration;
30+
import org.apache.hadoop.fs.FileSystem;
31+
import org.apache.hadoop.fs.Path;
32+
import org.apache.hadoop.hbase.HBaseTestingUtility;
33+
import org.apache.hadoop.hbase.HConstants;
34+
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
35+
import org.apache.hadoop.hbase.testclassification.MediumTests;
36+
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
37+
import org.junit.After;
38+
import org.junit.Before;
39+
import org.junit.Test;
40+
import org.junit.experimental.categories.Category;
41+
import org.mockito.Mockito;
42+
43+
@Category({RegionServerTests.class, MediumTests.class})
44+
public class TestLogRoller {
45+
46+
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
47+
48+
private static final int LOG_ROLL_PERIOD = 20 * 1000;
49+
private static final String LOG_DIR = "WALs";
50+
private static final String ARCHIVE_DIR = "archiveWALs";
51+
private static final String WAL_PREFIX = "test-log-roller";
52+
private static Configuration CONF;
53+
private static LogRoller ROLLER;
54+
private static Path ROOT_DIR;
55+
private static FileSystem FS;
56+
57+
@Before
58+
public void setup() throws Exception {
59+
CONF = TEST_UTIL.getConfiguration();
60+
CONF.setInt("hbase.regionserver.logroll.period", LOG_ROLL_PERIOD);
61+
CONF.setInt(HConstants.THREAD_WAKE_FREQUENCY, 300);
62+
ROOT_DIR = TEST_UTIL.getRandomDir();
63+
FS = FileSystem.get(CONF);
64+
HRegionServer server = Mockito.mock(HRegionServer.class);
65+
Mockito.when(server.getConfiguration()).thenReturn(CONF);
66+
RegionServerServices services = Mockito.mock(RegionServerServices.class);
67+
ROLLER = new LogRoller(server, services);
68+
ROLLER.start();
69+
}
70+
71+
@After
72+
public void tearDown() throws Exception {
73+
ROLLER.interrupt();
74+
FS.close();
75+
TEST_UTIL.shutdownMiniCluster();
76+
}
77+
78+
/**
79+
* verify that each wal roll separately
80+
*/
81+
@Test
82+
public void testRequestRollWithMultiWal() throws Exception {
83+
// add multiple wal
84+
Map<FSHLog, Path> wals = new HashMap<FSHLog, Path>();
85+
for (int i = 1; i <= 3; i++) {
86+
FSHLog wal = new FSHLog(FS, ROOT_DIR, LOG_DIR, ARCHIVE_DIR, CONF, null,
87+
true, WAL_PREFIX, "." + i);
88+
wal.rollWriter(true);
89+
wals.put(wal, wal.getCurrentFileName());
90+
ROLLER.addWAL(wal);
91+
Thread.sleep(1000);
92+
}
93+
94+
// request roll
95+
Iterator<Map.Entry<FSHLog, Path>> it = wals.entrySet().iterator();
96+
Map.Entry<FSHLog, Path> walEntry = it.next();
97+
walEntry.getKey().requestLogRoll();
98+
Thread.sleep(5000);
99+
100+
assertNotEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName());
101+
walEntry.setValue(walEntry.getKey().getCurrentFileName());
102+
while (it.hasNext()) {
103+
walEntry = it.next();
104+
assertEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName());
105+
}
106+
107+
// period roll
108+
Thread.sleep(LOG_ROLL_PERIOD + 5000);
109+
for (Map.Entry<FSHLog, Path> entry : wals.entrySet()) {
110+
assertNotEquals(entry.getValue(), entry.getKey().getCurrentFileName());
111+
entry.getKey().close();
112+
}
113+
}
114+
}

0 commit comments

Comments
 (0)