Skip to content

Commit 7a6faee

Browse files
committed
HBASE-22491 Separate the heap HFileBlock and offheap HFileBlock because the heap block won't need refCnt and save into prevBlocks list before shipping
1 parent a6e3d5b commit 7a6faee

File tree

19 files changed

+706
-305
lines changed

19 files changed

+706
-305
lines changed

hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,12 @@ public void clean() {
297297
}
298298
}
299299
}
300+
this.usedBufCount.set(0);
301+
this.maxPoolSizeInfoLevelLogged = false;
302+
this.poolAllocationBytes.reset();
303+
this.heapAllocationBytes.reset();
304+
this.lastPoolAllocationBytes = 0;
305+
this.lastHeapAllocationBytes = 0;
300306
}
301307

302308
/**
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.io.hfile;
19+
20+
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
21+
import org.apache.hadoop.hbase.nio.ByteBuff;
22+
import org.apache.yetus.audience.InterfaceAudience;
23+
24+
/**
25+
* The {@link ByteBuffAllocator} won't allocate pooled heap {@link ByteBuff} now; at the same time,
26+
* if allocate an off-heap {@link ByteBuff} from allocator, then it must be a pooled one. That's to
27+
* say, an exclusive memory HFileBlock would must be an heap block and a shared memory HFileBlock
28+
* would must be an off-heap block.
29+
* <p>
30+
* The exclusive memory HFileBlock will do nothing when calling retain or release methods, because
31+
* its memory will be garbage collected by JVM, even if its reference count decrease to zero, we can
32+
* do nothing for the de-allocating.
33+
* <p>
34+
* @see org.apache.hadoop.hbase.io.hfile.SharedMemHFileBlock
35+
*/
36+
@InterfaceAudience.Private
37+
public class ExclusiveMemHFileBlock extends HFileBlock {
38+
39+
ExclusiveMemHFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
40+
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
41+
long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
42+
HFileContext fileContext) {
43+
super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, buf,
44+
fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext,
45+
ByteBuffAllocator.HEAP);
46+
}
47+
48+
@Override
49+
public int refCnt() {
50+
return 0;
51+
}
52+
53+
@Override
54+
public ExclusiveMemHFileBlock retain() {
55+
// do nothing
56+
return this;
57+
}
58+
59+
@Override
60+
public boolean release() {
61+
// do nothing
62+
return false;
63+
}
64+
65+
@Override
66+
public boolean isSharedMem() {
67+
return false;
68+
}
69+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java

Lines changed: 90 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc)
285285
boolean usesChecksum = buf.get() == (byte) 1;
286286
long offset = buf.getLong();
287287
int nextBlockOnDiskSize = buf.getInt();
288-
return new HFileBlock(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
288+
return createFromBuff(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
289289
}
290290

291291
@Override
@@ -300,28 +300,6 @@ public int getDeserializerIdentifier() {
300300
CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER);
301301
}
302302

