Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
Expand Down Expand Up @@ -978,7 +979,7 @@ public static class BlockIndexWriter implements InlineBlockWriter {
private CacheConfig cacheConf;

/** Name to use for computing cache keys */
private String nameForCaching;
private Path pathForCaching;

/** Type of encoding used for index blocks in HFile */
private HFileIndexBlockEncoder indexBlockEncoder;
Expand All @@ -995,15 +996,15 @@ public BlockIndexWriter() {
* @param cacheConf used to determine when and how a block should be cached-on-write.
*/
public BlockIndexWriter(HFileBlock.Writer blockWriter, CacheConfig cacheConf,
String nameForCaching, HFileIndexBlockEncoder indexBlockEncoder) {
if ((cacheConf == null) != (nameForCaching == null)) {
Path pathForCaching, HFileIndexBlockEncoder indexBlockEncoder) {
if ((cacheConf == null) != (pathForCaching == null)) {
throw new IllegalArgumentException(
"Block cache and file name for " + "caching must be both specified or both null");
}

this.blockWriter = blockWriter;
this.cacheConf = cacheConf;
this.nameForCaching = nameForCaching;
this.pathForCaching = pathForCaching;
this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
this.minIndexNumEntries = HFileBlockIndex.DEFAULT_MIN_INDEX_NUM_ENTRIES;
this.indexBlockEncoder =
Expand Down Expand Up @@ -1070,7 +1071,7 @@ public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
if (cacheConf != null) {
cacheConf.getBlockCache().ifPresent(cache -> {
HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
cache.cacheBlock(new BlockCacheKey(nameForCaching, rootLevelIndexPos, true,
cache.cacheBlock(new BlockCacheKey(pathForCaching, rootLevelIndexPos, true,
blockForCaching.getBlockType()), blockForCaching);
});
}
Expand Down Expand Up @@ -1162,7 +1163,7 @@ private void writeIntermediateBlock(FSDataOutputStream out, BlockIndexChunk pare
cacheConf.getBlockCache().ifPresent(cache -> {
HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
cache.cacheBlock(
new BlockCacheKey(nameForCaching, beginOffset, true, blockForCaching.getBlockType()),
new BlockCacheKey(pathForCaching, beginOffset, true, blockForCaching.getBlockType()),
blockForCaching);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void run() {
// so we check first if the block exists on its in-memory index, if so, we just
// update the offset and move on to the next block without actually going read all
// the way to the cache.
BlockCacheKey cacheKey = new BlockCacheKey(name, offset);
BlockCacheKey cacheKey = new BlockCacheKey(path, offset, true, BlockType.DATA);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you know this is a Data block? At this point, I don't think this can be assumed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor used earlier(BlockCacheKey(name, offset);) sets these parameters internally. Hence, I used these parameters:

public BlockCacheKey(String hfileName, long offset) {
this(hfileName, offset, true, BlockType.DATA);
}

if (cache.isAlreadyCached(cacheKey).orElse(false)) {
// Right now, isAlreadyCached is only supported by BucketCache, which should
// always cache data blocks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1201,7 +1201,7 @@ public HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws
// Check cache for block. If found return.
long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
BlockCacheKey cacheKey =
new BlockCacheKey(name, metaBlockOffset, this.isPrimaryReplicaReader(), BlockType.META);
new BlockCacheKey(path, metaBlockOffset, this.isPrimaryReplicaReader(), BlockType.META);

cacheBlock &= cacheConf.shouldCacheBlockOnRead(BlockType.META.getCategory());
HFileBlock cachedBlock =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ protected void finishInit(final Configuration conf) {
// Data block index writer
boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : null, indexBlockEncoder);
cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? path : null, indexBlockEncoder);
dataBlockIndexWriter.setMaxChunkSize(HFileBlockIndex.getMaxChunkSize(conf));
dataBlockIndexWriter.setMinIndexNumEntries(HFileBlockIndex.getMinIndexNumEntries(conf));
inlineBlockWriters.add(dataBlockIndexWriter);
Expand Down Expand Up @@ -556,7 +556,7 @@ private void doCacheOnWrite(long offset) {
cacheConf.getBlockCache().ifPresent(cache -> {
HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
try {
cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
cache.cacheBlock(new BlockCacheKey(path, offset, true, cacheFormatBlock.getBlockType()),
cacheFormatBlock, cacheConf.isInMemory(), true);
} finally {
// refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
import org.apache.hadoop.hbase.regionserver.DataTieringManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.IdReadWriteLock;
Expand Down Expand Up @@ -141,8 +142,8 @@ public class BucketCache implements BlockCache, HeapSize {
/** Statistics thread */
private static final int statThreadPeriod = 5 * 60;

final static int DEFAULT_WRITER_THREADS = 3;
final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
public final static int DEFAULT_WRITER_THREADS = 3;
public final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;

// Store/read block data
transient final IOEngine ioEngine;
Expand Down Expand Up @@ -935,6 +936,16 @@ void freeSpace(final String why) {
}
try {
freeInProgress = true;

// Check the list of files to determine the cold files which can be readily evicted.
Set<String> coldFiles =
DataTieringManager.getInstance().getColdDataFiles(backingMap.keySet());
if (coldFiles != null) {
for(String fileName : coldFiles) {
evictBlocksByHfileName(fileName);
}
}

Comment on lines +939 to +948
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do like this here, but then can we think if we can modify DataTieringManager somehow to track when we have no CF with TIME_RANGE type? That way we could avoid this extra loops through the whole set of block keys unnecessarily.

Alternatively, we could follow the BucketEntryGroup logic starting from #987. We could define a COLD priority group and add that as the first group in the priority queue. That way we don't need this extra loop over the whole block sets to find out which blocks are cold. We would leverage the already existing loop from line #996 for that.

long bytesToFreeWithoutExtra = 0;
// Calculate free byte for each bucketSizeinfo
StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,28 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.function.Function;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockPriority;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.regionserver.DataTieringManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;

import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
final class BucketProtoUtils {

private static final Logger LOG = LoggerFactory.getLogger(BucketProtoUtils.class);
private BucketProtoUtils() {

}
Expand Down Expand Up @@ -130,10 +136,30 @@ static Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCac
ConcurrentHashMap<BlockCacheKey, BucketEntry> result = new ConcurrentHashMap<>();
NavigableSet<BlockCacheKey> resultSet = new ConcurrentSkipListSet<>(Comparator
.comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset));

Map<String, Path> allFilePaths = null;
DataTieringManager dataTieringManager;
try {
dataTieringManager = DataTieringManager.getInstance();
allFilePaths = dataTieringManager.getAllFilesList();
} catch (IllegalStateException e) {
// Data-Tiering manager has not been set up.
// Ignore the error and proceed with the normal flow.
LOG.error("Error while getting DataTieringManager instance: {}", e.getMessage());
Comment on lines +146 to +148
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this to be expected often? If so, let's lower the log level to DEBUG. If not, it seems we can still continue RS normal functioning, so it should rather be a WARNING than an ERROR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack!

}

for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) {
BucketCacheProtos.BlockCacheKey protoKey = entry.getKey();
BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(),
protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType()));

BlockCacheKey key;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need handling with and without paths in the BlockCacheKey creation


if(allFilePaths != null && allFilePaths.containsKey(protoKey.getHfilename())) {
key = new BlockCacheKey(allFilePaths.get(protoKey.getHfilename()), protoKey.getOffset(),
protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType()));
} else {
key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(),
protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType()));
}
BucketCacheProtos.BucketEntry protoValue = entry.getValue();
// TODO:We use ByteBuffAllocator.HEAP here, because we could not get the ByteBuffAllocator
// which created by RpcServer elegantly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.OptionalLong;
Expand Down Expand Up @@ -219,4 +220,24 @@ private long getDataTieringHotDataAge(Configuration conf) {
return Long.parseLong(
conf.get(DATATIERING_HOT_DATA_AGE_KEY, String.valueOf(DEFAULT_DATATIERING_HOT_DATA_AGE)));
}

/*
* This API browses through all the regions and returns a map of all file names
* pointing to their paths.
* @return Map with entries containing a mapping from filename to filepath
*/
public Map<String, Path> getAllFilesList() {
Map<String, Path> allFileList = new HashMap<>();
for (HRegion r : this.onlineRegions.values()) {
for (HStore hStore : r.getStores()) {
Configuration conf = hStore.getReadOnlyConfiguration();
for (HStoreFile hStoreFile : hStore.getStorefiles()) {
String hFileName =
hStoreFile.getFileInfo().getHFileInfo().getHFileContext().getHFileName();
allFileList.put(hFileName, hStoreFile.getPath());
}
}
}
return allFileList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,8 @@ public HRegionServer(final Configuration conf) throws IOException {

regionServerAccounting = new RegionServerAccounting(conf);

blockCache = BlockCacheFactory.createBlockCache(conf);
DataTieringManager.instantiate(onlineRegions);
blockCache = BlockCacheFactory.createBlockCache(conf);
mobFileCache = new MobFileCache(conf);

rsSnapshotVerifier = new RSSnapshotVerifier(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
Expand Down Expand Up @@ -145,7 +146,7 @@ private CombinedBlockCache createCombinedBlockCache() {
public void testCombinedBlockCacheStats(BlockType type, int expectedL1Miss, int expectedL2Miss)
throws Exception {
CombinedBlockCache blockCache = createCombinedBlockCache();
BlockCacheKey key = new BlockCacheKey("key1", 0, false, type);
BlockCacheKey key = new BlockCacheKey(new Path("key1"), 0, false, type);
int size = 100;
int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
byte[] byteArr = new byte[length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
*/
package org.apache.hadoop.hbase.regionserver;

import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_WRITER_THREADS;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;

import java.io.IOException;
Expand All @@ -42,11 +46,12 @@
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -92,13 +97,22 @@ public class TestDataTieringManager {
private static DataTieringManager dataTieringManager;
private static List<HStoreFile> hStoreFiles;

final static long capacitySize = 32 * 1024 * 1024;
final static int writeThreads = DEFAULT_WRITER_THREADS;
final static int writerQLen = DEFAULT_WRITER_QUEUE_ITEMS;
final static int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
@BeforeClass
public static void setupBeforeClass() throws Exception {
testDir = TEST_UTIL.getDataTestDir(TestDataTieringManager.class.getSimpleName());
defaultConf = TEST_UTIL.getConfiguration();
fs = HFileSystem.get(defaultConf);
BlockCache blockCache = BlockCacheFactory.createBlockCache(defaultConf);
fs.mkdirs(testDir);

BlockCache blockCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
cacheConf = new CacheConfig(defaultConf, blockCache);

setupOnlineRegions();
DataTieringManager.instantiate(testOnlineRegions);
dataTieringManager = DataTieringManager.getInstance();
Expand Down Expand Up @@ -218,6 +232,49 @@ public void testColdDataFiles() {
}
}

@Test
public void testAllDataFiles() {
Set<BlockCacheKey> allCachedBlocks = new HashSet<>();
for (HStoreFile file : hStoreFiles) {
allCachedBlocks.add(new BlockCacheKey(file.getPath(), 0, true, BlockType.DATA));
}
Map<String, Path> allFilePaths = dataTieringManager.getAllFilesList();
assertEquals(hStoreFiles.size(), allFilePaths.size());
}

public void testAllDataFilesAfterRestart() throws Exception {
Set<BlockCacheKey> cacheKeys = new HashSet<>();
// Create Cache keys
for (HStoreFile file : hStoreFiles) {
cacheKeys.add(new BlockCacheKey(file.getPath(), 0, true, BlockType.DATA));
}
// Create dummy data to be cached.
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4);
BucketCache cache = (BucketCache) cacheConf.getBlockCache().get();
int blocksIter = 0;
for(BlockCacheKey key: cacheKeys) {
cache.cacheBlock(key, blocks[blocksIter++].getBlock());
// Ensure that the block is persisted to the file.
while (!cache.getBackingMap().containsKey(key)) {
Thread.sleep(100);
}
}

// shutting down the cache persists the backmap to disk.
cache.shutdown();

// create a new cache which is populated from the disk which simulates a server restart.
BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);

Set<BlockCacheKey> keySet = newBucketCache.getBackingMap().keySet();
assertEquals(hStoreFiles.size(), keySet.size());
for(BlockCacheKey key: keySet) {
assertNotNull(key.getFilePath());
}
}

private void testDataTieringMethodWithPath(DataTieringMethodCallerWithPath caller, Path path,
boolean expectedResult, DataTieringException exception) {
try {
Expand Down