diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java index a7ae8b4d1a5a..03237c30a255 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,6 +116,13 @@ class BalancerClusterState { // Maps localityType -> region -> [server|rack]Index with highest locality private int[][] regionsToMostLocalEntities; + // Maps region -> serverIndex -> prefetch ratio of a region on a server + private Map, Float> regionIndexServerIndexPrefetchRatio; + // Maps region -> serverIndex with best prefect ratio + private int[] regionServerIndexWithBestPrefetchRatio; + // Historical region server prefetch ratio + Map> historicalRegionServerPrefetchRatio; + static class DefaultRackManager extends RackManager { @Override public String getRack(ServerName server) { @@ -125,13 +133,20 @@ public String getRack(ServerName server) { BalancerClusterState(Map> clusterState, Map> loads, RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager) { - this(null, clusterState, loads, regionFinder, rackManager); + this(null, clusterState, loads, regionFinder, rackManager, null); + } + + BalancerClusterState(Map> clusterState, + Map> loads, RegionHDFSBlockLocationFinder regionFinder, + RackManager rackManager, Map> oldRegionServerPrefetchRatio) { + this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerPrefetchRatio); } @SuppressWarnings("unchecked") BalancerClusterState(Collection unassignedRegions, Map> clusterState, Map> loads, - RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager) { + RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager, + Map> oldRegionServerPrefetchRatio) { if (unassignedRegions == null) { unassignedRegions = Collections.emptyList(); } @@ -145,6 +160,8 @@ public String getRack(ServerName server) { tables = new ArrayList<>(); this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); + this.historicalRegionServerPrefetchRatio = oldRegionServerPrefetchRatio; + numRegions = 0; List> serversPerHostList = new ArrayList<>(); @@ -553,6 +570,127 @@ enum LocalityType { RACK } + /** + * Returns prefetch ratio weighted to the size of the region in MB.* + * @param region Region ID + * @param server Server ID + * @return Weighted prefetch ratio of a region on a region server + */ + public float getOrComputeWeightedPrefetchRatio(int region, int server) { + return getRegionSizeMB(region) * getOrComputeRegionPrefetchRatio(region, server); + } + + /** + * Returns prefetch ratio of a region on a region server. This method also finds out the + * historical prefetch ratio of this region if it was ever hosted on this region server. + * @param region Region ID + * @param regionServerIndex Region Server ID + * @return Prefetch ratio of given region on the given region server + */ + protected float getRegionServerPrefetchRatio(int region, int regionServerIndex) { + // Cost this server has from RegionLoad + float prefetchRatio = 0.0f; + + // Get the prefetch ratio if the region is currently hosted on this server + for (int regionIndex : regionsPerServer[regionServerIndex]) { + if (region != regionIndex) { + continue; + } + Deque regionLoadList = regionLoads[regionIndex]; + + // The region is currently hosted on this region server. Now, get the prefetch cache ratio + // for this region on this region server + prefetchRatio = + regionLoadList == null ? 0.0f : regionLoadList.getLast().getPrefetchCacheRatio(); + + return prefetchRatio; + } + + // Region is not currently hosted on this server. Check if the region was prefetched on this + // server earlier. This can happen when the server was shutdown and the cache was persisted. + // Seartch using the index name and server name and not the index id and server id as these ids + // may change when a server is marked as dead or a new server is added. + String regionNameAsString = regions[region].getRegionNameAsString(); + Address serverAddress = servers[regionServerIndex].getAddress(); + if ( + historicalRegionServerPrefetchRatio != null + && historicalRegionServerPrefetchRatio.containsKey(regionNameAsString) + ) { + Map serverPrefetchRatio = + historicalRegionServerPrefetchRatio.get(regionNameAsString); + if (serverPrefetchRatio.containsKey(serverAddress)) { + prefetchRatio = serverPrefetchRatio.get(serverAddress); + + // The old prefetch cache ratio has been accounted for and hence, clear up this information + historicalRegionServerPrefetchRatio.remove(regionNameAsString, serverPrefetchRatio); + } + } + return prefetchRatio; + } + + /** + * Compute the prefetch ratios of all the regions on all the servers. This even includes the + * historical prefetch ratio of a region if it was historically hosted on some other region server + * before being moved to the currently hosting region server. + */ + private void computeRegionServerPrefetchRatio() { + regionIndexServerIndexPrefetchRatio = new HashMap<>(); + regionServerIndexWithBestPrefetchRatio = new int[numRegions]; + + for (int region = 0; region < numRegions; region++) { + float bestPrefetchRatio = 0.0f; + int serverWithBestPrefetchRatio = 0; + for (int server = 0; server < numServers; server++) { + float prefetchRatio = getRegionServerPrefetchRatio(region, server); + if (prefetchRatio > 0.0f || server == regionIndexToServerIndex[region]) { + // Record the prefetch ratio of a region on a region server only if it is greater than 0 + // which means either the region is currently hosted or was previously hosted on the given + // region server. We don't need to record a prefetch ratio of 0 as it does not add any + // value to the balancer decisions. + Pair regionServerPair = new Pair<>(region, server); + regionIndexServerIndexPrefetchRatio.put(regionServerPair, prefetchRatio); + } + if (prefetchRatio > bestPrefetchRatio) { + serverWithBestPrefetchRatio = server; + bestPrefetchRatio = prefetchRatio; + } else { + // If the server currently hosting the region has equal prefetch ratio to a historical + // prefetch ratio, consider the current server to continue hosting the region + if (prefetchRatio == bestPrefetchRatio && server == regionIndexToServerIndex[region]) { + // If two servers have the same prefetch ratio, the server currently hosting the + // region should retain the region + serverWithBestPrefetchRatio = server; + } + } + } + regionServerIndexWithBestPrefetchRatio[region] = serverWithBestPrefetchRatio; + } + } + + private float getOrComputeRegionPrefetchRatio(int region, int server) { + if ( + regionServerIndexWithBestPrefetchRatio == null + || regionIndexServerIndexPrefetchRatio.isEmpty() + ) { + computeRegionServerPrefetchRatio(); + } + + Pair regionServerPair = new Pair<>(region, server); + return regionIndexServerIndexPrefetchRatio.containsKey(regionServerPair) + ? regionIndexServerIndexPrefetchRatio.get(regionServerPair) + : 0.0f; + } + + public int[] getOrComputeServerWithBestPrefetchRatio() { + if ( + regionIndexServerIndexPrefetchRatio.isEmpty() + || regionServerIndexWithBestPrefetchRatio == null + ) { + computeRegionServerPrefetchRatio(); + } + return regionServerIndexWithBestPrefetchRatio; + } + public void doAction(BalanceAction action) { switch (action.getType()) { case NULL: diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerRegionLoad.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerRegionLoad.java index ffb36cb8ca1a..4728854d323e 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerRegionLoad.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerRegionLoad.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master.balancer; import org.apache.hadoop.hbase.RegionMetrics; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Size; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -34,6 +35,8 @@ class BalancerRegionLoad { private final long writeRequestsCount; private final int memStoreSizeMB; private final int storefileSizeMB; + private final float prefetchCacheRatio; + private final ServerName serverName; BalancerRegionLoad(RegionMetrics regionMetrics) { readRequestsCount = regionMetrics.getReadRequestCount(); @@ -41,6 +44,8 @@ class BalancerRegionLoad { writeRequestsCount = regionMetrics.getWriteRequestCount(); memStoreSizeMB = (int) regionMetrics.getMemStoreSize().get(Size.Unit.MEGABYTE); storefileSizeMB = (int) regionMetrics.getStoreFileSize().get(Size.Unit.MEGABYTE); + prefetchCacheRatio = regionMetrics.getPrefetchCacheRatio(); + serverName = regionMetrics.getServerName(); } public long getReadRequestsCount() { @@ -62,4 +67,12 @@ public int getMemStoreSizeMB() { public int getStorefileSizeMB() { return storefileSizeMB; } + + public float getPrefetchCacheRatio() { + return prefetchCacheRatio; + } + + public ServerName getServerName() { + return serverName; + } } diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index a4560cc595a2..54516868a0a0 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -232,7 +232,8 @@ private BalancerClusterState createCluster(List servers, clusterState.put(server, Collections.emptyList()); } } - return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager); + return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager, + null); } private List findIdleServers(List servers) { diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/PrefetchBasedCandidateGenerator.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/PrefetchBasedCandidateGenerator.java new file mode 100644 index 000000000000..6d7c7ed5b88c --- /dev/null +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/PrefetchBasedCandidateGenerator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.util.concurrent.ThreadLocalRandom; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +class PrefetchBasedCandidateGenerator extends CandidateGenerator { + private static float PREFETCH_RATIO_DIFF_FACTOR = 1.25f; + + @Override + BalanceAction generate(BalancerClusterState cluster) { + // iterate through regions until you find one that is not on ideal host + // start from a random point to avoid always balance the regions in front + if (cluster.numRegions > 0) { + int startRegionIndex = ThreadLocalRandom.current().nextInt(cluster.numRegions); + int toServerIndex; + for (int i = 0; i < cluster.numRegions; i++) { + int region = (startRegionIndex + i) % cluster.numRegions; + int currentServerIndex = cluster.regionIndexToServerIndex[region]; + float currentPrefetchRatio = + cluster.getOrComputeWeightedPrefetchRatio(region, currentServerIndex); + + // Check if there is a server with a better historical prefetch ratio + toServerIndex = pickOtherRandomServer(cluster, currentServerIndex); + float toServerPrefetchRatio = + cluster.getOrComputeWeightedPrefetchRatio(region, toServerIndex); + + // If the prefetch ratio on the target server is significantly higher, move the region. + if ( + currentPrefetchRatio > 0 + && (toServerPrefetchRatio / currentPrefetchRatio) > PREFETCH_RATIO_DIFF_FACTOR + ) { + return getAction(currentServerIndex, region, toServerIndex, -1); + } + } + } + return BalanceAction.NULL_ACTION; + } +} diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/PrefetchCacheCostFunction.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/PrefetchCacheCostFunction.java new file mode 100644 index 000000000000..7e7778597842 --- /dev/null +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/PrefetchCacheCostFunction.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Compute the cost of a potential cluster configuration based on the number of HFile's already + * cached in the bucket cache + */ +@InterfaceAudience.Private +public class PrefetchCacheCostFunction extends CostFunction { + private static final String PREFETCH_CACHE_COST_KEY = + "hbase.master.balancer.stochastic.prefetchCacheCost"; + private static final float DEFAULT_PREFETCH_COST = 500; + + private String prefetchedFileListPath; + private double prefetchRatio; + private double bestPrefetchRatio; + + /** + * The prefetch cache cost function is enabled only when the prefetch file list persistence is + * enabled by setting the parameter PREFETCH_PERSISTENCE_PATH_KEY. The prefetch file list + * persistence is disabled by default. The prefetch cache cost function is also disabled if the + * multiplier is set to 0. The prefetch cache ratio function would be most relevant for non-hdfs + * deployments, which then makes locality irrelevant. In those cases, prefetch and region skewness + * would be competing to prevail over the final balancer decision. + * @param conf Cluster configuration + */ + PrefetchCacheCostFunction(Configuration conf) { + prefetchedFileListPath = conf.get(HConstants.PREFETCH_PERSISTENCE_PATH_KEY); + this.setMultiplier(prefetchedFileListPath == null + ? 0.0f + : conf.getFloat(PREFETCH_CACHE_COST_KEY, DEFAULT_PREFETCH_COST)); + prefetchRatio = 0.0; + bestPrefetchRatio = 0.0; + } + + @Override + void prepare(BalancerClusterState cluster) { + super.prepare(cluster); + prefetchRatio = 0.0; + bestPrefetchRatio = 0.0; + + for (int region = 0; region < cluster.numRegions; region++) { + prefetchRatio += + cluster.getOrComputeWeightedPrefetchRatio(region, cluster.regionIndexToServerIndex[region]); + bestPrefetchRatio += cluster.getOrComputeWeightedPrefetchRatio(region, + cluster.getOrComputeServerWithBestPrefetchRatio()[region]); + } + prefetchRatio = bestPrefetchRatio == 0.0 ? 1.0 : prefetchRatio / bestPrefetchRatio; + } + + @Override + protected double cost() { + return scale(0, 1, (1 - prefetchRatio)); + } + + @Override + protected void regionMoved(int region, int oldServer, int newServer) { + double oldServerPrefetch = cluster.getOrComputeWeightedPrefetchRatio(region, oldServer); + double newServerPrefetch = cluster.getOrComputeWeightedPrefetchRatio(region, newServer); + double prefetchDelta = newServerPrefetch - oldServerPrefetch; + double normalizeDelta = bestPrefetchRatio == 0.0 ? 0.0 : prefetchDelta / bestPrefetchRatio; + prefetchRatio += normalizeDelta; + } + + @Override + public final void updateWeight(double[] weights) { + weights[StochasticLoadBalancer.GeneratorType.PREFETCH.ordinal()] += cost(); + } +} diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index edf049e8a718..bbe85c6af0bc 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -59,6 +60,7 @@ *
  • Data Locality
  • *
  • Memstore Sizes
  • *
  • Storefile Sizes
  • + *
  • Prefetch
  • * *

    * Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost best @@ -72,6 +74,7 @@ *

  • hbase.master.balancer.stochastic.localityCost
  • *
  • hbase.master.balancer.stochastic.memstoreSizeCost
  • *
  • hbase.master.balancer.stochastic.storefileSizeCost
  • + *
  • hbase.master.balancer.stochastic.prefetchCacheCost
  • * *

    * You can also add custom Cost function by setting the the following configuration value: @@ -129,6 +132,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { Map> loads = new HashMap<>(); + // Map of old prefetch ratio (region name ---> old server name ---> old prefetch ratio) + Map> historicRegionServerPrefetchRatio = + new HashMap>(); + // values are defaults private int maxSteps = DEFAULT_MAX_STEPS; private boolean runMaxSteps = DEFAULT_RUN_MAX_STEPS; @@ -153,6 +160,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { private RackLocalityCostFunction rackLocalityCost; private RegionReplicaHostCostFunction regionReplicaHostCostFunction; private RegionReplicaRackCostFunction regionReplicaRackCostFunction; + private PrefetchCacheCostFunction prefetchCacheCost; + private PrefetchBasedCandidateGenerator prefetchCandidateGenerator; protected List candidateGenerators; @@ -160,7 +169,8 @@ public enum GeneratorType { RANDOM, LOAD, LOCALITY, - RACK + RACK, + PREFETCH } /** @@ -221,6 +231,7 @@ protected List createCandidateGenerators() { candidateGenerators.add(GeneratorType.LOCALITY.ordinal(), localityCandidateGenerator); candidateGenerators.add(GeneratorType.RACK.ordinal(), new RegionReplicaRackCandidateGenerator()); + candidateGenerators.add(GeneratorType.PREFETCH.ordinal(), prefetchCandidateGenerator); return candidateGenerators; } @@ -237,6 +248,8 @@ protected void loadConf(Configuration conf) { localityCandidateGenerator = new LocalityBasedCandidateGenerator(); localityCost = new ServerLocalityCostFunction(conf); rackLocalityCost = new RackLocalityCostFunction(conf); + prefetchCacheCost = new PrefetchCacheCostFunction(conf); + prefetchCandidateGenerator = new PrefetchBasedCandidateGenerator(); this.candidateGenerators = createCandidateGenerators(); @@ -256,6 +269,7 @@ protected void loadConf(Configuration conf) { addCostFunction(new WriteRequestCostFunction(conf)); addCostFunction(new MemStoreSizeCostFunction(conf)); addCostFunction(new StoreFileCostFunction(conf)); + addCostFunction(prefetchCacheCost); loadCustomCostFunctions(conf); curFunctionCosts = new double[costFunctions.size()]; @@ -290,8 +304,8 @@ private void updateBalancerTableLoadInfo(TableName tableName, if ((this.localityCost != null) || (this.rackLocalityCost != null)) { finder = this.regionFinder; } - BalancerClusterState cluster = - new BalancerClusterState(loadOfOneTable, loads, finder, rackManager); + BalancerClusterState cluster = new BalancerClusterState(loadOfOneTable, loads, finder, + rackManager, historicRegionServerPrefetchRatio); initCosts(cluster); curOverallCost = computeCost(cluster, Double.MAX_VALUE); @@ -459,8 +473,8 @@ protected List balanceTable(TableName tableName, // The clusterState that is given to this method contains the state // of all the regions in the table(s) (that's true today) // Keep track of servers to iterate through them. - BalancerClusterState cluster = - new BalancerClusterState(loadOfOneTable, loads, finder, rackManager); + BalancerClusterState cluster = new BalancerClusterState(loadOfOneTable, loads, finder, + rackManager, historicRegionServerPrefetchRatio); long startTime = EnvironmentEdgeManager.currentTime(); @@ -711,6 +725,9 @@ private List createRegionPlans(BalancerClusterState cluster) { private void updateRegionLoad() { // We create a new hashmap so that regions that are no longer there are removed. // However we temporarily need the old loads so we can use them to keep the rolling average. + // The old prefetch ratio of a region on a region server is stored for finding out if the region + // was prefetched on the old server before moving it to the new server because of the server + // crash procedure. Map> oldLoads = loads; loads = new HashMap<>(); @@ -718,10 +735,23 @@ private void updateRegionLoad() { sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> { String regionNameAsString = RegionInfo.getRegionNameAsString(regionName); Deque rLoads = oldLoads.get(regionNameAsString); + ServerName oldServerName = null; + float oldPrefetchRatio = 0.0f; if (rLoads == null) { rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1); - } else if (rLoads.size() >= numRegionLoadsToRemember) { - rLoads.remove(); + } else { + // Get the old server name and the prefetch ratio for this region on this server + oldServerName = rLoads.getLast().getServerName(); + oldPrefetchRatio = rLoads.getLast().getPrefetchCacheRatio(); + if (rLoads.size() >= numRegionLoadsToRemember) { + rLoads.remove(); + } + } + if (oldServerName != null && !ServerName.isSameAddress(oldServerName, rm.getServerName())) { + // Record the old region server prefetch ratio + Map serverPrefetchRatio = new HashMap<>(); + serverPrefetchRatio.put(oldServerName.getAddress(), oldPrefetchRatio); + historicRegionServerPrefetchRatio.put(regionNameAsString, serverPrefetchRatio); } rLoads.add(new BalancerRegionLoad(rm)); loads.put(regionNameAsString, rLoads); @@ -740,6 +770,12 @@ void initCosts(BalancerClusterState cluster) { } } + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") + List getCostFunctions() { + return costFunctions; + } + /** * Update both the costs of costfunctions and the weights of candidate generators */ diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadBalancerPerformanceEvaluation.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadBalancerPerformanceEvaluation.java index 3a435e140989..80b3750d6d91 100644 --- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadBalancerPerformanceEvaluation.java +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/LoadBalancerPerformanceEvaluation.java @@ -86,6 +86,7 @@ public class LoadBalancerPerformanceEvaluation extends AbstractHBaseTool { private void setupConf() { conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, loadBalancerClazz, LoadBalancer.class); + conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, "/tmp/prefetch_persistence"); loadBalancer = LoadBalancerFactory.getLoadBalancer(conf); } diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestPrefetchCacheCostLoadBalancerFunction.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestPrefetchCacheCostLoadBalancerFunction.java new file mode 100644 index 000000000000..575def6f6b9d --- /dev/null +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestPrefetchCacheCostLoadBalancerFunction.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestPrefetchCacheCostLoadBalancerFunction extends StochasticBalancerTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestPrefetchCacheCostLoadBalancerFunction.class); + + // Mapping of prefetch test -> expected prefetch + private final float[] expectedPrefetch = { 0.0f, 0.0f, 0.5f, 1.0f, 0.0f, 0.572f, 0.0f, 0.075f }; + + /** + * Data set to testPrefetchCost: [test][0][0] = mapping of server to number of regions it hosts + * [test][region + 1][0] = server that region is hosted on [test][region + 1][server + 1] = + * prefetch of that region on server + */ + private final int[][][] clusterRegionPrefetchMocks = new int[][][] { + // Test 1: each region is entirely on server that hosts it + // Cost of moving the regions in this case should be high as the regions are fully prefetched + // on the server they are currently hosted on + new int[][] { new int[] { 2, 1, 1 }, // Server 0 has 2, server 1 has 1 and server 2 has 1 + // region(s) hosted respectively + new int[] { 2, 0, 0, 100 }, // region 0 is hosted and prefetched only on server 2 + new int[] { 0, 100, 0, 0 }, // region 1 is hosted and prefetched only on server 0 + new int[] { 0, 100, 0, 0 }, // region 2 is hosted and prefetched only on server 0 + new int[] { 1, 0, 100, 0 }, // region 3 is hosted and prefetched only on server 1 + }, + + // Test 2: each region is prefetched completely on the server it is currently hosted on, + // but it was also prefetched on some other server historically + // Cost of moving the regions in this case should be high as the regions are fully prefetched + // on the server they are currently hosted on. Although, the regions were previously hosted and + // prefetched on some other server, since they are completely prefetched on the new server, + // there is no need to move the regions back to the previously hosting cluster + new int[][] { new int[] { 1, 2, 1 }, // Server 0 has 1, server 1 has 2 and server 2 has 1 + // region(s) hosted respectively + new int[] { 0, 100, 0, 100 }, // region 0 is hosted and currently prefetched on server 0, + // but previously prefetched completely on server 2 + new int[] { 1, 100, 100, 0 }, // region 1 is hosted and currently prefetched on server 1, + // but previously prefetched completely on server 0 + new int[] { 1, 0, 100, 100 }, // region 2 is hosted and currently prefetched on server 1, + // but previously prefetched on server 2 + new int[] { 2, 0, 100, 100 }, // region 3 is hosted and currently prefetched on server 2, + // but previously prefetched on server 1 + }, + + // Test 3: The regions were hosted and fully prefetched on a server but later moved to other + // because of server crash procedure. The regions are partially prefetched on the server they + // are currently hosted on + new int[][] { new int[] { 1, 2, 1 }, new int[] { 0, 50, 0, 100 }, // Region 0 is currently + // hosted and partially + // prefetched on + // server 0, but was fully + // prefetched on server 2 + // previously + new int[] { 1, 100, 50, 0 }, // Region 1 is currently hosted and partially prefetched on + // server 1, but was fully prefetched on server 0 previously + new int[] { 1, 0, 50, 100 }, // Region 2 is currently hosted and partially prefetched on + // server 1, but was fully prefetched on server 2 previously + new int[] { 2, 0, 100, 50 }, // Region 3 is currently hosted and partially prefetched on + // server 2, but was fully prefetched on server 1 previously + }, + + // Test 4: The regions were hosted and fully prefetched on a server, but later moved to other + // server because of server crash procedure. The regions are not at all prefetched on the server + // they are currently hosted on + new int[][] { new int[] { 1, 1, 2 }, new int[] { 0, 0, 0, 100 }, // Region 0 is currently hosted + // but not prefetched on server + // 0, + // but was fully prefetched on + // server 2 previously + new int[] { 2, 100, 0, 0 }, // Region 1 is currently hosted but not prefetched on server 2, + // but was fully prefetched on server 0 previously + new int[] { 1, 0, 0, 100 }, // Region 2 is currently hosted but not prefetched on server 1, + // but was fully prefetched on server 2 previously + new int[] { 2, 0, 100, 0 }, // Region 3 is currently hosted but not prefetched on server 2, + // but was fully prefetched on server 1 previously + }, + + // Test 5: The regions were partially prefetched on old servers, before moving to the new server + // where also, they are partially prefetched + new int[][] { new int[] { 2, 1, 1 }, new int[] { 1, 50, 50, 0 }, // Region 0 is hosted and + // partially prefetched on + // server 1, but + // was previously hosted and + // partially prefetched on + // server 0 + new int[] { 2, 0, 50, 50 }, // Region 1 is hosted and partially prefetched on server 2, but + // was previously hosted and partially prefetched on server 1 + new int[] { 0, 50, 0, 50 }, // Region 2 is hosted and partially prefetched on server 0, but + // was previously hosted and partially prefetched on server 2 + new int[] { 0, 50, 50, 0 }, // Region 3 is hosted and partially prefetched on server 0, but + // was previously hosted and partially prefetched on server 1 + }, + + // Test 6: The regions are less prefetched on the new servers as compared to what they were + // prefetched on the server before they were moved to the new servers + new int[][] { new int[] { 1, 2, 1 }, new int[] { 0, 30, 70, 0 }, // Region 0 is hosted and + // prefetched 30% on server 0, + // but was + // previously hosted and + // prefetched 70% on server 1 + new int[] { 2, 70, 0, 30 }, // Region 1 is hosted and prefetched 30% on server 2, but was + // previously hosted and prefetched 70% on server 0 + new int[] { 1, 70, 30, 0 }, // Region 2 is hosted and prefetched 30% on server 1, but was + // previously hosted and prefetched 70% on server 0 + new int[] { 1, 0, 30, 70 }, // Region 3 is hosted and prefetched 30% on server 1, but was + // previously hosted and prefetched 70% on server 3 + }, + + // Test 7: The regions are more prefetched on the new servers as compared to what they were + // prefetched on the server before they were moved to the new servers + new int[][] { new int[] { 2, 1, 1 }, new int[] { 2, 0, 20, 80 }, // Region 0 is hosted and 80% + // prefetched on server 2, but + // was + // previously hosted and 20% + // prefetched on server 1 + new int[] { 2, 20, 0, 80 }, // Region 1 is hosted and 80% prefetched on server 2, but was + // previously hosted and 20% prefetched on server 0 + new int[] { 1, 20, 80, 0 }, // Region 2 is hosted and 80% prefetched on server 1, but was + // previously hosted and 20% prefetched on server 0 + new int[] { 0, 80, 20, 0 }, // Region 3 is hosted and 80% prefetched on server 0, but was + // previously hosted and 20% prefetched on server 1 + }, + + // Test 8: The regions are randomly assigned to the server with some regions historically + // hosted on other region servers + new int[][] { new int[] { 1, 2, 1 }, new int[] { 1, 0, 34, 58 }, // Region 0 is hosted and + // partially prefetched on + // server 1, + // but was previously hosted + // and partially prefetched on + // server 2 + // current prefetch < + // historical prefetch + new int[] { 2, 78, 0, 100 }, // Region 1 is hosted and fully prefetched on server 2, + // but was previously hosted and partially prefetched on server 0 + // current prefetch > historical prefetch + new int[] { 1, 66, 66, 0 }, // Region 2 is hosted and partially prefetched on server 1, + // but was previously hosted and partially prefetched on server 0 + // current prefetch == historical prefetch + new int[] { 0, 96, 0, 0 }, // Region 3 is hosted and partially prefetched on server 0 + // No historical prefetch + }, }; + + private static Configuration storedConfiguration; + + @BeforeClass + public static void saveInitialConfiguration() { + storedConfiguration = new Configuration(conf); + } + + @Before + public void beforeEachTest() { + conf = new Configuration(storedConfiguration); + loadBalancer.onConfigurationChange(conf); + } + + @Test + public void testVerifyPrefetchCostFunctionEnabled() { + conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, "/tmp/prefetch.persistence"); + + StochasticLoadBalancer lb = new StochasticLoadBalancer(); + lb.loadConf(conf); + + assertTrue(Arrays.asList(lb.getCostFunctionNames()) + .contains(PrefetchCacheCostFunction.class.getSimpleName())); + } + + @Test + public void testVerifyPrefetchCostFunctionDisabledByNoPersistencePathKey() { + assertFalse(Arrays.asList(loadBalancer.getCostFunctionNames()) + .contains(PrefetchCacheCostFunction.class.getSimpleName())); + } + + @Test + public void testVerifyPrefetchCostFunctionDisabledByNoMultiplier() { + conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, "/tmp/prefetch.persistence"); + conf.setFloat("hbase.master.balancer.stochastic.prefetchCacheCost", 0.0f); + + assertFalse(Arrays.asList(loadBalancer.getCostFunctionNames()) + .contains(PrefetchCacheCostFunction.class.getSimpleName())); + } + + @Test + public void testPrefetchCost() { + conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, "/tmp/prefetch.persistence"); + CostFunction costFunction = new PrefetchCacheCostFunction(conf); + + for (int test = 0; test < clusterRegionPrefetchMocks.length; test++) { + int[][] clusterRegionLocations = clusterRegionPrefetchMocks[test]; + TestPrefetchCacheCostLoadBalancerFunction.MockClusterForPrefetch cluster = + new TestPrefetchCacheCostLoadBalancerFunction.MockClusterForPrefetch( + clusterRegionLocations); + costFunction.prepare(cluster); + double cost = costFunction.cost(); + assertEquals(expectedPrefetch[test], cost, 0.01); + } + } + + private class MockClusterForPrefetch extends BalancerClusterState { + private final int[][] regionServerPrefetch; // [region][server] = prefetch percent + + public MockClusterForPrefetch(int[][] regionsArray) { + // regions[0] is an array where index = serverIndex and value = number of regions + super(mockClusterServers(regionsArray[0], 1), null, null, null, null); + regionServerPrefetch = new int[regionsArray.length - 1][]; + Map> historicalPrefetchRatio = new HashMap<>(); + for (int i = 1; i < regionsArray.length; i++) { + int regionIndex = i - 1; + regionServerPrefetch[regionIndex] = new int[regionsArray[i].length - 1]; + regionIndexToServerIndex[regionIndex] = regionsArray[i][0]; + for (int j = 1; j < regionsArray[i].length; j++) { + int serverIndex = j - 1; + regionServerPrefetch[regionIndex][serverIndex] = regionsArray[i][j]; + if (regionsArray[i][j] > 0 && serverIndex != regionsArray[i][0]) { + // This is the historical prefetch value + Map historicalPrefetch = new HashMap<>(); + historicalPrefetch.put(servers[serverIndex].getAddress(), (float) regionsArray[i][j]); + historicalPrefetchRatio.put(regions[regionIndex].getRegionNameAsString(), + historicalPrefetch); + } + } + } + historicalRegionServerPrefetchRatio = historicalPrefetchRatio; + } + + @Override + public int getRegionSizeMB(int region) { + return 1; + } + + @Override + protected float getRegionServerPrefetchRatio(int region, int regionServerIndex) { + float prefetchRatio = 0.0f; + + // Get the prefetch cache ratio if the region is currently hosted on this server + if (regionServerIndex == regionIndexToServerIndex[region]) { + return regionServerPrefetch[region][regionServerIndex]; + } + + // Region is not currently hosted on this server. Check if the region was prefetched on this + // server earlier. This can happen when the server was shutdown and the cache was persisted. + // Search using the index name and server name and not the index id and server id as these + // ids may change when a server is marked as dead or a new server is added. + String regionNameAsString = regions[region].getRegionNameAsString(); + Address serverAddress = servers[regionServerIndex].getAddress(); + if ( + historicalRegionServerPrefetchRatio != null + && historicalRegionServerPrefetchRatio.containsKey(regionNameAsString) + ) { + Map serverPrefetchRatio = + historicalRegionServerPrefetchRatio.get(regionNameAsString); + if (serverPrefetchRatio.containsKey(serverAddress)) { + prefetchRatio = serverPrefetchRatio.get(serverAddress); + // The old prefetch cache ratio has been accounted for and hence, clear up this + // information + // as it is not needed anymore + historicalRegionServerPrefetchRatio.remove(regionNameAsString, serverPrefetchRatio); + } + } + return prefetchRatio; + } + } +} diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java index 21f3a3b66c9a..0848f6c97784 100644 --- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java @@ -213,6 +213,7 @@ public void testKeepRegionLoad() throws Exception { when(rl.getWriteRequestCount()).thenReturn(0L); when(rl.getMemStoreSize()).thenReturn(Size.ZERO); when(rl.getStoreFileSize()).thenReturn(new Size(i, Size.Unit.MEGABYTE)); + when(rl.getPrefetchCacheRatio()).thenReturn(0.0f); Map regionLoadMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); regionLoadMap.put(Bytes.toBytes(REGION_KEY), rl); @@ -317,6 +318,7 @@ public void testUpdateStochasticCostsIfBalanceNotRan() { @Test public void testNeedBalance() { conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 1.0f); + conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, "/tmp/prefetch_persistence"); conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, -1f); conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", false); conf.setLong("hbase.master.balancer.stochastic.maxSteps", 5000L); @@ -603,6 +605,8 @@ public void testAdditionalCostFunction() { @Test public void testDefaultCostFunctionList() { + conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, "/tmp/prefetch_persistence"); + loadBalancer.loadConf(conf); List expected = Arrays.asList(RegionCountSkewCostFunction.class.getSimpleName(), PrimaryRegionCountSkewCostFunction.class.getSimpleName(), MoveCostFunction.class.getSimpleName(), RackLocalityCostFunction.class.getSimpleName(), @@ -611,7 +615,8 @@ public void testDefaultCostFunctionList() { RegionReplicaRackCostFunction.class.getSimpleName(), ReadRequestCostFunction.class.getSimpleName(), CPRequestCostFunction.class.getSimpleName(), WriteRequestCostFunction.class.getSimpleName(), - MemStoreSizeCostFunction.class.getSimpleName(), StoreFileCostFunction.class.getSimpleName()); + MemStoreSizeCostFunction.class.getSimpleName(), StoreFileCostFunction.class.getSimpleName(), + PrefetchCacheCostFunction.class.getSimpleName()); List actual = Arrays.asList(loadBalancer.getCostFunctionNames()); assertTrue("ExpectedCostFunctions: " + expected + " ActualCostFunctions: " + actual, @@ -623,7 +628,6 @@ private boolean needsBalanceIdleRegion(int[] cluster) { && Arrays.stream(cluster).anyMatch(x -> x < 1); } - // This mock allows us to test the LocalityCostFunction private class MockCluster extends BalancerClusterState { private int[][] localities = null; // [region][server] = percent of blocks diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerBalanceCluster.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerBalanceCluster.java index 2a7b8afccc38..16c8ef625883 100644 --- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerBalanceCluster.java +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerBalanceCluster.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; @@ -51,6 +52,7 @@ public class TestStochasticLoadBalancerBalanceCluster extends StochasticBalancer */ @Test public void testBalanceCluster() throws Exception { + conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, "/tmp/prefetch_persistence"); loadBalancer.onConfigurationChange(conf); for (int[] mockCluster : clusterStateMocks) { Map> servers = mockClusterServers(mockCluster); diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerLargeCluster.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerLargeCluster.java index 620d610288f7..e02728a7e32c 100644 --- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerLargeCluster.java +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerLargeCluster.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master.balancer; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.junit.ClassRule; @@ -39,6 +40,7 @@ public void testLargeCluster() { int numTables = 100; int replication = 1; conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 6 * 60 * 1000); + conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, "/tmp/prefetch_persistence"); loadBalancer.onConfigurationChange(conf); testWithClusterWithIteration(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplicaHighReplication.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplicaHighReplication.java index a58b8e162968..aac7247afd24 100644 --- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplicaHighReplication.java +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplicaHighReplication.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master.balancer; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.junit.ClassRule; @@ -36,6 +37,7 @@ public class TestStochasticLoadBalancerRegionReplicaHighReplication public void testRegionReplicasOnMidClusterHighReplication() { conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L); conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec + conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, "/tmp/prefetch_persistence"); loadBalancer.onConfigurationChange(conf); int numNodes = 40; int numRegions = 6 * numNodes; diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerSmallCluster.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerSmallCluster.java index 831c9e932727..925de0e06240 100644 --- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerSmallCluster.java +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerSmallCluster.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master.balancer; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.junit.ClassRule; @@ -38,6 +39,7 @@ public void testSmallCluster() { int numRegionsPerServer = 40; // all servers except one int replication = 1; int numTables = 10; + conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, "/tmp/prefetch_persistence"); testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); } @@ -48,6 +50,7 @@ public void testSmallCluster2() { int numRegionsPerServer = 40; // all servers except one int replication = 1; int numTables = 10; + conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, "/tmp/prefetch_persistence"); testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); } @@ -58,6 +61,7 @@ public void testSmallCluster3() { int numRegionsPerServer = 1; // all servers except one int replication = 1; int numTables = 10; + conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, "/tmp/prefetch_persistence"); // fails because of max moves testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, false, false); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetrics.java index 47b36a7a1516..285aa50b1143 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetrics.java @@ -138,4 +138,10 @@ default String getNameAsString() { /** Returns the compaction state of this region */ CompactionState getCompactionState(); + + /** Returns the ratio of files already in cache */ + float getPrefetchCacheRatio(); + + /** Returns the name of the server hosting this region */ + ServerName getServerName(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetricsBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetricsBuilder.java index 43b3a17aac17..2a34ba227c06 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetricsBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionMetricsBuilder.java @@ -80,7 +80,8 @@ public static RegionMetrics toRegionMetrics(ClusterStatusProtos.RegionLoad regio ClusterStatusProtos.StoreSequenceId::getSequenceId))) .setUncompressedStoreFileSize( new Size(regionLoadPB.getStoreUncompressedSizeMB(), Size.Unit.MEGABYTE)) - .build(); + .setPrefetchCacheRatio(regionLoadPB.getPrefetchCacheRatio()) + .setServerName(ProtobufUtil.toServerName(regionLoadPB.getServerName())).build(); } private static List @@ -120,7 +121,8 @@ public static ClusterStatusProtos.RegionLoad toRegionLoad(RegionMetrics regionMe .addAllStoreCompleteSequenceId(toStoreSequenceId(regionMetrics.getStoreSequenceId())) .setStoreUncompressedSizeMB( (int) regionMetrics.getUncompressedStoreFileSize().get(Size.Unit.MEGABYTE)) - .build(); + .setPrefetchCacheRatio(regionMetrics.getPrefetchCacheRatio()) + .setServerName(ProtobufUtil.toServerName(regionMetrics.getServerName())).build(); } public static RegionMetricsBuilder newBuilder(byte[] name) { @@ -154,6 +156,8 @@ public static RegionMetricsBuilder newBuilder(byte[] name) { private long blocksLocalWithSsdWeight; private long blocksTotalWeight; private CompactionState compactionState; + private float prefetchCacheRatio; + private ServerName serverName; private RegionMetricsBuilder(byte[] name) { this.name = name; @@ -289,6 +293,16 @@ public RegionMetricsBuilder setCompactionState(CompactionState compactionState) return this; } + public RegionMetricsBuilder setPrefetchCacheRatio(float prefetchCacheRatio) { + this.prefetchCacheRatio = prefetchCacheRatio; + return this; + } + + public RegionMetricsBuilder setServerName(ServerName serverName) { + this.serverName = serverName; + return this; + } + public RegionMetrics build() { return new RegionMetricsImpl(name, storeCount, storeFileCount, storeRefCount, maxCompactedStoreFileRefCount, compactingCellCount, compactedCellCount, storeFileSize, @@ -296,7 +310,7 @@ public RegionMetrics build() { uncompressedStoreFileSize, writeRequestCount, readRequestCount, cpRequestCount, filteredReadRequestCount, completedSequenceId, storeSequenceIds, dataLocality, lastMajorCompactionTimestamp, dataLocalityForSsd, blocksLocalWeight, blocksLocalWithSsdWeight, - blocksTotalWeight, compactionState); + blocksTotalWeight, compactionState, prefetchCacheRatio, serverName); } private static class RegionMetricsImpl implements RegionMetrics { @@ -327,6 +341,8 @@ private static class RegionMetricsImpl implements RegionMetrics { private final long blocksLocalWithSsdWeight; private final long blocksTotalWeight; private final CompactionState compactionState; + private final float prefetchCacheRatio; + private final ServerName serverName; RegionMetricsImpl(byte[] name, int storeCount, int storeFileCount, int storeRefCount, int maxCompactedStoreFileRefCount, final long compactingCellCount, long compactedCellCount, @@ -336,7 +352,7 @@ private static class RegionMetricsImpl implements RegionMetrics { long filteredReadRequestCount, long completedSequenceId, Map storeSequenceIds, float dataLocality, long lastMajorCompactionTimestamp, float dataLocalityForSsd, long blocksLocalWeight, long blocksLocalWithSsdWeight, long blocksTotalWeight, - CompactionState compactionState) { + CompactionState compactionState, float prefetchCacheRatio, ServerName serverName) { this.name = Preconditions.checkNotNull(name); this.storeCount = storeCount; this.storeFileCount = storeFileCount; @@ -364,6 +380,8 @@ private static class RegionMetricsImpl implements RegionMetrics { this.blocksLocalWithSsdWeight = blocksLocalWithSsdWeight; this.blocksTotalWeight = blocksTotalWeight; this.compactionState = compactionState; + this.prefetchCacheRatio = prefetchCacheRatio; + this.serverName = serverName; } @Override @@ -501,6 +519,16 @@ public CompactionState getCompactionState() { return compactionState; } + @Override + public float getPrefetchCacheRatio() { + return prefetchCacheRatio; + } + + @Override + public ServerName getServerName() { + return serverName; + } + @Override public String toString() { StringBuilder sb = @@ -541,6 +569,8 @@ public String toString() { Strings.appendKeyValue(sb, "blocksLocalWithSsdWeight", blocksLocalWithSsdWeight); Strings.appendKeyValue(sb, "blocksTotalWeight", blocksTotalWeight); Strings.appendKeyValue(sb, "compactionState", compactionState); + Strings.appendKeyValue(sb, "prefetchCacheRatio", this.getPrefetchCacheRatio()); + Strings.appendKeyValue(sb, "serverName", this.getServerName()); return sb.toString(); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index d56607653fca..22002c5bed3d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1321,6 +1321,11 @@ public enum OperationStatusCode { // sits in hbase-server, there were no other go! Can we move the cache implementation to // hbase-common? + /** + * Prefetch persistence path key + */ + public static final String PREFETCH_PERSISTENCE_PATH_KEY = "hbase.prefetch.file.list.path"; + /** * Current ioengine options in include: heap, offheap and file:PATH (where PATH is the path to the * file that will host the file-based cache. See BucketCache#getIOEngineFromName() for list of diff --git a/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto b/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto index 28cc5a865c23..e86f463c7fc6 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto @@ -177,6 +177,12 @@ message RegionLoad { MAJOR = 2; MAJOR_AND_MINOR = 3; } + + /** The prefetch cache ratio for region */ + optional float prefetch_cache_ratio = 28; + + /** The server hosting this region */ + optional ServerName server_name = 29; } message UserLoad { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 15c64c03d5e5..57f91fa19f44 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -93,8 +93,6 @@ public class CacheConfig { public static final String DROP_BEHIND_CACHE_COMPACTION_KEY = "hbase.hfile.drop.behind.compaction"; - public static final String PREFETCH_PERSISTENCE_PATH_KEY = "hbase.prefetch.file.list.path"; - /** * Configuration key to set interval for persisting bucket cache to disk. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index b4ab66d238e4..d3f9da9d829a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; -import static org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY; import java.io.File; import java.io.FileInputStream; @@ -52,6 +51,7 @@ import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.io.ByteBuffAllocator; @@ -289,9 +289,9 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); + this.prefetchedFileListPath = conf.get(HConstants.PREFETCH_PERSISTENCE_PATH_KEY); this.queueAdditionWaitTime = conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME); - this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY); this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000); sanityCheckConfigs(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 5240df5c62a1..124ab3031f77 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -106,9 +106,11 @@ import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.http.InfoServer; +import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor; import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.RpcClient; @@ -1530,6 +1532,7 @@ RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, long totalStaticBloomSize = 0L; long totalCompactingKVs = 0L; long currentCompactedKVs = 0L; + int filesAlreadyPrefetched = 0; List storeList = r.getStores(); stores += storeList.size(); for (HStore store : storeList) { @@ -1551,6 +1554,23 @@ RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, rootLevelIndexSize += store.getStorefilesRootLevelIndexSize(); totalStaticIndexSize += store.getTotalStaticIndexSize(); totalStaticBloomSize += store.getTotalStaticBloomSize(); + Collection filesInStore = store.getStorefiles(); + if (!filesInStore.isEmpty()) { + for (HStoreFile hStoreFile : filesInStore) { + String tempFileName = hStoreFile.getPath().getName(); + // PrefetchExecutor adds the encoded HFile name if the file has been prefetched. Every + // store file in this region is then checked if it has been prefetched. If the storefile + // is a link, then it's format is . The comparison here would fail if + // this format is used. In order for this comparison to success in such case, the HFile + // name needs to be extracted from the HFileLink + if (HFileLink.isHFileLink(tempFileName)) { + tempFileName = HFileLink.getReferencedHFileName(tempFileName); + } + if (PrefetchExecutor.isFilePrefetched(tempFileName)) { + filesAlreadyPrefetched++; + } + } + } } int unitMB = 1024 * 1024; @@ -1570,6 +1590,10 @@ RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight(); long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname()); long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname()); + + float ratioOfFilesAlreadyCached = + (storefiles == 0) ? 0 : filesAlreadyPrefetched / ((float) storefiles); + if (regionLoadBldr == null) { regionLoadBldr = RegionLoad.newBuilder(); } @@ -1593,7 +1617,9 @@ RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight) .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight) .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState())) - .setLastMajorCompactionTs(r.getOldestHfileTs(true)); + .setLastMajorCompactionTs(r.getOldestHfileTs(true)) + .setPrefetchCacheRatio(ratioOfFilesAlreadyCached) + .setServerName(ProtobufUtil.toServerName(serverName)); r.setCompleteSequenceId(regionLoadBldr); return regionLoadBldr.build(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerMetrics.java index 8bcf3e600f88..f0c7cb72f863 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerMetrics.java @@ -28,6 +28,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; @@ -58,6 +59,8 @@ public void testRegionLoadAggregation() { metrics.getRegionMetrics().values().stream().mapToLong(v -> v.getReadRequestCount()).sum()); assertEquals(100, metrics.getRegionMetrics().values().stream().mapToLong(v -> v.getCpRequestCount()).sum()); + assertEquals(1.0, metrics.getRegionMetrics().values().stream() + .mapToDouble(v -> v.getPrefetchCacheRatio()).sum(), 0); assertEquals(300, metrics.getRegionMetrics().values().stream() .mapToLong(v -> v.getFilteredReadRequestCount()).sum()); } @@ -95,16 +98,18 @@ private ClusterStatusProtos.ServerLoad createServerLoadProto() { .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.ENCODED_REGION_NAME) .setValue(ByteString.copyFromUtf8("QWERTYUIOP")).build(); - ClusterStatusProtos.RegionLoad rlOne = - ClusterStatusProtos.RegionLoad.newBuilder().setRegionSpecifier(rSpecOne).setStores(10) - .setStorefiles(101).setStoreUncompressedSizeMB(106).setStorefileSizeMB(520) - .setFilteredReadRequestsCount(100).setStorefileIndexSizeKB(42).setRootIndexSizeKB(201) - .setReadRequestsCount(Integer.MAX_VALUE).setWriteRequestsCount(Integer.MAX_VALUE).build(); + ClusterStatusProtos.RegionLoad rlOne = ClusterStatusProtos.RegionLoad.newBuilder() + .setRegionSpecifier(rSpecOne).setStores(10).setStorefiles(101).setStoreUncompressedSizeMB(106) + .setStorefileSizeMB(520).setFilteredReadRequestsCount(100).setStorefileIndexSizeKB(42) + .setRootIndexSizeKB(201).setReadRequestsCount(Integer.MAX_VALUE) + .setWriteRequestsCount(Integer.MAX_VALUE).setPrefetchCacheRatio(0.0f) + .setServerName(ProtobufUtil.toServerName(new ServerName("localhost1", 1, 1))).build(); ClusterStatusProtos.RegionLoad rlTwo = ClusterStatusProtos.RegionLoad.newBuilder() .setRegionSpecifier(rSpecTwo).setStores(3).setStorefiles(13).setStoreUncompressedSizeMB(23) .setStorefileSizeMB(300).setFilteredReadRequestsCount(200).setStorefileIndexSizeKB(40) .setRootIndexSizeKB(303).setReadRequestsCount(Integer.MAX_VALUE) - .setWriteRequestsCount(Integer.MAX_VALUE).setCpRequestsCount(100).build(); + .setWriteRequestsCount(Integer.MAX_VALUE).setCpRequestsCount(100).setPrefetchCacheRatio(1.0f) + .setServerName(ProtobufUtil.toServerName(new ServerName("localhost2", 1, 1))).build(); ClusterStatusProtos.ServerLoad sl = ClusterStatusProtos.ServerLoad.newBuilder() .addRegionLoads(rlOne).addRegionLoads(rlTwo).build(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java index a0e052e5ea36..8d8237ad0503 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.SingleProcessHBaseCluster; import org.apache.hadoop.hbase.StartTestingClusterOption; import org.apache.hadoop.hbase.TableName; @@ -75,7 +76,7 @@ public void setup() throws Exception { conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:" + testDir + "/bucket.cache"); conf.setInt("hbase.bucketcache.size", 400); conf.set("hbase.bucketcache.persistent.path", testDir + "/bucket.persistence"); - conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, testDir + "/prefetch.persistence"); + conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, testDir + "/prefetch.persistence"); zkCluster = TEST_UTIL.startMiniZKCluster(); cluster = TEST_UTIL.startMiniHBaseCluster(option); assertEquals(2, cluster.getRegionServerThreads().size()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java index 9e90d0e229c6..5b8d305e6087 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; @@ -84,7 +85,7 @@ public Configuration setupBucketCacheConfig(long bucketCachePersistInterval) thr } public BucketCache setupBucketCache(Configuration conf) throws IOException { - conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, (testDir + "/prefetch.persistence")); + conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, (testDir + "/prefetch.persistence")); BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 60 * 1000, conf); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java index 771ab0158f61..2fb7117d6fb6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; @@ -107,7 +108,7 @@ public void setup() throws IOException { testDir = TEST_UTIL.getDataTestDir(); TEST_UTIL.getTestFileSystem().mkdirs(testDir); prefetchPersistencePath = testDir + "/prefetch.persistence"; - conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, prefetchPersistencePath); + conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, prefetchPersistencePath); fs = HFileSystem.get(conf); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java index 594db4d7c303..f0a72f9a3d87 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java @@ -541,6 +541,16 @@ public long getBlocksTotalWeight() { public CompactionState getCompactionState() { return null; } + + @Override + public float getPrefetchCacheRatio() { + return 0.0f; + } + + @Override + public ServerName getServerName() { + return null; + } }; return regionMetrics; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerRejection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerRejection.java index c5d8223479e8..c1c2c62cc48c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerRejection.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerRejection.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.LogEntry; @@ -75,6 +76,7 @@ public void testBalancerRejections() throws Exception { try { // enabled balancer rejection recording conf.setBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED, true); + conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, "/tmp/prefetch_persistence"); conf.set(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY, MockCostFunction.class.getName()); MasterServices services = mock(MasterServices.class); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestPrefetchCacheCostBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestPrefetchCacheCostBalancer.java new file mode 100644 index 000000000000..68a3f36d56a7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestPrefetchCacheCostBalancer.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Random; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerMetrics; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SingleProcessHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test prefetchCacheCostFunction balancer function + */ +@Category({ MasterTests.class, MediumTests.class }) +public class TestPrefetchCacheCostBalancer extends StochasticBalancerTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestPrefetchCacheCostBalancer.class); + + private static HBaseTestingUtil TEST_UTIL; + + private static final int REGION_SERVERS = 3; + + private static final int REGION_NUM = REGION_SERVERS * 50; + + private Admin admin; + + private SingleProcessHBaseCluster cluster; + + private Configuration conf; + + private enum FunctionCostKeys { + REGIONCOUNTCOST("hbase.master.balancer.stochastic.regionCountCost"), + PRIMARYREGIONCOUNTCOST("hbase.master.balancer.stochastic.primaryRegionCountCost"), + MOVECOST("hbase.master.balancer.stochastic.moveCost"), + LOCALITYCOST("hbase.master.balancer.stochastic.localityCost"), + RACKLOCALITYCOST("hbase.master.balancer.stochastic.rackLocalityCost"), + TABLESKEWCOST("hbase.master.balancer.stochastic.tableSkewCost"), + REGIONREPLICAHOSTCOSTKEY("hbase.master.balancer.stochastic.regionReplicaHostCostKey"), + REGIONREPLICARACKCOSTKEY("hbase.master.balancer.stochastic.regionReplicaRackCostKey"), + READREQUESTCOST("hbase.master.balancer.stochastic.readRequestCost"), + CPREQUESTCOST("hbase.master.balancer.stochastic.cpRequestCost"), + WRITEREQUESTCOST("hbase.master.balancer.stochastic.writeRequestCost"), + MEMSTORESIZECOST("hbase.master.balancer.stochastic.memstoreSizeCost"), + STOREFILESIZECOST("hbase.master.balancer.stochastic.storefileSizeCost"); + + private final String costKey; + + FunctionCostKeys(String key) { + this.costKey = key; + } + + public String getValue() { + return costKey; + } + } + + @Before + public void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtil(); + conf = TEST_UTIL.getConfiguration(); + Path testDir = TEST_UTIL.getDataTestDir(); + + // Enable prefetch persistence which will enable prefetch cache cost function + Path p = new Path(testDir, "bc.txt"); + FileSystem fs = FileSystem.get(this.conf); + fs.create(p).close(); + // Must use file based bucket cache here. + conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "file:" + p); + String prefetchPersistencePath = testDir + "/prefetch.persistence"; + conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); + conf.set(HConstants.PREFETCH_PERSISTENCE_PATH_KEY, prefetchPersistencePath); + // Must use the ByteBuffAllocator here + conf.setBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); + conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.1f); + conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); + // 32MB for BucketCache. + conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 32); + } + + @After + public void cleanup() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + TEST_UTIL.cleanupTestDir(); + } + + // Verify whether the given cost function is enabled/disabled. A function is enabled if the + // cost key is set to a positive number and disabled if it is set to 0 + private void verifyCostFunctionState(Configuration myConf, String costFunctionName, + boolean isEnabled) { + float costKeyValue = myConf.getFloat(costFunctionName, Float.MAX_VALUE); + assertEquals(isEnabled, costKeyValue > 0.0f); + } + + @Test + public void testOnlyPrefetchCacheCostFunctionDisabled() throws Exception { + // Test strategy + // 1. Turn off the prefetch cache cost function by setting the parameter + // hbase.master.balancer.stochastic.prefetchCacheCost to 0 + // 2. Create the cluster + // 3. Find a region server to shutdown and restart + // 4. Assert that the region server identified in 3. has more than 1 regions assigned + // 5. Shutdown the region server + // 6. Get the number of regions assigned to the other region server and assert that it matched + // the total number of regions in the cluster + // 7. Start the region server identified in 3. + // 8. Trigger the balancer + // 9. Assert that some regions are assigned to the region server identified in 3. + + // Disable the prefetch cache cost function + conf.setFloat("hbase.master.balancer.stochastic.prefetchCacheCost", 0.0f); + loadBalancer.loadConf(conf); + + TEST_UTIL.startMiniCluster(REGION_SERVERS); + TEST_UTIL.getDFSCluster().waitClusterUp(); + + cluster = TEST_UTIL.getHBaseCluster(); + admin = TEST_UTIL.getAdmin(); + admin.balancerSwitch(false, true); + TableName tableName = TableName.valueOf("testTablePrefetchCacheCostFunctionDisabled"); + TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); + admin.createTable(tableDescriptor, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGION_NUM); + TEST_UTIL.waitTableAvailable(tableName); + TEST_UTIL.loadTable(admin.getConnection().getTable(tableName), HConstants.CATALOG_FAMILY); + admin.flush(tableName); + TEST_UTIL.compact(true); + + // Validate that all the other cost functions are enabled + Arrays.stream(FunctionCostKeys.values()) + .forEach(functionCostKey -> verifyCostFunctionState(admin.getConfiguration(), + functionCostKey.getValue(), true)); + + // Validate that the prefetch cache cost function is disabled + verifyCostFunctionState(admin.getConfiguration(), + "hbase.master.balancer.stochastic.prefetchCacheCost", false); + + // Run the balancer and wait until all the region movement is finished + admin.balancerSwitch(true, true); + assertTrue("Balancer did not run", admin.balance()); + admin.balance(); + TEST_UTIL.waitUntilNoRegionsInTransition(120000); + + Map ssmap = cluster.getClusterMetrics().getLiveServerMetrics(); + assertEquals(REGION_SERVERS, ssmap.size()); + + // Get the name of the region server to shutdown and restart + ServerName serverName = cluster.getClusterMetrics().getServersName().get(REGION_SERVERS - 1); + ServerMetrics sm = ssmap.get(serverName); + // Verify that some regions are assigned to this region server + assertTrue(0.0f != sm.getRegionMetrics().size()); + + // Shutdown the region server and wait for the regions hosted by this server to be reassigned + // to other region servers + cluster.stopRegionServer(serverName); + cluster.waitForRegionServerToStop(serverName, 1000); + // Compact the table so that all the regions are reassigned to the running region servers + TEST_UTIL.compact(true); + TEST_UTIL.waitUntilNoRegionsInTransition(12000); + + ssmap = cluster.getClusterMetrics().getLiveServerMetrics(); + assertEquals(REGION_SERVERS - 1, ssmap.size()); + sm = ssmap.get(serverName); + // Validate that no server metrics is found for the non-active server + assertNull(sm); + + // Restart the region server and run balancer and validate that some regions are reassigned to + // this region server + cluster.startRegionServer(serverName.getHostname(), serverName.getPort()); + // Get the name of the region server + cluster.waitForRegionServerToStart(serverName.getHostname(), serverName.getPort(), 1000); + admin.balance(); + TEST_UTIL.waitUntilNoRegionsInTransition(12000); + ssmap = cluster.getClusterMetrics().getLiveServerMetrics(); + assertEquals(REGION_SERVERS, ssmap.size()); + + ServerName newServerName = cluster.getClusterMetrics().getServersName().get(REGION_SERVERS - 1); + // Verify that the same region server has been started + assertTrue(ServerName.isSameAddress(serverName, newServerName)); + sm = ssmap.get(newServerName); + + assertNotNull(sm); + assertTrue(sm.getRegionMetrics().size() > 0); + } + + @Test + public void testOnlyPrefetchCacheCostFunctionEnabled() throws Exception { + // Test strategy + // 1. Turn off all other cost functions. NOTE: Please add to the list of cost functions that + // need + // to be turned off when a new function is added + // 2. Create a cluster only with prefetchCacheCostFunction enabled + // 3. Find a regionserver to shutdown and restart + // 4. Assert that the region server identified in 3. has more than 1 regions assigned + // 5. Shutdown the region server identified in 3. + // 6. Get the number of regions assigned to the other region servers and assert that it matches + // the total number of regions in the cluster + // 7. Start the region server identified in 3. + // 8. Trigger the balancer + // 9. Assert that no regions are assigned to the region server identified in 3. + + Arrays.stream(FunctionCostKeys.values()) + .forEach(functionCostKey -> conf.setFloat(functionCostKey.getValue(), 0.0f)); + loadBalancer.loadConf(conf); + + TEST_UTIL.startMiniCluster(REGION_SERVERS); + TEST_UTIL.getDFSCluster().waitClusterUp(); + cluster = TEST_UTIL.getHBaseCluster(); + admin = TEST_UTIL.getAdmin(); + admin.balancerSwitch(false, true); + TableName tableName = TableName.valueOf("testTableOnlyPrefetchCacheCostFunctionEnabled"); + TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); + admin.createTable(tableDescriptor, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGION_NUM); + TEST_UTIL.waitTableAvailable(tableName); + TEST_UTIL.loadTable(admin.getConnection().getTable(tableName), HConstants.CATALOG_FAMILY); + admin.flush(tableName); + TEST_UTIL.compact(true); + + // Validate that all the other cost functions are disabled + Arrays.stream(FunctionCostKeys.values()) + .forEach(functionCostKey -> verifyCostFunctionState(admin.getConfiguration(), + functionCostKey.getValue(), false)); + + verifyCostFunctionState(admin.getConfiguration(), + "hbase.master.balancer.stochastic.prefetchCacheCost", true); + + // Run balancer and wait until all the region have been moved + // started. + admin.balancerSwitch(true, true); + assertTrue("Balancer did not run", admin.balance()); + admin.balance(); + TEST_UTIL.waitUntilNoRegionsInTransition(120000); + + Map ssmap = cluster.getClusterMetrics().getLiveServerMetrics(); + + assertEquals(REGION_SERVERS, ssmap.size()); + + // Shutdown the last server. This is because the server id for an inactive server is reassigned + // to the next running server. As soon as this server is restarted, it is assigned the next + // available + // server id. In our case, we want to track the same server and hence, it's safe to restart the + // last server in the list + ServerName serverName = cluster.getClusterMetrics().getServersName().get(REGION_SERVERS - 1); + ServerMetrics sm = ssmap.get(serverName); + assertTrue(0 != sm.getRegionMetrics().size()); + + cluster.stopRegionServer(serverName); + cluster.waitForRegionServerToStop(serverName, 1000); + TEST_UTIL.compact(true); + TEST_UTIL.waitUntilNoRegionsInTransition(12000); + ssmap = cluster.getClusterMetrics().getLiveServerMetrics(); + assertEquals(REGION_SERVERS - 1, ssmap.size()); + sm = ssmap.get(serverName); + assertNull(sm); + + // Restart the region server + cluster.startRegionServer(serverName.getHostname(), serverName.getPort()); + cluster.waitForRegionServerToStart(serverName.getHostname(), serverName.getPort(), 1000); + admin.balance(); + TEST_UTIL.waitUntilNoRegionsInTransition(120000); + ssmap = cluster.getClusterMetrics().getLiveServerMetrics(); + assertEquals(REGION_SERVERS, ssmap.size()); + + ServerName newServerName = cluster.getClusterMetrics().getServersName().get(REGION_SERVERS - 1); + // Verify that the same region server has been started + assertTrue(ServerName.isSameAddress(serverName, newServerName)); + sm = ssmap.get(newServerName); + + assertNotNull(sm); + assertEquals(0, sm.getRegionMetrics().size()); + } + + @Test + public void testStressTestWithOnlyPrefetchCacheCostFunctionEnabled() throws Exception { + // Test the prefetch cache cost returned by the cost function when random servers are + // restarted and only the PrefetchCacheCostFunction is enabled. Ensure that the prefetch cost + // returned by the cost function is always between 0 and 1. + + // Disable all other cost functions + Arrays.stream(FunctionCostKeys.values()) + .forEach(functionCostKey -> conf.setFloat(functionCostKey.getValue(), 0.0f)); + loadBalancer.loadConf(conf); + + TEST_UTIL.startMiniCluster(REGION_SERVERS); + TEST_UTIL.getDFSCluster().waitClusterUp(); + cluster = TEST_UTIL.getHBaseCluster(); + admin = TEST_UTIL.getAdmin(); + admin.balancerSwitch(false, true); + TableName tableName = TableName.valueOf("testTableOnlyPrefetchCacheCostFunctionEnabled"); + TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); + admin.createTable(tableDescriptor, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), REGION_NUM); + TEST_UTIL.waitTableAvailable(tableName); + TEST_UTIL.loadTable(admin.getConnection().getTable(tableName), HConstants.CATALOG_FAMILY); + admin.flush(tableName); + TEST_UTIL.compact(true); + + // Verify that all the other cost functions except the PrefetchCacheCostFunction are disabled + Arrays.stream(FunctionCostKeys.values()) + .forEach(functionCostKey -> verifyCostFunctionState(admin.getConfiguration(), + functionCostKey.getValue(), false)); + + verifyCostFunctionState(admin.getConfiguration(), + "hbase.master.balancer.stochastic.prefetchCacheCost", true); + + admin.balancerSwitch(true, true); + admin.balance(); + TEST_UTIL.waitUntilNoRegionsInTransition(120000); + + Random rand = new Random(); + for (int i = 0; i < 5; i++) { + int randomServerID = rand.nextInt(REGION_SERVERS); + ServerName sn = cluster.getClusterMetrics().getServersName().get(randomServerID); + cluster.stopRegionServer(sn); + cluster.waitForRegionServerToStop(sn, 1000); + TEST_UTIL.compact(true); + TEST_UTIL.waitUntilNoRegionsInTransition(12000); + cluster.startRegionServer(sn.getHostname(), sn.getPort()); + cluster.waitForRegionServerToStart(sn.getHostname(), sn.getPort(), 1000); + admin.balance(); + // Verify that the same server was restarted + verifyServerActive(sn); + assertEquals(REGION_SERVERS, cluster.getClusterMetrics().getLiveServerMetrics().size()); + validatePrefetchCacheCost(loadBalancer.getCostFunctions()); + } + } + + private void verifyServerActive(ServerName serverName) throws Exception { + // The server id of the region server may change post restart. The only way to ensure that the + // same server has been restarted is by searching for the server address (host:port) in the + // active server list + boolean found = false; + for (ServerName sname : cluster.getClusterMetrics().getServersName()) { + if (ServerName.isSameAddress(sname, serverName)) { + found = true; + break; + } + } + assertTrue(found); + } + + private void validatePrefetchCacheCost(List cf) { + for (CostFunction c : cf) { + if (c.getMultiplier() > 0.0f) { + assertTrue(c.cost() >= 0.0 && c.cost() <= 1.0); + } + } + } +}