Skip to content

Commit 42d3d36

Browse files
jhungundwchevreuil
authored andcommitted
HBASE-28468: Integrate the data-tiering logic into cache evictions. (#5829)
Signed-off-by: Wellington Chevreuil <[email protected]>
1 parent 643c556 commit 42d3d36

File tree

3 files changed

+253
-3
lines changed

3 files changed

+253
-3
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1154,6 +1154,14 @@ void freeSpace(final String why) {
11541154
// the cached time is recored in nanos, so we need to convert the grace period accordingly
11551155
long orphanGracePeriodNanos = orphanBlockGracePeriod * 1000000;
11561156
long bytesFreed = 0;
1157+
// Check the list of files to determine the cold files which can be readily evicted.
1158+
Map<String, String> coldFiles = null;
1159+
try {
1160+
DataTieringManager dataTieringManager = DataTieringManager.getInstance();
1161+
coldFiles = dataTieringManager.getColdFilesList();
1162+
} catch (IllegalStateException e) {
1163+
LOG.warn("Data Tiering Manager is not set. Ignore time-based block evictions.");
1164+
}
11571165
// Scan entire map putting bucket entry into appropriate bucket entry
11581166
// group
11591167
for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
@@ -1186,6 +1194,17 @@ void freeSpace(final String why) {
11861194
}
11871195
}
11881196
}
1197+
1198+
if (bytesFreed < bytesToFreeWithExtra &&
1199+
coldFiles != null && coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName())
1200+
) {
1201+
int freedBlockSize = bucketEntryWithKey.getValue().getLength();
1202+
if (evictBlockIfNoRpcReferenced(bucketEntryWithKey.getKey())) {
1203+
bytesFreed += freedBlockSize;
1204+
}
1205+
continue;
1206+
}
1207+
11891208
switch (entry.getPriority()) {
11901209
case SINGLE: {
11911210
bucketSingle.add(bucketEntryWithKey);
@@ -1201,6 +1220,22 @@ void freeSpace(final String why) {
12011220
}
12021221
}
12031222
}
1223+
1224+
// Check if the cold file eviction is sufficient to create enough space.
1225+
bytesToFreeWithExtra -= bytesFreed;
1226+
if (bytesToFreeWithExtra <= 0) {
1227+
LOG.debug("Bucket cache free space completed; freed space : {} bytes of cold data blocks.",
1228+
StringUtils.byteDesc(bytesFreed));
1229+
return;
1230+
}
1231+
1232+
if (LOG.isDebugEnabled()) {
1233+
LOG.debug(
1234+
"Bucket cache free space completed; freed space : {} "
1235+
+ "bytes of cold data blocks. {} more bytes required to be freed.",
1236+
StringUtils.byteDesc(bytesFreed), bytesToFreeWithExtra);
1237+
}
1238+
12041239
PriorityQueue<BucketEntryGroup> bucketQueue =
12051240
new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow));
12061241

