diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java index 02fbc12e85c7..386db44f8768 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java @@ -31,6 +31,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.conf.ConfigurationManager; +import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.yetus.audience.InterfaceAudience; @@ -38,9 +40,12 @@ import org.slf4j.LoggerFactory; @InterfaceAudience.Private -public final class PrefetchExecutor { +public final class PrefetchExecutor implements PropagatingConfigurationObserver { private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class); + public static final String PREFETCH_EXECUTE_DELAY = "hbase.hfile.prefetch.execute.delay"; + /** Wait time in miliseconds before executing prefetch */ + private static int prefetchExecuteDelay; /** Futures for tracking block prefetch activity */ private static final Map> prefetchFutures = new ConcurrentSkipListMap<>(); @@ -59,6 +64,7 @@ public final class PrefetchExecutor { prefetchDelayMillis = conf.getInt("hbase.hfile.prefetch.delay", 1000); prefetchDelayVariation = conf.getFloat("hbase.hfile.prefetch.delay.variation", 0.2f); int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4); + prefetchExecuteDelay = conf.getInt(PREFETCH_EXECUTE_DELAY, 0); prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads, new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -73,7 +79,7 @@ public Thread newThread(Runnable r) { // TODO: We want HFile, which is where the blockcache lives, to handle // prefetching of file blocks but the Store level is where path convention // knowledge should be contained - private static final Pattern prefetchPathExclude = + private static final Pattern prefetchPathExclude =` Pattern.compile("(" + Path.SEPARATOR_CHAR + HConstants.HBASE_TEMP_DIRECTORY.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")|(" + Path.SEPARATOR_CHAR + HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")"); @@ -81,7 +87,9 @@ public Thread newThread(Runnable r) { public static void request(Path path, Runnable runnable) { if (!prefetchPathExclude.matcher(path.toString()).find()) { long delay; - if (prefetchDelayMillis > 0) { + if (prefetchExecuteDelay > 0) { + delay = prefetchExecuteDelay; + } else if (prefetchDelayMillis > 0) { delay = (long) ((prefetchDelayMillis * (1.0f - (prefetchDelayVariation / 2))) + (prefetchDelayMillis * (prefetchDelayVariation / 2) * ThreadLocalRandom.current().nextFloat())); @@ -127,6 +135,28 @@ public static boolean isCompleted(Path path) { return true; } - private PrefetchExecutor() { + @Override + public void onConfigurationChange(Configuration conf) { + LOG.debug("PrefetchExecutor.onConfigurationChange"); + PrefetchExecutor.loadConfiguration(conf); + } + + @Override + public void registerChildren(ConfigurationManager manager) { + LOG.debug("PrefetchExecutor.registerChildren"); + } + + @Override + public void deregisterChildren(ConfigurationManager manager) { + LOG.debug("PrefetchExecutor.deregisterChildren"); + } + + public static int getPrefetchExecuteDelay() { + return prefetchExecuteDelay; + } + + public static void loadConfiguration(Configuration conf) { + LOG.debug("PrefetchExecutor.loadConfiguration"); + prefetchExecuteDelay = conf.getInt(PREFETCH_EXECUTE_DELAY, 0); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java index cdf9faf2490b..7ab64dc9000d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY; +import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.*; import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; @@ -264,6 +265,42 @@ public void testPrefetchDoesntSkipHFileLink() throws Exception { }); } + @Test + public void testOnConfigurationChange() { + // change PREFETCH_DELAY_ENABLE_KEY from false to true + conf.setInt(PREFETCH_EXECUTE_DELAY, 2000); + PrefetchExecutor.loadConfiguration(conf); + assertTrue(getPrefetchExecuteDelay() == 2000); + + // restore + conf.setInt(PREFETCH_EXECUTE_DELAY, 0); + PrefetchExecutor.loadConfiguration(conf); + assertTrue(getPrefetchExecuteDelay() == 0); + } + + @Test + public void testPrefetchWithDelay() throws Exception { + conf.setInt(PREFETCH_EXECUTE_DELAY, 2000); + PrefetchExecutor.loadConfiguration(conf); + + HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ) + .withBlockSize(DATA_BLOCK_SIZE).build(); + Path storeFile = writeStoreFile("TestPrefetchWithDelay", context); + readStoreFile(storeFile); + conf.setInt(PREFETCH_EXECUTE_DELAY, 0); + } + + @Test + public void testPrefetchWithDefaultDelay() throws Exception { + conf.setInt(PREFETCH_EXECUTE_DELAY, 0); + PrefetchExecutor.loadConfiguration(conf); + + HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ) + .withBlockSize(DATA_BLOCK_SIZE).build(); + Path storeFile = writeStoreFile("TestPrefetchWithDelay", context); + readStoreFile(storeFile); + } + private void testPrefetchWhenRefs(boolean compactionEnabled, Consumer test) throws Exception { cacheConf = new CacheConfig(conf, blockCache);