Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,12 @@ protected static class Cluster {
int[] regionIndexToServerIndex; //regionIndex -> serverIndex
int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state)
int[] regionIndexToTableIndex; //regionIndex -> tableIndex
int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS
int[][] numRegionsPerServerPerTable; // serverIndex -> tableIndex -> # regions
int[] numRegionsPerTable; // tableIndex -> region count
double[] meanRegionsPerTable; // mean region count per table
double[] regionSkewByTable; // skew on RS per by table
double[] minRegionSkewByTable; // min skew on RS per by table
double[] maxRegionSkewByTable; // max skew on RS per by table
int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary
boolean hasRegionReplicas = false; //whether there is regions with replicas

Expand Down Expand Up @@ -365,7 +369,9 @@ protected Cluster(
}

numTables = tables.size();
LOG.debug("Number of tables={}", numTables);
numRegionsPerServerPerTable = new int[numServers][numTables];
numRegionsPerTable = new int[numTables];

for (int i = 0; i < numServers; i++) {
for (int j = 0; j < numTables; j++) {
Expand All @@ -376,15 +382,26 @@ protected Cluster(
for (int i=0; i < regionIndexToServerIndex.length; i++) {
if (regionIndexToServerIndex[i] >= 0) {
numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
numRegionsPerTable[regionIndexToTableIndex[i]]++;
}
}

numMaxRegionsPerTable = new int[numTables];
// Avoid repeated computation for planning
meanRegionsPerTable = new double[numTables];
regionSkewByTable = new double[numTables];
maxRegionSkewByTable = new double[numTables];
minRegionSkewByTable = new double[numTables];

for (int i = 0; i < numTables; i++) {
meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers;
minRegionSkewByTable[i] += DoubleArrayCost.getMinSkew(numRegionsPerTable[i], numServers);
maxRegionSkewByTable[i] += DoubleArrayCost.getMaxSkew(numRegionsPerTable[i], numServers);
}

for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) {
if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
}
for (int tableIdx = 0; tableIdx < aNumRegionsPerServerPerTable.length; tableIdx++) {
regionSkewByTable[tableIdx] +=
Math.abs(aNumRegionsPerServerPerTable[tableIdx] - meanRegionsPerTable[tableIdx]);
}
}

Expand Down Expand Up @@ -824,22 +841,13 @@ void regionMoved(int region, int oldServer, int newServer) {
int tableIndex = regionIndexToTableIndex[region];
if (oldServer >= 0) {
numRegionsPerServerPerTable[oldServer][tableIndex]--;
// update regionSkewPerTable for the move from old server
regionSkewByTable[tableIndex] += getSkewChangeFor(oldServer, tableIndex, -1);
}
numRegionsPerServerPerTable[newServer][tableIndex]++;

//check whether this caused maxRegionsPerTable in the new Server to be updated
if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex];
} else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1)
== numMaxRegionsPerTable[tableIndex]) {
//recompute maxRegionsPerTable since the previous value was coming from the old server
numMaxRegionsPerTable[tableIndex] = 0;
for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
}
}
}
// update regionSkewPerTable for the move to new server
regionSkewByTable[tableIndex] += getSkewChangeFor(newServer, tableIndex, 1);

// update for servers
int primary = regionIndexToPrimaryIndex[region];
Expand Down Expand Up @@ -1011,12 +1019,20 @@ public String toString() {
.append(Arrays.toString(serverIndicesSortedByRegionCount))
.append(", regionsPerServer=").append(Arrays.deepToString(regionsPerServer));

desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable))
desc.append(", regionSkewByTable=").append(Arrays.toString(regionSkewByTable))
.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
.append(", numTables=").append(numTables).append(", numMovedRegions=")
.append(numMovedRegions).append('}');
return desc.toString();
}

