Skip to content

Commit af3267d

Browse files
committed
HBASE-24984 WAL corruption due to early DBBs re-use when Durability.ASYNC_WAL is used with multi operation
1 parent 1e763d5 commit af3267d

File tree

3 files changed

+245
-20
lines changed

3 files changed

+245
-20
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,13 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
9797
private long exceptionSize = 0;
9898
private final boolean retryImmediatelySupported;
9999

100-
// This is a dirty hack to address HBASE-22539. The lowest bit is for normal rpc cleanup, and the
101-
// second bit is for WAL reference. We can only call release if both of them are zero. The reason
102-
// why we can not use a general reference counting is that, we may call cleanup multiple times in
103-
// the current implementation. We should fix this in the future.
104-
private final AtomicInteger reference = new AtomicInteger(0b01);
100+
// This is a dirty hack to address HBASE-22539. The highest bit is for rpc ref and cleanup, and
101+
// the rest of the bits are for WAL reference count. We can only call release if all of them are
102+
// zero. The reason why we can not use a general reference counting is that, we may call cleanup
103+
// multiple times in the current implementation. We should fix this in the future.
104+
// The refCount here will start as 0x80000000 and increment with every WAL reference and decrement
105+
// from WAL side on release
106+
private final AtomicInteger reference = new AtomicInteger(0x80000000);
105107

106108
private final Span span;
107109

@@ -157,13 +159,14 @@ public void done() {
157159
span.end();
158160
}
159161

