Skip to content

Commit ee618d5

Browse files
committed
HBASE-27637 Zero length value would cause value compressor read nothing and not advance the position of the InputStream (apache#5025)
Signed-off-by: Xiaolin Ha <[email protected]> Signed-off-by: Andrew Purtell <[email protected]> (cherry picked from commit 2bbe036)
1 parent 1fb311f commit ee618d5

File tree

4 files changed

+46
-33
lines changed

4 files changed

+46
-33
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.lang.reflect.InvocationTargetException;
2626
import java.util.EnumMap;
2727
import java.util.Map;
28+
import org.apache.commons.io.IOUtils;
2829
import org.apache.hadoop.conf.Configuration;
2930
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
3031
import org.apache.hadoop.hbase.io.BoundedDelegatingInputStream;
@@ -105,7 +106,7 @@ public byte[] compress(byte[] valueArray, int valueOffset, int valueLength) thro
105106
return compressed;
106107
}
107108

108-
public int decompress(InputStream in, int inLength, byte[] outArray, int outOffset,
109+
public void decompress(InputStream in, int inLength, byte[] outArray, int outOffset,
109110
int outLength) throws IOException {
110111

111112
// Our input is a sequence of bounded byte ranges (call them segments), with
@@ -122,11 +123,16 @@ public int decompress(InputStream in, int inLength, byte[] outArray, int outOffs
122123
} else {
123124
lowerIn.setDelegate(in, inLength);
124125
}
125-
126-
// Caller must handle short reads.
127-
// With current Hadoop compression codecs all 'outLength' bytes are read in here, so not
128-
// an issue for now.
129-
return compressedIn.read(outArray, outOffset, outLength);
126+
if (outLength == 0) {
127+
// The BufferedInputStream will return earlier and skip reading anything if outLength == 0,
128+
// but in fact for an empty value, the compressed output still contains some metadata so the
129+
// compressed size is not 0, so here we need to manually skip inLength bytes otherwise the
130+
// next read on this stream will start from an invalid position and cause critical problem,
131+
// such as data loss when splitting wal or replicating wal.
132+
IOUtils.skipFully(in, inLength);
133+
} else {
134+
IOUtils.readFully(compressedIn, outArray, outOffset, outLength);
135+
}
130136
}
131137

132138
public void clear() {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -382,13 +382,9 @@ private static void checkLength(int len, int max) throws IOException {
382382
private void readCompressedValue(InputStream in, byte[] outArray, int outOffset,
383383
int expectedLength) throws IOException {
384384
int compressedLen = StreamUtils.readRawVarint32(in);
385-
int read = compression.getValueCompressor().decompress(in, compressedLen, outArray, outOffset,
385+
compression.getValueCompressor().decompress(in, compressedLen, outArray, outOffset,
386386
expectedLength);
387-
if (read != expectedLength) {
388-
throw new IOException("ValueCompressor state error: short read");
389-
}
390387
}
391-
392388
}
393389

394390
public static class EnsureKvEncoder extends BaseEncoder {

hbase-server/src/test/java/org/apache/hadoop/hbase/wal/CompressedWALTestBase.java

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.hadoop.hbase.wal;
1919

20+
import static org.hamcrest.MatcherAssert.assertThat;
21+
import static org.hamcrest.Matchers.hasSize;
2022
import static org.junit.Assert.assertEquals;
2123
import static org.junit.Assert.assertTrue;
2224

@@ -26,8 +28,9 @@
2628
import java.util.TreeMap;
2729
import org.apache.hadoop.fs.Path;
2830
import org.apache.hadoop.hbase.Cell;
31+
import org.apache.hadoop.hbase.CellBuilderFactory;
32+
import org.apache.hadoop.hbase.CellBuilderType;
2933
import org.apache.hadoop.hbase.HBaseTestingUtility;
30-
import org.apache.hadoop.hbase.KeyValue;
3134
import org.apache.hadoop.hbase.TableName;
3235
import org.apache.hadoop.hbase.client.RegionInfo;
3336
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
@@ -82,33 +85,42 @@ public void doTest(TableName tableName) throws Exception {
8285

8386
for (int i = 0; i < total; i++) {
8487
WALEdit kvs = new WALEdit();
85-
kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
88+
kvs.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
89+
.setRow(row).setFamily(family).setQualifier(Bytes.toBytes(i)).setValue(value).build());
90+
kvs.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
91+
.setType(Cell.Type.DeleteFamily).setRow(row).setFamily(family).build());
8692
wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
8793
System.currentTimeMillis(), mvcc, scopes), kvs);
94+
wal.sync();
8895
}
89-
wal.sync();
9096
final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
9197
wals.shutdown();
9298

9399
// Confirm the WAL can be read back
94-
WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
95-
int count = 0;
96-
WAL.Entry entry = new WAL.Entry();
97-
while (reader.next(entry) != null) {
98-
count++;
99-
List<Cell> cells = entry.getEdit().getCells();
100-
assertTrue("Should be one KV per WALEdit", cells.size() == 1);
101-
for (Cell cell : cells) {
102-
assertTrue("Incorrect row", Bytes.equals(cell.getRowArray(), cell.getRowOffset(),
103-
cell.getRowLength(), row, 0, row.length));
104-
assertTrue("Incorrect family", Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
105-
cell.getFamilyLength(), family, 0, family.length));
106-
assertTrue("Incorrect value", Bytes.equals(cell.getValueArray(), cell.getValueOffset(),
107-
cell.getValueLength(), value, 0, value.length));
100+
try (WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath)) {
101+
int count = 0;
102+
WAL.Entry entry = new WAL.Entry();
103+
while (reader.next(entry) != null) {
104+
count++;
105+
List<Cell> cells = entry.getEdit().getCells();
106+
assertThat("Should be two KVs per WALEdit", cells, hasSize(2));
107+
Cell putCell = cells.get(0);
108+
assertEquals(Cell.Type.Put, putCell.getType());
109+
assertTrue("Incorrect row", Bytes.equals(putCell.getRowArray(), putCell.getRowOffset(),
110+
putCell.getRowLength(), row, 0, row.length));
111+
assertTrue("Incorrect family", Bytes.equals(putCell.getFamilyArray(),
112+
putCell.getFamilyOffset(), putCell.getFamilyLength(), family, 0, family.length));
113+
assertTrue("Incorrect value", Bytes.equals(putCell.getValueArray(),
114+
putCell.getValueOffset(), putCell.getValueLength(), value, 0, value.length));
115+
116+
Cell deleteCell = cells.get(1);
117+
assertEquals(Cell.Type.DeleteFamily, deleteCell.getType());
118+
assertTrue("Incorrect row", Bytes.equals(deleteCell.getRowArray(),
119+
deleteCell.getRowOffset(), deleteCell.getRowLength(), row, 0, row.length));
120+
assertTrue("Incorrect family", Bytes.equals(deleteCell.getFamilyArray(),
121+
deleteCell.getFamilyOffset(), deleteCell.getFamilyLength(), family, 0, family.length));
108122
}
123+
assertEquals("Should have read back as many KVs as written", total, count);
109124
}
110-
assertEquals("Should have read back as many KVs as written", total, count);
111-
reader.close();
112125
}
113-
114126
}

hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWALValueCompression.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class TestCompressedWALValueCompression extends CompressedWALTestBase {
4545
public static final HBaseClassTestRule CLASS_RULE =
4646
HBaseClassTestRule.forClass(TestCompressedWALValueCompression.class);
4747

48-
@Parameters
48+
@Parameters(name = "{index}: compression={0}")
4949
public static List<Object[]> params() {
5050
return HBaseTestingUtility.COMPRESSION_ALGORITHMS_PARAMETERIZED;
5151
}
@@ -78,5 +78,4 @@ public void test() throws Exception {
7878
TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"));
7979
doTest(tableName);
8080
}
81-
8281
}

0 commit comments

Comments
 (0)