|
22 | 22 |
|
23 | 23 | import java.io.IOException; |
24 | 24 | import java.lang.management.ManagementFactory; |
25 | | -import java.lang.management.MemoryUsage; |
| 25 | +import java.util.concurrent.Executor; |
| 26 | +import java.util.concurrent.ForkJoinPool; |
26 | 27 |
|
27 | 28 | import org.apache.commons.logging.Log; |
28 | 29 | import org.apache.commons.logging.LogFactory; |
|
45 | 46 | public class CacheConfig { |
46 | 47 | private static final Log LOG = LogFactory.getLog(CacheConfig.class.getName()); |
47 | 48 |
|
| 49 | + /** |
| 50 | + * Configuration key to cache block policy (Lru, TinyLfu). |
| 51 | + */ |
| 52 | + public static final String HFILE_BLOCK_CACHE_POLICY_KEY = "hfile.block.cache.policy"; |
| 53 | + public static final String HFILE_BLOCK_CACHE_POLICY_DEFAULT = "LRU"; |
| 54 | + |
48 | 55 | /** |
49 | 56 | * Configuration key to cache data blocks on write. There are separate |
50 | 57 | * switches for bloom blocks and non-root index blocks. |
@@ -92,19 +99,19 @@ public class CacheConfig { |
92 | 99 | * is an in-memory map that needs to be persisted across restarts. Where to store this |
93 | 100 | * in-memory state is what you supply here: e.g. <code>/tmp/bucketcache.map</code>. |
94 | 101 | */ |
95 | | - public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = |
| 102 | + public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = |
96 | 103 | "hbase.bucketcache.persistent.path"; |
97 | 104 |
|
98 | 105 | /** |
99 | 106 | * If the bucket cache is used in league with the lru on-heap block cache (meta blocks such |
100 | 107 | * as indices and blooms are kept in the lru blockcache and the data blocks in the |
101 | 108 | * bucket cache). |
102 | 109 | */ |
103 | | - public static final String BUCKET_CACHE_COMBINED_KEY = |
| 110 | + public static final String BUCKET_CACHE_COMBINED_KEY = |
104 | 111 | "hbase.bucketcache.combinedcache.enabled"; |
105 | 112 |
|
106 | 113 | public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads"; |
107 | | - public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = |
| 114 | + public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = |
108 | 115 | "hbase.bucketcache.writer.queuelength"; |
109 | 116 |
|
110 | 117 | /** |
@@ -149,6 +156,7 @@ private static enum ExternalBlockCaches { |
149 | 156 | memcached("org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache"); |
150 | 157 | // TODO(eclark): Consider more. Redis, etc. |
151 | 158 | Class<? extends BlockCache> clazz; |
| 159 | + @SuppressWarnings("unchecked") |
152 | 160 | ExternalBlockCaches(String clazzName) { |
153 | 161 | try { |
154 | 162 | clazz = (Class<? extends BlockCache>) Class.forName(clazzName); |
@@ -443,7 +451,9 @@ public boolean shouldCacheDataCompressed() { |
443 | 451 | * @return true if this {@link BlockCategory} should be compressed in blockcache, false otherwise |
444 | 452 | */ |
445 | 453 | public boolean shouldCacheCompressed(BlockCategory category) { |
446 | | - if (!isBlockCacheEnabled()) return false; |
| 454 | + if (!isBlockCacheEnabled()) { |
| 455 | + return false; |
| 456 | + } |
447 | 457 | switch (category) { |
448 | 458 | case DATA: |
449 | 459 | return this.cacheDataCompressed; |
@@ -525,28 +535,62 @@ public String toString() { |
525 | 535 | // Clear this if in tests you'd make more than one block cache instance. |
526 | 536 | @VisibleForTesting |
527 | 537 | static BlockCache GLOBAL_BLOCK_CACHE_INSTANCE; |
528 | | - private static LruBlockCache GLOBAL_L1_CACHE_INSTANCE = null; |
529 | | - private static BlockCache GLOBAL_L2_CACHE_INSTANCE = null; |
| 538 | + private static FirstLevelBlockCache GLOBAL_L1_CACHE_INSTANCE; |
| 539 | + private static BlockCache GLOBAL_L2_CACHE_INSTANCE; |
| 540 | + private static ForkJoinPool GLOBAL_FORKJOIN_POOL; |
530 | 541 |
|
531 | 542 | /** Boolean whether we have disabled the block cache entirely. */ |
532 | 543 | @VisibleForTesting |
533 | 544 | static boolean blockCacheDisabled = false; |
534 | 545 |
|
535 | 546 | /** |
536 | | - * @param c Configuration to use. |
537 | | - * @return An L1 instance. Currently an instance of LruBlockCache. |
| 547 | + * @param c Configuration to use |
| 548 | + * @return An L1 instance |
| 549 | + */ |
| 550 | + public static FirstLevelBlockCache getL1(final Configuration c) { |
| 551 | + long xmx = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); |
| 552 | + long l1CacheSize = HeapMemorySizeUtil.getFirstLevelCacheSize(c, xmx); |
| 553 | + return getL1(l1CacheSize, c); |
| 554 | + } |
| 555 | + |
| 556 | + /** |
| 557 | + * @param c Configuration to use |
| 558 | + * @param xmx Max heap memory |
| 559 | + * @return An L1 instance |
538 | 560 | */ |
539 | | - private static synchronized LruBlockCache getL1(final Configuration c) { |
| 561 | + |
| 562 | + private synchronized static FirstLevelBlockCache getL1(long cacheSize, Configuration c) { |
540 | 563 | if (GLOBAL_L1_CACHE_INSTANCE != null) return GLOBAL_L1_CACHE_INSTANCE; |
541 | | - final long lruCacheSize = HeapMemorySizeUtil.getLruCacheSize(c); |
542 | | - if (lruCacheSize < 0) { |
543 | | - blockCacheDisabled = true; |
| 564 | + if (cacheSize < 0) { |
| 565 | + return null; |
544 | 566 | } |
545 | | - if (blockCacheDisabled) return null; |
| 567 | + String policy = c.get(HFILE_BLOCK_CACHE_POLICY_KEY, HFILE_BLOCK_CACHE_POLICY_DEFAULT); |
546 | 568 | int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE); |
547 | | - LOG.info("Allocating LruBlockCache size=" + |
548 | | - StringUtils.byteDesc(lruCacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize)); |
549 | | - GLOBAL_L1_CACHE_INSTANCE = new LruBlockCache(lruCacheSize, blockSize, true, c); |
| 569 | + LOG.info("Allocating BlockCache size=" + |
| 570 | + StringUtils.byteDesc(cacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize)); |
| 571 | + if (policy.equalsIgnoreCase("LRU")) { |
| 572 | + GLOBAL_L1_CACHE_INSTANCE = new LruBlockCache(cacheSize, blockSize, true, c); |
| 573 | + } else if (policy.equalsIgnoreCase("TinyLFU")) { |
| 574 | + if (GLOBAL_FORKJOIN_POOL == null) { |
| 575 | + GLOBAL_FORKJOIN_POOL = new ForkJoinPool(); |
| 576 | + } |
| 577 | + Class<?> tinyLFUClass; |
| 578 | + try { |
| 579 | + tinyLFUClass = Class.forName("org.apache.hadoop.hbase.io.hfile.TinyLfuBlockCache"); |
| 580 | + GLOBAL_L1_CACHE_INSTANCE = (FirstLevelBlockCache) |
| 581 | + tinyLFUClass.getDeclaredConstructor(long.class, long.class, Executor.class, |
| 582 | + Configuration.class) |
| 583 | + .newInstance(cacheSize, blockSize, GLOBAL_FORKJOIN_POOL, c); |
| 584 | + } catch (Exception e) { |
| 585 | + throw new RuntimeException( |
| 586 | + "Unable to instantiate the TinyLfuBlockCache block cache policy." + |
| 587 | + "If you want to use TinyLFU you must build with JDK8+, run with JRE8+, and have both " + |
| 588 | + "the hbase-tinylfu-blockcache module and its dependency the caffiene library " + |
| 589 | + "installed into the classpath.", e); |
| 590 | + } |
| 591 | + } else { |
| 592 | + throw new IllegalArgumentException("Unknown block cache policy " + policy); |
| 593 | + } |
550 | 594 | return GLOBAL_L1_CACHE_INSTANCE; |
551 | 595 | } |
552 | 596 |
|
@@ -587,7 +631,7 @@ public CacheStats getL2Stats() { |
587 | 631 | } |
588 | 632 |
|
589 | 633 | private static BlockCache getExternalBlockcache(Configuration c) { |
590 | | - Class klass = null; |
| 634 | + Class<?> klass = null; |
591 | 635 |
|
592 | 636 | // Get the class, from the config. s |
593 | 637 | try { |
@@ -615,7 +659,9 @@ private static BlockCache getExternalBlockcache(Configuration c) { |
615 | 659 | private static BlockCache getBucketCache(Configuration c) { |
616 | 660 | // Check for L2. ioengine name must be non-null. |
617 | 661 | String bucketCacheIOEngineName = c.get(BUCKET_CACHE_IOENGINE_KEY, null); |
618 | | - if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) return null; |
| 662 | + if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) { |
| 663 | + return null; |
| 664 | + } |
619 | 665 |
|
620 | 666 | int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE); |
621 | 667 | final long bucketCacheSize = HeapMemorySizeUtil.getBucketCacheSize(c); |
@@ -673,28 +719,30 @@ private static BlockCache getBucketCache(Configuration c) { |
673 | 719 | * @return The block cache or <code>null</code>. |
674 | 720 | */ |
675 | 721 | public static synchronized BlockCache instantiateBlockCache(Configuration conf) { |
676 | | - if (GLOBAL_BLOCK_CACHE_INSTANCE != null) return GLOBAL_BLOCK_CACHE_INSTANCE; |
677 | | - if (blockCacheDisabled) return null; |
678 | | - LruBlockCache l1 = getL1(conf); |
679 | | - // blockCacheDisabled is set as a side-effect of getL1Internal(), so check it again after the call. |
680 | | - if (blockCacheDisabled) return null; |
| 722 | + if (GLOBAL_BLOCK_CACHE_INSTANCE != null) { |
| 723 | + return GLOBAL_BLOCK_CACHE_INSTANCE; |
| 724 | + } |
| 725 | + if (blockCacheDisabled) { |
| 726 | + return null; |
| 727 | + } |
| 728 | + FirstLevelBlockCache l1 = getL1(conf); |
681 | 729 | BlockCache l2 = getL2(conf); |
682 | 730 | if (l2 == null) { |
683 | 731 | GLOBAL_BLOCK_CACHE_INSTANCE = l1; |
684 | 732 | } else { |
685 | 733 | boolean useExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT); |
686 | | - boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, |
| 734 | + boolean combinedWithL1 = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, |
687 | 735 | DEFAULT_BUCKET_CACHE_COMBINED); |
688 | 736 | if (useExternal) { |
689 | 737 | GLOBAL_BLOCK_CACHE_INSTANCE = new InclusiveCombinedBlockCache(l1, l2); |
690 | 738 | } else { |
691 | | - if (combinedWithLru) { |
| 739 | + if (combinedWithL1) { |
692 | 740 | GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(l1, l2); |
693 | 741 | } else { |
694 | | - // L1 and L2 are not 'combined'. They are connected via the LruBlockCache victimhandler |
695 | | - // mechanism. It is a little ugly but works according to the following: when the |
696 | | - // background eviction thread runs, blocks evicted from L1 will go to L2 AND when we get |
697 | | - // a block from the L1 cache, if not in L1, we will search L2. |
| 742 | + // L1 and L2 are not 'combined'. They are connected via the FirstLevelBlockCache |
| 743 | + // victimhandler mechanism. It is a little ugly but works according to the following: |
| 744 | + // when the background eviction thread runs, blocks evicted from L1 will go to L2 AND when |
| 745 | + // we get a block from the L1 cache, if not in L1, we will search L2. |
698 | 746 | GLOBAL_BLOCK_CACHE_INSTANCE = l1; |
699 | 747 | } |
700 | 748 | } |
|
0 commit comments