160-
private void release(int mask) {
162+
@Override
163+
public void cleanup() {
161164
for (;;) {
162165
int ref = reference.get();
163-
if ((ref & mask) == 0) {
166+
if ((ref & 0x80000000) == 0) {
164167
return;
165168
}
166-
int nextRef = ref & (~mask);
169+
int nextRef = ref & 0x7fffffff;
167170
if (reference.compareAndSet(ref, nextRef)) {
168171
if (nextRef == 0) {
169172
if (this.reqCleanup != null) {
@@ -175,23 +178,20 @@ private void release(int mask) {
175178
}
176179
}
177180

178-
@Override
179-
public void cleanup() {
180-
release(0b01);
181+
public void retainByWAL() {
182+
reference.incrementAndGet();
181183
}
182184

183-
public void retainByWAL() {
184-
for (;;) {
185-
int ref = reference.get();
186-
int nextRef = ref | 0b10;
187-
if (reference.compareAndSet(ref, nextRef)) {
188-
return;
185+
public void releaseByWAL() {
186+
// Here this method of decrementAndGet for releasing WAL reference count will work in both
187+
// cases - i.e. highest bit (cleanup) 1 or 0. We will be decrementing a negative or positive
188+
// value respectively in these 2 cases, but the logic will work the same way
189+
if (reference.decrementAndGet() == 0) {
190+
if (this.reqCleanup != null) {
191+
this.reqCleanup.run();
189192
}
190193
}
191-
}
192194

193-
public void releaseByWAL() {
194-
release(0b10);
195195
}
196196

197197
@Override
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.wal;
19+
20+
import java.io.IOException;
21+
import java.util.List;
22+
23+
import org.apache.hadoop.conf.Configuration;
24+
import org.apache.hadoop.fs.FileSystem;
25+
import org.apache.hadoop.fs.Path;
26+
import org.apache.hadoop.hbase.HBaseClassTestRule;
27+
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
28+
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
29+
import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
30+
import org.apache.hadoop.hbase.regionserver.HRegion;
31+
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
32+
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
33+
import org.apache.hadoop.hbase.testclassification.MediumTests;
34+
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
35+
import org.apache.hadoop.hbase.util.CommonFSUtils;
36+
import org.junit.AfterClass;
37+
import org.junit.BeforeClass;
38+
import org.junit.ClassRule;
39+
import org.junit.experimental.categories.Category;
40+
41+
@Category({ RegionServerTests.class, MediumTests.class })
42+
public class TestFSHLogCorruptionWithMultiPutDueToDanglingByteBuffer
43+
extends WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase {
44+
45+
@ClassRule
46+
public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule
47+
.forClass(TestFSHLogCorruptionWithMultiPutDueToDanglingByteBuffer.class);
48+
49+
public static final class PauseWAL extends FSHLog {
50+
51+
private int testTableWalAppendsCount = 0;
52+
53+
public PauseWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
54+
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
55+
String prefix, String suffix) throws IOException {
56+
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
57+
}
58+
59+
@Override
60+
protected void atHeadOfRingBufferEventHandlerAppend() {
61+
// Let the 1st Append go through. The write thread will wait for this to go through before
62+
// calling further put()
63+
if (ARRIVE != null) { // Means appends as part of puts in testcase
64+
// Sleep for a second so that RS handler thread put all the mini batch WAL appends to ring
65+
// buffer.
66+
if (testTableWalAppendsCount == 0) {
67+
try {
68+
Thread.sleep(1000);
69+
} catch (InterruptedException e) {
70+
}
71+
}
72+
// Let the first minibatch write go through. When 2nd one comes, notify the waiting test
73+
// case for doing further batch puts and make this WAL append thread to pause
74+
if (testTableWalAppendsCount == 1) {
75+
ARRIVE.countDown();
76+
try {
77+
RESUME.await();
78+
} catch (InterruptedException e) {
79+
}
80+
}
81+
testTableWalAppendsCount++;
82+
}
83+
}
84+
}
85+
86+
public static final class PauseWALProvider extends AbstractFSWALProvider<PauseWAL> {
87+
88+
@Override
89+
protected PauseWAL createWAL() throws IOException {
90+
return new PauseWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
91+
getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
92+
conf, listeners, true, logPrefix,
93+
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
94+
}
95+
96+
@Override
97+
protected void doInit(Configuration conf) throws IOException {
98+
}
99+
}
100+
101+
@BeforeClass
102+
public static void setUp() throws Exception {
103+
UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, PauseWALProvider.class,
104+
WALProvider.class);
105+
UTIL.getConfiguration().setInt(HRegion.HBASE_REGIONSERVER_MINIBATCH_SIZE, 1);
106+
UTIL.getConfiguration().set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
107+
SimpleRpcServer.class.getName());
108+
UTIL.getConfiguration().setInt(ByteBuffAllocator.MAX_BUFFER_COUNT_KEY, 1);
109+
UTIL.getConfiguration().setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 1024);
110+
UTIL.getConfiguration().setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 500);
111+
UTIL.startMiniCluster(1);
112+
UTIL.createTable(TABLE_NAME, CF);
113+
UTIL.waitTableAvailable(TABLE_NAME);
114+
}
115+
116+
@AfterClass
117+
public static void tearDown() throws Exception {
118+
UTIL.shutdownMiniCluster();
119+
}
120+
}
121+
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.wal;
19+
20+
import static org.junit.Assert.assertEquals;
21+
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import java.util.concurrent.CountDownLatch;
25+
26+
import org.apache.hadoop.hbase.HBaseTestingUtility;
27+
import org.apache.hadoop.hbase.TableName;
28+
import org.apache.hadoop.hbase.client.Durability;
29+
import org.apache.hadoop.hbase.client.Get;
30+
import org.apache.hadoop.hbase.client.Put;
31+
import org.apache.hadoop.hbase.client.Result;
32+
import org.apache.hadoop.hbase.client.Table;
33+
import org.apache.hadoop.hbase.util.Bytes;
34+
import org.junit.Test;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
38+
public abstract class WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase {
39+
40+
private static final Logger LOG = LoggerFactory
41+
.getLogger(WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase.class);
42+
43+
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
44+
45+
protected static CountDownLatch ARRIVE;
46+
47+
protected static CountDownLatch RESUME;
48+
49+
protected static TableName TABLE_NAME = TableName
50+
.valueOf("WALCorruptionWithMultiPutDueToDanglingByteBufferTestBase");
51+
52+
protected static byte[] CF = Bytes.toBytes("cf");
53+
54+
protected static byte[] CQ = Bytes.toBytes("cq");
55+
56+
private byte[] getBytes(String prefix, int index) {
57+
return Bytes.toBytes(String.format("%s-%08d", prefix, index));
58+
}
59+
60+
@Test
61+
public void test() throws Exception {
62+
LOG.info("Stop WAL appending...");
63+
ARRIVE = new CountDownLatch(1);
64+
RESUME = new CountDownLatch(1);
65+
try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) {
66+
LOG.info("Put totally 100 rows in batches of 5 with " + Durability.ASYNC_WAL + "...");
67+
int batchSize = 5;
68+
List<Put> puts = new ArrayList<>(batchSize);
69+
for (int i = 1; i <= 100; i++) {
70+
Put p = new Put(getBytes("row", i)).addColumn(CF, CQ, getBytes("value", i))
71+
.setDurability(Durability.ASYNC_WAL);
72+
puts.add(p);
73+
if (i % batchSize == 0) {
74+
table.put(puts);
75+
LOG.info("Wrote batch of {} rows from row {}", batchSize,
76+
Bytes.toString(puts.get(0).getRow()));
77+
puts.clear();
78+
// Wait for few of the minibatches in 1st batch of puts to go through the WAL write.
79+
// The WAL write will pause then
80+
if (ARRIVE != null) {
81+
ARRIVE.await();
82+
ARRIVE = null;
83+
}
84+
}
85+
}
86+
LOG.info("Resume WAL appending...");
87+
RESUME.countDown();
88+
LOG.info("Put a single row to force a WAL sync...");
89+
table.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("value")));
90+
LOG.info("Abort the only region server");
91+
UTIL.getMiniHBaseCluster().abortRegionServer(0);
92+
LOG.info("Start a new region server");
93+
UTIL.getMiniHBaseCluster().startRegionServerAndWait(30000);
94+
UTIL.waitTableAvailable(TABLE_NAME);
95+
LOG.info("Check if all rows are still valid");
96+
for (int i = 1; i <= 100; i++) {
97+
Result result = table.get(new Get(getBytes("row", i)));
98+
assertEquals(Bytes.toString(getBytes("value", i)), Bytes.toString(result.getValue(CF, CQ)));
99+
}
100+
Result result = table.get(new Get(Bytes.toBytes("row")));
101+
assertEquals("value", Bytes.toString(result.getValue(CF, CQ)));
102+
}
103+
}
104+
}

0 commit comments

Comments
 (0)