From ebdc56501ce16124f7ca7391dc47576859592438 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Tue, 2 Aug 2022 12:18:28 +0100 Subject: [PATCH 01/10] HBASE-27264 Add options to consider compressed size when delimiting blocks during hfile writes --- .../hadoop/hbase/io/hfile/HFileBlock.java | 43 ++++++++++-- .../hbase/io/hfile/HFileWriterImpl.java | 8 ++- .../hbase/regionserver/TestHStoreFile.java | 67 +++++++++++++++++++ 3 files changed, 113 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index fe2e0c7fab9a..0e5ea46008ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.EncodedDataBlock; import org.apache.hadoop.hbase.io.encoding.EncodingState; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; @@ -239,6 +240,10 @@ static class Header { static final byte[] DUMMY_HEADER_NO_CHECKSUM = new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM]; + public static final String BLOCK_SIZE_LIMIT_COMPRESSED = "hbase.block.size.limit.compressed"; + + public static final String MAX_BLOCK_SIZE_COMPRESSED = "hbase.block.size.max.compressed"; + /** * Used deserializing blocks from Cache. * ++++++++++++++ @@ -454,7 +459,7 @@ int getOnDiskSizeWithoutHeader() { } /** Returns the uncompressed size of data part (header and checksum excluded). */ - int getUncompressedSizeWithoutHeader() { + public int getUncompressedSizeWithoutHeader() { return uncompressedSizeWithoutHeader; } @@ -729,6 +734,12 @@ private enum State { BLOCK_READY } + private boolean sizeLimitCompressed; + + private int maxSizeCompressed; + + private int adjustedBlockSize; + /** Writer state. Used to ensure the correct usage protocol. */ private State state = State.INIT; @@ -807,11 +818,12 @@ EncodingState getEncodingState() { */ public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) { - this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP); + this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP, false, fileContext.getBlocksize()); } + public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, - HFileContext fileContext, ByteBuffAllocator allocator) { + HFileContext fileContext, ByteBuffAllocator allocator, boolean sizeLimitcompleted, int maxSizeCompressed) { if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) { throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " @@ -834,6 +846,8 @@ public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, // TODO: Why fileContext saved away when we have dataBlockEncoder and/or // defaultDataBlockEncoder? this.fileContext = fileContext; + this.sizeLimitCompressed = sizeLimitcompleted; + this.maxSizeCompressed = maxSizeCompressed; } /** @@ -886,6 +900,27 @@ void ensureBlockReady() throws IOException { finishBlock(); } + public boolean shouldFinishBlock() throws IOException { + int uncompressedBlockSize = blockSizeWritten(); + if (uncompressedBlockSize >= fileContext.getBlocksize()) { + if (sizeLimitCompressed && uncompressedBlockSize < maxSizeCompressed) { + //In order to avoid excessive compression size calculations, we do it only once when + //the uncompressed size has reached BLOCKSIZE. We then use this compression size to + //calculate the compression rate, and adjust the block size limit by this ratio. + if (adjustedBlockSize == 0) { + int compressedSize = EncodedDataBlock.getCompressedSize(fileContext.getCompression(), + fileContext.getCompression().getCompressor(), baosInMemory.getBuffer(), 0, + baosInMemory.size()); + adjustedBlockSize = uncompressedBlockSize/compressedSize; + adjustedBlockSize *= fileContext.getBlocksize(); + } + return uncompressedBlockSize >= adjustedBlockSize; + } + return true; + } + return false; + } + /** * Finish up writing of the block. Flushes the compressing stream (if using compression), fills * out the header, does any compression/encryption of bytes to flush out to disk, and manages @@ -1066,7 +1101,7 @@ int getUncompressedSizeWithoutHeader() { /** * The uncompressed size of the block data, including header size. */ - int getUncompressedSizeWithHeader() { + public int getUncompressedSizeWithHeader() { expectState(State.BLOCK_READY); return baosInMemory.size(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index 80e333050c6b..c0b28b495a7c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.io.hfile; +import static org.apache.hadoop.hbase.io.hfile.HFileBlock.BLOCK_SIZE_LIMIT_COMPRESSED; +import static org.apache.hadoop.hbase.io.hfile.HFileBlock.MAX_BLOCK_SIZE_COMPRESSED; + import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; @@ -292,7 +295,9 @@ protected void finishInit(final Configuration conf) { throw new IllegalStateException("finishInit called twice"); } blockWriter = - new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator()); + new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator(), + conf.getBoolean(BLOCK_SIZE_LIMIT_COMPRESSED, false), + conf.getInt(MAX_BLOCK_SIZE_COMPRESSED, hFileContext.getBlocksize()*10)); // Data block index writer boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter, @@ -319,6 +324,7 @@ protected void checkBlockBoundary() throws IOException { shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize() || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize(); } + shouldFinishBlock &= blockWriter.shouldFinishBlock(); if (shouldFinishBlock) { finishBlock(); writeInlineBlocks(false); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java index 7eff766c0b25..95b55e94ab0d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; @@ -189,6 +190,25 @@ public static void writeStoreFile(final StoreFileWriter writer, byte[] fam, byte } } + public static void writeLargeStoreFile(final StoreFileWriter writer, byte[] fam, byte[] qualifier, + int rounds) + throws IOException { + long now = EnvironmentEdgeManager.currentTime(); + try { + for(int i=0; i= BLOCKSIZE_SMALL); + } + assertEquals(blockCount, 100); + } + } From 1ff2efa333e33ad8b2c480b19df28119d55254ee Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Tue, 2 Aug 2022 17:37:52 +0100 Subject: [PATCH 02/10] fixing UT and spotless error --- .../hadoop/hbase/io/hfile/HFileBlock.java | 19 ++++++---- .../hbase/io/hfile/HFileWriterImpl.java | 11 +++--- .../hbase/regionserver/TestHStoreFile.java | 36 +++++++------------ 3 files changed, 31 insertions(+), 35 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 0e5ea46008ce..876c440ffd30 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -734,6 +734,10 @@ private enum State { BLOCK_READY } + public boolean isSizeLimitCompressed() { + return sizeLimitCompressed; + } + private boolean sizeLimitCompressed; private int maxSizeCompressed; @@ -818,12 +822,13 @@ EncodingState getEncodingState() { */ public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) { - this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP, false, fileContext.getBlocksize()); + this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP, false, + fileContext.getBlocksize()); } - public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, - HFileContext fileContext, ByteBuffAllocator allocator, boolean sizeLimitcompleted, int maxSizeCompressed) { + HFileContext fileContext, ByteBuffAllocator allocator, boolean sizeLimitcompleted, + int maxSizeCompressed) { if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) { throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " @@ -904,14 +909,14 @@ public boolean shouldFinishBlock() throws IOException { int uncompressedBlockSize = blockSizeWritten(); if (uncompressedBlockSize >= fileContext.getBlocksize()) { if (sizeLimitCompressed && uncompressedBlockSize < maxSizeCompressed) { - //In order to avoid excessive compression size calculations, we do it only once when - //the uncompressed size has reached BLOCKSIZE. We then use this compression size to - //calculate the compression rate, and adjust the block size limit by this ratio. + // In order to avoid excessive compression size calculations, we do it only once when + // the uncompressed size has reached BLOCKSIZE. We then use this compression size to + // calculate the compression rate, and adjust the block size limit by this ratio. if (adjustedBlockSize == 0) { int compressedSize = EncodedDataBlock.getCompressedSize(fileContext.getCompression(), fileContext.getCompression().getCompressor(), baosInMemory.getBuffer(), 0, baosInMemory.size()); - adjustedBlockSize = uncompressedBlockSize/compressedSize; + adjustedBlockSize = uncompressedBlockSize / compressedSize; adjustedBlockSize *= fileContext.getBlocksize(); } return uncompressedBlockSize >= adjustedBlockSize; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index c0b28b495a7c..1c6e48300dfa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -294,10 +294,9 @@ protected void finishInit(final Configuration conf) { if (blockWriter != null) { throw new IllegalStateException("finishInit called twice"); } - blockWriter = - new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator(), - conf.getBoolean(BLOCK_SIZE_LIMIT_COMPRESSED, false), - conf.getInt(MAX_BLOCK_SIZE_COMPRESSED, hFileContext.getBlocksize()*10)); + blockWriter = new HFileBlock.Writer(conf, blockEncoder, hFileContext, + cacheConf.getByteBuffAllocator(), conf.getBoolean(BLOCK_SIZE_LIMIT_COMPRESSED, false), + conf.getInt(MAX_BLOCK_SIZE_COMPRESSED, hFileContext.getBlocksize() * 10)); // Data block index writer boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter, @@ -324,7 +323,9 @@ protected void checkBlockBoundary() throws IOException { shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize() || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize(); } - shouldFinishBlock &= blockWriter.shouldFinishBlock(); + if (blockWriter.isSizeLimitCompressed()) { + shouldFinishBlock &= blockWriter.shouldFinishBlock(); + } if (shouldFinishBlock) { finishBlock(); writeInlineBlocks(false); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java index 95b55e94ab0d..8c8f82966cc3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java @@ -191,15 +191,14 @@ public static void writeStoreFile(final StoreFileWriter writer, byte[] fam, byte } public static void writeLargeStoreFile(final StoreFileWriter writer, byte[] fam, byte[] qualifier, - int rounds) - throws IOException { + int rounds) throws IOException { long now = EnvironmentEdgeManager.currentTime(); try { - for(int i=0; i= BLOCKSIZE_SMALL); From 17171a37d29ed7c607af4a775736838f36c9a197 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Mon, 8 Aug 2022 16:29:14 +0100 Subject: [PATCH 03/10] changing to recalculate adjusted size on every block closure --- .../main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 876c440ffd30..28db1a42979f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -912,7 +912,7 @@ public boolean shouldFinishBlock() throws IOException { // In order to avoid excessive compression size calculations, we do it only once when // the uncompressed size has reached BLOCKSIZE. We then use this compression size to // calculate the compression rate, and adjust the block size limit by this ratio. - if (adjustedBlockSize == 0) { + if (adjustedBlockSize == 0 || uncompressedBlockSize >= adjustedBlockSize) { int compressedSize = EncodedDataBlock.getCompressedSize(fileContext.getCompression(), fileContext.getCompression().getCompressor(), baosInMemory.getBuffer(), 0, baosInMemory.size()); From a328d9b9c95735fb8980dc9fa8f9d87e100d8af1 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Tue, 9 Aug 2022 11:22:30 +0100 Subject: [PATCH 04/10] addressing review comments --- .../org/apache/hadoop/hbase/regionserver/TestHStoreFile.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java index 8c8f82966cc3..3dbae728f27d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java @@ -1244,7 +1244,7 @@ public void testDataBlockSizeCompressed() throws Exception { block = fReader.readBlock(offset, -1, /* cacheBlock */ false, /* pread */ false, /* isCompaction */ false, /* updateCacheMetrics */ false, null, null); offset += block.getOnDiskSizeWithHeader(); - blockCount += 1; + blockCount++; assertTrue(block.getUncompressedSizeWithoutHeader() >= BLOCKSIZE_SMALL); } assertEquals(blockCount, 100); From 694631c023be94b5bca2c1429f5a5cb8100bdf37 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Tue, 9 Aug 2022 14:09:24 +0100 Subject: [PATCH 05/10] renaming config properties --- .../hadoop/hbase/io/hfile/HFileBlock.java | 20 +++++++++---------- .../hbase/io/hfile/HFileWriterImpl.java | 10 +++++----- .../hbase/regionserver/TestHStoreFile.java | 3 ++- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 28db1a42979f..1c448b767540 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -240,9 +240,9 @@ static class Header { static final byte[] DUMMY_HEADER_NO_CHECKSUM = new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM]; - public static final String BLOCK_SIZE_LIMIT_COMPRESSED = "hbase.block.size.limit.compressed"; + public static final String BLOCK_DELIMIT_COMPRESSED = "hbase.block.delimit.compressed"; - public static final String MAX_BLOCK_SIZE_COMPRESSED = "hbase.block.size.max.compressed"; + public static final String MAX_BLOCK_SIZE_UNCOMPRESSED = "hbase.block.max.size.uncompressed"; /** * Used deserializing blocks from Cache. @@ -734,13 +734,13 @@ private enum State { BLOCK_READY } - public boolean isSizeLimitCompressed() { - return sizeLimitCompressed; + public boolean isDelimitByCompressedSize() { + return delimitByCompressedSize; } - private boolean sizeLimitCompressed; + private boolean delimitByCompressedSize; - private int maxSizeCompressed; + private int maxSizeUnCompressed; private int adjustedBlockSize; @@ -828,7 +828,7 @@ public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext, ByteBuffAllocator allocator, boolean sizeLimitcompleted, - int maxSizeCompressed) { + int maxSizeUnCompressed) { if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) { throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " @@ -851,8 +851,8 @@ public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, // TODO: Why fileContext saved away when we have dataBlockEncoder and/or // defaultDataBlockEncoder? this.fileContext = fileContext; - this.sizeLimitCompressed = sizeLimitcompleted; - this.maxSizeCompressed = maxSizeCompressed; + this.delimitByCompressedSize = sizeLimitcompleted; + this.maxSizeUnCompressed = maxSizeUnCompressed; } /** @@ -908,7 +908,7 @@ void ensureBlockReady() throws IOException { public boolean shouldFinishBlock() throws IOException { int uncompressedBlockSize = blockSizeWritten(); if (uncompressedBlockSize >= fileContext.getBlocksize()) { - if (sizeLimitCompressed && uncompressedBlockSize < maxSizeCompressed) { + if (delimitByCompressedSize && uncompressedBlockSize < maxSizeUnCompressed) { // In order to avoid excessive compression size calculations, we do it only once when // the uncompressed size has reached BLOCKSIZE. We then use this compression size to // calculate the compression rate, and adjust the block size limit by this ratio. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index 1c6e48300dfa..70b8b69eb444 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -17,8 +17,8 @@ */ package org.apache.hadoop.hbase.io.hfile; -import static org.apache.hadoop.hbase.io.hfile.HFileBlock.BLOCK_SIZE_LIMIT_COMPRESSED; -import static org.apache.hadoop.hbase.io.hfile.HFileBlock.MAX_BLOCK_SIZE_COMPRESSED; +import static org.apache.hadoop.hbase.io.hfile.HFileBlock.BLOCK_DELIMIT_COMPRESSED; +import static org.apache.hadoop.hbase.io.hfile.HFileBlock.MAX_BLOCK_SIZE_UNCOMPRESSED; import java.io.DataOutput; import java.io.DataOutputStream; @@ -295,8 +295,8 @@ protected void finishInit(final Configuration conf) { throw new IllegalStateException("finishInit called twice"); } blockWriter = new HFileBlock.Writer(conf, blockEncoder, hFileContext, - cacheConf.getByteBuffAllocator(), conf.getBoolean(BLOCK_SIZE_LIMIT_COMPRESSED, false), - conf.getInt(MAX_BLOCK_SIZE_COMPRESSED, hFileContext.getBlocksize() * 10)); + cacheConf.getByteBuffAllocator(), conf.getBoolean(BLOCK_DELIMIT_COMPRESSED, false), + conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 10)); // Data block index writer boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter, @@ -323,7 +323,7 @@ protected void checkBlockBoundary() throws IOException { shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize() || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize(); } - if (blockWriter.isSizeLimitCompressed()) { + if (blockWriter.isDelimitByCompressedSize()) { shouldFinishBlock &= blockWriter.shouldFinishBlock(); } if (shouldFinishBlock) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java index 3dbae728f27d..c87d4e1620a9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.io.hfile.HFileBlock.BLOCK_DELIMIT_COMPRESSED; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -1217,7 +1218,7 @@ public void testDataBlockSizeCompressed() throws Exception { Path dir = new Path(new Path(this.testDir, "7e0102"), "familyname"); Path path = new Path(dir, "1234567890"); DataBlockEncoding dataBlockEncoderAlgo = DataBlockEncoding.FAST_DIFF; - conf.setBoolean("hbase.block.size.limit.compressed", true); + conf.setBoolean(BLOCK_DELIMIT_COMPRESSED, true); cacheConf = new CacheConfig(conf); HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).withChecksumType(CKTYPE) From 6918d65cdc4afb93e25600c2b55de8f274c3b7c9 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Thu, 11 Aug 2022 17:09:05 +0100 Subject: [PATCH 06/10] Making the adjust size calculation pluggable --- .../hfile/BlockCompressedSizePredicator.java | 49 +++++++++++++++ .../hadoop/hbase/io/hfile/HFileBlock.java | 35 ++++------- .../hbase/io/hfile/HFileWriterImpl.java | 7 +-- ...reviousBlockCompressionRatePredicator.java | 63 +++++++++++++++++++ .../UncompressedBlockSizePredicator.java | 48 ++++++++++++++ .../hbase/regionserver/TestHStoreFile.java | 23 +++++-- 6 files changed, 192 insertions(+), 33 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCompressedSizePredicator.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PreviousBlockCompressionRatePredicator.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/UncompressedBlockSizePredicator.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCompressedSizePredicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCompressedSizePredicator.java new file mode 100644 index 000000000000..7520360600a9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCompressedSizePredicator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import org.apache.hadoop.hbase.io.ByteArrayOutputStream; +import org.apache.yetus.audience.InterfaceAudience; + +import java.io.IOException; + +/** + * Allows for defining different compression rate predicates in implementing classes. Useful + * when compression is in place, and we want to define block size based on the compressed size, + * rather than the default behaviour that considers the uncompressed size only. + * + * Since we don't actually know the compressed size until we actual apply compression in the block + * byte buffer, we need to "predicate" this compression rate and minimize compression execution to + * avoid excessive resources usage. + */ +@InterfaceAudience.Private +public interface BlockCompressedSizePredicator { + + String BLOCK_COMPRESSED_SIZE_PREDICATOR = "hbase.block.compressed.size.predicator"; + + /** + * Calculates an adjusted block size limit based on a compression rate predicate. + * @param context the meta file information for the current file. + * @param uncompressedBlockSize the total uncompressed size read for the block so far. + * @param blockContent The byte array containing the block content so far. + * @return the adjusted block size limit based on a compression rate predicate. + * @throws IOException + */ + int calculateCompressionSizeLimit(HFileContext context, int uncompressedBlockSize, + ByteArrayOutputStream blockContent) throws IOException; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 1c448b767540..821fb8aff151 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP; +import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR; import java.io.DataInputStream; import java.io.DataOutput; @@ -56,6 +57,7 @@ import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -240,8 +242,6 @@ static class Header { static final byte[] DUMMY_HEADER_NO_CHECKSUM = new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM]; - public static final String BLOCK_DELIMIT_COMPRESSED = "hbase.block.delimit.compressed"; - public static final String MAX_BLOCK_SIZE_UNCOMPRESSED = "hbase.block.max.size.uncompressed"; /** @@ -734,15 +734,9 @@ private enum State { BLOCK_READY } - public boolean isDelimitByCompressedSize() { - return delimitByCompressedSize; - } - - private boolean delimitByCompressedSize; - private int maxSizeUnCompressed; - private int adjustedBlockSize; + private BlockCompressedSizePredicator compressedSizePredicator; /** Writer state. Used to ensure the correct usage protocol. */ private State state = State.INIT; @@ -822,12 +816,11 @@ EncodingState getEncodingState() { */ public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) { - this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP, false, - fileContext.getBlocksize()); + this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP, fileContext.getBlocksize()); } public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, - HFileContext fileContext, ByteBuffAllocator allocator, boolean sizeLimitcompleted, + HFileContext fileContext, ByteBuffAllocator allocator, int maxSizeUnCompressed) { if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) { throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is " @@ -851,7 +844,9 @@ public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, // TODO: Why fileContext saved away when we have dataBlockEncoder and/or // defaultDataBlockEncoder? this.fileContext = fileContext; - this.delimitByCompressedSize = sizeLimitcompleted; + this.compressedSizePredicator = (BlockCompressedSizePredicator) ReflectionUtils. + newInstance(conf.getClass(BLOCK_COMPRESSED_SIZE_PREDICATOR, + UncompressedBlockSizePredicator.class), new Configuration(conf)); this.maxSizeUnCompressed = maxSizeUnCompressed; } @@ -908,17 +903,9 @@ void ensureBlockReady() throws IOException { public boolean shouldFinishBlock() throws IOException { int uncompressedBlockSize = blockSizeWritten(); if (uncompressedBlockSize >= fileContext.getBlocksize()) { - if (delimitByCompressedSize && uncompressedBlockSize < maxSizeUnCompressed) { - // In order to avoid excessive compression size calculations, we do it only once when - // the uncompressed size has reached BLOCKSIZE. We then use this compression size to - // calculate the compression rate, and adjust the block size limit by this ratio. - if (adjustedBlockSize == 0 || uncompressedBlockSize >= adjustedBlockSize) { - int compressedSize = EncodedDataBlock.getCompressedSize(fileContext.getCompression(), - fileContext.getCompression().getCompressor(), baosInMemory.getBuffer(), 0, - baosInMemory.size()); - adjustedBlockSize = uncompressedBlockSize / compressedSize; - adjustedBlockSize *= fileContext.getBlocksize(); - } + if (uncompressedBlockSize < maxSizeUnCompressed) { + int adjustedBlockSize = compressedSizePredicator. + calculateCompressionSizeLimit(fileContext, uncompressedBlockSize, baosInMemory); return uncompressedBlockSize >= adjustedBlockSize; } return true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index 70b8b69eb444..a994e4c4c1a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.io.hfile; -import static org.apache.hadoop.hbase.io.hfile.HFileBlock.BLOCK_DELIMIT_COMPRESSED; import static org.apache.hadoop.hbase.io.hfile.HFileBlock.MAX_BLOCK_SIZE_UNCOMPRESSED; import java.io.DataOutput; @@ -295,7 +294,7 @@ protected void finishInit(final Configuration conf) { throw new IllegalStateException("finishInit called twice"); } blockWriter = new HFileBlock.Writer(conf, blockEncoder, hFileContext, - cacheConf.getByteBuffAllocator(), conf.getBoolean(BLOCK_DELIMIT_COMPRESSED, false), + cacheConf.getByteBuffAllocator(), conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 10)); // Data block index writer boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); @@ -323,9 +322,7 @@ protected void checkBlockBoundary() throws IOException { shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize() || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize(); } - if (blockWriter.isDelimitByCompressedSize()) { - shouldFinishBlock &= blockWriter.shouldFinishBlock(); - } + shouldFinishBlock &= blockWriter.shouldFinishBlock(); if (shouldFinishBlock) { finishBlock(); writeInlineBlocks(false); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PreviousBlockCompressionRatePredicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PreviousBlockCompressionRatePredicator.java new file mode 100644 index 000000000000..a37a9556595c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PreviousBlockCompressionRatePredicator.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import org.apache.hadoop.hbase.io.ByteArrayOutputStream; +import org.apache.hadoop.hbase.io.encoding.EncodedDataBlock; +import org.apache.yetus.audience.InterfaceAudience; + +import java.io.IOException; + +/** + * This BlockCompressedSizePredicator implementation adjusts the block size limit based on the + * compression rate of the block contents read so far. For the first block, adjusted size would be + * zero, so it performs a compression of current block contents and calculate compression rate and + * adjusted size. For subsequent blocks, it only performs this calculation once the previous block + * adjusted size has been reached, and the block is about to be closed. + */ +@InterfaceAudience.Private +public class PreviousBlockCompressionRatePredicator implements BlockCompressedSizePredicator { + + int adjustedBlockSize; + + /** + * Calculates an adjusted block size limit based on the compression rate of current block + * contents. This calculation is only performed if this is the first block, otherwise, if the + * adjusted size from previous block has been reached by the current one. + * @param context the meta file information for the current file. + * @param uncompressedBlockSize the total uncompressed size read for the block so far. + * @param contents The byte array containing the block content so far. + * @return the adjusted block size limit based on block compression rate. + * @throws IOException + */ + @Override + public int calculateCompressionSizeLimit(HFileContext context, int uncompressedBlockSize, + ByteArrayOutputStream contents) throws IOException { + // In order to avoid excessive compression size calculations, we do it only once when + // the uncompressed size has reached BLOCKSIZE. We then use this compression size to + // calculate the compression rate, and adjust the block size limit by this ratio. + if (adjustedBlockSize == 0 || uncompressedBlockSize >= adjustedBlockSize) { + int compressedSize = EncodedDataBlock.getCompressedSize(context.getCompression(), + context.getCompression().getCompressor(), contents.getBuffer(), 0, + contents.size()); + adjustedBlockSize = uncompressedBlockSize / compressedSize; + adjustedBlockSize *= context.getBlocksize(); + } + return adjustedBlockSize; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/UncompressedBlockSizePredicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/UncompressedBlockSizePredicator.java new file mode 100644 index 000000000000..0339fed57714 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/UncompressedBlockSizePredicator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import org.apache.hadoop.hbase.io.ByteArrayOutputStream; +import org.apache.yetus.audience.InterfaceAudience; + +import java.io.IOException; + +/** + * This BlockCompressedSizePredicator implementation doesn't actually performs any predicate + * and simply return the configured BLOCK_SIZE value, without any adjustments. This is the default + * implementation if hbase.block.compressed.size.predicator property is not defined. + */ +@InterfaceAudience.Private +public class UncompressedBlockSizePredicator implements BlockCompressedSizePredicator { + + /** + * Returns the configured BLOCK_SIZE as the block size limit, without applying any compression + * rate adjustments. + * @param context the meta file information for the current file. + * @param uncompressedBlockSize the total uncompressed size read for the block so far. + * @param blockContent The byte array containing the block content so far. + * @return the configured BLOCK_SIZE as the block size limit, without applying any compression + * rate adjustments. + * @throws IOException + */ + @Override + public int calculateCompressionSizeLimit(HFileContext context, int uncompressedBlockSize, + ByteArrayOutputStream blockContent) throws IOException { + return context.getBlocksize(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java index c87d4e1620a9..27aad6e339e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; -import static org.apache.hadoop.hbase.io.hfile.HFileBlock.BLOCK_DELIMIT_COMPRESSED; +import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -40,6 +40,8 @@ import java.util.OptionalLong; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -76,8 +78,10 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileInfo; import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.io.hfile.PreviousBlockCompressionRatePredicator; import org.apache.hadoop.hbase.io.hfile.ReaderContext; import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder; +import org.apache.hadoop.hbase.io.hfile.UncompressedBlockSizePredicator; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -1215,10 +1219,21 @@ public void testDataBlockSizeEncoded() throws Exception { @Test public void testDataBlockSizeCompressed() throws Exception { + conf.set(BLOCK_COMPRESSED_SIZE_PREDICATOR, PreviousBlockCompressionRatePredicator.class.getName()); + testDataBlockSizeWithCompressionRatePredicator(100, s -> s >= BLOCKSIZE_SMALL); + } + + @Test + public void testDataBlockSizeUnCompressed() throws Exception { + conf.set(BLOCK_COMPRESSED_SIZE_PREDICATOR, UncompressedBlockSizePredicator.class.getName()); + testDataBlockSizeWithCompressionRatePredicator(200, s -> s < BLOCKSIZE_SMALL); + } + + private void testDataBlockSizeWithCompressionRatePredicator(int expectedBlockCount, + Function validation) throws Exception { Path dir = new Path(new Path(this.testDir, "7e0102"), "familyname"); Path path = new Path(dir, "1234567890"); DataBlockEncoding dataBlockEncoderAlgo = DataBlockEncoding.FAST_DIFF; - conf.setBoolean(BLOCK_DELIMIT_COMPRESSED, true); cacheConf = new CacheConfig(conf); HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).withChecksumType(CKTYPE) @@ -1246,9 +1261,9 @@ public void testDataBlockSizeCompressed() throws Exception { /* isCompaction */ false, /* updateCacheMetrics */ false, null, null); offset += block.getOnDiskSizeWithHeader(); blockCount++; - assertTrue(block.getUncompressedSizeWithoutHeader() >= BLOCKSIZE_SMALL); + assertTrue(validation.apply(block.getUncompressedSizeWithoutHeader())); } - assertEquals(blockCount, 100); + assertEquals(blockCount, expectedBlockCount); } } From b9e191fd89b79895e5b8e709d795ef1aed9d3530 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Sat, 13 Aug 2022 23:36:24 +0100 Subject: [PATCH 07/10] Addressing Ankit's suggestions --- .../hfile/BlockCompressedSizePredicator.java | 16 ++++++++--- .../hadoop/hbase/io/hfile/HFileBlock.java | 9 +++--- .../hbase/io/hfile/HFileWriterImpl.java | 2 +- ...reviousBlockCompressionRatePredicator.java | 28 +++++++++++-------- .../UncompressedBlockSizePredicator.java | 15 +++++++--- .../hbase/regionserver/TestHStoreFile.java | 14 ++++++---- 6 files changed, 53 insertions(+), 31 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCompressedSizePredicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCompressedSizePredicator.java index 7520360600a9..c3defdf55c37 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCompressedSizePredicator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCompressedSizePredicator.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.io.hfile; -import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.yetus.audience.InterfaceAudience; import java.io.IOException; @@ -36,14 +35,23 @@ public interface BlockCompressedSizePredicator { String BLOCK_COMPRESSED_SIZE_PREDICATOR = "hbase.block.compressed.size.predicator"; + String MAX_BLOCK_SIZE_UNCOMPRESSED = "hbase.block.max.size.uncompressed"; + /** * Calculates an adjusted block size limit based on a compression rate predicate. * @param context the meta file information for the current file. * @param uncompressedBlockSize the total uncompressed size read for the block so far. - * @param blockContent The byte array containing the block content so far. * @return the adjusted block size limit based on a compression rate predicate. * @throws IOException */ - int calculateCompressionSizeLimit(HFileContext context, int uncompressedBlockSize, - ByteArrayOutputStream blockContent) throws IOException; + int calculateCompressionSizeLimit(HFileContext context, int uncompressedBlockSize) + throws IOException; + + /** + * Updates the predicator with both compressed and uncompressed sizes of latest block written. + * To be called once the block is finshed and flushed to disk after compression. + * @param uncompressed the uncompressed size of last block written. + * @param compressed the compressed size of last block written. + */ + void updateLatestBlockSizes(int uncompressed, int compressed); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 9c173e5afd9e..ca519f5988c4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -52,7 +52,6 @@ import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.encoding.EncodedDataBlock; import org.apache.hadoop.hbase.io.encoding.EncodingState; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; @@ -254,8 +253,6 @@ static class Header { static final byte[] DUMMY_HEADER_NO_CHECKSUM = new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM]; - public static final String MAX_BLOCK_SIZE_UNCOMPRESSED = "hbase.block.max.size.uncompressed"; - /** * Used deserializing blocks from Cache. * ++++++++++++++ @@ -915,11 +912,12 @@ void ensureBlockReady() throws IOException { } public boolean shouldFinishBlock() throws IOException { - int uncompressedBlockSize = blockSizeWritten(); +// int uncompressedBlockSize = blockSizeWritten(); + int uncompressedBlockSize = baosInMemory.size(); if (uncompressedBlockSize >= fileContext.getBlocksize()) { if (uncompressedBlockSize < maxSizeUnCompressed) { int adjustedBlockSize = compressedSizePredicator. - calculateCompressionSizeLimit(fileContext, uncompressedBlockSize, baosInMemory); + calculateCompressionSizeLimit(fileContext, uncompressedBlockSize); return uncompressedBlockSize >= adjustedBlockSize; } return true; @@ -1010,6 +1008,7 @@ private void putHeader(ByteBuff buff, int onDiskSize, int uncompressedSize, private void putHeader(ByteArrayOutputStream dest, int onDiskSize, int uncompressedSize, int onDiskDataSize) { putHeader(dest.getBuffer(), 0, onDiskSize, uncompressedSize, onDiskDataSize); + compressedSizePredicator.updateLatestBlockSizes(uncompressedSize, onDiskSize); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index a994e4c4c1a2..86951a1eb1ed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hbase.io.hfile; -import static org.apache.hadoop.hbase.io.hfile.HFileBlock.MAX_BLOCK_SIZE_UNCOMPRESSED; +import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED; import java.io.DataOutput; import java.io.DataOutputStream; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PreviousBlockCompressionRatePredicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PreviousBlockCompressionRatePredicator.java index a37a9556595c..2b67f591a8ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PreviousBlockCompressionRatePredicator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PreviousBlockCompressionRatePredicator.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.io.hfile; -import org.apache.hadoop.hbase.io.ByteArrayOutputStream; -import org.apache.hadoop.hbase.io.encoding.EncodedDataBlock; import org.apache.yetus.audience.InterfaceAudience; import java.io.IOException; @@ -33,7 +31,8 @@ @InterfaceAudience.Private public class PreviousBlockCompressionRatePredicator implements BlockCompressedSizePredicator { - int adjustedBlockSize; + private int adjustedBlockSize; + private int compressionRatio = 1; /** * Calculates an adjusted block size limit based on the compression rate of current block @@ -41,23 +40,28 @@ public class PreviousBlockCompressionRatePredicator implements BlockCompressedSi * adjusted size from previous block has been reached by the current one. * @param context the meta file information for the current file. * @param uncompressedBlockSize the total uncompressed size read for the block so far. - * @param contents The byte array containing the block content so far. * @return the adjusted block size limit based on block compression rate. * @throws IOException */ @Override - public int calculateCompressionSizeLimit(HFileContext context, int uncompressedBlockSize, - ByteArrayOutputStream contents) throws IOException { + public int calculateCompressionSizeLimit(HFileContext context, int uncompressedBlockSize) + throws IOException { // In order to avoid excessive compression size calculations, we do it only once when // the uncompressed size has reached BLOCKSIZE. We then use this compression size to // calculate the compression rate, and adjust the block size limit by this ratio. - if (adjustedBlockSize == 0 || uncompressedBlockSize >= adjustedBlockSize) { - int compressedSize = EncodedDataBlock.getCompressedSize(context.getCompression(), - context.getCompression().getCompressor(), contents.getBuffer(), 0, - contents.size()); - adjustedBlockSize = uncompressedBlockSize / compressedSize; - adjustedBlockSize *= context.getBlocksize(); + if (uncompressedBlockSize >= adjustedBlockSize) { + adjustedBlockSize = context.getBlocksize() * compressionRatio; } return adjustedBlockSize; } + + /** + * Recalculates compression rate for the last block. + * @param uncompressed the uncompressed size of last block written. + * @param compressed the compressed size of last block written. + */ + @Override + public void updateLatestBlockSizes(int uncompressed, int compressed) { + compressionRatio = uncompressed/compressed; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/UncompressedBlockSizePredicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/UncompressedBlockSizePredicator.java index 0339fed57714..1ea6824bf347 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/UncompressedBlockSizePredicator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/UncompressedBlockSizePredicator.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.io.hfile; -import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.yetus.audience.InterfaceAudience; import java.io.IOException; @@ -35,14 +34,22 @@ public class UncompressedBlockSizePredicator implements BlockCompressedSizePredi * rate adjustments. * @param context the meta file information for the current file. * @param uncompressedBlockSize the total uncompressed size read for the block so far. - * @param blockContent The byte array containing the block content so far. * @return the configured BLOCK_SIZE as the block size limit, without applying any compression * rate adjustments. * @throws IOException */ @Override - public int calculateCompressionSizeLimit(HFileContext context, int uncompressedBlockSize, - ByteArrayOutputStream blockContent) throws IOException { + public int calculateCompressionSizeLimit(HFileContext context, int uncompressedBlockSize) + throws IOException { return context.getBlocksize(); } + + /** + * Empty implementation. Does nothing. + * @param uncompressed the uncompressed size of last block written. + * @param compressed the compressed size of last block written. + */ + @Override + public void updateLatestBlockSizes(int uncompressed, int compressed) {} + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java index 27aad6e339e8..53858ea98812 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java @@ -40,6 +40,7 @@ import java.util.OptionalLong; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; @@ -1220,17 +1221,18 @@ public void testDataBlockSizeEncoded() throws Exception { @Test public void testDataBlockSizeCompressed() throws Exception { conf.set(BLOCK_COMPRESSED_SIZE_PREDICATOR, PreviousBlockCompressionRatePredicator.class.getName()); - testDataBlockSizeWithCompressionRatePredicator(100, s -> s >= BLOCKSIZE_SMALL); + testDataBlockSizeWithCompressionRatePredicator(11, (s,c) -> + (c > 2 && c < 11) ? s >= BLOCKSIZE_SMALL*10 : true ); } @Test public void testDataBlockSizeUnCompressed() throws Exception { conf.set(BLOCK_COMPRESSED_SIZE_PREDICATOR, UncompressedBlockSizePredicator.class.getName()); - testDataBlockSizeWithCompressionRatePredicator(200, s -> s < BLOCKSIZE_SMALL); + testDataBlockSizeWithCompressionRatePredicator(100, (s,c) -> s < BLOCKSIZE_SMALL*10); } private void testDataBlockSizeWithCompressionRatePredicator(int expectedBlockCount, - Function validation) throws Exception { + BiFunction validation) throws Exception { Path dir = new Path(new Path(this.testDir, "7e0102"), "familyname"); Path path = new Path(dir, "1234567890"); DataBlockEncoding dataBlockEncoderAlgo = DataBlockEncoding.FAST_DIFF; @@ -1261,9 +1263,11 @@ private void testDataBlockSizeWithCompressionRatePredicator(int expectedBlockCou /* isCompaction */ false, /* updateCacheMetrics */ false, null, null); offset += block.getOnDiskSizeWithHeader(); blockCount++; - assertTrue(validation.apply(block.getUncompressedSizeWithoutHeader())); + System.out.println(">>>> " + block.getUncompressedSizeWithoutHeader()); + System.out.println(">>>> " + blockCount); + assertTrue(validation.apply(block.getUncompressedSizeWithoutHeader(), blockCount)); } - assertEquals(blockCount, expectedBlockCount); + assertEquals(expectedBlockCount, blockCount); } } From 80b95f349ae468816a921f62648907196163a42d Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Mon, 15 Aug 2022 15:17:29 +0100 Subject: [PATCH 08/10] Fixing UT failures; Fixing spotless errors; Addressing Ankit's comments --- .../hfile/BlockCompressedSizePredicator.java | 42 ++++++++-------- .../hadoop/hbase/io/hfile/HFileBlock.java | 40 +++++++-------- .../hbase/io/hfile/HFileWriterImpl.java | 8 +-- ...reviousBlockCompressionRatePredicator.java | 49 +++++++++---------- .../UncompressedBlockSizePredicator.java | 32 +++++------- .../hbase/regionserver/TestHStoreFile.java | 15 +++--- 6 files changed, 87 insertions(+), 99 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCompressedSizePredicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCompressedSizePredicator.java index c3defdf55c37..a90e04fe5ad8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCompressedSizePredicator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCompressedSizePredicator.java @@ -19,16 +19,19 @@ import org.apache.yetus.audience.InterfaceAudience; -import java.io.IOException; - /** - * Allows for defining different compression rate predicates in implementing classes. Useful + * Allows for defining different compression rate predicates on its implementing classes. Useful * when compression is in place, and we want to define block size based on the compressed size, - * rather than the default behaviour that considers the uncompressed size only. - * - * Since we don't actually know the compressed size until we actual apply compression in the block - * byte buffer, we need to "predicate" this compression rate and minimize compression execution to - * avoid excessive resources usage. + * rather than the default behaviour that considers the uncompressed size only. Since we don't + * actually know the compressed size until we actual apply compression in the block byte buffer, we + * need to "predicate" this compression rate and minimize compression execution to avoid excessive + * resources usage. Different approaches for predicating the compressed block size can be defined by + * implementing classes. The updateLatestBlockSizes allows for updating uncompressed + * and compressed size values, and is called during block finishing (when we finally apply + * compression on the block data). Final block size predicate logic is implemented in + * shouldFinishBlock, which is called by the block writer once uncompressed size has + * reached the configured BLOCK size, and additional checks should be applied to decide if the block + * can be finished. */ @InterfaceAudience.Private public interface BlockCompressedSizePredicator { @@ -38,20 +41,19 @@ public interface BlockCompressedSizePredicator { String MAX_BLOCK_SIZE_UNCOMPRESSED = "hbase.block.max.size.uncompressed"; /** - * Calculates an adjusted block size limit based on a compression rate predicate. - * @param context the meta file information for the current file. - * @param uncompressedBlockSize the total uncompressed size read for the block so far. - * @return the adjusted block size limit based on a compression rate predicate. - * @throws IOException + * Updates the predicator with both compressed and uncompressed sizes of latest block written. To + * be called once the block is finshed and flushed to disk after compression. + * @param context the HFileContext containg the configured max block size. + * @param uncompressed the uncompressed size of last block written. + * @param compressed the compressed size of last block written. */ - int calculateCompressionSizeLimit(HFileContext context, int uncompressedBlockSize) - throws IOException; + void updateLatestBlockSizes(HFileContext context, int uncompressed, int compressed); /** - * Updates the predicator with both compressed and uncompressed sizes of latest block written. - * To be called once the block is finshed and flushed to disk after compression. - * @param uncompressed the uncompressed size of last block written. - * @param compressed the compressed size of last block written. + * Decides if the block should be finished based on the comparison of its uncompressed size + * against an adjusted size based on a predicated compression factor. + * @param uncompressed true if the block should be finished. n */ - void updateLatestBlockSizes(int uncompressed, int compressed); + boolean shouldFinishBlock(int uncompressed); + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index ca519f5988c4..8e04580874fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -18,12 +18,9 @@ package org.apache.hadoop.hbase.io.hfile; import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP; - import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR; - import static org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer.CONTEXT_KEY; - import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.trace.Span; @@ -831,8 +828,7 @@ public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, } public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, - HFileContext fileContext, ByteBuffAllocator allocator, - int maxSizeUnCompressed) { + HFileContext fileContext, ByteBuffAllocator allocator, int maxSizeUnCompressed) { if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) { throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " @@ -855,9 +851,9 @@ public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder, // TODO: Why fileContext saved away when we have dataBlockEncoder and/or // defaultDataBlockEncoder? this.fileContext = fileContext; - this.compressedSizePredicator = (BlockCompressedSizePredicator) ReflectionUtils. - newInstance(conf.getClass(BLOCK_COMPRESSED_SIZE_PREDICATOR, - UncompressedBlockSizePredicator.class), new Configuration(conf)); + this.compressedSizePredicator = (BlockCompressedSizePredicator) ReflectionUtils.newInstance( + conf.getClass(BLOCK_COMPRESSED_SIZE_PREDICATOR, UncompressedBlockSizePredicator.class), + new Configuration(conf)); this.maxSizeUnCompressed = maxSizeUnCompressed; } @@ -911,18 +907,13 @@ void ensureBlockReady() throws IOException { finishBlock(); } - public boolean shouldFinishBlock() throws IOException { -// int uncompressedBlockSize = blockSizeWritten(); - int uncompressedBlockSize = baosInMemory.size(); - if (uncompressedBlockSize >= fileContext.getBlocksize()) { - if (uncompressedBlockSize < maxSizeUnCompressed) { - int adjustedBlockSize = compressedSizePredicator. - calculateCompressionSizeLimit(fileContext, uncompressedBlockSize); - return uncompressedBlockSize >= adjustedBlockSize; - } + public boolean checkBoundariesWithPredicate() { + int rawBlockSize = encodedBlockSizeWritten(); + if (rawBlockSize >= maxSizeUnCompressed) { return true; + } else { + return compressedSizePredicator.shouldFinishBlock(rawBlockSize); } - return false; } /** @@ -939,6 +930,11 @@ private void finishBlock() throws IOException { userDataStream.flush(); prevOffset = prevOffsetByType[blockType.getId()]; + // We need to cache the unencoded/uncompressed size before changing the block state + int rawBlockSize = 0; + if (this.getEncodingState() != null) { + rawBlockSize = blockSizeWritten(); + } // We need to set state before we can package the block up for cache-on-write. In a way, the // block is ready, but not yet encoded or compressed. state = State.BLOCK_READY; @@ -959,6 +955,10 @@ private void finishBlock() throws IOException { onDiskBlockBytesWithHeader.reset(); onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(), compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength()); + // Update raw and compressed sizes in the predicate + compressedSizePredicator.updateLatestBlockSizes(fileContext, rawBlockSize, + onDiskBlockBytesWithHeader.size()); + // Calculate how many bytes we need for checksum on the tail of the block. int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(), fileContext.getBytesPerChecksum()); @@ -966,6 +966,7 @@ private void finishBlock() throws IOException { // Put the header for the on disk bytes; header currently is unfilled-out putHeader(onDiskBlockBytesWithHeader, onDiskBlockBytesWithHeader.size() + numBytes, baosInMemory.size(), onDiskBlockBytesWithHeader.size()); + if (onDiskChecksum.length != numBytes) { onDiskChecksum = new byte[numBytes]; } @@ -1008,7 +1009,6 @@ private void putHeader(ByteBuff buff, int onDiskSize, int uncompressedSize, private void putHeader(ByteArrayOutputStream dest, int onDiskSize, int uncompressedSize, int onDiskDataSize) { putHeader(dest.getBuffer(), 0, onDiskSize, uncompressedSize, onDiskDataSize); - compressedSizePredicator.updateLatestBlockSizes(uncompressedSize, onDiskSize); } /** @@ -1130,7 +1130,7 @@ public int encodedBlockSizeWritten() { * block at the moment. Note that this will return zero in the "block ready" state as well. * @return the number of bytes written */ - int blockSizeWritten() { + public int blockSizeWritten() { return state != State.WRITING ? 0 : this.getEncodingState().getUnencodedDataSizeWritten(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index 86951a1eb1ed..d58be5fd1ced 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -293,9 +293,9 @@ protected void finishInit(final Configuration conf) { if (blockWriter != null) { throw new IllegalStateException("finishInit called twice"); } - blockWriter = new HFileBlock.Writer(conf, blockEncoder, hFileContext, - cacheConf.getByteBuffAllocator(), - conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 10)); + blockWriter = + new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator(), + conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 10)); // Data block index writer boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter, @@ -322,7 +322,7 @@ protected void checkBlockBoundary() throws IOException { shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize() || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize(); } - shouldFinishBlock &= blockWriter.shouldFinishBlock(); + shouldFinishBlock &= blockWriter.checkBoundariesWithPredicate(); if (shouldFinishBlock) { finishBlock(); writeInlineBlocks(false); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PreviousBlockCompressionRatePredicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PreviousBlockCompressionRatePredicator.java index 2b67f591a8ab..be0ee3bb9a77 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PreviousBlockCompressionRatePredicator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PreviousBlockCompressionRatePredicator.java @@ -19,49 +19,44 @@ import org.apache.yetus.audience.InterfaceAudience; -import java.io.IOException; - /** * This BlockCompressedSizePredicator implementation adjusts the block size limit based on the * compression rate of the block contents read so far. For the first block, adjusted size would be * zero, so it performs a compression of current block contents and calculate compression rate and - * adjusted size. For subsequent blocks, it only performs this calculation once the previous block - * adjusted size has been reached, and the block is about to be closed. + * adjusted size. For subsequent blocks, decision whether the block should be finished or not will + * be based on the compression rate calculated for the previous block. */ @InterfaceAudience.Private public class PreviousBlockCompressionRatePredicator implements BlockCompressedSizePredicator { - + private int adjustedBlockSize; private int compressionRatio = 1; + private int configuredMaxBlockSize; /** - * Calculates an adjusted block size limit based on the compression rate of current block - * contents. This calculation is only performed if this is the first block, otherwise, if the - * adjusted size from previous block has been reached by the current one. - * @param context the meta file information for the current file. - * @param uncompressedBlockSize the total uncompressed size read for the block so far. - * @return the adjusted block size limit based on block compression rate. - * @throws IOException + * Recalculates compression rate for the last block and adjusts the block size limit as: + * BLOCK_SIZE * (uncompressed/compressed). + * @param context HFIleContext containing the configured max block size. + * @param uncompressed the uncompressed size of last block written. + * @param compressed the compressed size of last block written. */ - @Override - public int calculateCompressionSizeLimit(HFileContext context, int uncompressedBlockSize) - throws IOException { - // In order to avoid excessive compression size calculations, we do it only once when - // the uncompressed size has reached BLOCKSIZE. We then use this compression size to - // calculate the compression rate, and adjust the block size limit by this ratio. - if (uncompressedBlockSize >= adjustedBlockSize) { - adjustedBlockSize = context.getBlocksize() * compressionRatio; - } - return adjustedBlockSize; + @Override + public void updateLatestBlockSizes(HFileContext context, int uncompressed, int compressed) { + configuredMaxBlockSize = context.getBlocksize(); + compressionRatio = uncompressed / compressed; + adjustedBlockSize = context.getBlocksize() * compressionRatio; } /** - * Recalculates compression rate for the last block. - * @param uncompressed the uncompressed size of last block written. - * @param compressed the compressed size of last block written. + * Returns true if the passed uncompressed size is larger than the limit calculated by + * updateLatestBlockSizes. + * @param uncompressed true if the block should be finished. n */ @Override - public void updateLatestBlockSizes(int uncompressed, int compressed) { - compressionRatio = uncompressed/compressed; + public boolean shouldFinishBlock(int uncompressed) { + if (uncompressed >= configuredMaxBlockSize) { + return uncompressed >= adjustedBlockSize; + } + return false; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/UncompressedBlockSizePredicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/UncompressedBlockSizePredicator.java index 1ea6824bf347..c259375a97de 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/UncompressedBlockSizePredicator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/UncompressedBlockSizePredicator.java @@ -19,37 +19,31 @@ import org.apache.yetus.audience.InterfaceAudience; -import java.io.IOException; - /** - * This BlockCompressedSizePredicator implementation doesn't actually performs any predicate - * and simply return the configured BLOCK_SIZE value, without any adjustments. This is the default - * implementation if hbase.block.compressed.size.predicator property is not defined. + * This BlockCompressedSizePredicator implementation doesn't actually performs any predicate and + * simply returns true on shouldFinishBlock. This is the default implementation + * if hbase.block.compressed.size.predicator property is not defined. */ @InterfaceAudience.Private public class UncompressedBlockSizePredicator implements BlockCompressedSizePredicator { /** - * Returns the configured BLOCK_SIZE as the block size limit, without applying any compression - * rate adjustments. - * @param context the meta file information for the current file. - * @param uncompressedBlockSize the total uncompressed size read for the block so far. - * @return the configured BLOCK_SIZE as the block size limit, without applying any compression - * rate adjustments. - * @throws IOException + * Empty implementation. Does nothing. + * @param uncompressed the uncompressed size of last block written. + * @param compressed the compressed size of last block written. */ @Override - public int calculateCompressionSizeLimit(HFileContext context, int uncompressedBlockSize) - throws IOException { - return context.getBlocksize(); + public void updateLatestBlockSizes(HFileContext context, int uncompressed, int compressed) { } /** - * Empty implementation. Does nothing. - * @param uncompressed the uncompressed size of last block written. - * @param compressed the compressed size of last block written. + * Dummy implementation that always returns true. This means, we will be only considering the + * block uncompressed size for deciding when to finish a block. + * @param uncompressed true if the block should be finished. n */ @Override - public void updateLatestBlockSizes(int uncompressed, int compressed) {} + public boolean shouldFinishBlock(int uncompressed) { + return true; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java index 53858ea98812..d71b33e82d5a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java @@ -41,8 +41,6 @@ import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; -import java.util.function.Function; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -1220,19 +1218,20 @@ public void testDataBlockSizeEncoded() throws Exception { @Test public void testDataBlockSizeCompressed() throws Exception { - conf.set(BLOCK_COMPRESSED_SIZE_PREDICATOR, PreviousBlockCompressionRatePredicator.class.getName()); - testDataBlockSizeWithCompressionRatePredicator(11, (s,c) -> - (c > 2 && c < 11) ? s >= BLOCKSIZE_SMALL*10 : true ); + conf.set(BLOCK_COMPRESSED_SIZE_PREDICATOR, + PreviousBlockCompressionRatePredicator.class.getName()); + testDataBlockSizeWithCompressionRatePredicator(11, + (s, c) -> (c > 1 && c < 11) ? s >= BLOCKSIZE_SMALL * 10 : true); } @Test public void testDataBlockSizeUnCompressed() throws Exception { conf.set(BLOCK_COMPRESSED_SIZE_PREDICATOR, UncompressedBlockSizePredicator.class.getName()); - testDataBlockSizeWithCompressionRatePredicator(100, (s,c) -> s < BLOCKSIZE_SMALL*10); + testDataBlockSizeWithCompressionRatePredicator(200, (s, c) -> s < BLOCKSIZE_SMALL * 10); } private void testDataBlockSizeWithCompressionRatePredicator(int expectedBlockCount, - BiFunction validation) throws Exception { + BiFunction validation) throws Exception { Path dir = new Path(new Path(this.testDir, "7e0102"), "familyname"); Path path = new Path(dir, "1234567890"); DataBlockEncoding dataBlockEncoderAlgo = DataBlockEncoding.FAST_DIFF; @@ -1263,8 +1262,6 @@ private void testDataBlockSizeWithCompressionRatePredicator(int expectedBlockCou /* isCompaction */ false, /* updateCacheMetrics */ false, null, null); offset += block.getOnDiskSizeWithHeader(); blockCount++; - System.out.println(">>>> " + block.getUncompressedSizeWithoutHeader()); - System.out.println(">>>> " + blockCount); assertTrue(validation.apply(block.getUncompressedSizeWithoutHeader(), blockCount)); } assertEquals(expectedBlockCount, blockCount); From 5a1c552c1cd0e571bf4e84018595d38bbf657ebc Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Mon, 15 Aug 2022 15:19:52 +0100 Subject: [PATCH 09/10] reverting uneeded modifier change --- .../main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 8e04580874fe..128e55a87415 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -465,7 +465,7 @@ int getOnDiskSizeWithoutHeader() { } /** Returns the uncompressed size of data part (header and checksum excluded). */ - public int getUncompressedSizeWithoutHeader() { + int getUncompressedSizeWithoutHeader() { return uncompressedSizeWithoutHeader; } From 4136f57f2c6ca6b197f429bb51098b66be2023d2 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Mon, 15 Aug 2022 15:30:28 +0100 Subject: [PATCH 10/10] reverting reverting uneeded modifier change --- .../main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 128e55a87415..8e04580874fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -465,7 +465,7 @@ int getOnDiskSizeWithoutHeader() { } /** Returns the uncompressed size of data part (header and checksum excluded). */ - int getUncompressedSizeWithoutHeader() { + public int getUncompressedSizeWithoutHeader() { return uncompressedSizeWithoutHeader; }