Skip to content

Commit 8a6fed7

Browse files
authored
HBASE-26316 Per-table or per-CF compression codec setting overrides (#3730)
We get and retain Compressor instances in HFileBlockDefaultEncodingContext, and could in theory call Compressor#reinit when setting up the context, to update compression parameters like level and buffer size, but we do not plumb through the CompoundConfiguration from the Store into the encoding context. As a consequence we can only update codec parameters globally in system site conf files. Fine grained configurability is important for algorithms like ZStandard (ZSTD), which offers more than 20 compression levels, where at level 1 it is almost as fast as LZ4, and where at higher levels it utilizes computationally expensive techniques to rival LZMA at compression ratio but trades off significantly for reduced compresson throughput. The ZSTD level that should be set for a given column family or table will vary by use case. Signed-off-by: Viraj Jasani <[email protected]>
1 parent 26ab9d0 commit 8a6fed7

File tree

59 files changed

+450
-265
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+450
-265
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to you under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations under
15+
* the License.
16+
*/
17+
package org.apache.hadoop.hbase.io.compress;
18+
19+
import org.apache.hadoop.conf.Configuration;
20+
import org.apache.yetus.audience.InterfaceAudience;
21+
22+
/**
23+
* This is a marker interface that indicates if a compressor or decompressor
24+
* type can support reinitialization via reinit(Configuration conf).
25+
*/
26+
@InterfaceAudience.Private
27+
public interface CanReinit {
28+
29+
void reinit(Configuration conf);
30+
31+
}

hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,9 @@
2222
import java.io.IOException;
2323
import java.io.InputStream;
2424
import java.io.OutputStream;
25-
2625
import org.apache.hadoop.conf.Configurable;
2726
import org.apache.hadoop.conf.Configuration;
2827
import org.apache.hadoop.hbase.HBaseConfiguration;
29-
import org.apache.hadoop.hbase.io.util.BlockIOUtils;
30-
import org.apache.hadoop.hbase.nio.ByteBuff;
3128
import org.apache.hadoop.io.compress.CodecPool;
3229
import org.apache.hadoop.io.compress.CompressionCodec;
3330
import org.apache.hadoop.io.compress.CompressionInputStream;
@@ -521,37 +518,6 @@ public static String[] getSupportedAlgorithms() {
521518
return ret;
522519
}
523520

524-
/**
525-
* Decompresses data from the given stream using the configured compression algorithm. It will
526-
* throw an exception if the dest buffer does not have enough space to hold the decompressed data.
527-
* @param dest the output buffer
528-
* @param bufferedBoundedStream a stream to read compressed data from, bounded to the exact amount
529-
* of compressed data
530-
* @param uncompressedSize uncompressed data size, header not included
531-
* @param compressAlgo compression algorithm used
532-
* @throws IOException if any IO error happen
533-
*/
534-
public static void decompress(ByteBuff dest, InputStream bufferedBoundedStream,
535-
int uncompressedSize, Compression.Algorithm compressAlgo) throws IOException {
536-
if (dest.remaining() < uncompressedSize) {
537-
throw new IllegalArgumentException("Output buffer does not have enough space to hold "
538-
+ uncompressedSize + " decompressed bytes, available: " + dest.remaining());
539-
}
540-
541-
Decompressor decompressor = null;
542-
try {
543-
decompressor = compressAlgo.getDecompressor();
544-
try (InputStream is =
545-
compressAlgo.createDecompressionStream(bufferedBoundedStream, decompressor, 0)) {
546-
BlockIOUtils.readFullyWithHeapBuffer(is, dest, uncompressedSize);
547-
}
548-
} finally {
549-
if (decompressor != null) {
550-
compressAlgo.returnDecompressor(decompressor);
551-
}
552-
}
553-
}
554-
555521
/**
556522
* Load a codec implementation for an algorithm using the supplied configuration.
557523
* @param conf the configuration to use

hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/AbstractDataBlockEncoder.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import java.io.IOException;
2020
import java.nio.ByteBuffer;
21+
22+
import org.apache.hadoop.conf.Configuration;
2123
import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
2224
import org.apache.hadoop.hbase.Cell;
2325
import org.apache.hadoop.hbase.KeyValue;
@@ -29,14 +31,15 @@
2931
public abstract class AbstractDataBlockEncoder implements DataBlockEncoder {
3032

3133
@Override
32-
public HFileBlockEncodingContext newDataBlockEncodingContext(
34+
public HFileBlockEncodingContext newDataBlockEncodingContext(Configuration conf,
3335
DataBlockEncoding encoding, byte[] header, HFileContext meta) {
34-
return new HFileBlockDefaultEncodingContext(encoding, header, meta);
36+
return new HFileBlockDefaultEncodingContext(conf, encoding, header, meta);
3537
}
3638

3739
@Override
38-
public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
39-
return new HFileBlockDefaultDecodingContext(meta);
40+
public HFileBlockDecodingContext newDataBlockDecodingContext(Configuration conf,
41+
HFileContext meta) {
42+
return new HFileBlockDefaultDecodingContext(conf, meta);
4043
}
4144

4245
protected void postEncoding(HFileBlockEncodingContext encodingCtx)

hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import java.io.DataOutputStream;
2121
import java.io.IOException;
2222
import java.nio.ByteBuffer;
23+
24+
import org.apache.hadoop.conf.Configuration;
2325
import org.apache.hadoop.hbase.Cell;
2426
import org.apache.hadoop.hbase.CellComparator;
2527
import org.apache.hadoop.hbase.io.hfile.HFileContext;
@@ -91,6 +93,8 @@ ByteBuffer decodeKeyValues(DataInputStream source, HFileBlockDecodingContext dec
9193
/**
9294
* Creates a encoder specific encoding context
9395
*
96+
* @param conf
97+
* store configuration
9498
* @param encoding
9599
* encoding strategy used
96100
* @param headerBytes
@@ -100,18 +104,20 @@ ByteBuffer decodeKeyValues(DataInputStream source, HFileBlockDecodingContext dec
100104
* HFile meta data
101105
* @return a newly created encoding context
102106
*/
103-
HFileBlockEncodingContext newDataBlockEncodingContext(
107+
HFileBlockEncodingContext newDataBlockEncodingContext(Configuration conf,
104108
DataBlockEncoding encoding, byte[] headerBytes, HFileContext meta);
105109

106110
/**
107111
* Creates an encoder specific decoding context, which will prepare the data
108112
* before actual decoding
109113
*
114+
* @param conf
115+
* store configuration
110116
* @param meta
111117
* HFile meta data
112118
* @return a newly created decoding context
113119
*/
114-
HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta);
120+
HFileBlockDecodingContext newDataBlockDecodingContext(Configuration conf, HFileContext meta);
115121

116122
/**
117123
* An interface which enable to seek while underlying data is encoded.

hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Iterator;
2828
import java.util.List;
2929
import org.apache.commons.lang3.NotImplementedException;
30+
import org.apache.hadoop.conf.Configuration;
3031
import org.apache.hadoop.hbase.Cell;
3132
import org.apache.hadoop.hbase.HConstants;
3233
import org.apache.hadoop.hbase.KeyValue;
@@ -57,6 +58,7 @@ public class EncodedDataBlock {
5758
private HFileContext meta;
5859

5960
private final DataBlockEncoding encoding;
61+
private final Configuration conf;
6062

6163
// The is for one situation that there are some cells includes tags and others are not.
6264
// isTagsLenZero stores if cell tags length is zero before doing encoding since we need
@@ -68,21 +70,23 @@ public class EncodedDataBlock {
6870

6971
/**
7072
* Create a buffer which will be encoded using dataBlockEncoder.
73+
* @param conf store configuration
7174
* @param dataBlockEncoder Algorithm used for compression.
7275
* @param encoding encoding type used
73-
* @param rawKVs
74-
* @param meta
76+
* @param rawKVs raw KVs
77+
* @param meta hfile context
7578
*/
76-
public EncodedDataBlock(DataBlockEncoder dataBlockEncoder, DataBlockEncoding encoding,
77-
byte[] rawKVs, HFileContext meta) {
79+
public EncodedDataBlock(Configuration conf, DataBlockEncoder dataBlockEncoder,
80+
DataBlockEncoding encoding, byte[] rawKVs, HFileContext meta) {
7881
Preconditions.checkNotNull(encoding,
7982
"Cannot create encoded data block with null encoder");
8083
this.dataBlockEncoder = dataBlockEncoder;
8184
this.encoding = encoding;
82-
encodingCtx = dataBlockEncoder.newDataBlockEncodingContext(encoding,
85+
encodingCtx = dataBlockEncoder.newDataBlockEncodingContext(conf, encoding,
8386
HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
8487
this.rawKVs = rawKVs;
8588
this.meta = meta;
89+
this.conf = conf;
8690
}
8791

8892
/**
@@ -115,7 +119,7 @@ public Cell next() {
115119
if (decompressedData == null) {
116120
try {
117121
decompressedData = dataBlockEncoder.decodeKeyValues(dis, dataBlockEncoder
118-
.newDataBlockDecodingContext(meta));
122+
.newDataBlockDecodingContext(conf, meta));
119123
} catch (IOException e) {
120124
throw new RuntimeException("Problem with data block encoder, " +
121125
"most likely it requested more bytes than are available.", e);

hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import java.io.IOException;
2121
import java.io.InputStream;
2222
import org.apache.commons.io.IOUtils;
23+
import org.apache.hadoop.conf.Configuration;
2324
import org.apache.hadoop.hbase.io.ByteBuffInputStream;
2425
import org.apache.hadoop.hbase.io.TagCompressionContext;
26+
import org.apache.hadoop.hbase.io.compress.CanReinit;
2527
import org.apache.hadoop.hbase.io.compress.Compression;
2628
import org.apache.hadoop.hbase.io.crypto.Cipher;
2729
import org.apache.hadoop.hbase.io.crypto.Decryptor;
@@ -30,6 +32,7 @@
3032
import org.apache.hadoop.hbase.io.util.BlockIOUtils;
3133
import org.apache.hadoop.hbase.nio.ByteBuff;
3234
import org.apache.hadoop.hbase.util.Bytes;
35+
import org.apache.hadoop.io.compress.Decompressor;
3336
import org.apache.yetus.audience.InterfaceAudience;
3437

3538
/**
@@ -41,10 +44,12 @@
4144
*/
4245
@InterfaceAudience.Private
4346
public class HFileBlockDefaultDecodingContext implements HFileBlockDecodingContext {
47+
private final Configuration conf;
4448
private final HFileContext fileContext;
4549
private TagCompressionContext tagCompressionContext;
4650

47-
public HFileBlockDefaultDecodingContext(HFileContext fileContext) {
51+
public HFileBlockDefaultDecodingContext(Configuration conf, HFileContext fileContext) {
52+
this.conf = conf;
4853
this.fileContext = fileContext;
4954
}
5055

@@ -87,8 +92,24 @@ public void prepareDecoding(int onDiskSizeWithoutHeader, int uncompressedSizeWit
8792

8893
Compression.Algorithm compression = fileContext.getCompression();
8994
if (compression != Compression.Algorithm.NONE) {
90-
Compression.decompress(blockBufferWithoutHeader, dataInputStream,
91-
uncompressedSizeWithoutHeader, compression);
95+
Decompressor decompressor = null;
96+
try {
97+
decompressor = compression.getDecompressor();
98+
// Some algorithms don't return decompressors and accept null as a valid parameter for
99+
// same when creating decompression streams. We can ignore these cases wrt reinit.
100+
if (decompressor instanceof CanReinit) {
101+
((CanReinit)decompressor).reinit(conf);
102+
}
103+
try (InputStream is =
104+
compression.createDecompressionStream(dataInputStream, decompressor, 0)) {
105+
BlockIOUtils.readFullyWithHeapBuffer(is, blockBufferWithoutHeader,
106+
uncompressedSizeWithoutHeader);
107+
}
108+
} finally {
109+
if (decompressor != null) {
110+
compression.returnDecompressor(decompressor);
111+
}
112+
}
92113
} else {
93114
BlockIOUtils.readFullyWithHeapBuffer(dataInputStream, blockBufferWithoutHeader,
94115
onDiskSizeWithoutHeader);

hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.io.IOException;
2323
import java.io.InputStream;
2424
import java.security.SecureRandom;
25+
26+
import org.apache.hadoop.conf.Configuration;
2527
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
2628
import org.apache.hadoop.hbase.io.TagCompressionContext;
2729
import org.apache.hadoop.hbase.io.compress.Compression;
@@ -72,18 +74,26 @@ public class HFileBlockDefaultEncodingContext implements HFileBlockEncodingConte
7274
private EncodingState encoderState;
7375

7476
/**
77+
* @param conf configuraton
7578
* @param encoding encoding used
7679
* @param headerBytes dummy header bytes
7780
* @param fileContext HFile meta data
7881
*/
79-
public HFileBlockDefaultEncodingContext(DataBlockEncoding encoding, byte[] headerBytes,
80-
HFileContext fileContext) {
82+
public HFileBlockDefaultEncodingContext(Configuration conf, DataBlockEncoding encoding,
83+
byte[] headerBytes, HFileContext fileContext) {
8184
this.encodingAlgo = encoding;
8285
this.fileContext = fileContext;
8386
Compression.Algorithm compressionAlgorithm =
8487
fileContext.getCompression() == null ? NONE : fileContext.getCompression();
8588
if (compressionAlgorithm != NONE) {
86-
compressor = compressionAlgorithm.getCompressor();
89+
if (compressor == null) {
90+
compressor = compressionAlgorithm.getCompressor();
91+
// Some algorithms don't return compressors and accept null as a valid parameter for
92+
// same when creating compression streams. We can ignore these cases wrt reinit.
93+
if (compressor != null) {
94+
compressor.reinit(conf);
95+
}
96+
}
8797
compressedByteStream = new ByteArrayOutputStream();
8898
try {
8999
compressionStream =

hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopCompressor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.IOException;
2020
import java.nio.ByteBuffer;
2121
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.hbase.io.compress.CanReinit;
2223
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
2324
import org.apache.yetus.audience.InterfaceAudience;
2425
import org.slf4j.Logger;
@@ -30,7 +31,7 @@
3031
*/
3132
@InterfaceAudience.Private
3233
public abstract class HadoopCompressor<T extends Compressor>
33-
implements org.apache.hadoop.io.compress.Compressor {
34+
implements CanReinit, org.apache.hadoop.io.compress.Compressor {
3435

3536
protected static final Logger LOG = LoggerFactory.getLogger(HadoopCompressor.class);
3637
protected T compressor;

hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestHFileCompressionLz4.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.hadoop.hbase.io.compress.aircompressor;
1919

2020
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.fs.Path;
2122
import org.apache.hadoop.hbase.HBaseClassTestRule;
23+
import org.apache.hadoop.hbase.HBaseTestingUtil;
2224
import org.apache.hadoop.hbase.io.compress.Compression;
2325
import org.apache.hadoop.hbase.io.compress.HFileTestBase;
2426
import org.apache.hadoop.hbase.testclassification.IOTests;
@@ -35,17 +37,21 @@ public class TestHFileCompressionLz4 extends HFileTestBase {
3537
public static final HBaseClassTestRule CLASS_RULE =
3638
HBaseClassTestRule.forClass(TestHFileCompressionLz4.class);
3739

40+
private static Configuration conf;
41+
3842
@BeforeClass
3943
public static void setUpBeforeClass() throws Exception {
40-
Configuration conf = TEST_UTIL.getConfiguration();
44+
conf = TEST_UTIL.getConfiguration();
4145
conf.set(Compression.LZ4_CODEC_CLASS_KEY, Lz4Codec.class.getCanonicalName());
4246
Compression.Algorithm.LZ4.reload(conf);
4347
HFileTestBase.setUpBeforeClass();
4448
}
4549

4650
@Test
4751
public void test() throws Exception {
48-
doTest(Compression.Algorithm.LZ4);
52+
Path path = new Path(TEST_UTIL.getDataTestDir(),
53+
HBaseTestingUtil.getRandomUUID().toString() + ".hfile");
54+
doTest(conf, path, Compression.Algorithm.LZ4);
4955
}
5056

5157
}

hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestHFileCompressionLzo.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.hadoop.hbase.io.compress.aircompressor;
1919

2020
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.fs.Path;
2122
import org.apache.hadoop.hbase.HBaseClassTestRule;
23+
import org.apache.hadoop.hbase.HBaseTestingUtil;
2224
import org.apache.hadoop.hbase.io.compress.Compression;
2325
import org.apache.hadoop.hbase.io.compress.HFileTestBase;
2426
import org.apache.hadoop.hbase.testclassification.IOTests;
@@ -35,17 +37,21 @@ public class TestHFileCompressionLzo extends HFileTestBase {
3537
public static final HBaseClassTestRule CLASS_RULE =
3638
HBaseClassTestRule.forClass(TestHFileCompressionLzo.class);
3739

40+
private static Configuration conf;
41+
3842
@BeforeClass
3943
public static void setUpBeforeClass() throws Exception {
40-
Configuration conf = TEST_UTIL.getConfiguration();
44+
conf = TEST_UTIL.getConfiguration();
4145
conf.set(Compression.LZO_CODEC_CLASS_KEY, LzoCodec.class.getCanonicalName());
4246
Compression.Algorithm.LZO.reload(conf);
4347
HFileTestBase.setUpBeforeClass();
4448
}
4549

4650
@Test
4751
public void test() throws Exception {
48-
doTest(Compression.Algorithm.LZO);
52+
Path path = new Path(TEST_UTIL.getDataTestDir(),
53+
HBaseTestingUtil.getRandomUUID().toString() + ".hfile");
54+
doTest(conf, path, Compression.Algorithm.LZO);
4955
}
5056

5157
}

0 commit comments

Comments
 (0)