Skip to content

Commit ca44233

Browse files
committed
We were fully flushing the deflater at the end of every value
(FULL_FLUSH). This was very conservative, allowing each value to be decompressed individually (so is resilient to corruptions), but seriously impacts compression ratio. Meanwhile our custom dictionary scheme accumulates strings over the whole file, so we were being conservative in value compression in a way our custom scheme defeats anyway. We can instead also let the Deflater build its dictionary over all values in the WAL, using SYNC_FLUSH instead. Add more unit tests.
1 parent 3822a59 commit ca44233

File tree

5 files changed

+177
-53
lines changed

5 files changed

+177
-53
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.EnumMap;
2424
import java.util.Map;
2525
import java.util.zip.Deflater;
26+
import java.util.zip.Inflater;
2627

2728
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
2829
import org.apache.yetus.audience.InterfaceAudience;
@@ -35,21 +36,45 @@
3536
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
3637
public class CompressionContext {
3738

38-
static final String ENABLE_WAL_TAGS_COMPRESSION =
39+
public static final String ENABLE_WAL_TAGS_COMPRESSION =
3940
"hbase.regionserver.wal.tags.enablecompression";
4041

41-
static final String ENABLE_WAL_VALUE_COMPRESSION =
42+
public static final String ENABLE_WAL_VALUE_COMPRESSION =
4243
"hbase.regionserver.wal.value.enablecompression";
4344

4445
public enum DictionaryIndex {
4546
REGION, TABLE, FAMILY, QUALIFIER, ROW
4647
}
4748

49+
static class ValueCompressor {
50+
final Deflater deflater;
51+
final Inflater inflater;
52+
53+
public ValueCompressor() {
54+
deflater = new Deflater();
55+
deflater.setLevel(Deflater.BEST_SPEED);
56+
inflater = new Inflater();
57+
}
58+
59+
public Deflater getDeflater() {
60+
return deflater;
61+
}
62+
63+
public Inflater getInflater() {
64+
return inflater;
65+
}
66+
67+
public void clear() {
68+
deflater.reset();
69+
inflater.reset();
70+
}
71+
};
72+
4873
private final Map<DictionaryIndex, Dictionary> dictionaries =
4974
new EnumMap<>(DictionaryIndex.class);
5075
// Context used for compressing tags
5176
TagCompressionContext tagCompressionContext = null;
52-
Deflater valueCompressor = null;
77+
ValueCompressor valueCompressor = null;
5378

5479
public CompressionContext(Class<? extends Dictionary> dictType, boolean recoveredEdits,
5580
boolean hasTagCompression, boolean hasValueCompression)
@@ -77,9 +102,7 @@ public CompressionContext(Class<? extends Dictionary> dictType, boolean recovere
77102
tagCompressionContext = new TagCompressionContext(dictType, Short.MAX_VALUE);
78103
}
79104
if (hasValueCompression) {
80-
valueCompressor = new Deflater();
81-
// Optimize for encoding speed
82-
valueCompressor.setLevel(Deflater.BEST_SPEED);
105+
valueCompressor = new ValueCompressor();
83106
}
84107
}
85108

@@ -94,7 +117,7 @@ public Dictionary getDictionary(Enum dictIndex) {
94117
return dictionaries.get(dictIndex);
95118
}
96119

97-
public Deflater getValueCompressor() {
120+
public ValueCompressor getValueCompressor() {
98121
return valueCompressor;
99122
}
100123

@@ -106,7 +129,7 @@ void clear() {
106129
tagCompressionContext.clear();
107130
}
108131
if (valueCompressor != null) {
109-
valueCompressor.reset();
132+
valueCompressor.clear();
110133
}
111134
}
112135

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,12 @@
4242
import org.apache.hadoop.hbase.io.util.Dictionary;
4343
import org.apache.hadoop.hbase.io.util.StreamUtils;
4444
import org.apache.hadoop.hbase.nio.ByteBuff;
45+
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext.ValueCompressor;
4546
import org.apache.hadoop.hbase.util.ByteBufferUtils;
4647
import org.apache.hadoop.hbase.util.Bytes;
4748
import org.apache.hadoop.hbase.util.ReflectionUtils;
4849
import org.apache.hadoop.io.IOUtils;
49-
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
50+
5051
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
5152
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
5253

@@ -272,18 +273,18 @@ public void write(Cell cell) throws IOException {
272273
}
273274

