1818package org .apache .hadoop .hbase .io .hfile ;
1919
2020import static org .apache .hadoop .hbase .io .ByteBuffAllocator .HEAP ;
21+ import static org .apache .hadoop .hbase .io .hfile .BlockCompressedSizePredicator .BLOCK_COMPRESSED_SIZE_PREDICATOR ;
2122import static org .apache .hadoop .hbase .io .hfile .trace .HFileContextAttributesBuilderConsumer .CONTEXT_KEY ;
2223
2324import io .opentelemetry .api .common .Attributes ;
6465import org .apache .hadoop .hbase .util .ChecksumType ;
6566import org .apache .hadoop .hbase .util .ClassSize ;
6667import org .apache .hadoop .hbase .util .EnvironmentEdgeManager ;
68+ import org .apache .hadoop .util .ReflectionUtils ;
6769import org .apache .yetus .audience .InterfaceAudience ;
6870import org .slf4j .Logger ;
6971import org .slf4j .LoggerFactory ;
@@ -463,7 +465,7 @@ int getOnDiskSizeWithoutHeader() {
463465 }
464466
465467 /** Returns the uncompressed size of data part (header and checksum excluded). */
466- int getUncompressedSizeWithoutHeader () {
468+ public int getUncompressedSizeWithoutHeader () {
467469 return uncompressedSizeWithoutHeader ;
468470 }
469471
@@ -740,6 +742,10 @@ private enum State {
740742 BLOCK_READY
741743 };
742744
745+ private int maxSizeUnCompressed ;
746+
747+ private BlockCompressedSizePredicator compressedSizePredicator ;
748+
743749 /** Writer state. Used to ensure the correct usage protocol. */
744750 private State state = State .INIT ;
745751
@@ -818,11 +824,11 @@ EncodingState getEncodingState() {
818824 */
819825 public Writer (Configuration conf , HFileDataBlockEncoder dataBlockEncoder ,
820826 HFileContext fileContext ) {
821- this (conf , dataBlockEncoder , fileContext , ByteBuffAllocator .HEAP );
827+ this (conf , dataBlockEncoder , fileContext , ByteBuffAllocator .HEAP , fileContext . getBlocksize () );
822828 }
823829
824830 public Writer (Configuration conf , HFileDataBlockEncoder dataBlockEncoder ,
825- HFileContext fileContext , ByteBuffAllocator allocator ) {
831+ HFileContext fileContext , ByteBuffAllocator allocator , int maxSizeUnCompressed ) {
826832 if (fileContext .getBytesPerChecksum () < HConstants .HFILEBLOCK_HEADER_SIZE ) {
827833 throw new RuntimeException ("Unsupported value of bytesPerChecksum. " + " Minimum is "
828834 + HConstants .HFILEBLOCK_HEADER_SIZE + " but the configured value is "
@@ -845,6 +851,10 @@ public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
845851 // TODO: Why fileContext saved away when we have dataBlockEncoder and/or
846852 // defaultDataBlockEncoder?
847853 this .fileContext = fileContext ;
854+ this .compressedSizePredicator = (BlockCompressedSizePredicator ) ReflectionUtils .newInstance (
855+ conf .getClass (BLOCK_COMPRESSED_SIZE_PREDICATOR , UncompressedBlockSizePredicator .class ),
856+ new Configuration (conf ));
857+ this .maxSizeUnCompressed = maxSizeUnCompressed ;
848858 }
849859
850860 /**
@@ -897,6 +907,15 @@ void ensureBlockReady() throws IOException {
897907 finishBlock ();
898908 }
899909
910+ public boolean checkBoundariesWithPredicate () {
911+ int rawBlockSize = encodedBlockSizeWritten ();
912+ if (rawBlockSize >= maxSizeUnCompressed ) {
913+ return true ;
914+ } else {
915+ return compressedSizePredicator .shouldFinishBlock (rawBlockSize );
916+ }
917+ }
918+
900919 /**
901920 * Finish up writing of the block. Flushes the compressing stream (if using compression), fills
902921 * out the header, does any compression/encryption of bytes to flush out to disk, and manages
@@ -911,6 +930,11 @@ private void finishBlock() throws IOException {
911930 userDataStream .flush ();
912931 prevOffset = prevOffsetByType [blockType .getId ()];
913932
933+ // We need to cache the unencoded/uncompressed size before changing the block state
934+ int rawBlockSize = 0 ;
935+ if (this .getEncodingState () != null ) {
936+ rawBlockSize = blockSizeWritten ();
937+ }
914938 // We need to set state before we can package the block up for cache-on-write. In a way, the
915939 // block is ready, but not yet encoded or compressed.
916940 state = State .BLOCK_READY ;
@@ -931,13 +955,18 @@ private void finishBlock() throws IOException {
931955 onDiskBlockBytesWithHeader .reset ();
932956 onDiskBlockBytesWithHeader .write (compressAndEncryptDat .get (),
933957 compressAndEncryptDat .getOffset (), compressAndEncryptDat .getLength ());
958+ // Update raw and compressed sizes in the predicate
959+ compressedSizePredicator .updateLatestBlockSizes (fileContext , rawBlockSize ,
960+ onDiskBlockBytesWithHeader .size ());
961+
934962 // Calculate how many bytes we need for checksum on the tail of the block.
935963 int numBytes = (int ) ChecksumUtil .numBytes (onDiskBlockBytesWithHeader .size (),
936964 fileContext .getBytesPerChecksum ());
937965
938966 // Put the header for the on disk bytes; header currently is unfilled-out
939967 putHeader (onDiskBlockBytesWithHeader , onDiskBlockBytesWithHeader .size () + numBytes ,
940968 baosInMemory .size (), onDiskBlockBytesWithHeader .size ());
969+
941970 if (onDiskChecksum .length != numBytes ) {
942971 onDiskChecksum = new byte [numBytes ];
943972 }
@@ -1077,7 +1106,7 @@ int getUncompressedSizeWithoutHeader() {
10771106 /**
10781107 * The uncompressed size of the block data, including header size.
10791108 */
1080- int getUncompressedSizeWithHeader () {
1109+ public int getUncompressedSizeWithHeader () {
10811110 expectState (State .BLOCK_READY );
10821111 return baosInMemory .size ();
10831112 }
@@ -1101,7 +1130,7 @@ public int encodedBlockSizeWritten() {
11011130 * block at the moment. Note that this will return zero in the "block ready" state as well.
11021131 * @return the number of bytes written
11031132 */
1104- int blockSizeWritten () {
1133+ public int blockSizeWritten () {
11051134 return state != State .WRITING ? 0 : this .getEncodingState ().getUnencodedDataSizeWritten ();
11061135 }
11071136
0 commit comments