Skip to content

Commit 43e8c11

Browse files
committed
HBASE-27013 Introduce read all bytes when using pread for prefetch (#4414)
- introduce optional flag `hfile.pread.all.bytes.enabled` for pread that must read full bytes with the next block header Signed-off-by: Josh Elser <[email protected]>
1 parent 62dbae2 commit 43e8c11

File tree

5 files changed

+164
-7
lines changed

5 files changed

+164
-7
lines changed

hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,6 +1059,13 @@ public enum OperationStatusCode {
10591059
public static final long HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT =
10601060
32 * 1024 * 1024L;
10611061

1062+
/**
1063+
* Configuration key for setting pread must read both necessaryLen and extraLen, default is
1064+
* disabled. This is an optimized flag for reading HFile from blob storage.
1065+
*/
1066+
public static final String HFILE_PREAD_ALL_BYTES_ENABLED_KEY = "hfile.pread.all.bytes.enabled";
1067+
public static final boolean HFILE_PREAD_ALL_BYTES_ENABLED_DEFAULT = false;
1068+
10621069
/*
10631070
* Minimum percentage of free heap necessary for a successful cluster startup.
10641071
*/

hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,21 +228,43 @@ public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int nec
228228
*/
229229
public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
230230
int necessaryLen, int extraLen) throws IOException {
231+
return preadWithExtra(buff, dis, position, necessaryLen, extraLen, false);
232+
}
233+
234+
/**
235+
* Read from an input stream at least <code>necessaryLen</code> and if possible,
236+
* <code>extraLen</code> also if available. Analogous to
237+
* {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and
238+
* specifies a number of "extra" bytes that would be desirable but not absolutely necessary to
239+
* read. If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer
240+
* directly, and does not allocate a temporary byte array.
241+
* @param buff ByteBuff to read into.
242+
* @param dis the input stream to read from
243+
* @param position the position within the stream from which to start reading
244+
* @param necessaryLen the number of bytes that are absolutely necessary to read
245+
* @param extraLen the number of extra bytes that would be nice to read
246+
* @param readAllBytes whether we must read the necessaryLen and extraLen
247+
* @return true if and only if extraLen is > 0 and reading those extra bytes was successful
248+
* @throws IOException if failed to read the necessary bytes
249+
*/
250+
public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
251+
int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
231252
boolean preadbytebuffer = dis.hasCapability("in:preadbytebuffer");
232253

233254
if (preadbytebuffer) {
234-
return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen);
255+
return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen, readAllBytes);
235256
} else {
236-
return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen);
257+
return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen, readAllBytes);
237258
}
238259
}
239260