private double getSkewChangeFor(int serverIndex, int tableIndex, double regionCountChange) {
double curSkew = Math.abs(numRegionsPerServerPerTable[serverIndex][tableIndex] -
meanRegionsPerTable[tableIndex]);
double oldSkew = Math.abs(numRegionsPerServerPerTable[serverIndex][tableIndex] -
regionCountChange - meanRegionsPerTable[tableIndex]);
return curSkew - oldSkew;
}
}

// slop for regions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,31 +72,14 @@ private static double computeCost(double[] stats) {
double count = stats.length;
double mean = total / count;

// Compute max as if all region servers had 0 and one had the sum of all costs. This must be
// a zero sum cost for this to make sense.
double max = ((count - 1) * mean) + (total - mean);

// It's possible that there aren't enough regions to go around
double min;
if (count > total) {
min = ((count - total) * mean) + ((1 - mean) * total);
} else {
// Some will have 1 more than everything else.
int numHigh = (int) (total - (Math.floor(mean) * count));
int numLow = (int) (count - numHigh);

min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));

}
min = Math.max(0, min);
for (int i = 0; i < stats.length; i++) {
double n = stats[i];
double diff = Math.abs(mean - n);
totalCost += diff;
}

double scaled = StochasticLoadBalancer.scale(min, max, totalCost);
return scaled;
return StochasticLoadBalancer.scale(getMinSkew(total, count),
getMaxSkew(total, count), totalCost);
}

private static double getSum(double[] stats) {
Expand All @@ -106,4 +89,34 @@ private static double getSum(double[] stats) {
}
return total;
}
}

/**
* Return the min skew of distribution
* @param total is total number of regions
*/
public static double getMinSkew(double total, double numServers) {
double mean = total / numServers;
// It's possible that there aren't enough regions to go around
double min;
if (numServers > total) {
min = ((numServers - total) * mean + (1 - mean) * total) ;
} else {
// Some will have 1 more than everything else.
int numHigh = (int) (total - (Math.floor(mean) * numServers));
int numLow = (int) (numServers - numHigh);
min = numHigh * (Math.ceil(mean) - mean) + numLow * (mean - Math.floor(mean));
}
return min;
}

