Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ public class CacheConfig {

public static final String PREFETCH_PERSISTENCE_PATH_KEY = "hbase.prefetch.file.list.path";

/**
* Configuration key to set interval for persisting bucket cache to disk.
*/
public static final String BUCKETCACHE_PERSIST_INTERVAL_KEY =
"hbase.bucketcache.persist.intervalinmillis";

// Defaults
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public static void persistToFile(String path) throws IOException {
throw new IOException("Error persisting prefetched HFiles set!");
}
if (!prefetchCompleted.isEmpty()) {
try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, true)) {
try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, false)) {
PrefetchProtoUtils.toPB(prefetchCompleted).writeDelimitedTo(fos);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;

import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY;

import java.io.File;
Expand Down Expand Up @@ -178,6 +179,7 @@ public class BucketCache implements BlockCache, HeapSize {
private final BucketCacheStats cacheStats = new BucketCacheStats();

private final String persistencePath;
static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false);
private final long cacheCapacity;
/** Approximate block size */
private final long blockSize;
Expand Down Expand Up @@ -237,6 +239,8 @@ public class BucketCache implements BlockCache, HeapSize {

private String prefetchedFileListPath;

private long bucketcachePersistInterval;

private static final String FILE_VERIFY_ALGORITHM =
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
Expand Down Expand Up @@ -288,6 +292,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
this.queueAdditionWaitTime =
conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY);
this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000);

sanityCheckConfigs();

Expand All @@ -314,6 +319,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity);

if (ioEngine.isPersistent() && persistencePath != null) {
startBucketCachePersisterThread();
try {
retrieveFromFile(bucketSizes);
} catch (IOException ioex) {
Expand Down Expand Up @@ -370,6 +376,12 @@ protected void startWriterThreads() {
}
}

void startBucketCachePersisterThread() {
BucketCachePersister cachePersister =
new BucketCachePersister(this, bucketcachePersistInterval);
cachePersister.start();
}

boolean isCacheEnabled() {
return this.cacheEnabled;
}
Expand Down Expand Up @@ -597,6 +609,9 @@ void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decre
if (evictedByEvictionProcess) {
cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
}
if (ioEngine.isPersistent()) {
setCacheInconsistent(true);
}
}

/**
Expand Down Expand Up @@ -721,6 +736,14 @@ protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
});
}

public boolean isCacheInconsistent() {
return isCacheInconsistent.get();
}

public void setCacheInconsistent(boolean setCacheInconsistent) {
isCacheInconsistent.set(setCacheInconsistent);
}

/*
* Statistics thread. Periodically output cache statistics to the log.
*/
Expand Down Expand Up @@ -1167,6 +1190,9 @@ void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws Inte
// Only add if non-null entry.
if (bucketEntries[i] != null) {
putIntoBackingMap(key, bucketEntries[i]);
if (ioEngine.isPersistent()) {
setCacheInconsistent(true);
}
}
// Always remove from ramCache even if we failed adding it to the block cache above.
boolean existed = ramCache.remove(key, re -> {
Expand Down Expand Up @@ -1216,8 +1242,7 @@ static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q,
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION",
justification = "false positive, try-with-resources ensures close is called.")
private void persistToFile() throws IOException {
assert !cacheEnabled;
void persistToFile() throws IOException {
if (!ioEngine.isPersistent()) {
throw new IOException("Attempt to persist non-persistent cache mappings!");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;

import java.io.IOException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class BucketCachePersister extends Thread {
private final BucketCache cache;
private final long intervalMillis;
private static final Logger LOG = LoggerFactory.getLogger(BucketCachePersister.class);

public BucketCachePersister(BucketCache cache, long intervalMillis) {
super("bucket-cache-persister");
this.cache = cache;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Let's set a meaningful thread name for this one.

this.intervalMillis = intervalMillis;
LOG.info("BucketCachePersister started with interval: " + intervalMillis);
}

public void run() {
while (true) {
try {
Thread.sleep(intervalMillis);
if (cache.isCacheInconsistent()) {
LOG.debug("Cache is inconsistent, persisting to disk");
cache.persistToFile();
cache.setCacheInconsistent(false);
}
} catch (IOException | InterruptedException e) {
LOG.warn("Exception in BucketCachePersister" + e.getMessage());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public void setup() throws Exception {
}

@Test
public void testRegionClosePrefetchPersistence() throws Exception {
public void testPrefetchPersistence() throws Exception {

// Write to table and flush
TableName tableName = TableName.valueOf("table1");
byte[] row0 = Bytes.toBytes("row1");
Expand All @@ -107,8 +108,14 @@ public void testRegionClosePrefetchPersistence() throws Exception {
table.put(put1);
TEST_UTIL.flush(tableName);
} finally {
Thread.sleep(1000);
Thread.sleep(1500);
}

// Default interval for cache persistence is 1000ms. So after 1000ms, both the persistence files
// should exist.
assertTrue(new File(testDir + "/bucket.persistence").exists());
assertTrue(new File(testDir + "/prefetch.persistence").exists());

// Stop the RS
cluster.stopRegionServer(0);
LOG.info("Stopped Region Server 0.");
Expand All @@ -118,27 +125,14 @@ public void testRegionClosePrefetchPersistence() throws Exception {

// Start the RS and validate
cluster.startRegionServer();
Thread.sleep(1000);
assertFalse(new File(testDir + "/prefetch.persistence").exists());
assertFalse(new File(testDir + "/bucket.persistence").exists());
}

@Test
public void testPrefetchPersistenceNegative() throws Exception {
cluster.stopRegionServer(0);
LOG.info("Stopped Region Server 0.");
Thread.sleep(1000);
assertFalse(new File(testDir + "/prefetch.persistence").exists());
assertTrue(new File(testDir + "/bucket.persistence").exists());
cluster.startRegionServer();
Thread.sleep(1000);
assertFalse(new File(testDir + "/prefetch.persistence").exists());
assertFalse(new File(testDir + "/bucket.persistence").exists());
}

@After
public void tearDown() throws Exception {
TEST_UTIL.shutdownMiniCluster();
TEST_UTIL.cleanupDataTestDirOnTestFS(String.valueOf(testDir));
if (zkCluster != null) {
zkCluster.shutdown();
}
Expand Down
Loading