Skip to content

Commit 873f898

Browse files
authored
HBASE-27646 Should not use pread when prefetching in HFilePreadReader (#5063) (#5122)
Signed-off-by: Bryan Beaudreault <[email protected]>
1 parent 81dffda commit 873f898

File tree

5 files changed

+102
-3
lines changed

5 files changed

+102
-3
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ private static class ReadStatistics {
9797
private Boolean instanceOfCanUnbuffer = null;
9898
private CanUnbuffer unbuffer = null;
9999

100+
protected Path readerPath;
101+
100102
public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
101103
this(fs, path, false, -1L);
102104
}
@@ -127,6 +129,9 @@ private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path, boolea
127129
// Initially we are going to read the tail block. Open the reader w/FS checksum.
128130
this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
129131
this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
132+
this.readerPath = this.stream.getWrappedStream() instanceof FileLink.FileLinkInputStream
133+
? ((FileLink.FileLinkInputStream) this.stream.getWrappedStream()).getCurrentPath()
134+
: path;
130135
setStreamOptions(stream);
131136
}
132137

@@ -342,4 +347,8 @@ public void unbuffer() {
342347
}
343348
}
344349
}
350+
351+
public Path getReaderPath() {
352+
return readerPath;
353+
}
345354
}

hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public class FileLink {
8383
* FileLink InputStream that handles the switch between the original path and the alternative
8484
* locations, when the file is moved.
8585
*/
86-
private static class FileLinkInputStream extends InputStream
86+
protected static class FileLinkInputStream extends InputStream
8787
implements Seekable, PositionedReadable, CanSetDropBehind, CanSetReadahead, CanUnbuffer {
8888
private FSDataInputStream in = null;
8989
private Path currentPath = null;
@@ -286,6 +286,10 @@ public void setReadahead(Long readahead) throws IOException, UnsupportedOperatio
286286
public void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException {
287287
in.setDropBehind(dropCache);
288288
}
289+
290+
public Path getCurrentPath() {
291+
return currentPath;
292+
}
289293
}
290294

291295
private Path[] locations = null;

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121
import org.apache.hadoop.conf.Configuration;
2222
import org.apache.hadoop.fs.Path;
23+
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
2324
import org.apache.yetus.audience.InterfaceAudience;
2425
import org.slf4j.Logger;
2526
import org.slf4j.LoggerFactory;
@@ -41,7 +42,15 @@ public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig c
4142
public void run() {
4243
long offset = 0;
4344
long end = 0;
45+
HFile.Reader prefetchStreamReader = null;
4446
try {
47+
ReaderContext streamReaderContext = ReaderContextBuilder.newBuilder(context)
48+
.withReaderType(ReaderContext.ReaderType.STREAM)
49+
.withInputStreamWrapper(new FSDataInputStreamWrapper(context.getFileSystem(),
50+
context.getInputStreamWrapper().getReaderPath()))
51+
.build();
52+
prefetchStreamReader =
53+
new HFileStreamReader(streamReaderContext, fileInfo, cacheConf, conf);
4554
end = getTrailer().getLoadOnOpenDataOffset();
4655
if (LOG.isTraceEnabled()) {
4756
LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, end));
@@ -56,8 +65,8 @@ public void run() {
5665
// the internal-to-hfileblock thread local which holds the overread that gets the
5766
// next header, will not have happened...so, pass in the onDiskSize gotten from the
5867
// cached block. This 'optimization' triggers extremely rarely I'd say.
59-
HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /* cacheBlock= */true,
60-
/* pread= */true, false, false, null, null, true);
68+
HFileBlock block = prefetchStreamReader.readBlock(offset, onDiskSizeOfNextBlock,
69+
/* cacheBlock= */true, /* pread= */false, false, false, null, null, true);
6170
try {
6271
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
6372
offset += block.getOnDiskSizeWithHeader();
@@ -77,6 +86,13 @@ public void run() {
7786
// Other exceptions are interesting
7887
LOG.warn("Prefetch " + getPathOffsetEndStr(path, offset, end), e);
7988
} finally {
89+
if (prefetchStreamReader != null) {
90+
try {
91+
prefetchStreamReader.close(false);
92+
} catch (IOException e) {
93+
LOG.warn("Close prefetch stream reader failed, path: " + path, e);
94+
}
95+
}
8096
PrefetchExecutor.complete(path);
8197
}
8298
}

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContextBuilder.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,18 @@ public class ReaderContextBuilder {
4343
public ReaderContextBuilder() {
4444
}
4545

46+
public static ReaderContextBuilder newBuilder(ReaderContext readerContext) {
47+
return new ReaderContextBuilder(readerContext);
48+
}
49+
50+
private ReaderContextBuilder(ReaderContext readerContext) {
51+
this.filePath = readerContext.getFilePath();
52+
this.fsdis = readerContext.getInputStreamWrapper();
53+
this.fileSize = readerContext.getFileSize();
54+
this.hfs = readerContext.getFileSystem();
55+
this.type = readerContext.getReaderType();
56+
}
57+
4658
public ReaderContextBuilder withFilePath(Path filePath) {
4759
this.filePath = filePath;
4860
return this;

hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,20 @@
5656
import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
5757
import org.apache.hadoop.hbase.fs.HFileSystem;
5858
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
59+
import org.apache.hadoop.hbase.io.HFileLink;
5960
import org.apache.hadoop.hbase.io.compress.Compression;
6061
import org.apache.hadoop.hbase.regionserver.BloomType;
6162
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
6263
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
6364
import org.apache.hadoop.hbase.regionserver.HStoreFile;
65+
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
6466
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
67+
import org.apache.hadoop.hbase.regionserver.TestHStoreFile;
6568
import org.apache.hadoop.hbase.testclassification.IOTests;
6669
import org.apache.hadoop.hbase.testclassification.MediumTests;
6770
import org.apache.hadoop.hbase.trace.TraceUtil;
6871
import org.apache.hadoop.hbase.util.Bytes;
72+
import org.apache.hadoop.hbase.util.CommonFSUtils;
6973
import org.apache.hadoop.hbase.util.Pair;
7074
import org.junit.Before;
7175
import org.junit.ClassRule;
@@ -252,6 +256,14 @@ public void testPrefetchDoesntSkipRefs() throws Exception {
252256
});
253257
}
254258

259+
@Test
260+
public void testPrefetchDoesntSkipHFileLink() throws Exception {
261+
testPrefetchWhenHFileLink(c -> {
262+
boolean isCached = c != null;
263+
assertTrue(isCached);
264+
});
265+
}
266+
255267
private void testPrefetchWhenRefs(boolean compactionEnabled, Consumer<Cacheable> test)
256268
throws Exception {
257269
cacheConf = new CacheConfig(conf, blockCache);
@@ -287,6 +299,52 @@ private void testPrefetchWhenRefs(boolean compactionEnabled, Consumer<Cacheable>
287299
}
288300
}
289301

302+
private void testPrefetchWhenHFileLink(Consumer<Cacheable> test) throws Exception {
303+
cacheConf = new CacheConfig(conf, blockCache);
304+
HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
305+
Path testDir = TEST_UTIL.getDataTestDir("testPrefetchWhenHFileLink");
306+
final RegionInfo hri =
307+
RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchWhenHFileLink")).build();
308+
// force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
309+
Configuration testConf = new Configuration(this.conf);
310+
CommonFSUtils.setRootDir(testConf, testDir);
311+
HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs,
312+
CommonFSUtils.getTableDir(testDir, hri.getTable()), hri);
313+
314+
// Make a store file and write data to it.
315+
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
316+
.withFilePath(regionFs.createTempName()).withFileContext(context).build();
317+
TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("testPrefetchWhenHFileLink"),
318+
Bytes.toBytes("testPrefetchWhenHFileLink"));
319+
320+
Path storeFilePath = regionFs.commitStoreFile("cf", writer.getPath());
321+
Path dstPath = new Path(regionFs.getTableDir(), new Path("test-region", "cf"));
322+
HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName());
323+
Path linkFilePath =
324+
new Path(dstPath, HFileLink.createHFileLinkName(hri, storeFilePath.getName()));
325+
326+
// Try to open store file from link
327+
StoreFileInfo storeFileInfo = new StoreFileInfo(testConf, this.fs, linkFilePath, true);
328+
HStoreFile hsf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf);
329+
assertTrue(storeFileInfo.isLink());
330+
331+
hsf.initReader();
332+
HFile.Reader reader = hsf.getReader().getHFileReader();
333+
while (!reader.prefetchComplete()) {
334+
// Sleep for a bit
335+
Thread.sleep(1000);
336+
}
337+
long offset = 0;
338+
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
339+
HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true);
340+
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
341+
if (block.getBlockType() == BlockType.DATA) {
342+
test.accept(blockCache.getBlock(blockCacheKey, true, false, true));
343+
}
344+
offset += block.getOnDiskSizeWithHeader();
345+
}
346+
}
347+
290348
private Path writeStoreFile(String fname) throws IOException {
291349
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
292350
return writeStoreFile(fname, meta);

0 commit comments

Comments
 (0)