Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
aa22468
HBASE-27389 Add cost function in balancer to consider the cost of bui…
ragarkar Sep 23, 2022
54bacff
HBASE-27389 Add cost function in balancer to consider the cost of bui…
ragarkar Sep 23, 2022
5c201ed
HBASE-27389 Add cost function in balancer to consider the cost of bui…
ragarkar Jan 18, 2023
1669025
Merge branch 'master' into prefetchcost
ragarkar Jan 18, 2023
a53a3fc
HBASE-27389 Add cost function in balancer to consider the cost of bui…
ragarkar Jan 18, 2023
826db46
HBASE-27389 Add cost function in balancer to consider the cost of bui…
ragarkar Mar 2, 2023
ab870c0
HBASE-27389 Add cost function in balancer to consider the cost of bui…
ragarkar Mar 2, 2023
a8dbcef
HBASE-27389 Add cost function in balancer to consider the cost of bui…
ragarkar Mar 2, 2023
8645e93
HBASE-27389 Add cost function in balancer to consider the cost of bui…
ragarkar Mar 3, 2023
3fcaa56
HBASE-27389 Add cost function in balancer to consider the cost of bui…
ragarkar Mar 24, 2023
e70cc66
Merge branch 'master' into prefetchcost
ragarkar Mar 24, 2023
af0181b
HBASE-27389 Add cost function in balancer to consider the cost of bui…
ragarkar Mar 24, 2023
cf7b3d2
HBASE-27389 Add cost function in balancer to consider the cost of bui…
ragarkar Mar 24, 2023
1a7d728
HBASE-27389 Add cost function in balancer to consider the cost of bui…
ragarkar Mar 27, 2023
160f484
HBASE-27389 Add cost function in balancer to consider the cost of bui…
ragarkar Mar 31, 2023
55e55ea
HBASE-27389 Add cost function in balancer to consider the cost of bui…
ragarkar Mar 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -115,6 +116,13 @@ class BalancerClusterState {
// Maps localityType -> region -> [server|rack]Index with highest locality
private int[][] regionsToMostLocalEntities;

// Maps region -> serverIndex -> prefetch ratio of a region on a server
private Map<Pair<Integer, Integer>, Float> regionIndexServerIndexPrefetchRatio;
// Maps region -> serverIndex with best prefect ratio
private int[] regionServerIndexWithBestPrefetchRatio;
// Historical region server prefetch ratio
Map<String, Map<Address, Float>> historicalRegionServerPrefetchRatio;

static class DefaultRackManager extends RackManager {
@Override
public String getRack(ServerName server) {
Expand All @@ -125,13 +133,20 @@ public String getRack(ServerName server) {
BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
RackManager rackManager) {
this(null, clusterState, loads, regionFinder, rackManager);
this(null, clusterState, loads, regionFinder, rackManager, null);
}

BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
RackManager rackManager, Map<String, Map<Address, Float>> oldRegionServerPrefetchRatio) {
this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerPrefetchRatio);
}

@SuppressWarnings("unchecked")
BalancerClusterState(Collection<RegionInfo> unassignedRegions,
Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads,
RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager) {
RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager,
Map<String, Map<Address, Float>> oldRegionServerPrefetchRatio) {
if (unassignedRegions == null) {
unassignedRegions = Collections.emptyList();
}
Expand All @@ -145,6 +160,8 @@ public String getRack(ServerName server) {
tables = new ArrayList<>();
this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();

this.historicalRegionServerPrefetchRatio = oldRegionServerPrefetchRatio;

numRegions = 0;

List<List<Integer>> serversPerHostList = new ArrayList<>();
Expand Down Expand Up @@ -553,6 +570,127 @@ enum LocalityType {
RACK
}

/**
* Returns prefetch ratio weighted to the size of the region in MB.*
* @param region Region ID
* @param server Server ID
* @return Weighted prefetch ratio of a region on a region server
*/
public float getOrComputeWeightedPrefetchRatio(int region, int server) {
return getRegionSizeMB(region) * getOrComputeRegionPrefetchRatio(region, server);
}

/**
* Returns prefetch ratio of a region on a region server. This method also finds out the
* historical prefetch ratio of this region if it was ever hosted on this region server.
* @param region Region ID
* @param regionServerIndex Region Server ID
* @return Prefetch ratio of given region on the given region server
*/
protected float getRegionServerPrefetchRatio(int region, int regionServerIndex) {
// Cost this server has from RegionLoad
float prefetchRatio = 0.0f;

// Get the prefetch ratio if the region is currently hosted on this server
for (int regionIndex : regionsPerServer[regionServerIndex]) {
if (region != regionIndex) {
continue;
}
Deque<BalancerRegionLoad> regionLoadList = regionLoads[regionIndex];

// The region is currently hosted on this region server. Now, get the prefetch cache ratio
// for this region on this region server
prefetchRatio =
regionLoadList == null ? 0.0f : regionLoadList.getLast().getPrefetchCacheRatio();

return prefetchRatio;
}

// Region is not currently hosted on this server. Check if the region was prefetched on this
// server earlier. This can happen when the server was shutdown and the cache was persisted.
// Seartch using the index name and server name and not the index id and server id as these ids
// may change when a server is marked as dead or a new server is added.
String regionNameAsString = regions[region].getRegionNameAsString();
Address serverAddress = servers[regionServerIndex].getAddress();
if (
historicalRegionServerPrefetchRatio != null
&& historicalRegionServerPrefetchRatio.containsKey(regionNameAsString)
) {
Map<Address, Float> serverPrefetchRatio =
historicalRegionServerPrefetchRatio.get(regionNameAsString);
if (serverPrefetchRatio.containsKey(serverAddress)) {
prefetchRatio = serverPrefetchRatio.get(serverAddress);

// The old prefetch cache ratio has been accounted for and hence, clear up this information
historicalRegionServerPrefetchRatio.remove(regionNameAsString, serverPrefetchRatio);
}
}
return prefetchRatio;
}

/**
* Compute the prefetch ratios of all the regions on all the servers. This even includes the
* historical prefetch ratio of a region if it was historically hosted on some other region server
* before being moved to the currently hosting region server.
*/
private void computeRegionServerPrefetchRatio() {
regionIndexServerIndexPrefetchRatio = new HashMap<>();
regionServerIndexWithBestPrefetchRatio = new int[numRegions];

for (int region = 0; region < numRegions; region++) {
float bestPrefetchRatio = 0.0f;
int serverWithBestPrefetchRatio = 0;
for (int server = 0; server < numServers; server++) {
float prefetchRatio = getRegionServerPrefetchRatio(region, server);
if (prefetchRatio > 0.0f || server == regionIndexToServerIndex[region]) {
// Record the prefetch ratio of a region on a region server only if it is greater than 0
// which means either the region is currently hosted or was previously hosted on the given
// region server. We don't need to record a prefetch ratio of 0 as it does not add any
// value to the balancer decisions.
Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
regionIndexServerIndexPrefetchRatio.put(regionServerPair, prefetchRatio);
}
if (prefetchRatio > bestPrefetchRatio) {
serverWithBestPrefetchRatio = server;
bestPrefetchRatio = prefetchRatio;
} else {
// If the server currently hosting the region has equal prefetch ratio to a historical
// prefetch ratio, consider the current server to continue hosting the region
if (prefetchRatio == bestPrefetchRatio && server == regionIndexToServerIndex[region]) {
// If two servers have the same prefetch ratio, the server currently hosting the
// region should retain the region
serverWithBestPrefetchRatio = server;
}
}
}
regionServerIndexWithBestPrefetchRatio[region] = serverWithBestPrefetchRatio;
}
}

private float getOrComputeRegionPrefetchRatio(int region, int server) {
if (
regionServerIndexWithBestPrefetchRatio == null
|| regionIndexServerIndexPrefetchRatio.isEmpty()
) {
computeRegionServerPrefetchRatio();
}

Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
return regionIndexServerIndexPrefetchRatio.containsKey(regionServerPair)
? regionIndexServerIndexPrefetchRatio.get(regionServerPair)
: 0.0f;
}

public int[] getOrComputeServerWithBestPrefetchRatio() {
if (
regionIndexServerIndexPrefetchRatio.isEmpty()
|| regionServerIndexWithBestPrefetchRatio == null
) {
computeRegionServerPrefetchRatio();
}
return regionServerIndexWithBestPrefetchRatio;
}

public void doAction(BalanceAction action) {
switch (action.getType()) {
case NULL:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.balancer;

import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Size;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
Expand All @@ -34,13 +35,17 @@ class BalancerRegionLoad {
private final long writeRequestsCount;
private final int memStoreSizeMB;
private final int storefileSizeMB;
private final float prefetchCacheRatio;
private final ServerName serverName;

BalancerRegionLoad(RegionMetrics regionMetrics) {
readRequestsCount = regionMetrics.getReadRequestCount();
cpRequestsCount = regionMetrics.getCpRequestCount();
writeRequestsCount = regionMetrics.getWriteRequestCount();
memStoreSizeMB = (int) regionMetrics.getMemStoreSize().get(Size.Unit.MEGABYTE);
storefileSizeMB = (int) regionMetrics.getStoreFileSize().get(Size.Unit.MEGABYTE);
prefetchCacheRatio = regionMetrics.getPrefetchCacheRatio();
serverName = regionMetrics.getServerName();
}

public long getReadRequestsCount() {
Expand All @@ -62,4 +67,12 @@ public int getMemStoreSizeMB() {
public int getStorefileSizeMB() {
return storefileSizeMB;
}

public float getPrefetchCacheRatio() {
return prefetchCacheRatio;
}

public ServerName getServerName() {
return serverName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ private BalancerClusterState createCluster(List<ServerName> servers,
clusterState.put(server, Collections.emptyList());
}
}
return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager);
return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager,
null);
}

private List<ServerName> findIdleServers(List<ServerName> servers) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;

import java.util.concurrent.ThreadLocalRandom;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
class PrefetchBasedCandidateGenerator extends CandidateGenerator {
private static float PREFETCH_RATIO_DIFF_FACTOR = 1.25f;

@Override
BalanceAction generate(BalancerClusterState cluster) {
// iterate through regions until you find one that is not on ideal host
// start from a random point to avoid always balance the regions in front
if (cluster.numRegions > 0) {
int startRegionIndex = ThreadLocalRandom.current().nextInt(cluster.numRegions);
int toServerIndex;
for (int i = 0; i < cluster.numRegions; i++) {
int region = (startRegionIndex + i) % cluster.numRegions;
int currentServerIndex = cluster.regionIndexToServerIndex[region];
float currentPrefetchRatio =
cluster.getOrComputeWeightedPrefetchRatio(region, currentServerIndex);

// Check if there is a server with a better historical prefetch ratio
toServerIndex = pickOtherRandomServer(cluster, currentServerIndex);
float toServerPrefetchRatio =
cluster.getOrComputeWeightedPrefetchRatio(region, toServerIndex);

// If the prefetch ratio on the target server is significantly higher, move the region.
if (
currentPrefetchRatio > 0
&& (toServerPrefetchRatio / currentPrefetchRatio) > PREFETCH_RATIO_DIFF_FACTOR
) {
return getAction(currentServerIndex, region, toServerIndex, -1);
}
}
}
return BalanceAction.NULL_ACTION;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Compute the cost of a potential cluster configuration based on the number of HFile's already
* cached in the bucket cache
*/
@InterfaceAudience.Private
public class PrefetchCacheCostFunction extends CostFunction {
private static final String PREFETCH_CACHE_COST_KEY =
"hbase.master.balancer.stochastic.prefetchCacheCost";
private static final float DEFAULT_PREFETCH_COST = 500;

private String prefetchedFileListPath;
private double prefetchRatio;
private double bestPrefetchRatio;

/**
* The prefetch cache cost function is enabled only when the prefetch file list persistence is
* enabled by setting the parameter PREFETCH_PERSISTENCE_PATH_KEY. The prefetch file list
* persistence is disabled by default. The prefetch cache cost function is also disabled if the
* multiplier is set to 0. The prefetch cache ratio function would be most relevant for non-hdfs
* deployments, which then makes locality irrelevant. In those cases, prefetch and region skewness
* would be competing to prevail over the final balancer decision.
* @param conf Cluster configuration
*/
PrefetchCacheCostFunction(Configuration conf) {
prefetchedFileListPath = conf.get(HConstants.PREFETCH_PERSISTENCE_PATH_KEY);
this.setMultiplier(prefetchedFileListPath == null
? 0.0f
: conf.getFloat(PREFETCH_CACHE_COST_KEY, DEFAULT_PREFETCH_COST));
prefetchRatio = 0.0;
bestPrefetchRatio = 0.0;
}

@Override
void prepare(BalancerClusterState cluster) {
super.prepare(cluster);
prefetchRatio = 0.0;
bestPrefetchRatio = 0.0;

for (int region = 0; region < cluster.numRegions; region++) {
prefetchRatio +=
cluster.getOrComputeWeightedPrefetchRatio(region, cluster.regionIndexToServerIndex[region]);
bestPrefetchRatio += cluster.getOrComputeWeightedPrefetchRatio(region,
cluster.getOrComputeServerWithBestPrefetchRatio()[region]);
}
prefetchRatio = bestPrefetchRatio == 0.0 ? 1.0 : prefetchRatio / bestPrefetchRatio;
}

@Override
protected double cost() {
return scale(0, 1, (1 - prefetchRatio));
}

@Override
protected void regionMoved(int region, int oldServer, int newServer) {
double oldServerPrefetch = cluster.getOrComputeWeightedPrefetchRatio(region, oldServer);
double newServerPrefetch = cluster.getOrComputeWeightedPrefetchRatio(region, newServer);
double prefetchDelta = newServerPrefetch - oldServerPrefetch;
double normalizeDelta = bestPrefetchRatio == 0.0 ? 0.0 : prefetchDelta / bestPrefetchRatio;
prefetchRatio += normalizeDelta;
}

@Override
public final void updateWeight(double[] weights) {
weights[StochasticLoadBalancer.GeneratorType.PREFETCH.ordinal()] += cost();
}
}
Loading