303-
/**
304-
* Copy constructor. Creates a shallow copy of {@code that}'s buffer.
305-
*/
306-
private HFileBlock(HFileBlock that) {
307-
this(that, false);
308-
}
309-
310-
/**
311-
* Copy constructor. Creates a shallow/deep copy of {@code that}'s buffer as per the boolean
312-
* param.
313-
*/
314-
private HFileBlock(HFileBlock that, boolean bufCopy) {
315-
init(that.blockType, that.onDiskSizeWithoutHeader, that.uncompressedSizeWithoutHeader,
316-
that.prevBlockOffset, that.offset, that.onDiskDataSizeWithHeader, that.nextBlockOnDiskSize,
317-
that.fileContext, that.allocator);
318-
if (bufCopy) {
319-
this.buf = ByteBuff.wrap(ByteBuffer.wrap(that.buf.toBytes(0, that.buf.limit())));
320-
} else {
321-
this.buf = that.buf.duplicate();
322-
}
323-
}
324-
325303
/**
326304
* Creates a new {@link HFile} block from the given fields. This constructor
327305
* is used only while writing blocks and caching,
@@ -336,20 +314,27 @@ private HFileBlock(HFileBlock that, boolean bufCopy) {
336314
* @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
337315
* @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
338316
* @param prevBlockOffset see {@link #prevBlockOffset}
339-
* @param b block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
317+
* @param buf block buffer with header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
340318
* @param fillHeader when true, write the first 4 header fields into passed buffer.
341319
* @param offset the file offset the block was read from
342320
* @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
343321
* @param fileContext HFile meta data
344322
*/
345323
@VisibleForTesting
346324
public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
347-
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
348-
long offset, final int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
349-
HFileContext fileContext, ByteBuffAllocator allocator) {
350-
init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
351-
onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext, allocator);
352-
this.buf = new SingleByteBuff(b);
325+
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
326+
long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext,
327+
ByteBuffAllocator allocator) {
328+
this.blockType = blockType;
329+
this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
330+
this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
331+
this.prevBlockOffset = prevBlockOffset;
332+
this.offset = offset;
333+
this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
334+
this.nextBlockOnDiskSize = nextBlockOnDiskSize;
335+
this.fileContext = fileContext;
336+
this.allocator = allocator;
337+
this.buf = buf;
353338
if (fillHeader) {
354339
overwriteHeader();
355340
}
@@ -363,7 +348,7 @@ public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
363348
* to that point.
364349
* @param buf Has header, content, and trailing checksums if present.
365350
*/
366-
HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
351+
static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
367352
final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator)
368353
throws IOException {
369354
buf.rewind();
@@ -374,15 +359,15 @@ public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
374359
final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
375360
// This constructor is called when we deserialize a block from cache and when we read a block in
376361
// from the fs. fileCache is null when deserialized from cache so need to make up one.
377-
HFileContextBuilder fileContextBuilder = fileContext != null?
378-
new HFileContextBuilder(fileContext): new HFileContextBuilder();
362+
HFileContextBuilder fileContextBuilder =
363+
fileContext != null ? new HFileContextBuilder(fileContext) : new HFileContextBuilder();
379364
fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum);
380365
int onDiskDataSizeWithHeader;
381366
if (usesHBaseChecksum) {
382367
byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX);
383368
int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX);
384369
onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
385-
// Use the checksum type and bytes per checksum from header, not from filecontext.
370+
// Use the checksum type and bytes per checksum from header, not from fileContext.
386371
fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType));
387372
fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum);
388373
} else {
@@ -393,29 +378,19 @@ public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
393378
}
394379
fileContext = fileContextBuilder.build();
395380
assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
396-
init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
397-
onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext, allocator);
398-
this.offset = offset;
399-
this.buf = buf;
400-
this.buf.rewind();
401-
}
402-
403-
/**
404-
* Called from constructors.
405-
*/
406-
private void init(BlockType blockType, int onDiskSizeWithoutHeader,
407-
int uncompressedSizeWithoutHeader, long prevBlockOffset, long offset,
408-
int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize, HFileContext fileContext,
409-
ByteBuffAllocator allocator) {
410-
this.blockType = blockType;
411-
this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
412-
this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
413-
this.prevBlockOffset = prevBlockOffset;
414-
this.offset = offset;
415-
this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
416-
this.nextBlockOnDiskSize = nextBlockOnDiskSize;
417-
this.fileContext = fileContext;
418-
this.allocator = allocator;
381+
return new HFileBlockBuilder()
382+
.withBlockType(blockType)
383+
.withOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader)
384+
.withUncompressedSizeWithoutHeader(uncompressedSizeWithoutHeader)
385+
.withPrevBlockOffset(prevBlockOffset)
386+
.withOffset(offset)
387+
.withOnDiskDataSizeWithHeader(onDiskDataSizeWithHeader)
388+
.withNextBlockOnDiskSize(nextBlockOnDiskSize)
389+
.withHFileContext(fileContext)
390+
.withByteBuffAllocator(allocator)
391+
.withByteBuff(buf.rewind())
392+
.withShared(!buf.hasArray())
393+
.build();
419394
}
420395

