Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ message BucketCacheEntry {
map<int32, string> deserializers = 4;
required BackingMap backing_map = 5;
optional bytes checksum = 6;
map<string, bool> prefetched_files = 7;
}

message BackingMap {
Expand Down Expand Up @@ -71,6 +72,8 @@ message BucketEntry {
required int64 access_counter = 3;
required int32 deserialiser_index = 4;
required BlockPriority priority = 5;
required int64 cachedTime = 6;
optional int32 disk_size_with_header = 7;
}

enum BlockPriority {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ public class CacheConfig {
public static final String DROP_BEHIND_CACHE_COMPACTION_KEY =
"hbase.hfile.drop.behind.compaction";

public static final String PREFETCH_PERSISTENCE_PATH_KEY = "hbase.prefetch.file.list.path";

/**
* Configuration key to set interval for persisting bucket cache to disk.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
package org.apache.hadoop.hbase.io.hfile;

import java.io.IOException;
import java.util.Optional;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -35,8 +39,14 @@ public class HFilePreadReader extends HFileReaderImpl {
public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig cacheConf,
Configuration conf) throws IOException {
super(context, fileInfo, cacheConf, conf);
final MutableBoolean fileAlreadyCached = new MutableBoolean(false);
BucketCache.getBuckedCacheFromCacheConfig(cacheConf).ifPresent(bc -> fileAlreadyCached
.setValue(bc.getFullyCachedFiles().get(path.getName()) == null ? false : true));
// Prefetch file blocks upon open if requested
if (cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff()) {
if (
cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff()
&& !fileAlreadyCached.booleanValue()
) {
PrefetchExecutor.request(path, new Runnable() {
@Override
public void run() {
Expand All @@ -55,12 +65,36 @@ public void run() {
if (LOG.isTraceEnabled()) {
LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, end));
}
Optional<BucketCache> bucketCacheOptional =
BucketCache.getBuckedCacheFromCacheConfig(cacheConf);
// Don't use BlockIterator here, because it's designed to read load-on-open section.
long onDiskSizeOfNextBlock = -1;
while (offset < end) {
if (Thread.interrupted()) {
break;
}
// BucketCache can be persistent and resilient to restarts, so we check first if the
// block exists on its in-memory index, if so, we just update the offset and move on
// to the next block without actually going read all the way to the cache.
if (bucketCacheOptional.isPresent()) {
BucketCache cache = bucketCacheOptional.get();
BlockCacheKey cacheKey = new BlockCacheKey(name, offset);
BucketEntry entry = cache.getBackingMap().get(cacheKey);
if (entry != null) {
cacheKey = new BlockCacheKey(name, offset);
entry = cache.getBackingMap().get(cacheKey);
if (entry == null) {
LOG.debug("No cache key {}, we'll read and cache it", cacheKey);
} else {
offset += entry.getOnDiskSizeWithHeader();
LOG.debug("Found cache key {}. Skipping prefetch, the block is already cached.",
cacheKey);
continue;
}
} else {
LOG.debug("No entry in the backing map for cache key {}", cacheKey);
}
}
// Perhaps we got our block from cache? Unlikely as this may be, if it happens, then
// the internal-to-hfileblock thread local which holds the overread that gets the
// next header, will not have happened...so, pass in the onDiskSize gotten from the
Expand All @@ -77,12 +111,15 @@ public void run() {
block.release();
}
}
BucketCache.getBuckedCacheFromCacheConfig(cacheConf)
.ifPresent(bc -> bc.fileCacheCompleted(path.getName()));

} catch (IOException e) {
// IOExceptions are probably due to region closes (relocation, etc.)
if (LOG.isTraceEnabled()) {
LOG.trace("Prefetch " + getPathOffsetEndStr(path, offset, end), e);
}
} catch (Exception e) {
} catch (Throwable e) {
// Other exceptions are interesting
LOG.warn("Prefetch " + getPathOffsetEndStr(path, offset, end), e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Future;
Expand All @@ -42,25 +37,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos;

@InterfaceAudience.Private
public final class PrefetchExecutor {

private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class);

/** Futures for tracking block prefetch activity */
private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
/** Set of files for which prefetch is completed */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL")
private static HashMap<String, Boolean> prefetchCompleted = new HashMap<>();
/** Executor pool shared among all HFiles for block prefetch */
private static final ScheduledExecutorService prefetchExecutorPool;
/** Delay before beginning prefetch */
private static final int prefetchDelayMillis;
/** Variation in prefetch delay times, to mitigate stampedes */
private static final float prefetchDelayVariation;
static String prefetchedFileListPath;
static {
// Consider doing this on demand with a configuration passed in rather
// than in a static initializer.
Expand Down Expand Up @@ -90,13 +79,6 @@ public Thread newThread(Runnable r) {
+ HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")");

public static void request(Path path, Runnable runnable) {
if (prefetchCompleted != null) {
if (isFilePrefetched(path.getName())) {
LOG.info(
"File has already been prefetched before the restart, so skipping prefetch : " + path);
return;
}
}
if (!prefetchPathExclude.matcher(path.toString()).find()) {
long delay;
if (prefetchDelayMillis > 0) {
Expand All @@ -122,8 +104,9 @@ public static void request(Path path, Runnable runnable) {

public static void complete(Path path) {
prefetchFutures.remove(path);
prefetchCompleted.put(path.getName(), true);
LOG.debug("Prefetch completed for {}", path.getName());
if (LOG.isDebugEnabled()) {
LOG.debug("Prefetch completed for {}", path.getName());
}
}

public static void cancel(Path path) {
Expand All @@ -134,8 +117,6 @@ public static void cancel(Path path) {
prefetchFutures.remove(path);
LOG.debug("Prefetch cancelled for {}", path);
}
LOG.debug("Removing filename from the prefetched persistence list: {}", path.getName());
removePrefetchedFileWhileEvict(path.getName());
}

public static boolean isCompleted(Path path) {
Expand All @@ -146,70 +127,6 @@ public static boolean isCompleted(Path path) {
return true;
}

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION",
justification = "false positive, try-with-resources ensures close is called.")
public static void persistToFile(String path) throws IOException {
prefetchedFileListPath = path;
if (prefetchedFileListPath == null) {
LOG.info("Exception while persisting prefetch!");
throw new IOException("Error persisting prefetched HFiles set!");
}
if (!prefetchCompleted.isEmpty()) {
try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, false)) {
PrefetchProtoUtils.toPB(prefetchCompleted).writeDelimitedTo(fos);
}
}
}

public static void retrieveFromFile(String path) throws IOException {
prefetchedFileListPath = path;
File prefetchPersistenceFile = new File(prefetchedFileListPath);
if (!prefetchPersistenceFile.exists()) {
LOG.warn("Prefetch persistence file does not exist!");
return;
}
LOG.info("Retrieving from prefetch persistence file " + path);
assert (prefetchedFileListPath != null);
try (FileInputStream fis = deleteFileOnClose(prefetchPersistenceFile)) {
PersistentPrefetchProtos.PrefetchedHfileName proto =
PersistentPrefetchProtos.PrefetchedHfileName.parseDelimitedFrom(fis);
Map<String, Boolean> protoPrefetchedFilesMap = proto.getPrefetchedFilesMap();
prefetchCompleted.putAll(protoPrefetchedFilesMap);
}
}

private static FileInputStream deleteFileOnClose(final File file) throws IOException {
return new FileInputStream(file) {
private File myFile;

private FileInputStream init(File file) {
myFile = file;
return this;
}

@Override
public void close() throws IOException {
if (myFile == null) {
return;
}

super.close();
if (!myFile.delete()) {
throw new IOException("Failed deleting persistence file " + myFile.getAbsolutePath());
}
myFile = null;
}
}.init(file);
}

public static void removePrefetchedFileWhileEvict(String hfileName) {
prefetchCompleted.remove(hfileName);
}

public static boolean isFilePrefetched(String hfileName) {
return prefetchCompleted.containsKey(hfileName);
}

private PrefetchExecutor() {
}
}
Loading