Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ static class Header {
* {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
* @see Writer#putHeader(byte[], int, int, int, int)
*/
private int onDiskDataSizeWithHeader;
private final int onDiskDataSizeWithHeader;
// End of Block Header fields.

/**
Expand All @@ -188,13 +188,15 @@ static class Header {
* ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache. So,
* we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be good if
* could be confined to cache-use only but hard-to-do.
* <p>
* NOTE: this byteBuff including HFileBlock header and data, but excluding checksum.
*/
private ByteBuff buf;
private ByteBuff bufWithoutChecksum;

/**
* Meta data that holds meta information on the hfileblock.
*/
private HFileContext fileContext;
private final HFileContext fileContext;

/**
* The offset of this block in the file. Populated by the reader for convenience of access. This
Expand Down Expand Up @@ -296,6 +298,8 @@ public int getDeserializerIdentifier() {
CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER);
}

private final int totalChecksumBytes;

/**
* Creates a new {@link HFile} block from the given fields. This constructor is used only while
* writing blocks and caching, and is sitting in a byte buffer and we want to stuff the block into
Expand Down Expand Up @@ -332,11 +336,12 @@ public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
this.nextBlockOnDiskSize = nextBlockOnDiskSize;
this.fileContext = fileContext;
this.allocator = allocator;
this.buf = buf;
this.bufWithoutChecksum = buf;
if (fillHeader) {
overwriteHeader();
}
this.buf.rewind();
this.bufWithoutChecksum.rewind();
this.totalChecksumBytes = computeTotalChecksumBytes();
}

/**
Expand Down Expand Up @@ -411,12 +416,12 @@ public BlockType getBlockType() {

@Override
public int refCnt() {
return buf.refCnt();
return bufWithoutChecksum.refCnt();
}

@Override
public HFileBlock retain() {
buf.retain();
bufWithoutChecksum.retain();
return this;
}

Expand All @@ -426,7 +431,7 @@ public HFileBlock retain() {
*/
@Override
public boolean release() {
return buf.release();
return bufWithoutChecksum.release();
}

/**
Expand All @@ -441,7 +446,7 @@ public HFileBlock touch() {

@Override
public HFileBlock touch(Object hint) {
buf.touch(hint);
bufWithoutChecksum.touch(hint);
return this;
}

Expand All @@ -451,7 +456,7 @@ short getDataBlockEncodingId() {
throw new IllegalArgumentException("Querying encoder ID of a block " + "of type other than "
+ BlockType.ENCODED_DATA + ": " + blockType);
}
return buf.getShort(headerSize());
return bufWithoutChecksum.getShort(headerSize());
}

/** Returns the on-disk size of header + data part + checksum. */
Expand Down Expand Up @@ -479,15 +484,15 @@ long getPrevBlockOffset() {
* side-effect.
*/
private void overwriteHeader() {
buf.rewind();
blockType.write(buf);
buf.putInt(onDiskSizeWithoutHeader);
buf.putInt(uncompressedSizeWithoutHeader);
buf.putLong(prevBlockOffset);
bufWithoutChecksum.rewind();
blockType.write(bufWithoutChecksum);
bufWithoutChecksum.putInt(onDiskSizeWithoutHeader);
bufWithoutChecksum.putInt(uncompressedSizeWithoutHeader);
bufWithoutChecksum.putLong(prevBlockOffset);
if (this.fileContext.isUseHBaseChecksum()) {
buf.put(fileContext.getChecksumType().getCode());
buf.putInt(fileContext.getBytesPerChecksum());
buf.putInt(onDiskDataSizeWithHeader);
bufWithoutChecksum.put(fileContext.getChecksumType().getCode());
bufWithoutChecksum.putInt(fileContext.getBytesPerChecksum());
bufWithoutChecksum.putInt(onDiskDataSizeWithHeader);
}
}

Expand All @@ -507,11 +512,12 @@ public ByteBuff getBufferWithoutHeader() {
* in {@link CompoundBloomFilter} to avoid object creation on every Bloom filter lookup, but has
* to be used with caution. Buffer holds header, block content, and any follow-on checksums if
* present.
* @return the buffer of this block for read-only operations
* @return the buffer of this block for read-only operations,the buffer includes header,but not
* checksum.
*/
public ByteBuff getBufferReadOnly() {
// TODO: ByteBuf does not support asReadOnlyBuffer(). Fix.
ByteBuff dup = this.buf.duplicate();
ByteBuff dup = this.bufWithoutChecksum.duplicate();
assert dup.position() == 0;
return dup;
}
Expand Down Expand Up @@ -545,7 +551,7 @@ private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromFie
*/
void sanityCheck() throws IOException {
// Duplicate so no side-effects
ByteBuff dup = this.buf.duplicate().rewind();
ByteBuff dup = this.bufWithoutChecksum.duplicate().rewind();
sanityCheckAssertion(BlockType.read(dup), blockType);

sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader");
Expand Down Expand Up @@ -588,18 +594,19 @@ public String toString() {
.append(", prevBlockOffset=").append(prevBlockOffset).append(", isUseHBaseChecksum=")
.append(fileContext.isUseHBaseChecksum());
if (fileContext.isUseHBaseChecksum()) {
sb.append(", checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
.append(", bytesPerChecksum=").append(this.buf.getInt(24 + 1))
sb.append(", checksumType=").append(ChecksumType.codeToType(this.bufWithoutChecksum.get(24)))
.append(", bytesPerChecksum=").append(this.bufWithoutChecksum.getInt(24 + 1))
.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
} else {
sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader).append("(")
.append(onDiskSizeWithoutHeader).append("+")
.append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
}
String dataBegin;
if (buf.hasArray()) {
dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
if (bufWithoutChecksum.hasArray()) {
dataBegin = Bytes.toStringBinary(bufWithoutChecksum.array(),
bufWithoutChecksum.arrayOffset() + headerSize(),
Math.min(32, bufWithoutChecksum.limit() - bufWithoutChecksum.arrayOffset() - headerSize()));
} else {
ByteBuff bufWithoutHeader = getBufferWithoutHeader();
byte[] dataBeginBytes =
Expand All @@ -609,8 +616,8 @@ public String toString() {
}
sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader())
.append(", totalChecksumBytes=").append(totalChecksumBytes()).append(", isUnpacked=")
.append(isUnpacked()).append(", buf=[").append(buf).append("]").append(", dataBeginsWith=")
.append(dataBegin).append(", fileContext=").append(fileContext)
.append(isUnpacked()).append(", buf=[").append(bufWithoutChecksum).append("]")
.append(", dataBeginsWith=").append(dataBegin).append(", fileContext=").append(fileContext)
.append(", nextBlockOnDiskSize=").append(nextBlockOnDiskSize).append("]");
return sb.toString();
}
Expand Down Expand Up @@ -639,7 +646,7 @@ HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException
: reader.getDefaultBlockDecodingContext();
// Create a duplicated buffer without the header part.
int headerSize = this.headerSize();
ByteBuff dup = this.buf.duplicate();
ByteBuff dup = this.bufWithoutChecksum.duplicate();
dup.position(headerSize);
dup = dup.slice();
// Decode the dup into unpacked#buf
Expand All @@ -662,7 +669,7 @@ private ByteBuff allocateBufferForUnpacking() {
int headerSize = headerSize();
int capacityNeeded = headerSize + uncompressedSizeWithoutHeader;

ByteBuff source = buf.duplicate();
ByteBuff source = bufWithoutChecksum.duplicate();
ByteBuff newBuf = allocator.allocate(capacityNeeded);

// Copy header bytes into newBuf.
Expand All @@ -681,7 +688,7 @@ private ByteBuff allocateBufferForUnpacking() {
public boolean isUnpacked() {
final int headerSize = headerSize();
final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader;
final int bufCapacity = buf.remaining();
final int bufCapacity = bufWithoutChecksum.remaining();
return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
}

Expand All @@ -697,9 +704,9 @@ long getOffset() {
return offset;
}

/** Returns a byte stream reading the data + checksum of this block */
/** Returns a byte stream reading the data(excluding header and checksum) of this block */
DataInputStream getByteStream() {
ByteBuff dup = this.buf.duplicate();
ByteBuff dup = this.bufWithoutChecksum.duplicate();
dup.position(this.headerSize());
return new DataInputStream(new ByteBuffInputStream(dup));
}
Expand All @@ -708,9 +715,9 @@ DataInputStream getByteStream() {
public long heapSize() {
long size = FIXED_OVERHEAD;
size += fileContext.heapSize();
if (buf != null) {
if (bufWithoutChecksum != null) {
// Deep overhead of the byte buffer. Needs to be aligned separately.
size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE);
size += ClassSize.align(bufWithoutChecksum.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE);
}
return ClassSize.align(size);
}
Expand Down Expand Up @@ -1848,17 +1855,17 @@ void sanityCheckUncompressed() throws IOException {
// Cacheable implementation
@Override
public int getSerializedLength() {
if (buf != null) {
if (bufWithoutChecksum != null) {
// Include extra bytes for block metadata.
return this.buf.limit() + BLOCK_METADATA_SPACE;
return this.bufWithoutChecksum.limit() + BLOCK_METADATA_SPACE;
}
return 0;
}

// Cacheable implementation
@Override
public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) {
this.buf.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE);
this.bufWithoutChecksum.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE);
destination = addMetaData(destination, includeNextBlockMetadata);

// Make it ready for reading. flip sets position to zero and limit to current position which
Expand Down Expand Up @@ -1904,7 +1911,7 @@ public int hashCode() {
result = result * 31 + onDiskSizeWithoutHeader;
result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32));
result = result * 31 + uncompressedSizeWithoutHeader;
result = result * 31 + buf.hashCode();
result = result * 31 + bufWithoutChecksum.hashCode();
return result;
}

Expand Down Expand Up @@ -1942,8 +1949,8 @@ public boolean equals(Object comparison) {
return false;
}
if (
ByteBuff.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
castedComparison.buf.limit()) != 0
ByteBuff.compareTo(this.bufWithoutChecksum, 0, this.bufWithoutChecksum.limit(),
castedComparison.bufWithoutChecksum, 0, castedComparison.bufWithoutChecksum.limit()) != 0
) {
return false;
}
Expand Down Expand Up @@ -1971,10 +1978,17 @@ int getOnDiskDataSizeWithHeader() {
}

/**
* Calculate the number of bytes required to store all the checksums for this block. Each checksum
* value is a 4 byte integer.
* Return the number of bytes required to store all the checksums for this block. Each checksum
* value is a 4 byte integer. <br/>
* NOTE: ByteBuff returned by {@link HFileBlock#getBufferWithoutHeader()} and
* {@link HFileBlock#getBufferReadOnly} or DataInputStream returned by
* {@link HFileBlock#getByteStream()} does not include checksum.
*/
int totalChecksumBytes() {
return totalChecksumBytes;
}

private int computeTotalChecksumBytes() {
// If the hfile block has minorVersion 0, then there are no checksum
// data to validate. Similarly, a zero value in this.bytesPerChecksum
// indicates that cached blocks do not have checksum data because
Expand Down Expand Up @@ -2071,7 +2085,8 @@ private static HFileBlock shallowClone(HFileBlock blk, ByteBuff newBuf) {
}

static HFileBlock deepCloneOnHeap(HFileBlock blk) {
ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
ByteBuff deepCloned = ByteBuff
.wrap(ByteBuffer.wrap(blk.bufWithoutChecksum.toBytes(0, blk.bufWithoutChecksum.limit())));
return createBuilder(blk, deepCloned).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -883,10 +883,10 @@ public DataInputStream readRootIndex(HFileBlock blk, final int numEntries) throw
*/
public void readMultiLevelIndexRoot(HFileBlock blk, final int numEntries) throws IOException {
DataInputStream in = readRootIndex(blk, numEntries);
// after reading the root index the checksum bytes have to
// be subtracted to know if the mid key exists.
int checkSumBytes = blk.totalChecksumBytes();
if ((in.available() - checkSumBytes) < MID_KEY_METADATA_SIZE) {
// HFileBlock.getByteStream() returns a byte stream for reading the data(excluding checksum)
// of root index block, so after reading the root index there is no need to subtract the
// checksum bytes.
if (in.available() < MID_KEY_METADATA_SIZE) {
// No mid-key metadata available.
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,10 @@ public void initRootIndex(HFileBlock blk, int numEntries, CellComparator compara

private void init(HFileBlock blk, int numEntries) throws IOException {
DataInputStream in = readRootIndex(blk, numEntries);
// after reading the root index the checksum bytes have to
// be subtracted to know if the mid key exists.
int checkSumBytes = blk.totalChecksumBytes();
if ((in.available() - checkSumBytes) < MID_KEY_METADATA_SIZE) {
// HFileBlock.getByteStream() returns a byte stream for reading the data(excluding checksum)
// of root index block, so after reading the root index there is no need to subtract the
// checksum bytes.
if (in.available() < MID_KEY_METADATA_SIZE) {
// No mid-key metadata available.
return;
}
Expand Down
Loading