Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ public static Iterable<Object[]> data() {
final long capacitySize = 32 * 1024 * 1024;
final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
String ioEngineName = "offheap";
String persistencePath = null;
private String ioEngineName = "offheap";

private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();

private static class MockedBucketCache extends BucketCache {

Expand All @@ -136,14 +137,27 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
@Before
public void setup() throws IOException {
cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make persistencePath null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since persistencePath was defined as null and was not getting updated in the file ever after, I thought of removing the declaration and directly putting null

constructedBlockSizes, writeThreads, writerQLen, null);
}

@After
public void tearDown() {
cache.shutdown();
}

/**
* Test Utility to create test dir and return name
*
* @return return name of created dir
* @throws IOException throws IOException
*/
private Path createAndGetTestDir() throws IOException {
final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir();
HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir);
return testDir;
}


/**
* Return a random element from {@code a}.
*/
Expand Down Expand Up @@ -255,51 +269,128 @@ public void run() {

@Test
public void testRetrieveFromFile() throws Exception {
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
Path testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);

Path testDir = createAndGetTestDir();
String ioEngineName = "file:" + testDir + "/bucket.cache";
testRetrievalUtils(testDir, ioEngineName);
int[] smallBucketSizes = new int[]{3 * 1024, 5 * 1024};
String persistencePath = testDir + "/bucket.persistence";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
smallBucketSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
HBASE_TESTING_UTILITY.cleanupTestDir();
}

@Test
public void testRetrieveFromMMap() throws Exception {
final Path testDir = createAndGetTestDir();
final String ioEngineName = "mmap:" + testDir + "/bucket.cache";
testRetrievalUtils(testDir, ioEngineName);
}

@Test
public void testRetrieveFromPMem() throws Exception {
final Path testDir = createAndGetTestDir();
final String ioEngineName = "pmem:" + testDir + "/bucket.cache";
testRetrievalUtils(testDir, ioEngineName);
int[] smallBucketSizes = new int[]{3 * 1024, 5 * 1024};
String persistencePath = testDir + "/bucket.persistence";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
long usedSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedSize);

HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
// Add blocks
for (HFileBlockPair block : blocks) {
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
}
for (HFileBlockPair block : blocks) {
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
smallBucketSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
HBASE_TESTING_UTILITY.cleanupTestDir();
}

private void testRetrievalUtils(Path testDir, String ioEngineName)
throws IOException, InterruptedException {
final String persistencePath = testDir + "/bucket.persistence";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
try {
long usedSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedSize);
HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
for (HFileBlockPair block : blocks) {
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
}
for (HFileBlockPair block : blocks) {
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
}
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);
bucketCache.shutdown();
assertTrue(new File(persistencePath).exists());
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
} finally {
bucketCache.shutdown();
}
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);
// persist cache to file
bucketCache.shutdown();
assertTrue(new File(persistencePath).exists());
}

// restore cache from file
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
// persist cache to file
bucketCache.shutdown();
assertTrue(new File(persistencePath).exists());
@Test
public void testRetrieveUnsupportedIOE() throws Exception {
try {
final Path testDir = createAndGetTestDir();
final String ioEngineName = testDir + "/bucket.cache";
testRetrievalUtils(testDir, ioEngineName);
Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!");
} catch (IllegalArgumentException e) {
Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, " +
"files:, mmap: or offheap", e.getMessage());
}
}

// reconfig buckets sizes, the biggest bucket is small than constructedBlockSize (8k or 16k)
// so it can't restore cache from file
int[] smallBucketSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024 };
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
smallBucketSizes, writeThreads, writerQLen, persistencePath);
@Test
public void testRetrieveFromMultipleFiles() throws Exception {
final Path testDirInitial = createAndGetTestDir();
final Path newTestDir = new HBaseTestingUtility().getDataTestDir();
HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir);
String ioEngineName = new StringBuilder("files:").append(testDirInitial)
.append("/bucket1.cache").append(FileIOEngine.FILE_DELIMITER).append(newTestDir)
.append("/bucket2.cache").toString();
testRetrievalUtils(testDirInitial, ioEngineName);
int[] smallBucketSizes = new int[]{3 * 1024, 5 * 1024};
String persistencePath = testDirInitial + "/bucket.persistence";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
smallBucketSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
HBASE_TESTING_UTILITY.cleanupTestDir();
}

TEST_UTIL.cleanupTestDir();
@Test
public void testRetrieveFromFileWithoutPersistence() throws Exception {
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, null);
try {
final Path testDir = createAndGetTestDir();
String ioEngineName = "file:" + testDir + "/bucket.cache";
long usedSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedSize);
HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
for (HFileBlockPair block : blocks) {
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
}
for (HFileBlockPair block : blocks) {
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
}
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);
bucketCache.shutdown();
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, null);
assertEquals(0, bucketCache.getAllocator().getUsedSize());
} finally {
bucketCache.shutdown();
HBASE_TESTING_UTILITY.cleanupTestDir();
}
}

@Test
Expand All @@ -322,13 +413,32 @@ public void testGetPartitionSize() throws IOException {
conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);

BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf);
constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);

validateGetPartitionSize(cache, 0.1f, 0.5f);
validateGetPartitionSize(cache, 0.7f, 0.5f);
validateGetPartitionSize(cache, 0.2f, 0.5f);
}

@Test
public void testCacheSizeCapacity() throws IOException {
// Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE
validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR,
BucketCache.DEFAULT_MIN_FACTOR);
Configuration conf = HBaseConfiguration.create();
conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
try {
new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads,
writerQLen, null, 100, conf);
Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!");
} catch (IllegalArgumentException e) {
Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage());
}
}

@Test
public void testValidBucketCacheConfigs() throws IOException {
Configuration conf = HBaseConfiguration.create();
Expand All @@ -340,7 +450,7 @@ public void testValidBucketCacheConfigs() throws IOException {
conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);

BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf);
constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);

assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f,
cache.getAcceptableFactor(), 0);
Expand Down Expand Up @@ -408,7 +518,7 @@ private void checkConfigValues(Configuration conf, Map<String, float[]> configMa
conf.setFloat(configName, configMap.get(configName)[i]);
}
BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf);
constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
} catch (IllegalArgumentException e) {
assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
Expand Down