Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,21 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public final class PrefetchExecutor {
public final class PrefetchExecutor implements PropagatingConfigurationObserver {

private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class);
public static final String PREFETCH_EXECUTE_DELAY = "hbase.hfile.prefetch.execute.delay";
/** Wait time in miliseconds before executing prefetch */
private static int prefetchExecuteDelay;

/** Futures for tracking block prefetch activity */
private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
Expand All @@ -59,6 +64,7 @@ public final class PrefetchExecutor {
prefetchDelayMillis = conf.getInt("hbase.hfile.prefetch.delay", 1000);
prefetchDelayVariation = conf.getFloat("hbase.hfile.prefetch.delay.variation", 0.2f);
int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4);
prefetchExecuteDelay = conf.getInt(PREFETCH_EXECUTE_DELAY, 0);
prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Expand All @@ -73,15 +79,17 @@ public Thread newThread(Runnable r) {
// TODO: We want HFile, which is where the blockcache lives, to handle
// prefetching of file blocks but the Store level is where path convention
// knowledge should be contained
private static final Pattern prefetchPathExclude =
private static final Pattern prefetchPathExclude =`
Pattern.compile("(" + Path.SEPARATOR_CHAR + HConstants.HBASE_TEMP_DIRECTORY.replace(".", "\\.")
+ Path.SEPARATOR_CHAR + ")|(" + Path.SEPARATOR_CHAR
+ HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")");

public static void request(Path path, Runnable runnable) {
if (!prefetchPathExclude.matcher(path.toString()).find()) {
long delay;
if (prefetchDelayMillis > 0) {
if (prefetchExecuteDelay > 0) {
delay = prefetchExecuteDelay;
} else if (prefetchDelayMillis > 0) {
delay = (long) ((prefetchDelayMillis * (1.0f - (prefetchDelayVariation / 2)))
+ (prefetchDelayMillis * (prefetchDelayVariation / 2)
* ThreadLocalRandom.current().nextFloat()));
Expand Down Expand Up @@ -127,6 +135,28 @@ public static boolean isCompleted(Path path) {
return true;
}

private PrefetchExecutor() {
@Override
public void onConfigurationChange(Configuration conf) {
LOG.debug("PrefetchExecutor.onConfigurationChange");
PrefetchExecutor.loadConfiguration(conf);
}

@Override
public void registerChildren(ConfigurationManager manager) {
LOG.debug("PrefetchExecutor.registerChildren");
}

@Override
public void deregisterChildren(ConfigurationManager manager) {
LOG.debug("PrefetchExecutor.deregisterChildren");
}

public static int getPrefetchExecuteDelay() {
return prefetchExecuteDelay;
}

public static void loadConfiguration(Configuration conf) {
LOG.debug("PrefetchExecutor.loadConfiguration");
prefetchExecuteDelay = conf.getInt(PREFETCH_EXECUTE_DELAY, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY;
import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.*;
import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
Expand Down Expand Up @@ -264,6 +265,42 @@ public void testPrefetchDoesntSkipHFileLink() throws Exception {
});
}

@Test
public void testOnConfigurationChange() {
// change PREFETCH_DELAY_ENABLE_KEY from false to true
conf.setInt(PREFETCH_EXECUTE_DELAY, 2000);
PrefetchExecutor.loadConfiguration(conf);
assertTrue(getPrefetchExecuteDelay() == 2000);

// restore
conf.setInt(PREFETCH_EXECUTE_DELAY, 0);
PrefetchExecutor.loadConfiguration(conf);
assertTrue(getPrefetchExecuteDelay() == 0);
}

@Test
public void testPrefetchWithDelay() throws Exception {
conf.setInt(PREFETCH_EXECUTE_DELAY, 2000);
PrefetchExecutor.loadConfiguration(conf);

HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
.withBlockSize(DATA_BLOCK_SIZE).build();
Path storeFile = writeStoreFile("TestPrefetchWithDelay", context);
readStoreFile(storeFile);
conf.setInt(PREFETCH_EXECUTE_DELAY, 0);
}

@Test
public void testPrefetchWithDefaultDelay() throws Exception {
conf.setInt(PREFETCH_EXECUTE_DELAY, 0);
PrefetchExecutor.loadConfiguration(conf);

HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
.withBlockSize(DATA_BLOCK_SIZE).build();
Path storeFile = writeStoreFile("TestPrefetchWithDelay", context);
readStoreFile(storeFile);
}

private void testPrefetchWhenRefs(boolean compactionEnabled, Consumer<Cacheable> test)
throws Exception {
cacheConf = new CacheConfig(conf, blockCache);
Expand Down