Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,12 @@ public void clean() {
}
}
}
this.usedBufCount.set(0);
this.maxPoolSizeInfoLevelLogged = false;
this.poolAllocationBytes.reset();
this.heapAllocationBytes.reset();
this.lastPoolAllocationBytes = 0;
this.lastHeapAllocationBytes = 0;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;

import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.yetus.audience.InterfaceAudience;

/**
* The {@link ByteBuffAllocator} won't allocate pooled heap {@link ByteBuff} now; at the same time,
* if allocate an off-heap {@link ByteBuff} from allocator, then it must be a pooled one. That's to
* say, an exclusive memory HFileBlock would must be an heap block and a shared memory HFileBlock
* would must be an off-heap block.
* <p>
* The exclusive memory HFileBlock will do nothing when calling retain or release methods, because
* its memory will be garbage collected by JVM, even if its reference count decrease to zero, we can
* do nothing for the de-allocating.
* <p>
* @see org.apache.hadoop.hbase.io.hfile.SharedMemHFileBlock
*/
@InterfaceAudience.Private
public class ExclusiveMemHFileBlock extends HFileBlock {

ExclusiveMemHFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
HFileContext fileContext) {
super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, buf,
fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext,
ByteBuffAllocator.HEAP);
}

@Override
public int refCnt() {
return 0;
}

@Override
public ExclusiveMemHFileBlock retain() {
// do nothing
return this;
}

@Override
public boolean release() {
// do nothing
return false;
}

@Override
public boolean isSharedMem() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc)
boolean usesChecksum = buf.get() == (byte) 1;
long offset = buf.getLong();
int nextBlockOnDiskSize = buf.getInt();
return new HFileBlock(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
return createFromBuff(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
}

@Override
Expand All @@ -300,28 +300,6 @@ public int getDeserializerIdentifier() {
CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER);
}

/**
* Copy constructor. Creates a shallow copy of {@code that}'s buffer.
*/
private HFileBlock(HFileBlock that) {
this(that, false);
}

/**
* Copy constructor. Creates a shallow/deep copy of {@code that}'s buffer as per the boolean
* param.
*/
private HFileBlock(HFileBlock that, boolean bufCopy) {
init(that.blockType, that.onDiskSizeWithoutHeader, that.uncompressedSizeWithoutHeader,
that.prevBlockOffset, that.offset, that.onDiskDataSizeWithHeader, that.nextBlockOnDiskSize,
that.fileContext, that.allocator);
if (bufCopy) {
this.buf = ByteBuff.wrap(ByteBuffer.wrap(that.buf.toBytes(0, that.buf.limit())));
} else {
this.buf = that.buf.duplicate();
}
}

/**
* Creates a new {@link HFile} block from the given fields. This constructor
* is used only while writing blocks and caching,
Expand All @@ -336,20 +314,27 @@ private HFileBlock(HFileBlock that, boolean bufCopy) {
* @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
* @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
* @param prevBlockOffset see {@link #prevBlockOffset}
* @param b block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
* @param buf block buffer with header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
* @param fillHeader when true, write the first 4 header fields into passed buffer.
* @param offset the file offset the block was read from
* @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
* @param fileContext HFile meta data
*/
@VisibleForTesting
public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
long offset, final int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
HFileContext fileContext, ByteBuffAllocator allocator) {
init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext, allocator);
this.buf = new SingleByteBuff(b);
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext,
ByteBuffAllocator allocator) {
this.blockType = blockType;
this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
this.prevBlockOffset = prevBlockOffset;
this.offset = offset;
this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
this.nextBlockOnDiskSize = nextBlockOnDiskSize;
this.fileContext = fileContext;
this.allocator = allocator;
this.buf = buf;
if (fillHeader) {
overwriteHeader();
}
Expand All @@ -363,7 +348,7 @@ public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
* to that point.
* @param buf Has header, content, and trailing checksums if present.
*/
HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator)
throws IOException {
buf.rewind();
Expand All @@ -374,15 +359,15 @@ public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
// This constructor is called when we deserialize a block from cache and when we read a block in
// from the fs. fileCache is null when deserialized from cache so need to make up one.
HFileContextBuilder fileContextBuilder = fileContext != null?
new HFileContextBuilder(fileContext): new HFileContextBuilder();
HFileContextBuilder fileContextBuilder =
fileContext != null ? new HFileContextBuilder(fileContext) : new HFileContextBuilder();
fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum);
int onDiskDataSizeWithHeader;
if (usesHBaseChecksum) {
byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX);
int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX);
onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
// Use the checksum type and bytes per checksum from header, not from filecontext.
// Use the checksum type and bytes per checksum from header, not from fileContext.
fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType));
fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum);
} else {
Expand All @@ -393,29 +378,19 @@ public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
}
fileContext = fileContextBuilder.build();
assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext, allocator);
this.offset = offset;
this.buf = buf;
this.buf.rewind();
}

/**
* Called from constructors.
*/
private void init(BlockType blockType, int onDiskSizeWithoutHeader,
int uncompressedSizeWithoutHeader, long prevBlockOffset, long offset,
int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize, HFileContext fileContext,
ByteBuffAllocator allocator) {
this.blockType = blockType;
this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
this.prevBlockOffset = prevBlockOffset;
this.offset = offset;
this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
this.nextBlockOnDiskSize = nextBlockOnDiskSize;
this.fileContext = fileContext;
this.allocator = allocator;
return new HFileBlockBuilder()
.withBlockType(blockType)
.withOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader)
.withUncompressedSizeWithoutHeader(uncompressedSizeWithoutHeader)
.withPrevBlockOffset(prevBlockOffset)
.withOffset(offset)
.withOnDiskDataSizeWithHeader(onDiskDataSizeWithHeader)
.withNextBlockOnDiskSize(nextBlockOnDiskSize)
.withHFileContext(fileContext)
.withByteBuffAllocator(allocator)
.withByteBuff(buf.rewind())
.withShared(!buf.hasArray())
.build();
}

