Skip to content

Commit 8600992

Browse files
committed
HBASE-28468: Integration of time-based priority caching in eviction logic.
The time-based priority caching relies on the presence of file paths in the block-cache key. However, in case of the persitent cache, the file paths are not persisted in the files. Hence, when the region server is restarted, the block cache keys need to be repopulated with the file paths. This change addresses the following: 1. Always populate the block-cache key with path during its creation. 2. Fetch the file paths corresponding to the file names of the block-cache key during restarts. 3. Use the Data-Tiering-Manager APIs during cache-full scenario to evict the cold file blocks. Change-Id: Ice19bd41064c73538ee3d3755057813a531b9171
1 parent a9d3170 commit 8600992

File tree

11 files changed

+136
-18
lines changed

11 files changed

+136
-18
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.concurrent.atomic.AtomicReference;
3131
import org.apache.hadoop.conf.Configuration;
3232
import org.apache.hadoop.fs.FSDataOutputStream;
33+
import org.apache.hadoop.fs.Path;
3334
import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
3435
import org.apache.hadoop.hbase.Cell;
3536
import org.apache.hadoop.hbase.CellComparator;
@@ -978,7 +979,7 @@ public static class BlockIndexWriter implements InlineBlockWriter {
978979
private CacheConfig cacheConf;
979980

980981
/** Name to use for computing cache keys */
981-
private String nameForCaching;
982+
private Path pathForCaching;
982983

983984
/** Type of encoding used for index blocks in HFile */
984985
private HFileIndexBlockEncoder indexBlockEncoder;
@@ -995,15 +996,15 @@ public BlockIndexWriter() {
995996
* @param cacheConf used to determine when and how a block should be cached-on-write.
996997
*/
997998
public BlockIndexWriter(HFileBlock.Writer blockWriter, CacheConfig cacheConf,
998-
String nameForCaching, HFileIndexBlockEncoder indexBlockEncoder) {
999-
if ((cacheConf == null) != (nameForCaching == null)) {
999+
Path pathForCaching, HFileIndexBlockEncoder indexBlockEncoder) {
1000+
if ((cacheConf == null) != (pathForCaching == null)) {
10001001
throw new IllegalArgumentException(
10011002
"Block cache and file name for " + "caching must be both specified or both null");
10021003
}
10031004

10041005
this.blockWriter = blockWriter;
10051006
this.cacheConf = cacheConf;
1006-
this.nameForCaching = nameForCaching;
1007+
this.pathForCaching = pathForCaching;
10071008
this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
10081009
this.minIndexNumEntries = HFileBlockIndex.DEFAULT_MIN_INDEX_NUM_ENTRIES;
10091010
this.indexBlockEncoder =
@@ -1070,7 +1071,7 @@ public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
10701071
if (cacheConf != null) {
10711072
cacheConf.getBlockCache().ifPresent(cache -> {
10721073
HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
1073-
cache.cacheBlock(new BlockCacheKey(nameForCaching, rootLevelIndexPos, true,
1074+
cache.cacheBlock(new BlockCacheKey(pathForCaching, rootLevelIndexPos, true,
10741075
blockForCaching.getBlockType()), blockForCaching);
10751076
});
10761077
}
@@ -1162,7 +1163,7 @@ private void writeIntermediateBlock(FSDataOutputStream out, BlockIndexChunk pare
11621163
cacheConf.getBlockCache().ifPresent(cache -> {
11631164
HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
11641165
cache.cacheBlock(
1165-
new BlockCacheKey(nameForCaching, beginOffset, true, blockForCaching.getBlockType()),
1166+
new BlockCacheKey(pathForCaching, beginOffset, true, blockForCaching.getBlockType()),
11661167
blockForCaching);
11671168
});
11681169
}

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public void run() {
7979
// so we check first if the block exists on its in-memory index, if so, we just
8080
// update the offset and move on to the next block without actually going read all
8181
// the way to the cache.
82-
BlockCacheKey cacheKey = new BlockCacheKey(name, offset);
82+
BlockCacheKey cacheKey = new BlockCacheKey(path, offset, true, BlockType.DATA);
8383
if (cache.isAlreadyCached(cacheKey).orElse(false)) {
8484
// Right now, isAlreadyCached is only supported by BucketCache, which should
8585
// always cache data blocks.

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1201,7 +1201,7 @@ public HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws
12011201
// Check cache for block. If found return.
12021202
long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
12031203
BlockCacheKey cacheKey =
1204-
new BlockCacheKey(name, metaBlockOffset, this.isPrimaryReplicaReader(), BlockType.META);
1204+
new BlockCacheKey(path, metaBlockOffset, this.isPrimaryReplicaReader(), BlockType.META);
12051205

12061206
cacheBlock &= cacheConf.shouldCacheBlockOnRead(BlockType.META.getCategory());
12071207
HFileBlock cachedBlock =

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ protected void finishInit(final Configuration conf) {
315315
// Data block index writer
316316
boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
317317
dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
318-
cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : null, indexBlockEncoder);
318+
cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? path : null, indexBlockEncoder);
319319
dataBlockIndexWriter.setMaxChunkSize(HFileBlockIndex.getMaxChunkSize(conf));
320320
dataBlockIndexWriter.setMinIndexNumEntries(HFileBlockIndex.getMinIndexNumEntries(conf));
321321
inlineBlockWriters.add(dataBlockIndexWriter);
@@ -556,7 +556,7 @@ private void doCacheOnWrite(long offset) {
556556
cacheConf.getBlockCache().ifPresent(cache -> {
557557
HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
558558
try {
559-
cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
559+
cache.cacheBlock(new BlockCacheKey(path, offset, true, cacheFormatBlock.getBlockType()),
560560
cacheFormatBlock, cacheConf.isInMemory(), true);
561561
} finally {
562562
// refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import org.apache.hadoop.hbase.nio.ByteBuff;
7777
import org.apache.hadoop.hbase.nio.RefCnt;
7878
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
79+
import org.apache.hadoop.hbase.regionserver.DataTieringManager;
7980
import org.apache.hadoop.hbase.util.Bytes;
8081
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
8182
import org.apache.hadoop.hbase.util.IdReadWriteLock;
@@ -141,8 +142,8 @@ public class BucketCache implements BlockCache, HeapSize {
141142
/** Statistics thread */
142143
private static final int statThreadPeriod = 5 * 60;
143144

144-
final static int DEFAULT_WRITER_THREADS = 3;
145-
final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
145+
public final static int DEFAULT_WRITER_THREADS = 3;
146+
public final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
146147

147148
// Store/read block data
148149
transient final IOEngine ioEngine;
@@ -934,6 +935,16 @@ void freeSpace(final String why) {
934935
}
935936
try {
936937
freeInProgress = true;
938+
939+
// Check the list of files to determine the cold files which can be readily evicted.
940+
Set<String> coldFiles =
941+
DataTieringManager.getInstance().getColdDataFiles(backingMap.keySet());
942+
if (coldFiles != null) {
943+
for(String fileName : coldFiles) {
944+
evictBlocksByHfileName(fileName);
945+
}
946+
}
947+
937948
long bytesToFreeWithoutExtra = 0;
938949
// Calculate free byte for each bucketSizeinfo
939950
StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null;

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,28 @@
2525
import java.util.concurrent.ConcurrentHashMap;
2626
import java.util.concurrent.ConcurrentSkipListSet;
2727
import java.util.function.Function;
28+
import org.apache.hadoop.fs.Path;
2829
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
2930
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
3031
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
3132
import org.apache.hadoop.hbase.io.hfile.BlockPriority;
3233
import org.apache.hadoop.hbase.io.hfile.BlockType;
3334
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
3435
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
36+
import org.apache.hadoop.hbase.regionserver.DataTieringManager;
3537
import org.apache.hadoop.hbase.util.Pair;
3638
import org.apache.yetus.audience.InterfaceAudience;
3739

3840
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
3941

4042
import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
4145

4246
@InterfaceAudience.Private
4347
final class BucketProtoUtils {
48+
49+
private static final Logger LOG = LoggerFactory.getLogger(BucketProtoUtils.class);
4450
private BucketProtoUtils() {
4551

4652
}
@@ -130,10 +136,30 @@ static Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCac
130136
ConcurrentHashMap<BlockCacheKey, BucketEntry> result = new ConcurrentHashMap<>();
131137
NavigableSet<BlockCacheKey> resultSet = new ConcurrentSkipListSet<>(Comparator
132138
.comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset));
139+
140+
Map<String, Path> allFilePaths = null;
141+
DataTieringManager dataTieringManager;
142+
try {
143+
dataTieringManager = DataTieringManager.getInstance();
144+
allFilePaths = dataTieringManager.getAllFilesList();
145+
} catch (IllegalStateException e) {
146+
// Data-Tiering manager has not been set up.
147+
// Ignore the error and proceed with the normal flow.
148+
LOG.error("Error while getting DataTieringManager instance: {}", e.getMessage());
149+
}
150+
133151
for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) {
134152
BucketCacheProtos.BlockCacheKey protoKey = entry.getKey();
135-
BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(),
136-
protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType()));
153+
154+
BlockCacheKey key;
155+
156+
if(allFilePaths != null && allFilePaths.containsKey(protoKey.getHfilename())) {
157+
key = new BlockCacheKey(allFilePaths.get(protoKey.getHfilename()), protoKey.getOffset(),
158+
protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType()));
159+
} else {
160+
key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(),
161+
protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType()));
162+
}
137163
BucketCacheProtos.BucketEntry protoValue = entry.getValue();
138164
// TODO:We use ByteBuffAllocator.HEAP here, because we could not get the ByteBuffAllocator
139165
// which created by RpcServer elegantly.

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.regionserver;
1919

20+
import java.util.HashMap;
2021
import java.util.HashSet;
2122
import java.util.Map;
2223
import java.util.OptionalLong;
@@ -219,4 +220,24 @@ private long getDataTieringHotDataAge(Configuration conf) {
219220
return Long.parseLong(
220221
conf.get(DATATIERING_HOT_DATA_AGE_KEY, String.valueOf(DEFAULT_DATATIERING_HOT_DATA_AGE)));
221222
}
223+
224+
/*
225+
* This API browses through all the regions and returns a map of all file names
226+
* pointing to their paths.
227+
* @return Map with entries containing a mapping from filename to filepath
228+
*/
229+
public Map<String, Path> getAllFilesList() {
230+
Map<String, Path> allFileList = new HashMap<>();
231+
for (HRegion r : this.onlineRegions.values()) {
232+
for (HStore hStore : r.getStores()) {
233+
Configuration conf = hStore.getReadOnlyConfiguration();
234+
for (HStoreFile hStoreFile : hStore.getStorefiles()) {
235+
String hFileName =
236+
hStoreFile.getFileInfo().getHFileInfo().getHFileContext().getHFileName();
237+
allFileList.put(hFileName, hStoreFile.getPath());
238+
}
239+
}
240+
}
241+
return allFileList;
242+
}
222243
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,8 +530,8 @@ public HRegionServer(final Configuration conf) throws IOException {
530530

531531
regionServerAccounting = new RegionServerAccounting(conf);
532532

533-
blockCache = BlockCacheFactory.createBlockCache(conf);
534533
DataTieringManager.instantiate(onlineRegions);
534+
blockCache = BlockCacheFactory.createBlockCache(conf);
535535
mobFileCache = new MobFileCache(conf);
536536

537537
rsSnapshotVerifier = new RSSnapshotVerifier(conf);

hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import java.nio.ByteBuffer;
2626
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.fs.Path;
2728
import org.apache.hadoop.hbase.HBaseClassTestRule;
2829
import org.apache.hadoop.hbase.HBaseTestingUtil;
2930
import org.apache.hadoop.hbase.HConstants;
@@ -145,7 +146,7 @@ private CombinedBlockCache createCombinedBlockCache() {
145146
public void testCombinedBlockCacheStats(BlockType type, int expectedL1Miss, int expectedL2Miss)
146147
throws Exception {
147148
CombinedBlockCache blockCache = createCombinedBlockCache();
148-
BlockCacheKey key = new BlockCacheKey("key1", 0, false, type);
149+
BlockCacheKey key = new BlockCacheKey(new Path("key1"), 0, false, type);
149150
int size = 100;
150151
int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
151152
byte[] byteArr = new byte[length];

hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ public void testBucketCacheEvictByHFileAfterRecovery() throws Exception {
114114
Path testDir = TEST_UTIL.getDataTestDir();
115115
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
116116
Configuration conf = HBaseConfiguration.create();
117+
117118
// Disables the persister thread by setting its interval to MAX_VALUE
118119
conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
119120
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };

0 commit comments

Comments
 (0)