/**
* Return the max deviation of distribution
* Compute max as if all region servers had 0 and one had the sum of all costs. This must be
* a zero sum cost for this to make sense.
* @param total is total number of regions
*/
public static double getMaxSkew(double total, double numServers) {
double mean = total / numServers;
return (total - mean) + (numServers - 1) * mean;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {

protected static final Random RANDOM = new Random(System.currentTimeMillis());
private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
public static final double COST_EPSILON = 0.0001;

Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();

Expand All @@ -140,7 +141,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private int stepsPerRegion = 800;
private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
private int numRegionLoadsToRemember = 15;
private float minCostNeedBalance = 0.05f;
private float minCostNeedBalance = 0.025f;

private List<CandidateGenerator> candidateGenerators;
private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
Expand Down Expand Up @@ -215,9 +216,11 @@ public synchronized void setConf(Configuration conf) {
curFunctionCosts = new double[costFunctions.size()];
tempFunctionCosts = new double[costFunctions.size()];

LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" +
Arrays.toString(getCostFunctionNames()) + " etc.");
LOG.info(
"Loaded config; maxSteps=" + maxSteps + ", runMaxSteps=" + runMaxSteps +
", stepsPerRegion=" + stepsPerRegion +
", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable
+ ", CostFunctions=" + Arrays.toString(getCostFunctionNames()) + " etc.");
}

private void loadCustomCostFunctions(Configuration conf) {
Expand Down Expand Up @@ -706,7 +709,6 @@ Cluster.Action generate(Cluster cluster) {
* Base class of StochasticLoadBalancer's Cost Functions.
*/
public abstract static class CostFunction {

private float multiplier = 0;

protected Cluster cluster;
Expand Down Expand Up @@ -763,24 +765,6 @@ protected void regionMoved(int region, int oldServer, int newServer) {
protected abstract double cost();
}

/**
* Scale the value between 0 and 1.
* @param min Min value
* @param max The Max value
* @param value The value to be scaled.
* @return The scaled value.
*/
static double scale(double min, double max, double value) {
if (max <= min || value <= min) {
return 0;
}
if ((max - min) == 0) {
return 0;
}

return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
}

/**
* Given the starting state of the regions and a potential ending state
* compute cost based upon the number of regions that have moved.
Expand Down Expand Up @@ -970,15 +954,12 @@ static class TableSkewCostFunction extends CostFunction {

@Override
protected double cost() {
double max = cluster.numRegions;
double min = ((double) cluster.numRegions) / cluster.numServers;
double value = 0;

for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
value += cluster.numMaxRegionsPerTable[i];
double cost = 0;
for (int tableIdx = 0; tableIdx < cluster.numTables; tableIdx++) {
cost += scale(cluster.minRegionSkewByTable[tableIdx],
cluster.maxRegionSkewByTable[tableIdx], cluster.regionSkewByTable[tableIdx]);
}

return scale(min, max, value);
return cost;
}
}

Expand Down Expand Up @@ -1425,4 +1406,23 @@ protected double getCostFromRl(BalancerRegionLoad rl) {
public static String composeAttributeName(String tableName, String costFunctionName) {
return tableName + TABLE_FUNCTION_SEP + costFunctionName;
}

/**
* Scale the value between 0 and 1.
* @param min Min value
* @param max The Max value
* @param value The value to be scaled.
* @return The scaled value.
*/
static double scale(double min, double max, double value) {
if (max <= min || value <= min
|| Math.abs(max - min) <= COST_EPSILON || Math.abs(value - min) <= COST_EPSILON) {
return 0;
}
if (max <= min || Math.abs(max - min) <= COST_EPSILON) {
return 0;
}

return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public static void beforeAllTests() throws Exception {
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f);
conf.setFloat("hbase.regions.slop", 0.0f);
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 3 * 60 * 1000);
loadBalancer = new StochasticLoadBalancer();
loadBalancer.setConf(conf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,8 @@ public void testRegionAvailabilityWithRegionMoves() throws Exception {

// now move region1 from servers[0] to servers[2]
cluster.doAction(new MoveRegionAction(0, 0, 2));
// check that the numMaxRegionsPerTable for "table" has increased to 2
assertEquals(2, cluster.numMaxRegionsPerTable[0]);
// check that the regionSkewByTable for "table" has increased to 2
assertEquals(2, cluster.regionSkewByTable[0], 0.01);
// now repeat check whether moving region1 from servers[1] to servers[2]
// would lower availability
assertTrue(cluster.wouldLowerAvailability(hri1, servers[2]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,14 @@ public class TestStochasticLoadBalancerBalanceCluster extends BalancerTestBase {
*/
@Test
public void testBalanceCluster() throws Exception {
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 3 * 60 * 1000); // 800 sec
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 20000000L);
loadBalancer.setConf(conf);
for (int[] mockCluster : clusterStateMocks) {
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
List<ServerAndLoad> list = convertToList(servers);
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));

Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
(Map) mockClusterServersWithTables(servers);
List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public static void beforeAllTests() throws IOException {
BalancerTestBase.conf.setFloat("hbase.master.balancer.stochastic.regionCountCost", 0);
BalancerTestBase.conf.setFloat("hbase.master.balancer.stochastic.primaryRegionCountCost", 0);
BalancerTestBase.conf.setFloat("hbase.master.balancer.stochastic.tableSkewCost", 0);
BalancerTestBase.conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
BalancerTestBase.conf.set(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY,
HeterogeneousRegionCountCostFunction.class.getName());
// Need to ensure test dir has been created.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public void testLargeCluster() {
int numRegionsPerServer = 80; // all servers except one
int numTables = 100;
int replication = 1;
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 6 * 60 * 1000);
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 20000000L);
loadBalancer.onConfigurationChange(conf);
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
}
}