@@ -1209,7 +1244,6 @@ void freeSpace(final String why) {
12091244
bucketQueue.add(bucketMemory);
12101245

12111246
int remainingBuckets = bucketQueue.size();
1212-
12131247
BucketEntryGroup bucketGroup;
12141248
while ((bucketGroup = bucketQueue.poll()) != null) {
12151249
long overflow = bucketGroup.overflow();

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
2121

2222
import java.io.IOException;
23+
import java.util.HashMap;
2324
import java.util.HashSet;
2425
import java.util.Map;
2526
import java.util.OptionalLong;
@@ -173,12 +174,12 @@ private boolean hotDataValidator(long maxTimestamp, long hotDataAge) {
173174
private long getMaxTimestamp(Path hFilePath) throws DataTieringException {
174175
HStoreFile hStoreFile = getHStoreFile(hFilePath);
175176
if (hStoreFile == null) {
176-
LOG.error("HStoreFile corresponding to " + hFilePath + " doesn't exist");
177+
LOG.error("HStoreFile corresponding to {} doesn't exist", hFilePath);
177178
return Long.MAX_VALUE;
178179
}
179180
OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp();
180181
if (!maxTimestamp.isPresent()) {
181-
LOG.error("Maximum timestamp not present for " + hFilePath);
182+
LOG.error("Maximum timestamp not present for {}", hFilePath);
182183
return Long.MAX_VALUE;
183184
}
184185
return maxTimestamp.getAsLong();
@@ -270,4 +271,41 @@ private long getDataTieringHotDataAge(Configuration conf) {
270271
return Long.parseLong(
271272
conf.get(DATATIERING_HOT_DATA_AGE_KEY, String.valueOf(DEFAULT_DATATIERING_HOT_DATA_AGE)));
272273
}
274+
275+
/*
276+
* This API traverses through the list of online regions and returns a subset of these files-names
277+
* that are cold.
278+
* @return List of names of files with cold data as per data-tiering logic.
279+
*/
280+
public Map<String, String> getColdFilesList() {
281+
Map<String, String> coldFiles = new HashMap<>();
282+
for (HRegion r : this.onlineRegions.values()) {
283+
for (HStore hStore : r.getStores()) {
284+
Configuration conf = hStore.getReadOnlyConfiguration();
285+
if (getDataTieringType(conf) != DataTieringType.TIME_RANGE) {
286+
// Data-Tiering not enabled for the store. Just skip it.
287+
continue;
288+
}
289+
Long hotDataAge = getDataTieringHotDataAge(conf);
290+
291+
for (HStoreFile hStoreFile : hStore.getStorefiles()) {
292+
String hFileName =
293+
hStoreFile.getFileInfo().getHFileInfo().getHFileContext().getHFileName();
294+
OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp();
295+
if (!maxTimestamp.isPresent()) {
296+
LOG.warn("maxTimestamp missing for file: {}",
297+
hStoreFile.getFileInfo().getActiveFileName());
298+
continue;
299+
}
300+
long currentTimestamp = EnvironmentEdgeManager.getDelegate().currentTime();
301+
long fileAge = currentTimestamp - maxTimestamp.getAsLong();
302+
if (fileAge > hotDataAge) {
303+
// Values do not matter.
304+
coldFiles.put(hFileName, null);
305+
}
306+
}
307+
}
308+
}
309+
return coldFiles;
310+
}
273311
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.regionserver;
1919

2020
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
21+
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
2122
import static org.junit.Assert.assertEquals;
2223
import static org.junit.Assert.assertTrue;
2324
import static org.junit.Assert.fail;
@@ -51,7 +52,9 @@
5152
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
5253
import org.apache.hadoop.hbase.io.hfile.BlockType;
5354
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
55+
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
5456
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
57+
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
5558
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
5659
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
5760
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -247,6 +250,181 @@ public void testColdDataFiles() {
247250
}
248251
}
249252

253+
@Test
254+
public void testPickColdDataFiles() {
255+
Map<String, String> coldDataFiles = dataTieringManager.getColdFilesList();
256+
assertEquals(1, coldDataFiles.size());
257+
// hStoreFiles[3] is the cold file.
258+
assert (coldDataFiles.containsKey(hStoreFiles.get(3).getFileInfo().getActiveFileName()));
259+
}
260+
261+
/*
262+
* Verify that two cold blocks(both) are evicted when bucket reaches its capacity. The hot file
263+
* remains in the cache.
264+
*/
265+
@Test
266+
public void testBlockEvictions() throws Exception {
267+
long capacitySize = 40 * 1024;
268+
int writeThreads = 3;
269+
int writerQLen = 64;
270+
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
271+
272+
// Setup: Create a bucket cache with lower capacity
273+
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
274+
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
275+
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
276+
277+
// Create three Cache keys with cold data files and a block with hot data.
278+
// hStoreFiles.get(3) is a cold data file, while hStoreFiles.get(0) is a hot file.
279+
Set<BlockCacheKey> cacheKeys = new HashSet<>();
280+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));
281+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 8192, true, BlockType.DATA));
282+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA));
283+
284+
// Create dummy data to be cached and fill the cache completely.
285+
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);
286+
287+
int blocksIter = 0;
288+
for (BlockCacheKey key : cacheKeys) {
289+
bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
290+
// Ensure that the block is persisted to the file.
291+
Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key)));
292+
}
293+
294+
// Verify that the bucket cache contains 3 blocks.
295+
assertEquals(3, bucketCache.getBackingMap().keySet().size());
296+
297+
// Add an additional block into cache with hot data which should trigger the eviction
298+
BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
299+
CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);
300+
301+
bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
302+
Waiter.waitFor(defaultConf, 10000, 100,
303+
() -> (bucketCache.getBackingMap().containsKey(newKey)));
304+
305+
// Verify that the bucket cache now contains 2 hot blocks blocks only.
306+
// Both cold blocks of 8KB will be evicted to make room for 1 block of 8KB + an additional
307+
// space.
308+
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0);
309+
}
310+
311+
/*
312+
* Verify that two cold blocks(both) are evicted when bucket reaches its capacity, but one cold
313+
* block remains in the cache since the required space is freed.
314+
*/
315+
@Test
316+
public void testBlockEvictionsAllColdBlocks() throws Exception {
317+
long capacitySize = 40 * 1024;
318+
int writeThreads = 3;
319+
int writerQLen = 64;
320+
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
321+
322+
// Setup: Create a bucket cache with lower capacity
323+
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
324+
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
325+
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
326+
327+
// Create three Cache keys with three cold data blocks.
328+
// hStoreFiles.get(3) is a cold data file.
329+
Set<BlockCacheKey> cacheKeys = new HashSet<>();
330+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));
331+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 8192, true, BlockType.DATA));
332+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 16384, true, BlockType.DATA));
333+
334+
// Create dummy data to be cached and fill the cache completely.
335+
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);
336+
337+
int blocksIter = 0;
338+
for (BlockCacheKey key : cacheKeys) {
339+
bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
340+
// Ensure that the block is persisted to the file.
341+
Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key)));
342+
}
343+
344+
// Verify that the bucket cache contains 3 blocks.
345+
assertEquals(3, bucketCache.getBackingMap().keySet().size());
346+
347+
// Add an additional block into cache with hot data which should trigger the eviction
348+
BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
349+
CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);
350+
351+
bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
352+
Waiter.waitFor(defaultConf, 10000, 100,
353+
() -> (bucketCache.getBackingMap().containsKey(newKey)));
354+
355+
// Verify that the bucket cache now contains 1 cold block and a newly added hot block.
356+
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 1, 1);
357+
}
358+
359+
/*
360+
* Verify that a hot block evicted along with a cold block when bucket reaches its capacity.
361+
*/
362+
@Test
363+
public void testBlockEvictionsHotBlocks() throws Exception {
364+
long capacitySize = 40 * 1024;
365+
int writeThreads = 3;
366+
int writerQLen = 64;
367+
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
368+
369+
// Setup: Create a bucket cache with lower capacity
370+
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
371+
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
372+
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
373+
374+
// Create three Cache keys with two hot data blocks and one cold data block
375+
// hStoreFiles.get(0) is a hot data file and hStoreFiles.get(3) is a cold data file.
376+
Set<BlockCacheKey> cacheKeys = new HashSet<>();
377+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA));
378+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 8192, true, BlockType.DATA));
379+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));
380+
381+
// Create dummy data to be cached and fill the cache completely.
382+
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);
383+
384+
int blocksIter = 0;
385+
for (BlockCacheKey key : cacheKeys) {
386+
bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
387+
// Ensure that the block is persisted to the file.
388+
Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key)));
389+
}
390+
391+
// Verify that the bucket cache contains 3 blocks.
392+
assertEquals(3, bucketCache.getBackingMap().keySet().size());
393+
394+
// Add an additional block which should evict the only cold block with an additional hot block.
395+
BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
396+
CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);
397+
398+
bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
399+
Waiter.waitFor(defaultConf, 10000, 100,
400+
() -> (bucketCache.getBackingMap().containsKey(newKey)));
401+
402+
// Verify that the bucket cache now contains 2 hot blocks.
403+
// Only one of the older hot blocks is retained and other one is the newly added hot block.
404+
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0);
405+
}
406+
407+
private void validateBlocks(Set<BlockCacheKey> keys, int expectedTotalKeys, int expectedHotBlocks,
408+
int expectedColdBlocks) {
409+
int numHotBlocks = 0, numColdBlocks = 0;
410+
411+
assertEquals(expectedTotalKeys, keys.size());
412+
int iter = 0;
413+
for (BlockCacheKey key : keys) {
414+
try {
415+
if (dataTieringManager.isHotData(key)) {
416+
numHotBlocks++;
417+
} else {
418+
numColdBlocks++;
419+
}
420+
} catch (Exception e) {
421+
fail("Unexpected exception!");
422+
}
423+
}
424+
assertEquals(expectedHotBlocks, numHotBlocks);
425+
assertEquals(expectedColdBlocks, numColdBlocks);
426+
}
427+
250428
private void testDataTieringMethodWithPath(DataTieringMethodCallerWithPath caller, Path path,
251429
boolean expectedResult, DataTieringException exception) {
252430
try {

0 commit comments

Comments
 (0)