|
26 | 26 | import static org.hamcrest.Matchers.hasItem; |
27 | 27 | import static org.hamcrest.Matchers.hasItems; |
28 | 28 | import static org.hamcrest.Matchers.not; |
| 29 | +import static org.junit.Assert.assertEquals; |
29 | 30 | import static org.junit.Assert.assertFalse; |
30 | 31 | import static org.junit.Assert.assertTrue; |
31 | 32 | import static org.junit.Assert.fail; |
|
35 | 36 | import java.io.IOException; |
36 | 37 | import java.util.List; |
37 | 38 | import java.util.Random; |
| 39 | +import java.util.concurrent.ScheduledThreadPoolExecutor; |
38 | 40 | import java.util.concurrent.ThreadLocalRandom; |
39 | 41 | import java.util.concurrent.TimeUnit; |
40 | 42 | import java.util.function.BiConsumer; |
@@ -120,6 +122,40 @@ public void testPrefetchSetInHCDWorks() { |
120 | 122 | assertTrue(cc.shouldPrefetchOnOpen()); |
121 | 123 | } |
122 | 124 |
|
| 125 | + @Test |
| 126 | + public void testPrefetchBlockCacheDisabled() throws Exception { |
| 127 | + ScheduledThreadPoolExecutor poolExecutor = |
| 128 | + (ScheduledThreadPoolExecutor) PrefetchExecutor.getExecutorPool(); |
| 129 | + long totalCompletedBefore = poolExecutor.getCompletedTaskCount(); |
| 130 | + long queueBefore = poolExecutor.getQueue().size(); |
| 131 | + ColumnFamilyDescriptor columnFamilyDescriptor = |
| 132 | + ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true) |
| 133 | + .setBlockCacheEnabled(false).build(); |
| 134 | + HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); |
| 135 | + CacheConfig cacheConfig = |
| 136 | + new CacheConfig(conf, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP); |
| 137 | + Path storeFile = writeStoreFile("testPrefetchBlockCacheDisabled", meta, cacheConfig); |
| 138 | + readStoreFile(storeFile, (r, o) -> { |
| 139 | + HFileBlock block = null; |
| 140 | + try { |
| 141 | + block = r.readBlock(o, -1, false, true, false, true, null, null); |
| 142 | + } catch (IOException e) { |
| 143 | + fail(e.getMessage()); |
| 144 | + } |
| 145 | + return block; |
| 146 | + }, (key, block) -> { |
| 147 | + boolean isCached = blockCache.getBlock(key, true, false, true) != null; |
| 148 | + if ( |
| 149 | + block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX |
| 150 | + || block.getBlockType() == BlockType.INTERMEDIATE_INDEX |
| 151 | + ) { |
| 152 | + assertFalse(isCached); |
| 153 | + } |
| 154 | + }, cacheConfig); |
| 155 | + assertEquals(totalCompletedBefore + queueBefore, |
| 156 | + poolExecutor.getCompletedTaskCount() + poolExecutor.getQueue().size()); |
| 157 | + } |
| 158 | + |
123 | 159 | @Test |
124 | 160 | public void testPrefetch() throws Exception { |
125 | 161 | TraceUtil.trace(() -> { |
@@ -212,8 +248,15 @@ private void readStoreFileCacheOnly(Path storeFilePath) throws Exception { |
212 | 248 | private void readStoreFile(Path storeFilePath, |
213 | 249 | BiFunction<HFile.Reader, Long, HFileBlock> readFunction, |
214 | 250 | BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception { |
| 251 | + readStoreFile(storeFilePath, readFunction, validationFunction, cacheConf); |
| 252 | + } |
| 253 | + |
| 254 | + private void readStoreFile(Path storeFilePath, |
| 255 | + BiFunction<HFile.Reader, Long, HFileBlock> readFunction, |
| 256 | + BiConsumer<BlockCacheKey, HFileBlock> validationFunction, CacheConfig cacheConfig) |
| 257 | + throws Exception { |
215 | 258 | // Open the file |
216 | | - HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); |
| 259 | + HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf); |
217 | 260 |
|
218 | 261 | while (!reader.prefetchComplete()) { |
219 | 262 | // Sleep for a bit |
@@ -350,8 +393,13 @@ private Path writeStoreFile(String fname) throws IOException { |
350 | 393 | } |
351 | 394 |
|
352 | 395 | private Path writeStoreFile(String fname, HFileContext context) throws IOException { |
| 396 | + return writeStoreFile(fname, context, cacheConf); |
| 397 | + } |
| 398 | + |
| 399 | + private Path writeStoreFile(String fname, HFileContext context, CacheConfig cacheConfig) |
| 400 | + throws IOException { |
353 | 401 | Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); |
354 | | - StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) |
| 402 | + StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConfig, fs) |
355 | 403 | .withOutputDir(storeFileParentDir).withFileContext(context).build(); |
356 | 404 | Random rand = ThreadLocalRandom.current(); |
357 | 405 | final int rowLen = 32; |
|
0 commit comments