Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,7 @@ void freeSpace(final String why) {
long bytesToFreeWithExtra =
(long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor));

long bytesFreed = 0;
// Instantiate priority buckets
BucketEntryGroup bucketSingle =
new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor));
Expand All @@ -982,9 +983,36 @@ void freeSpace(final String why) {
BucketEntryGroup bucketMemory =
new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor));

// Check the list of files to determine the cold files which can be readily evicted.
Map<String, String> coldFiles = null;
try {
DataTieringManager dataTieringManager = DataTieringManager.getInstance();
coldFiles = dataTieringManager.getColdFilesList();
} catch (IllegalStateException e) {
LOG.warn("Data Tiering Manager is not set. Ignore time-based block evictions.");
}
// Scan entire map putting bucket entry into appropriate bucket entry
// group
for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
if (
coldFiles != null && coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName())
) {
int freedBlockSize = bucketEntryWithKey.getValue().getLength();
if (evictBlockIfNoRpcReferenced(bucketEntryWithKey.getKey())) {
bytesFreed += freedBlockSize;
}
if (bytesFreed >= bytesToFreeWithExtra) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Bucket cache free space completed; required: {} freed: {} from cold data blocks.",
bytesToFreeWithExtra, StringUtils.byteDesc(bytesFreed));
}
// Sufficient bytes have been freed.
return;
}
continue;
}

switch (bucketEntryWithKey.getValue().getPriority()) {
case SINGLE: {
bucketSingle.add(bucketEntryWithKey);
Expand All @@ -1001,6 +1029,21 @@ void freeSpace(final String why) {
}
}

// Check if the cold file eviction is sufficient to create enough space.
bytesToFreeWithExtra -= bytesFreed;
if (bytesToFreeWithExtra <= 0) {
LOG.debug("Bucket cache free space completed; freed space : {} bytes of cold data blocks.",
StringUtils.byteDesc(bytesFreed));
return;
}

if (LOG.isDebugEnabled()) {
LOG.debug(
"Bucket cache free space completed; freed space : {} "
+ "bytes of cold data blocks. {} more bytes required to be freed.",
StringUtils.byteDesc(bytesFreed), bytesToFreeWithExtra);
}

PriorityQueue<BucketEntryGroup> bucketQueue =
new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow));

Expand All @@ -1009,8 +1052,6 @@ void freeSpace(final String why) {
bucketQueue.add(bucketMemory);

int remainingBuckets = bucketQueue.size();
long bytesFreed = 0;

BucketEntryGroup bucketGroup;
while ((bucketGroup = bucketQueue.poll()) != null) {
long overflow = bucketGroup.overflow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.OptionalLong;
Expand Down Expand Up @@ -173,12 +174,12 @@ private boolean hotDataValidator(long maxTimestamp, long hotDataAge) {
private long getMaxTimestamp(Path hFilePath) throws DataTieringException {
HStoreFile hStoreFile = getHStoreFile(hFilePath);
if (hStoreFile == null) {
LOG.error("HStoreFile corresponding to " + hFilePath + " doesn't exist");
LOG.error("HStoreFile corresponding to {} doesn't exist", hFilePath);
return Long.MAX_VALUE;
}
OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp();
if (!maxTimestamp.isPresent()) {
LOG.error("Maximum timestamp not present for " + hFilePath);
LOG.error("Maximum timestamp not present for {}", hFilePath);
return Long.MAX_VALUE;
}
return maxTimestamp.getAsLong();
Expand Down Expand Up @@ -270,4 +271,41 @@ private long getDataTieringHotDataAge(Configuration conf) {
return Long.parseLong(
conf.get(DATATIERING_HOT_DATA_AGE_KEY, String.valueOf(DEFAULT_DATATIERING_HOT_DATA_AGE)));
}

/*
* This API traverses through the list of online regions and returns a subset of these files-names
* that are cold.
* @return List of names of files with cold data as per data-tiering logic.
*/
public Map<String, String> getColdFilesList() {
Map<String, String> coldFiles = new HashMap<>();
for (HRegion r : this.onlineRegions.values()) {
for (HStore hStore : r.getStores()) {
Configuration conf = hStore.getReadOnlyConfiguration();
if (getDataTieringType(conf) != DataTieringType.TIME_RANGE) {
// Data-Tiering not enabled for the store. Just skip it.
continue;
}
Long hotDataAge = getDataTieringHotDataAge(conf);

for (HStoreFile hStoreFile : hStore.getStorefiles()) {
String hFileName =
hStoreFile.getFileInfo().getHFileInfo().getHFileContext().getHFileName();
OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp();
if (!maxTimestamp.isPresent()) {
LOG.warn("maxTimestamp missing for file: {}",
hStoreFile.getFileInfo().getActiveFileName());
continue;
}
long currentTimestamp = EnvironmentEdgeManager.getDelegate().currentTime();
long fileAge = currentTimestamp - maxTimestamp.getAsLong();
if (fileAge > hotDataAge) {
// Values do not matter.
coldFiles.put(hFileName, null);
}
}
}
}
return coldFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,8 @@ public void testPrefetchWithDelay() throws Exception {
Thread.sleep(20000);
assertFalse("Prefetch threads should not be running at this point", reader.prefetchStarted());
while (!reader.prefetchStarted()) {
assertTrue("Prefetch delay has not been expired yet",
getElapsedTime(startTime) < PrefetchExecutor.getPrefetchDelay());
// Wait until the prefetch is triggered.
Thread.sleep(500);
}
if (reader.prefetchStarted()) {
// Added some delay as we have started the timer a bit late.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver;

import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -51,7 +52,9 @@
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -245,6 +248,181 @@ public void testColdDataFiles() {
}
}

@Test
public void testPickColdDataFiles() {
Map<String, String> coldDataFiles = dataTieringManager.getColdFilesList();
assertEquals(1, coldDataFiles.size());
// hStoreFiles[3] is the cold file.
assert (coldDataFiles.containsKey(hStoreFiles.get(3).getFileInfo().getActiveFileName()));
}

/*
* Verify that two cold blocks(both) are evicted when bucket reaches its capacity. The hot file
* remains in the cache.
*/
@Test
public void testBlockEvictions() throws Exception {
long capacitySize = 40 * 1024;
int writeThreads = 3;
int writerQLen = 64;
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };

// Setup: Create a bucket cache with lower capacity
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);

// Create three Cache keys with cold data files and a block with hot data.
// hStoreFiles.get(3) is a cold data file, while hStoreFiles.get(0) is a hot file.
Set<BlockCacheKey> cacheKeys = new HashSet<>();
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 8192, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA));

// Create dummy data to be cached and fill the cache completely.
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);

int blocksIter = 0;
for (BlockCacheKey key : cacheKeys) {
bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
// Ensure that the block is persisted to the file.
Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key)));
}

