diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index 592c19c866cf..d6ff22e17a7a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; @@ -978,7 +979,7 @@ public static class BlockIndexWriter implements InlineBlockWriter { private CacheConfig cacheConf; /** Name to use for computing cache keys */ - private String nameForCaching; + private Path pathForCaching; /** Type of encoding used for index blocks in HFile */ private HFileIndexBlockEncoder indexBlockEncoder; @@ -995,15 +996,15 @@ public BlockIndexWriter() { * @param cacheConf used to determine when and how a block should be cached-on-write. */ public BlockIndexWriter(HFileBlock.Writer blockWriter, CacheConfig cacheConf, - String nameForCaching, HFileIndexBlockEncoder indexBlockEncoder) { - if ((cacheConf == null) != (nameForCaching == null)) { + Path pathForCaching, HFileIndexBlockEncoder indexBlockEncoder) { + if ((cacheConf == null) != (pathForCaching == null)) { throw new IllegalArgumentException( "Block cache and file name for " + "caching must be both specified or both null"); } this.blockWriter = blockWriter; this.cacheConf = cacheConf; - this.nameForCaching = nameForCaching; + this.pathForCaching = pathForCaching; this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE; this.minIndexNumEntries = HFileBlockIndex.DEFAULT_MIN_INDEX_NUM_ENTRIES; this.indexBlockEncoder = @@ -1070,7 +1071,7 @@ public long writeIndexBlocks(FSDataOutputStream out) throws IOException { if (cacheConf != null) { cacheConf.getBlockCache().ifPresent(cache -> { HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf); - cache.cacheBlock(new BlockCacheKey(nameForCaching, rootLevelIndexPos, true, + cache.cacheBlock(new BlockCacheKey(pathForCaching, rootLevelIndexPos, true, blockForCaching.getBlockType()), blockForCaching); }); } @@ -1162,7 +1163,7 @@ private void writeIntermediateBlock(FSDataOutputStream out, BlockIndexChunk pare cacheConf.getBlockCache().ifPresent(cache -> { HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf); cache.cacheBlock( - new BlockCacheKey(nameForCaching, beginOffset, true, blockForCaching.getBlockType()), + new BlockCacheKey(pathForCaching, beginOffset, true, blockForCaching.getBlockType()), blockForCaching); }); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index 6063ffe68891..6d491a3aa4bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -79,7 +79,7 @@ public void run() { // so we check first if the block exists on its in-memory index, if so, we just // update the offset and move on to the next block without actually going read all // the way to the cache. - BlockCacheKey cacheKey = new BlockCacheKey(name, offset); + BlockCacheKey cacheKey = new BlockCacheKey(path, offset, true, BlockType.DATA); if (cache.isAlreadyCached(cacheKey).orElse(false)) { // Right now, isAlreadyCached is only supported by BucketCache, which should // always cache data blocks. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index e0f27af71458..e2eeb7a281ba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1201,7 +1201,7 @@ public HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws // Check cache for block. If found return. long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block); BlockCacheKey cacheKey = - new BlockCacheKey(name, metaBlockOffset, this.isPrimaryReplicaReader(), BlockType.META); + new BlockCacheKey(path, metaBlockOffset, this.isPrimaryReplicaReader(), BlockType.META); cacheBlock &= cacheConf.shouldCacheBlockOnRead(BlockType.META.getCategory()); HFileBlock cachedBlock = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index d2dfaf62106a..786797c43908 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -315,7 +315,7 @@ protected void finishInit(final Configuration conf) { // Data block index writer boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter, - cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : null, indexBlockEncoder); + cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? path : null, indexBlockEncoder); dataBlockIndexWriter.setMaxChunkSize(HFileBlockIndex.getMaxChunkSize(conf)); dataBlockIndexWriter.setMinIndexNumEntries(HFileBlockIndex.getMinIndexNumEntries(conf)); inlineBlockWriters.add(dataBlockIndexWriter); @@ -556,7 +556,7 @@ private void doCacheOnWrite(long offset) { cacheConf.getBlockCache().ifPresent(cache -> { HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf); try { - cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()), + cache.cacheBlock(new BlockCacheKey(path, offset, true, cacheFormatBlock.getBlockType()), cacheFormatBlock, cacheConf.isInMemory(), true); } finally { // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 71bfc757e51e..cc58d17c3b21 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.RefCnt; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; +import org.apache.hadoop.hbase.regionserver.DataTieringManager; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.IdReadWriteLock; @@ -141,8 +142,8 @@ public class BucketCache implements BlockCache, HeapSize { /** Statistics thread */ private static final int statThreadPeriod = 5 * 60; - final static int DEFAULT_WRITER_THREADS = 3; - final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; + public final static int DEFAULT_WRITER_THREADS = 3; + public final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; // Store/read block data transient final IOEngine ioEngine; @@ -935,6 +936,16 @@ void freeSpace(final String why) { } try { freeInProgress = true; + + // Check the list of files to determine the cold files which can be readily evicted. + Set coldFiles = + DataTieringManager.getInstance().getColdDataFiles(backingMap.keySet()); + if (coldFiles != null) { + for(String fileName : coldFiles) { + evictBlocksByHfileName(fileName); + } + } + long bytesToFreeWithoutExtra = 0; // Calculate free byte for each bucketSizeinfo StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java index 4b42414fb9c5..44dc74929547 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.function.Function; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; @@ -32,15 +33,20 @@ import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.regionserver.DataTieringManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @InterfaceAudience.Private final class BucketProtoUtils { + + private static final Logger LOG = LoggerFactory.getLogger(BucketProtoUtils.class); private BucketProtoUtils() { } @@ -130,10 +136,30 @@ static Pair, NavigableSet result = new ConcurrentHashMap<>(); NavigableSet resultSet = new ConcurrentSkipListSet<>(Comparator .comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset)); + + Map allFilePaths = null; + DataTieringManager dataTieringManager; + try { + dataTieringManager = DataTieringManager.getInstance(); + allFilePaths = dataTieringManager.getAllFilesList(); + } catch (IllegalStateException e) { + // Data-Tiering manager has not been set up. + // Ignore the error and proceed with the normal flow. + LOG.error("Error while getting DataTieringManager instance: {}", e.getMessage()); + } + for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) { BucketCacheProtos.BlockCacheKey protoKey = entry.getKey(); - BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(), - protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType())); + + BlockCacheKey key; + + if(allFilePaths != null && allFilePaths.containsKey(protoKey.getHfilename())) { + key = new BlockCacheKey(allFilePaths.get(protoKey.getHfilename()), protoKey.getOffset(), + protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType())); + } else { + key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(), + protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType())); + } BucketCacheProtos.BucketEntry protoValue = entry.getValue(); // TODO:We use ByteBuffAllocator.HEAP here, because we could not get the ByteBuffAllocator // which created by RpcServer elegantly. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java index 0bc04ddc428b..28f0ddeebca2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.OptionalLong; @@ -219,4 +220,24 @@ private long getDataTieringHotDataAge(Configuration conf) { return Long.parseLong( conf.get(DATATIERING_HOT_DATA_AGE_KEY, String.valueOf(DEFAULT_DATATIERING_HOT_DATA_AGE))); } + + /* + * This API browses through all the regions and returns a map of all file names + * pointing to their paths. + * @return Map with entries containing a mapping from filename to filepath + */ + public Map getAllFilesList() { + Map allFileList = new HashMap<>(); + for (HRegion r : this.onlineRegions.values()) { + for (HStore hStore : r.getStores()) { + Configuration conf = hStore.getReadOnlyConfiguration(); + for (HStoreFile hStoreFile : hStore.getStorefiles()) { + String hFileName = + hStoreFile.getFileInfo().getHFileInfo().getHFileContext().getHFileName(); + allFileList.put(hFileName, hStoreFile.getPath()); + } + } + } + return allFileList; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index e4da74f78cf2..f55d35e69cac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -530,8 +530,8 @@ public HRegionServer(final Configuration conf) throws IOException { regionServerAccounting = new RegionServerAccounting(conf); - blockCache = BlockCacheFactory.createBlockCache(conf); DataTieringManager.instantiate(onlineRegions); + blockCache = BlockCacheFactory.createBlockCache(conf); mobFileCache = new MobFileCache(conf); rsSnapshotVerifier = new RSSnapshotVerifier(conf); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java index b9bca1ba6b4e..413e3607345c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HConstants; @@ -145,7 +146,7 @@ private CombinedBlockCache createCombinedBlockCache() { public void testCombinedBlockCacheStats(BlockType type, int expectedL1Miss, int expectedL2Miss) throws Exception { CombinedBlockCache blockCache = createCombinedBlockCache(); - BlockCacheKey key = new BlockCacheKey("key1", 0, false, type); + BlockCacheKey key = new BlockCacheKey(new Path("key1"), 0, false, type); int size = 100; int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; byte[] byteArr = new byte[length]; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java index afb5862a8a46..91497bbd3675 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java @@ -17,7 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_WRITER_THREADS; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import java.io.IOException; @@ -42,11 +46,12 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.hfile.BlockCache; -import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -92,13 +97,22 @@ public class TestDataTieringManager { private static DataTieringManager dataTieringManager; private static List hStoreFiles; + final static long capacitySize = 32 * 1024 * 1024; + final static int writeThreads = DEFAULT_WRITER_THREADS; + final static int writerQLen = DEFAULT_WRITER_QUEUE_ITEMS; + final static int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; @BeforeClass public static void setupBeforeClass() throws Exception { testDir = TEST_UTIL.getDataTestDir(TestDataTieringManager.class.getSimpleName()); defaultConf = TEST_UTIL.getConfiguration(); fs = HFileSystem.get(defaultConf); - BlockCache blockCache = BlockCacheFactory.createBlockCache(defaultConf); + fs.mkdirs(testDir); + + BlockCache blockCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", + DEFAULT_ERROR_TOLERATION_DURATION, defaultConf); cacheConf = new CacheConfig(defaultConf, blockCache); + setupOnlineRegions(); DataTieringManager.instantiate(testOnlineRegions); dataTieringManager = DataTieringManager.getInstance(); @@ -218,6 +232,49 @@ public void testColdDataFiles() { } } + @Test + public void testAllDataFiles() { + Set allCachedBlocks = new HashSet<>(); + for (HStoreFile file : hStoreFiles) { + allCachedBlocks.add(new BlockCacheKey(file.getPath(), 0, true, BlockType.DATA)); + } + Map allFilePaths = dataTieringManager.getAllFilesList(); + assertEquals(hStoreFiles.size(), allFilePaths.size()); + } + + public void testAllDataFilesAfterRestart() throws Exception { + Set cacheKeys = new HashSet<>(); + // Create Cache keys + for (HStoreFile file : hStoreFiles) { + cacheKeys.add(new BlockCacheKey(file.getPath(), 0, true, BlockType.DATA)); + } + // Create dummy data to be cached. + CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); + BucketCache cache = (BucketCache) cacheConf.getBlockCache().get(); + int blocksIter = 0; + for(BlockCacheKey key: cacheKeys) { + cache.cacheBlock(key, blocks[blocksIter++].getBlock()); + // Ensure that the block is persisted to the file. + while (!cache.getBackingMap().containsKey(key)) { + Thread.sleep(100); + } + } + + // shutting down the cache persists the backmap to disk. + cache.shutdown(); + + // create a new cache which is populated from the disk which simulates a server restart. + BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", + DEFAULT_ERROR_TOLERATION_DURATION, defaultConf); + + Set keySet = newBucketCache.getBackingMap().keySet(); + assertEquals(hStoreFiles.size(), keySet.size()); + for(BlockCacheKey key: keySet) { + assertNotNull(key.getFilePath()); + } + } + private void testDataTieringMethodWithPath(DataTieringMethodCallerWithPath caller, Path path, boolean expectedResult, DataTieringException exception) { try {