/**
Expand Down Expand Up @@ -639,7 +614,7 @@ public String toString() {
.append("(").append(onDiskSizeWithoutHeader)
.append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
}
String dataBegin = null;
String dataBegin;
if (buf.hasArray()) {
dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
Expand Down Expand Up @@ -673,7 +648,7 @@ HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException
return this;
}

HFileBlock unpacked = new HFileBlock(this);
HFileBlock unpacked = shallowClone(this);
unpacked.allocateBuffer(); // allocates space for the decompressed block
boolean succ = false;
try {
Expand Down Expand Up @@ -761,10 +736,16 @@ public long heapSize() {
}

/**
* @return true to indicate the block is allocated from JVM heap, otherwise from off-heap.
* Will be override by {@link SharedMemHFileBlock} or {@link ExclusiveMemHFileBlock}. Return true
* by default.
*/
boolean isOnHeap() {
return buf.hasArray();
public boolean isSharedMem() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still we create HFileBlock only in tests? If not we could have left this API abstract only and even HFileBlock. That would have been best. May be in a follow up we can do that. Just renaming will be enough. In this jira keep it this way only. Fine.

if (this instanceof SharedMemHFileBlock) {
return true;
} else if (this instanceof ExclusiveMemHFileBlock) {
return false;
}
return true;
}

/**
Expand Down Expand Up @@ -1039,8 +1020,7 @@ void writeHeaderAndData(FSDataOutputStream out) throws IOException {
+ offset);
}
startOffset = offset;

finishBlockAndWriteHeaderAndData((DataOutputStream) out);
finishBlockAndWriteHeaderAndData(out);
}

/**
Expand Down Expand Up @@ -1251,13 +1231,27 @@ HFileBlock getBlockForCaching(CacheConfig cacheConf) {
.withIncludesMvcc(fileContext.isIncludesMvcc())
.withIncludesTags(fileContext.isIncludesTags())
.build();
return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
getUncompressedSizeWithoutHeader(), prevOffset,
cacheConf.shouldCacheCompressed(blockType.getCategory()) ? cloneOnDiskBufferWithHeader()
: cloneUncompressedBufferWithHeader(),
FILL_HEADER, startOffset, UNSET,
onDiskBlockBytesWithHeader.size() + onDiskChecksum.length, newContext,
cacheConf.getByteBuffAllocator());
// Build the HFileBlock.
HFileBlockBuilder builder = new HFileBlockBuilder();
ByteBuffer buffer;
if (cacheConf.shouldCacheCompressed(blockType.getCategory())) {
buffer = cloneOnDiskBufferWithHeader();
} else {
buffer = cloneUncompressedBufferWithHeader();
}
return builder.withBlockType(blockType)
.withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader())
.withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader())
.withPrevBlockOffset(prevOffset)
.withByteBuff(ByteBuff.wrap(buffer))
.withFillHeader(FILL_HEADER)
.withOffset(startOffset)
.withNextBlockOnDiskSize(UNSET)
.withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length)
.withHFileContext(newContext)
.withByteBuffAllocator(cacheConf.getByteBuffAllocator())
.withShared(!buffer.hasArray())
.build();
}
}

Expand Down Expand Up @@ -1781,8 +1775,8 @@ protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
// The onDiskBlock will become the headerAndDataBuffer for this block.
// If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
// contains the header of next block, so no need to set next block's header in it.
HFileBlock hFileBlock = new HFileBlock(curBlock, checksumSupport, offset,
nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
HFileBlock hFileBlock = createFromBuff(curBlock, checksumSupport, offset,
nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
// Run check on uncompressed sizings.
if (!fileContext.isCompressedOrEncrypted()) {
hFileBlock.sanityCheckUncompressed();
Expand Down Expand Up @@ -1947,7 +1941,7 @@ public boolean equals(Object comparison) {
if (comparison == null) {
return false;
}
if (comparison.getClass() != this.getClass()) {
if (!(comparison instanceof HFileBlock)) {
return false;
}

Expand Down Expand Up @@ -2084,7 +2078,27 @@ static String toStringHeader(ByteBuff buf) throws IOException {
" onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
}

public HFileBlock deepCloneOnHeap() {
return new HFileBlock(this, true);
private static HFileBlockBuilder createBuilder(HFileBlock blk){
return new HFileBlockBuilder()
.withBlockType(blk.blockType)
.withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader)
.withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader)
.withPrevBlockOffset(blk.prevBlockOffset)
.withByteBuff(blk.buf.duplicate()) // Duplicate the buffer.
.withOffset(blk.offset)
.withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
.withNextBlockOnDiskSize(blk.nextBlockOnDiskSize)
.withHFileContext(blk.fileContext)
.withByteBuffAllocator(blk.allocator)
.withShared(blk.isSharedMem());
}

static HFileBlock shallowClone(HFileBlock blk) {
return createBuilder(blk).build();
}

static HFileBlock deepCloneOnHeap(HFileBlock blk) {
ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
return createBuilder(blk).withByteBuff(deepCloned).withShared(false).build();
}
}
Loading