// Verify that the bucket cache contains 3 blocks.
assertEquals(3, bucketCache.getBackingMap().keySet().size());

// Add an additional block into cache with hot data which should trigger the eviction
BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);

bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
Waiter.waitFor(defaultConf, 10000, 100,
() -> (bucketCache.getBackingMap().containsKey(newKey)));

// Verify that the bucket cache now contains 2 hot blocks blocks only.
// Both cold blocks of 8KB will be evicted to make room for 1 block of 8KB + an additional
// space.
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0);
}

/*
* Verify that two cold blocks(both) are evicted when bucket reaches its capacity, but one cold
* block remains in the cache since the required space is freed.
*/
@Test
public void testBlockEvictionsAllColdBlocks() throws Exception {
long capacitySize = 40 * 1024;
int writeThreads = 3;
int writerQLen = 64;
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };

// Setup: Create a bucket cache with lower capacity
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);

// Create three Cache keys with three cold data blocks.
// hStoreFiles.get(3) is a cold data file.
Set<BlockCacheKey> cacheKeys = new HashSet<>();
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 8192, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 16384, true, BlockType.DATA));

// Create dummy data to be cached and fill the cache completely.
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);

int blocksIter = 0;
for (BlockCacheKey key : cacheKeys) {
bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
// Ensure that the block is persisted to the file.
Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key)));
}

// Verify that the bucket cache contains 3 blocks.
assertEquals(3, bucketCache.getBackingMap().keySet().size());

// Add an additional block into cache with hot data which should trigger the eviction
BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);

bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
Waiter.waitFor(defaultConf, 10000, 100,
() -> (bucketCache.getBackingMap().containsKey(newKey)));

// Verify that the bucket cache now contains 1 cold block and a newly added hot block.
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 1, 1);
}

/*
* Verify that a hot block evicted along with a cold block when bucket reaches its capacity.
*/
@Test
public void testBlockEvictionsHotBlocks() throws Exception {
long capacitySize = 40 * 1024;
int writeThreads = 3;
int writerQLen = 64;
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };

// Setup: Create a bucket cache with lower capacity
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
Comment on lines +368 to +369
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says it's creating a lower capacity cache, yet it's using same 64KB as in previous tests.

DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);

// Create three Cache keys with two hot data blocks and one cold data block
// hStoreFiles.get(0) is a hot data file and hStoreFiles.get(3) is a cold data file.
Set<BlockCacheKey> cacheKeys = new HashSet<>();
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 8192, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));

// Create dummy data to be cached and fill the cache completely.
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);

int blocksIter = 0;
for (BlockCacheKey key : cacheKeys) {
bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
// Ensure that the block is persisted to the file.
Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key)));
}

// Verify that the bucket cache contains 3 blocks.
assertEquals(3, bucketCache.getBackingMap().keySet().size());

// Add an additional block which should evict the only cold block with an additional hot block.
BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);

bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
Waiter.waitFor(defaultConf, 10000, 100,
() -> (bucketCache.getBackingMap().containsKey(newKey)));

// Verify that the bucket cache now contains 2 hot blocks.
// Only one of the older hot blocks is retained and other one is the newly added hot block.
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0);
}

private void validateBlocks(Set<BlockCacheKey> keys, int expectedTotalKeys, int expectedHotBlocks,
int expectedColdBlocks) {
int numHotBlocks = 0, numColdBlocks = 0;

assertEquals(expectedTotalKeys, keys.size());
int iter = 0;
for (BlockCacheKey key : keys) {
try {
if (dataTieringManager.isHotData(key)) {
numHotBlocks++;
} else {
numColdBlocks++;
}
} catch (Exception e) {
fail("Unexpected exception!");
}
}
assertEquals(expectedHotBlocks, numHotBlocks);
assertEquals(expectedColdBlocks, numColdBlocks);
}

private void testDataTieringMethodWithPath(DataTieringMethodCallerWithPath caller, Path path,
boolean expectedResult, DataTieringException exception) {
try {
Expand Down