|
22 | 22 |
|
23 | 23 | import java.io.IOException; |
24 | 24 | import java.lang.management.ManagementFactory; |
25 | | -import java.lang.management.MemoryUsage; |
| 25 | +import java.util.concurrent.Executor; |
| 26 | +import java.util.concurrent.ForkJoinPool; |
26 | 27 |
|
27 | 28 | import org.apache.commons.logging.Log; |
28 | 29 | import org.apache.commons.logging.LogFactory; |
|
45 | 46 | public class CacheConfig { |
46 | 47 | private static final Log LOG = LogFactory.getLog(CacheConfig.class.getName()); |
47 | 48 |
|
| 49 | + /** |
| 50 | + * Configuration key to cache block policy (Lru, TinyLfu). |
| 51 | + */ |
| 52 | + public static final String HFILE_BLOCK_CACHE_POLICY_KEY = "hfile.block.cache.policy"; |
| 53 | + public static final String HFILE_BLOCK_CACHE_POLICY_DEFAULT = "LRU"; |
| 54 | + |
48 | 55 | /** |
49 | 56 | * Configuration key to cache data blocks on write. There are separate |
50 | 57 | * switches for bloom blocks and non-root index blocks. |
@@ -92,19 +99,19 @@ public class CacheConfig { |
92 | 99 | * is an in-memory map that needs to be persisted across restarts. Where to store this |
93 | 100 | * in-memory state is what you supply here: e.g. <code>/tmp/bucketcache.map</code>. |
94 | 101 | */ |
95 | | - public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = |
| 102 | + public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = |
96 | 103 | "hbase.bucketcache.persistent.path"; |
97 | 104 |
|
98 | 105 | /** |
99 | 106 | * If the bucket cache is used in league with the lru on-heap block cache (meta blocks such |
100 | 107 | * as indices and blooms are kept in the lru blockcache and the data blocks in the |
101 | 108 | * bucket cache). |
102 | 109 | */ |
103 | | - public static final String BUCKET_CACHE_COMBINED_KEY = |
| 110 | + public static final String BUCKET_CACHE_COMBINED_KEY = |
104 | 111 | "hbase.bucketcache.combinedcache.enabled"; |
105 | 112 |
|
106 | 113 | public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads"; |
107 | | - public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = |
| 114 | + public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = |
108 | 115 | "hbase.bucketcache.writer.queuelength"; |
109 | 116 |
|
110 | 117 | /** |
@@ -177,6 +184,7 @@ private static enum ExternalBlockCaches { |
177 | 184 | memcached("org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache"); |
178 | 185 | // TODO(eclark): Consider more. Redis, etc. |
179 | 186 | Class<? extends BlockCache> clazz; |
| 187 | + @SuppressWarnings("unchecked") |
180 | 188 | ExternalBlockCaches(String clazzName) { |
181 | 189 | try { |
182 | 190 | clazz = (Class<? extends BlockCache>) Class.forName(clazzName); |
@@ -511,7 +519,9 @@ public boolean shouldCacheDataCompressed() { |
511 | 519 | * @return true if this {@link BlockCategory} should be compressed in blockcache, false otherwise |
512 | 520 | */ |
513 | 521 | public boolean shouldCacheCompressed(BlockCategory category) { |
514 | | - if (!isBlockCacheEnabled()) return false; |
| 522 | + if (!isBlockCacheEnabled()) { |
| 523 | + return false; |
| 524 | + } |
515 | 525 | switch (category) { |
516 | 526 | case DATA: |
517 | 527 | return this.cacheDataCompressed; |
@@ -614,28 +624,62 @@ public String toString() { |
614 | 624 | // Clear this if in tests you'd make more than one block cache instance. |
615 | 625 | @VisibleForTesting |
616 | 626 | static BlockCache GLOBAL_BLOCK_CACHE_INSTANCE; |
617 | | - private static LruBlockCache GLOBAL_L1_CACHE_INSTANCE = null; |
618 | | - private static BlockCache GLOBAL_L2_CACHE_INSTANCE = null; |
| 627 | + private static FirstLevelBlockCache GLOBAL_L1_CACHE_INSTANCE; |
| 628 | + private static BlockCache GLOBAL_L2_CACHE_INSTANCE; |
| 629 | + private static ForkJoinPool GLOBAL_FORKJOIN_POOL; |
619 | 630 |
|
620 | 631 | /** Boolean whether we have disabled the block cache entirely. */ |
621 | 632 | @VisibleForTesting |
622 | 633 | static boolean blockCacheDisabled = false; |
623 | 634 |
|
624 | 635 | /** |
625 | | - * @param c Configuration to use. |
626 | | - * @return An L1 instance. Currently an instance of LruBlockCache. |
| 636 | + * @param c Configuration to use |
| 637 | + * @return An L1 instance |
| 638 | + */ |
| 639 | + public static FirstLevelBlockCache getL1(final Configuration c) { |
| 640 | + long xmx = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); |
| 641 | + long l1CacheSize = HeapMemorySizeUtil.getFirstLevelCacheSize(c, xmx); |
| 642 | + return getL1(l1CacheSize, c); |
| 643 | + } |
| 644 | + |
| 645 | + /** |
| 646 | + * @param c Configuration to use |
| 647 | + * @param xmx Max heap memory |
| 648 | + * @return An L1 instance |
627 | 649 | */ |
628 | | - private static synchronized LruBlockCache getL1(final Configuration c) { |
| 650 | + |
| 651 | + private synchronized static FirstLevelBlockCache getL1(long cacheSize, Configuration c) { |
629 | 652 | if (GLOBAL_L1_CACHE_INSTANCE != null) return GLOBAL_L1_CACHE_INSTANCE; |
630 | | - final long lruCacheSize = HeapMemorySizeUtil.getLruCacheSize(c); |
631 | | - if (lruCacheSize < 0) { |
632 | | - blockCacheDisabled = true; |
| 653 | + if (cacheSize < 0) { |
| 654 | + return null; |
633 | 655 | } |
634 | | - if (blockCacheDisabled) return null; |
| 656 | + String policy = c.get(HFILE_BLOCK_CACHE_POLICY_KEY, HFILE_BLOCK_CACHE_POLICY_DEFAULT); |
635 | 657 | int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE); |
636 | | - LOG.info("Allocating LruBlockCache size=" + |
637 | | - StringUtils.byteDesc(lruCacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize)); |
638 | | - GLOBAL_L1_CACHE_INSTANCE = new LruBlockCache(lruCacheSize, blockSize, true, c); |
| 658 | + LOG.info("Allocating BlockCache size=" + |
| 659 | + StringUtils.byteDesc(cacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize)); |
| 660 | + if (policy.equalsIgnoreCase("LRU")) { |
| 661 | + GLOBAL_L1_CACHE_INSTANCE = new LruBlockCache(cacheSize, blockSize, true, c); |
| 662 | + } else if (policy.equalsIgnoreCase("TinyLFU")) { |
| 663 | + if (GLOBAL_FORKJOIN_POOL == null) { |
| 664 | + GLOBAL_FORKJOIN_POOL = new ForkJoinPool(); |
| 665 | + } |
| 666 | + Class<?> tinyLFUClass; |
| 667 | + try { |
| 668 | + tinyLFUClass = Class.forName("org.apache.hadoop.hbase.io.hfile.TinyLfuBlockCache"); |
| 669 | + GLOBAL_L1_CACHE_INSTANCE = (FirstLevelBlockCache) |
| 670 | + tinyLFUClass.getDeclaredConstructor(long.class, long.class, Executor.class, |
| 671 | + Configuration.class) |
| 672 | + .newInstance(cacheSize, blockSize, GLOBAL_FORKJOIN_POOL, c); |
| 673 | + } catch (Exception e) { |
| 674 | + throw new RuntimeException( |
| 675 | + "Unable to instantiate the TinyLfuBlockCache block cache policy." + |
| 676 | + "If you want to use TinyLFU you must build with JDK8+, run with JRE8+, and have both " + |
| 677 | + "the hbase-tinylfu-blockcache module and its dependency the caffiene library " + |
| 678 | + "installed into the classpath.", e); |
| 679 | + } |
| 680 | + } else { |
| 681 | + throw new IllegalArgumentException("Unknown block cache policy " + policy); |
| 682 | + } |
639 | 683 | return GLOBAL_L1_CACHE_INSTANCE; |
640 | 684 | } |
641 | 685 |
|
@@ -676,7 +720,7 @@ public CacheStats getL2Stats() { |
676 | 720 | } |
677 | 721 |
|
678 | 722 | private static BlockCache getExternalBlockcache(Configuration c) { |
679 | | - Class klass = null; |
| 723 | + Class<?> klass = null; |
680 | 724 |
|
681 | 725 | // Get the class, from the config. s |
682 | 726 | try { |
@@ -704,7 +748,9 @@ private static BlockCache getExternalBlockcache(Configuration c) { |
704 | 748 | private static BlockCache getBucketCache(Configuration c) { |
705 | 749 | // Check for L2. ioengine name must be non-null. |
706 | 750 | String bucketCacheIOEngineName = c.get(BUCKET_CACHE_IOENGINE_KEY, null); |
707 | | - if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) return null; |
| 751 | + if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) { |
| 752 | + return null; |
| 753 | + } |
708 | 754 |
|
709 | 755 | int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE); |
710 | 756 | final long bucketCacheSize = HeapMemorySizeUtil.getBucketCacheSize(c); |
@@ -762,33 +808,35 @@ private static BlockCache getBucketCache(Configuration c) { |
762 | 808 | * @return The block cache or <code>null</code>. |
763 | 809 | */ |
764 | 810 | public static synchronized BlockCache instantiateBlockCache(Configuration conf) { |
765 | | - if (GLOBAL_BLOCK_CACHE_INSTANCE != null) return GLOBAL_BLOCK_CACHE_INSTANCE; |
766 | | - if (blockCacheDisabled) return null; |
| 811 | + if (GLOBAL_BLOCK_CACHE_INSTANCE != null) { |
| 812 | + return GLOBAL_BLOCK_CACHE_INSTANCE; |
| 813 | + } |
| 814 | + if (blockCacheDisabled) { |
| 815 | + return null; |
| 816 | + } |
767 | 817 | if (conf.get(DEPRECATED_BLOCKCACHE_BLOCKSIZE_KEY) != null) { |
768 | 818 | LOG.warn("The config key " + DEPRECATED_BLOCKCACHE_BLOCKSIZE_KEY + |
769 | 819 | " is deprecated now, instead please use " + BLOCKCACHE_BLOCKSIZE_KEY +". " |
770 | 820 | + "In future release we will remove the deprecated config."); |
771 | 821 | } |
772 | | - LruBlockCache l1 = getL1(conf); |
773 | | - // blockCacheDisabled is set as a side-effect of getL1Internal(), so check it again after the call. |
774 | | - if (blockCacheDisabled) return null; |
| 822 | + FirstLevelBlockCache l1 = getL1(conf); |
775 | 823 | BlockCache l2 = getL2(conf); |
776 | 824 | if (l2 == null) { |
777 | 825 | GLOBAL_BLOCK_CACHE_INSTANCE = l1; |
778 | 826 | } else { |
779 | 827 | boolean useExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT); |
780 | | - boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, |
| 828 | + boolean combinedWithL1 = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, |
781 | 829 | DEFAULT_BUCKET_CACHE_COMBINED); |
782 | 830 | if (useExternal) { |
783 | 831 | GLOBAL_BLOCK_CACHE_INSTANCE = new InclusiveCombinedBlockCache(l1, l2); |
784 | 832 | } else { |
785 | | - if (combinedWithLru) { |
| 833 | + if (combinedWithL1) { |
786 | 834 | GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(l1, l2); |
787 | 835 | } else { |
788 | | - // L1 and L2 are not 'combined'. They are connected via the LruBlockCache victimhandler |
789 | | - // mechanism. It is a little ugly but works according to the following: when the |
790 | | - // background eviction thread runs, blocks evicted from L1 will go to L2 AND when we get |
791 | | - // a block from the L1 cache, if not in L1, we will search L2. |
| 836 | + // L1 and L2 are not 'combined'. They are connected via the FirstLevelBlockCache |
| 837 | + // victimhandler mechanism. It is a little ugly but works according to the following: |
| 838 | + // when the background eviction thread runs, blocks evicted from L1 will go to L2 AND when |
| 839 | + // we get a block from the L1 cache, if not in L1, we will search L2. |
792 | 840 | GLOBAL_BLOCK_CACHE_INSTANCE = l1; |
793 | 841 | } |
794 | 842 | } |
|
0 commit comments