421396
/**
@@ -639,7 +614,7 @@ public String toString() {
639614
.append("(").append(onDiskSizeWithoutHeader)
640615
.append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
641616
}
642-
String dataBegin = null;
617+
String dataBegin;
643618
if (buf.hasArray()) {
644619
dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
645620
Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
@@ -673,7 +648,7 @@ HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException
673648
return this;
674649
}
675650

676-
HFileBlock unpacked = new HFileBlock(this);
651+
HFileBlock unpacked = shallowClone(this);
677652
unpacked.allocateBuffer(); // allocates space for the decompressed block
678653
boolean succ = false;
679654
try {
@@ -761,10 +736,16 @@ public long heapSize() {
761736
}
762737

763738
/**
764-
* @return true to indicate the block is allocated from JVM heap, otherwise from off-heap.
739+
* Will be override by {@link SharedMemHFileBlock} or {@link ExclusiveMemHFileBlock}. Return true
740+
* by default.
765741
*/
766-
boolean isOnHeap() {
767-
return buf.hasArray();
742+
public boolean isSharedMem() {
743+
if (this instanceof SharedMemHFileBlock) {
744+
return true;
745+
} else if (this instanceof ExclusiveMemHFileBlock) {
746+
return false;
747+
}
748+
return true;
768749
}
769750

770751
/**
@@ -1039,8 +1020,7 @@ void writeHeaderAndData(FSDataOutputStream out) throws IOException {
10391020
+ offset);
10401021
}
10411022
startOffset = offset;
1042-
1043-
finishBlockAndWriteHeaderAndData((DataOutputStream) out);
1023+
finishBlockAndWriteHeaderAndData(out);
10441024
}
10451025

10461026
/**
@@ -1251,13 +1231,27 @@ HFileBlock getBlockForCaching(CacheConfig cacheConf) {
12511231
.withIncludesMvcc(fileContext.isIncludesMvcc())
12521232
.withIncludesTags(fileContext.isIncludesTags())
12531233
.build();
1254-
return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
1255-
getUncompressedSizeWithoutHeader(), prevOffset,
1256-
cacheConf.shouldCacheCompressed(blockType.getCategory()) ? cloneOnDiskBufferWithHeader()
1257-
: cloneUncompressedBufferWithHeader(),
1258-
FILL_HEADER, startOffset, UNSET,
1259-
onDiskBlockBytesWithHeader.size() + onDiskChecksum.length, newContext,
1260-
cacheConf.getByteBuffAllocator());
1234+
// Build the HFileBlock.
1235+
HFileBlockBuilder builder = new HFileBlockBuilder();
1236+
ByteBuffer buffer;
1237+
if (cacheConf.shouldCacheCompressed(blockType.getCategory())) {
1238+
buffer = cloneOnDiskBufferWithHeader();
1239+
} else {
1240+
buffer = cloneUncompressedBufferWithHeader();
1241+
}
1242+
return builder.withBlockType(blockType)
1243+
.withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader())
1244+
.withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader())
1245+
.withPrevBlockOffset(prevOffset)
1246+
.withByteBuff(ByteBuff.wrap(buffer))
1247+
.withFillHeader(FILL_HEADER)
1248+
.withOffset(startOffset)
1249+
.withNextBlockOnDiskSize(UNSET)
1250+
.withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length)
1251+
.withHFileContext(newContext)
1252+
.withByteBuffAllocator(cacheConf.getByteBuffAllocator())
1253+
.withShared(!buffer.hasArray())
1254+
.build();
12611255
}
12621256
}
12631257

@@ -1781,8 +1775,8 @@ protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
17811775
// The onDiskBlock will become the headerAndDataBuffer for this block.
17821776
// If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
17831777
// contains the header of next block, so no need to set next block's header in it.
1784-
HFileBlock hFileBlock = new HFileBlock(curBlock, checksumSupport, offset,
1785-
nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
1778+
HFileBlock hFileBlock = createFromBuff(curBlock, checksumSupport, offset,
1779+
nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
17861780
// Run check on uncompressed sizings.
17871781
if (!fileContext.isCompressedOrEncrypted()) {
17881782
hFileBlock.sanityCheckUncompressed();
@@ -1947,7 +1941,7 @@ public boolean equals(Object comparison) {
19471941
if (comparison == null) {
19481942
return false;
19491943
}
1950-
if (comparison.getClass() != this.getClass()) {
1944+
if (!(comparison instanceof HFileBlock)) {
19511945
return false;
19521946
}
19531947

@@ -2084,7 +2078,27 @@ static String toStringHeader(ByteBuff buf) throws IOException {
20842078
" onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
20852079
}
20862080

2087-
public HFileBlock deepCloneOnHeap() {
2088-
return new HFileBlock(this, true);
2081+
private static HFileBlockBuilder createBuilder(HFileBlock blk){
2082+
return new HFileBlockBuilder()
2083+
.withBlockType(blk.blockType)
2084+
.withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader)
2085+
.withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader)
2086+
.withPrevBlockOffset(blk.prevBlockOffset)
2087+
.withByteBuff(blk.buf.duplicate()) // Duplicate the buffer.
2088+
.withOffset(blk.offset)
2089+
.withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
2090+
.withNextBlockOnDiskSize(blk.nextBlockOnDiskSize)
2091+
.withHFileContext(blk.fileContext)
2092+
.withByteBuffAllocator(blk.allocator)
2093+
.withShared(blk.isSharedMem());
2094+
}
2095+
2096+
static HFileBlock shallowClone(HFileBlock blk) {
2097+
return createBuilder(blk).build();
2098+
}
2099+
2100+
static HFileBlock deepCloneOnHeap(HFileBlock blk) {
2101+
ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
2102+
return createBuilder(blk).withByteBuff(deepCloned).withShared(false).build();
20892103
}
20902104
}

0 commit comments

Comments
 (0)