274275
public static void writeCompressedValue(OutputStream out, byte[] valueArray, int offset,
275-
int vlength, Deflater deflater) throws IOException {
276+
int vlength, ValueCompressor valueCompressor) throws IOException {
276277
byte[] buffer = new byte[4096];
277278
ByteArrayOutputStream baos = new ByteArrayOutputStream();
278-
deflater.reset();
279+
Deflater deflater = valueCompressor.getDeflater();
279280
deflater.setInput(valueArray, offset, vlength);
280281
boolean finished = false;
281282
do {
282283
int bytesOut = deflater.deflate(buffer);
283284
if (bytesOut > 0) {
284285
baos.write(buffer, 0, bytesOut);
285286
} else {
286-
bytesOut = deflater.deflate(buffer, 0, buffer.length, Deflater.FULL_FLUSH);
287+
bytesOut = deflater.deflate(buffer, 0, buffer.length, Deflater.SYNC_FLUSH);
287288
baos.write(buffer, 0, bytesOut);
288289
finished = true;
289290
}
@@ -351,7 +352,7 @@ protected Cell parseCell() throws IOException {
351352
type = (byte)(type & 0x7f);
352353
pos = Bytes.putByte(backingArray, pos, type);
353354
int valLen = typeValLen - 1;
354-
readCompressedValue(in, backingArray, pos, valLen);
355+
readCompressedValue(in, backingArray, pos, valLen, compression.getValueCompressor());
355356
pos += valLen;
356357
} else {
357358
pos = Bytes.putByte(backingArray, pos, type);
@@ -400,17 +401,16 @@ private static void checkLength(int len, int max) throws IOException {
400401
}
401402

402403
public static void readCompressedValue(InputStream in, byte[] outArray, int outOffset,
403-
int expectedLength) throws IOException {
404+
int expectedLength, ValueCompressor valueCompressor) throws IOException {
404405
int compressedLength = StreamUtils.readRawVarint32(in);
405406
byte[] buffer = new byte[compressedLength];
406407
IOUtils.readFully(in, buffer, 0, compressedLength);
407-
Inflater inflater = new Inflater();
408+
Inflater inflater = valueCompressor.getInflater();
408409
inflater.setInput(buffer);
409410
int remaining = expectedLength;
410411
do {
411412
try {
412413
int inflatedBytes = inflater.inflate(outArray, outOffset, remaining);
413-
Preconditions.checkState(inflatedBytes > 0, "Inflater state error");
414414
outOffset += inflatedBytes;
415415
remaining -= inflatedBytes;
416416
} catch (DataFormatException e) {

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java

Lines changed: 48 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.nio.ByteBuffer;
2727
import java.util.ArrayList;
2828
import java.util.List;
29+
2930
import org.apache.hadoop.conf.Configuration;
3031
import org.apache.hadoop.hbase.ArrayBackedTag;
3132
import org.apache.hadoop.hbase.ByteBufferKeyValue;
@@ -74,14 +75,20 @@ public void testValueCompressionEnabled() throws Exception {
7475
@Test
7576
public void testValueCompression() throws Exception {
7677
final byte[] row_1 = Bytes.toBytes("row_1");
77-
final byte[] value_1 = new byte[512];
78+
final byte[] value_1 = new byte[WALCellCodec.VALUE_COMPRESS_THRESHOLD];
7879
Bytes.zero(value_1);
7980
final byte[] row_2 = Bytes.toBytes("row_2");
8081
final byte[] value_2 = new byte[Bytes.SIZEOF_LONG];
8182
Bytes.random(value_2);
8283
final byte[] row_3 = Bytes.toBytes("row_3");
83-
final byte[] value_3 = new byte[1024];
84+
final byte[] value_3 = new byte[WALCellCodec.VALUE_COMPRESS_THRESHOLD];
8485
Bytes.random(value_3);
86+
final byte[] row_4 = Bytes.toBytes("row_4");
87+
final byte[] value_4 = new byte[WALCellCodec.VALUE_COMPRESS_THRESHOLD * 4];
88+
fillBytes(value_4, Bytes.toBytes("DEADBEEF"));
89+
final byte[] row_5 = Bytes.toBytes("row_5");
90+
final byte[] value_5 = new byte[WALCellCodec.VALUE_COMPRESS_THRESHOLD * 2];
91+
fillBytes(value_5, Bytes.toBytes("CAFEBABE"));
8592

8693
Configuration conf = new Configuration(false);
8794
WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class,
@@ -91,6 +98,8 @@ public void testValueCompression() throws Exception {
9198
encoder.write(createKV(row_1, value_1, 0));
9299
encoder.write(createKV(row_2, value_2, 0));
93100
encoder.write(createKV(row_3, value_3, 0));
101+
encoder.write(createKV(row_4, value_4, 0));
102+
encoder.write(createKV(row_5, value_5, 0));
94103
encoder.flush();
95104

96105
try (InputStream is = new ByteArrayInputStream(bos.toByteArray())) {
@@ -113,55 +122,57 @@ public void testValueCompression() throws Exception {
113122
kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
114123
assertTrue(Bytes.equals(value_3, 0, value_3.length,
115124
kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
125+
decoder.advance();
126+
kv = (KeyValue) decoder.current();
127+
assertTrue(Bytes.equals(row_4, 0, row_4.length,
128+
kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
129+
assertTrue(Bytes.equals(value_4, 0, value_4.length,
130+
kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
131+
decoder.advance();
132+
kv = (KeyValue) decoder.current();
133+
assertTrue(Bytes.equals(row_5, 0, row_5.length,
134+
kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
135+
assertTrue(Bytes.equals(value_5, 0, value_5.length,
136+
kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
137+
}
138+
}
139+
140+
static void fillBytes(byte[] buffer, byte[] fill) {
141+
int offset = 0;
142+
int remaining = buffer.length;
143+
while (remaining > 0) {
144+
int len = remaining < fill.length ? remaining : fill.length;
145+
System.arraycopy(fill, 0, buffer, offset, len);
146+
offset += len;
147+
remaining -= len;
116148
}
117149
}
118150

119151
@Test
120152
public void testValueCompressionCompatibility() throws Exception {
121-
final byte[] row_1 = Bytes.toBytes("row_1");
122-
final byte[] value_1 = new byte[512];
123-
Bytes.zero(value_1);
124-
final byte[] row_2 = Bytes.toBytes("row_2");
125-
final byte[] value_2 = new byte[Bytes.SIZEOF_LONG];
126-
Bytes.random(value_2);
127-
final byte[] row_3 = Bytes.toBytes("row_3");
128-
final byte[] value_3 = new byte[1024];
129-
Bytes.random(value_3);
130-
153+
final byte[] key = Bytes.toBytes("myRow");
154+
final byte[] value = Bytes.toBytes("myValue");
131155
Configuration conf = new Configuration(false);
132156
WALCellCodec outCodec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class,
133157
false, false, false));
134158
ByteArrayOutputStream bos = new ByteArrayOutputStream();
135159
Encoder encoder = outCodec.getEncoder(bos);
136-
encoder.write(createKV(row_1, value_1, 0));
137-
encoder.write(createKV(row_2, value_2, 0));
138-
encoder.write(createKV(row_3, value_3, 0));
160+
for (int i = 0; i < 10; i++) {
161+
encoder.write(createOffheapKV(key, value, 0));
162+
}
139163
encoder.flush();
140-
141-
byte[] encodedCells = bos.toByteArray();
142-
143164
WALCellCodec inCodec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class,
144165
false, false, true));
145-
try (InputStream is = new ByteArrayInputStream(encodedCells)) {
166+
try (InputStream is = new ByteArrayInputStream(bos.toByteArray())) {
146167
Decoder decoder = inCodec.getDecoder(is);
147-
decoder.advance();
148-
KeyValue kv = (KeyValue) decoder.current();
149-
assertTrue(Bytes.equals(row_1, 0, row_1.length,
150-
kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
151-
assertTrue(Bytes.equals(value_1, 0, value_1.length,
152-
kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
153-
decoder.advance();
154-
kv = (KeyValue) decoder.current();
155-
assertTrue(Bytes.equals(row_2, 0, row_2.length,
156-
kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
157-
assertTrue(Bytes.equals(value_2, 0, value_2.length,
158-
kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
159-
decoder.advance();
160-
kv = (KeyValue) decoder.current();
161-
assertTrue(Bytes.equals(row_3, 0, row_3.length,
162-
kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
163-
assertTrue(Bytes.equals(value_3, 0, value_3.length,
164-
kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
168+
for (int i = 0 ; i < 10; i++) {
169+
decoder.advance();
170+
KeyValue kv = (KeyValue) decoder.current();
171+
assertTrue(Bytes.equals(key, 0, key.length,
172+
kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
173+
assertTrue(Bytes.equals(value, 0, value.length,
174+
kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
175+
}
165176
}
166177
}
167178

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.regionserver.wal;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.hbase.HBaseClassTestRule;
22+
import org.apache.hadoop.hbase.HConstants;
23+
import org.apache.hadoop.hbase.testclassification.LargeTests;
24+
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
25+
import org.junit.BeforeClass;
26+
import org.junit.ClassRule;
27+
import org.junit.experimental.categories.Category;
28+
29+
/**
30+
* Enables compression and runs the TestWALReplay tests.
31+
*/
32+
@Category({ RegionServerTests.class, LargeTests.class })
33+
public class TestWALReplayValueCompression extends TestWALReplay {
34+
35+
@ClassRule
36+
public static final HBaseClassTestRule CLASS_RULE =
37+
HBaseClassTestRule.forClass(TestWALReplayValueCompression.class);
38+
39+
@BeforeClass
40+
public static void setUpBeforeClass() throws Exception {
41+
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
42+
conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
43+
conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
44+
TestWALReplay.setUpBeforeClass();
45+
}
46+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.wal;
19+
20+
import org.apache.hadoop.hbase.HBaseClassTestRule;
21+
import org.apache.hadoop.hbase.HConstants;
22+
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
23+
import org.apache.hadoop.hbase.testclassification.LargeTests;
24+
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
25+
import org.junit.BeforeClass;
26+
import org.junit.ClassRule;
27+
import org.junit.experimental.categories.Category;
28+
29+
@Category({RegionServerTests.class, LargeTests.class})
30+
public class TestWALSplitValueCompression extends TestWALSplit {
31+
32+
@ClassRule
33+
public static final HBaseClassTestRule CLASS_RULE =
34+
HBaseClassTestRule.forClass(TestWALSplitValueCompression.class);
35+
36+
@BeforeClass
37+
public static void setUpBeforeClass() throws Exception {
38+
TestWALSplit.setUpBeforeClass();
39+
TEST_UTIL.getConfiguration()
40+
.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
41+
TEST_UTIL.getConfiguration()
42+
.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
43+
}
44+
}

0 commit comments

Comments
 (0)