240261
private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis, long position,
241-
int necessaryLen, int extraLen) throws IOException {
262+
int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
242263
int remain = necessaryLen + extraLen;
243264
byte[] buf = new byte[remain];
244265
int bytesRead = 0;
245-
while (bytesRead < necessaryLen) {
266+
int lengthMustRead = readAllBytes ? remain : necessaryLen;
267+
while (bytesRead < lengthMustRead) {
246268
int ret = dis.read(position + bytesRead, buf, bytesRead, remain);
247269
if (ret < 0) {
248270
throw new IOException("Premature EOF from inputStream (positional read returned " + ret
@@ -257,11 +279,12 @@ private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis
257279
}
258280

259281
private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream dis, long position,
260-
int necessaryLen, int extraLen) throws IOException {
282+
int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
261283
int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0;
262284
ByteBuffer[] buffers = buff.nioByteBuffers();
263285
ByteBuffer cur = buffers[idx];
264-
while (bytesRead < necessaryLen) {
286+
int lengthMustRead = readAllBytes ? remain : necessaryLen;
287+
while (bytesRead < lengthMustRead) {
265288
int ret;
266289
while (!cur.hasRemaining()) {
267290
if (++idx >= buffers.length) {

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1349,6 +1349,8 @@ static class FSReaderImpl implements FSReader {
13491349

13501350
private final Lock streamLock = new ReentrantLock();
13511351

1352+
private final boolean isPreadAllBytes;
1353+
13521354
FSReaderImpl(ReaderContext readerContext, HFileContext fileContext, ByteBuffAllocator allocator,
13531355
Configuration conf) throws IOException {
13541356
this.fileSize = readerContext.getFileSize();
@@ -1365,6 +1367,7 @@ static class FSReaderImpl implements FSReader {
13651367
this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
13661368
defaultDecodingCtx = new HFileBlockDefaultDecodingContext(conf, fileContext);
13671369
encodedBlockDecodingCtx = defaultDecodingCtx;
1370+
isPreadAllBytes = readerContext.isPreadAllBytes();
13681371
}
13691372

13701373
@Override
@@ -1453,7 +1456,9 @@ protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int siz
14531456
} else {
14541457
// Positional read. Better for random reads; or when the streamLock is already locked.
14551458
int extraSize = peekIntoNextBlock ? hdrSize : 0;
1456-
if (!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize)) {
1459+
if (
1460+
!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize, isPreadAllBytes)
1461+
) {
14571462
// did not read the next block header.
14581463
return false;
14591464
}

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.io.hfile;
1919

2020
import org.apache.hadoop.fs.Path;
21+
import org.apache.hadoop.hbase.HConstants;
2122
import org.apache.hadoop.hbase.fs.HFileSystem;
2223
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
2324
import org.apache.yetus.audience.InterfaceAudience;
@@ -39,6 +40,7 @@ public enum ReaderType {
3940
private final HFileSystem hfs;
4041
private final boolean primaryReplicaReader;
4142
private final ReaderType type;
43+
private final boolean preadAllBytes;
4244

4345
public ReaderContext(Path filePath, FSDataInputStreamWrapper fsdis, long fileSize,
4446
HFileSystem hfs, boolean primaryReplicaReader, ReaderType type) {
@@ -48,6 +50,8 @@ public ReaderContext(Path filePath, FSDataInputStreamWrapper fsdis, long fileSiz
4850
this.hfs = hfs;
4951
this.primaryReplicaReader = primaryReplicaReader;
5052
this.type = type;
53+
this.preadAllBytes = hfs.getConf().getBoolean(HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_KEY,
54+
HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_DEFAULT);
5155
}
5256

5357
public Path getFilePath() {
@@ -73,4 +77,8 @@ public boolean isPrimaryReplicaReader() {
7377
public ReaderType getReaderType() {
7478
return this.type;
7579
}
80+
81+
public boolean isPreadAllBytes() {
82+
return preadAllBytes;
83+
}
7684
}

hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.io.hfile;
1919

2020
import static org.junit.Assert.assertArrayEquals;
21+
import static org.junit.Assert.assertEquals;
2122
import static org.junit.Assert.assertFalse;
2223
import static org.junit.Assert.assertTrue;
2324
import static org.junit.Assert.fail;
@@ -28,27 +29,37 @@
2829
import static org.mockito.Mockito.verifyNoMoreInteractions;
2930
import static org.mockito.Mockito.when;
3031

32+
import java.io.DataOutputStream;
3133
import java.io.IOException;
3234
import java.io.InputStream;
3335
import java.nio.ByteBuffer;
36+
import java.util.Random;
37+
import org.apache.hadoop.conf.Configuration;
3438
import org.apache.hadoop.fs.FSDataInputStream;
3539
import org.apache.hadoop.fs.FSDataOutputStream;
3640
import org.apache.hadoop.fs.FileSystem;
3741
import org.apache.hadoop.fs.Path;
3842
import org.apache.hadoop.hbase.HBaseClassTestRule;
3943
import org.apache.hadoop.hbase.HBaseTestingUtility;
44+
import org.apache.hadoop.hbase.HConstants;
45+
import org.apache.hadoop.hbase.fs.HFileSystem;
46+
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
47+
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
48+
import org.apache.hadoop.hbase.io.compress.Compression;
4049
import org.apache.hadoop.hbase.io.util.BlockIOUtils;
4150
import org.apache.hadoop.hbase.nio.ByteBuff;
4251
import org.apache.hadoop.hbase.nio.MultiByteBuff;
4352
import org.apache.hadoop.hbase.nio.SingleByteBuff;
4453
import org.apache.hadoop.hbase.testclassification.IOTests;
4554
import org.apache.hadoop.hbase.testclassification.SmallTests;
4655
import org.apache.hadoop.hbase.util.Bytes;
56+
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
4757
import org.junit.ClassRule;
4858
import org.junit.Rule;
4959
import org.junit.Test;
5060
import org.junit.experimental.categories.Category;
5161
import org.junit.rules.ExpectedException;
62+
import org.junit.rules.TestName;
5263

5364
@Category({ IOTests.class, SmallTests.class })
5465
public class TestBlockIOUtils {
@@ -57,11 +68,17 @@ public class TestBlockIOUtils {
5768
public static final HBaseClassTestRule CLASS_RULE =
5869
HBaseClassTestRule.forClass(TestBlockIOUtils.class);
5970

71+
@Rule
72+
public TestName testName = new TestName();
73+
6074
@Rule
6175
public ExpectedException exception = ExpectedException.none();
6276

6377
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
6478

79+
private static final int NUM_TEST_BLOCKS = 2;
80+
private static final Compression.Algorithm COMPRESSION_ALGO = Compression.Algorithm.GZ;
81+
6582
@Test
6683
public void testIsByteBufferReadable() throws IOException {
6784
FileSystem fs = TEST_UTIL.getTestFileSystem();
@@ -92,6 +109,103 @@ public void testReadFully() throws IOException {
92109
assertArrayEquals(Bytes.toBytes(s), heapBuf);
93110
}
94111

112+
@Test
113+
public void testPreadWithReadFullBytes() throws IOException {
114+
testPreadReadFullBytesInternal(true, EnvironmentEdgeManager.currentTime());
115+
}
116+
117+
@Test
118+
public void testPreadWithoutReadFullBytes() throws IOException {
119+
testPreadReadFullBytesInternal(false, EnvironmentEdgeManager.currentTime());
120+
}
121+
122+
private void testPreadReadFullBytesInternal(boolean readAllBytes, long randomSeed)
123+
throws IOException {
124+
Configuration conf = TEST_UTIL.getConfiguration();
125+
conf.setBoolean(HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_KEY, readAllBytes);
126+
FileSystem fs = TEST_UTIL.getTestFileSystem();
127+
Path path = new Path(TEST_UTIL.getDataTestDirOnTestFS(), testName.getMethodName());
128+
// give a fixed seed such we can see failure easily.
129+
Random rand = new Random(randomSeed);
130+
long totalDataBlockBytes =
131+
writeBlocks(TEST_UTIL.getConfiguration(), rand, COMPRESSION_ALGO, path);
132+
readDataBlocksAndVerify(fs, path, COMPRESSION_ALGO, totalDataBlockBytes);
133+
}
134+
135+
private long writeBlocks(Configuration conf, Random rand, Compression.Algorithm compressAlgo,
136+
Path path) throws IOException {
137+
FileSystem fs = HFileSystem.get(conf);
138+
FSDataOutputStream os = fs.create(path);
139+
HFileContext meta =
140+
new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build();
141+
HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, meta);
142+
long totalDataBlockBytes = 0;
143+
for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
144+
int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
145+
if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
146+
blockTypeOrdinal = BlockType.DATA.ordinal();
147+
}
148+
BlockType bt = BlockType.values()[blockTypeOrdinal];
149+
DataOutputStream dos = hbw.startWriting(bt);
150+
int size = rand.nextInt(500);
151+
for (int j = 0; j < size; ++j) {
152+
dos.writeShort(i + 1);
153+
dos.writeInt(j + 1);
154+
}
155+
156+
hbw.writeHeaderAndData(os);
157+
totalDataBlockBytes += hbw.getOnDiskSizeWithHeader();
158+
}
159+
// append a dummy trailer and in a actual HFile it should have more data.
160+
FixedFileTrailer trailer = new FixedFileTrailer(3, 3);
161+
trailer.setFirstDataBlockOffset(0);
162+
trailer.setLastDataBlockOffset(totalDataBlockBytes);
163+
trailer.setComparatorClass(meta.getCellComparator().getClass());
164+
trailer.setDataIndexCount(NUM_TEST_BLOCKS);
165+
trailer.setCompressionCodec(compressAlgo);
166+
trailer.serialize(os);
167+
// close the stream
168+
os.close();
169+
return totalDataBlockBytes;
170+
}
171+
172+
private void readDataBlocksAndVerify(FileSystem fs, Path path, Compression.Algorithm compressAlgo,
173+
long totalDataBlockBytes) throws IOException {
174+
FSDataInputStream is = fs.open(path);
175+
HFileContext fileContext =
176+
new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build();
177+
ReaderContext context =
178+
new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
179+
.withReaderType(ReaderContext.ReaderType.PREAD).withFileSize(totalDataBlockBytes)
180+
.withFilePath(path).withFileSystem(fs).build();
181+
HFileBlock.FSReader hbr =
182+
new HFileBlock.FSReaderImpl(context, fileContext, ByteBuffAllocator.HEAP, fs.getConf());
183+
184+
long onDiskSizeOfNextBlock = -1;
185+
long offset = 0;
186+
int numOfReadBlock = 0;
187+
// offset and totalBytes shares the same logic in the HFilePreadReader
188+
while (offset < totalDataBlockBytes) {
189+
HFileBlock block = hbr.readBlockData(offset, onDiskSizeOfNextBlock, true, false, false);
190+
numOfReadBlock++;
191+
try {
192+
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
193+
offset += block.getOnDiskSizeWithHeader();
194+
} finally {
195+
block.release();
196+
}
197+
}
198+
assertEquals(totalDataBlockBytes, offset);
199+
assertEquals(NUM_TEST_BLOCKS, numOfReadBlock);
200+
deleteFile(fs, path);
201+
}
202+
203+
private void deleteFile(FileSystem fs, Path path) throws IOException {
204+
if (fs.exists(path)) {
205+
fs.delete(path, true);
206+
}
207+
}
208+
95209
@Test
96210
public void testReadWithExtra() throws IOException {
97211
FileSystem fs = TEST_UTIL.getTestFileSystem();

0 commit comments

Comments
 (0)