Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ protected static class Cluster {
int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state)
int[] regionIndexToTableIndex; //regionIndex -> tableIndex
int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS
int[] regionsPerTable; // count of regions per table
double regionStDevByTable; // standard deviation of region distribution by table
int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary
boolean hasRegionReplicas = false; //whether there is regions with replicas

Expand Down Expand Up @@ -297,7 +298,7 @@ protected Cluster(
primariesOfRegionsPerHost = new int[numHosts][];
primariesOfRegionsPerRack = new int[numRacks][];

int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
int regionIndex = 0, regionPerServerIndex = 0;

for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
if (entry.getKey() == null) {
Expand Down Expand Up @@ -383,20 +384,19 @@ protected Cluster(
}
}

regionsPerTable = new int[numTables];
for (int i = 0; i < numTables; i++) {
regionsPerTable[i] = 0;
}

for (int i=0; i < regionIndexToServerIndex.length; i++) {
if (regionIndexToServerIndex[i] >= 0) {
numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
regionsPerTable[regionIndexToTableIndex[i]]++;
}
}

numMaxRegionsPerTable = new int[numTables];
for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) {
if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
}
}
}
recomputeRegionStDevByTable();

for (int i = 0; i < regions.length; i ++) {
RegionInfo info = regions[i];
Expand Down Expand Up @@ -513,6 +513,122 @@ private void registerRegion(RegionInfo region, int regionIndex,
}
}

private void recomputeRegionStDevByTable() {
regionStDevByTable = 0;

for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
for (int tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) {
double deviation = aNumRegionsPerServerPerTable[tableIndex]
- Double.valueOf(regionsPerTable[tableIndex]) / numServers;
regionStDevByTable += deviation * deviation;
}
}

if (numServers == 0) {
regionStDevByTable = regionStDevByTable == 0? 0 : 1;
}

regionStDevByTable = scale(getMinStDevForMultiVar(regionsPerTable, numServers),
getMaxStDevForMultiVar(regionsPerTable, numServers),
Math.sqrt(regionStDevByTable / numServers));
}

/**
* Return the max standard deviation of distribution
* Compute max as if all region servers had 0 and one had the sum of all costs. This must be
* a zero sum cost for this to make sense.
*/
public double getMaxStDev(double total, double numServers) {
double mean = total / numServers;
return Math.sqrt(((total - mean) * (total - mean)
+ (numServers - 1) * mean * mean)
/ numServers);
}

/**
* Return the max standard deviation of distribution of multiple variables
* Compute max as if all region servers had 0 and one had the sum of all costs. This must be
* a zero sum cost for this to make sense.
*/
public double getMaxStDevForMultiVar(int[] total, double numServers) {
if (total == null || total.length == 0 || numServers == 0)
{
return 0;
}

double variance = 0;
for (int i = 0; i < total.length; i++) {
double mean = total[i] / numServers;
variance += (total[i] - mean) * (total[i] - mean) + (numServers - 1) * mean * mean;
}

return Math.sqrt(variance / numServers);
}

/**
* Return the min standard deviation of distribution
*/
public double getMinStDev(double total, double numServers) {
double mean = total / numServers;
// It's possible that there aren't enough regions to go around
double min;
if (numServers > total) {
min = Math.sqrt(((numServers - total) * mean * mean
+ (1 - mean) * (1 - mean) * total) / numServers);
} else {
// Some will have 1 more than everything else.
int numHigh = (int) (total - (Math.floor(mean) * numServers));
int numLow = (int) (numServers - numHigh);
min = Math.sqrt(((numHigh * (Math.ceil(mean) - mean) * (Math.ceil(mean) - mean))
+ (numLow * (mean - Math.floor(mean)) * (mean - Math.floor(mean)))) / numServers);
}
return min;
}

/**
* Return the min standard deviation of distribution of multiple variables
*/
public double getMinStDevForMultiVar(int[] total, double numServers) {
if (total == null || total.length == 0 || numServers == 0)
{
return 0;
}

double variance = 0;
for (int i = 0; i < total.length; i++) {
double mean = total[i] / numServers;
// It's possible that there aren't enough regions to go around
if (numServers > total[i]) {
variance += (numServers - total[i]) * mean * mean + (1 - mean) * (1 - mean) * total[i];
} else {
// Some will have 1 more than everything else.
int numHigh = (int) (total[i] - (Math.floor(mean) * numServers));
int numLow = (int) (numServers - numHigh);
variance += numHigh * (Math.ceil(mean) - mean) * (Math.ceil(mean) - mean)
+ (numLow * (mean - Math.floor(mean)) * (mean - Math.floor(mean)));
}
}

return Math.sqrt(variance / numServers);
}

/**
* Scale the value between 0 and 1.
*
* @param min Min value
* @param max The Max value
* @param value The value to be scaled.
* @return The scaled value.
*/
public double scale(double min, double max, double value) {
if (max <= min || value <= min) {
return 0;
}
if ((max - min) == 0) return 0;

return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
}

/**
* Returns true iff a given server has less regions than the balanced amount
*/
Expand Down Expand Up @@ -837,19 +953,8 @@ void regionMoved(int region, int oldServer, int newServer) {
}
numRegionsPerServerPerTable[newServer][tableIndex]++;

//check whether this caused maxRegionsPerTable in the new Server to be updated
if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex];
} else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1)
== numMaxRegionsPerTable[tableIndex]) {
//recompute maxRegionsPerTable since the previous value was coming from the old server
numMaxRegionsPerTable[tableIndex] = 0;
for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
}
}
}
// recalculate stdev
recomputeRegionStDevByTable();

// update for servers
int primary = regionIndexToPrimaryIndex[region];
Expand Down Expand Up @@ -1019,7 +1124,7 @@ public String toString() {
.append(Arrays.toString(serverIndicesSortedByRegionCount))
.append(", regionsPerServer=").append(Arrays.deepToString(regionsPerServer));

desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable))
desc.append(", regionStDevByTable=").append(regionStDevByTable)
.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
.append(", numTables=").append(numTables).append(", numMovedRegions=")
.append(numMovedRegions).append('}');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -817,31 +817,14 @@ protected double costFromArray(double[] stats) {
double count = stats.length;
double mean = total/count;

// Compute max as if all region servers had 0 and one had the sum of all costs. This must be
// a zero sum cost for this to make sense.
double max = ((count - 1) * mean) + (total - mean);

// It's possible that there aren't enough regions to go around
double min;
if (count > total) {
min = ((count - total) * mean) + ((1 - mean) * total);
} else {
// Some will have 1 more than everything else.
int numHigh = (int) (total - (Math.floor(mean) * count));
int numLow = (int) (count - numHigh);

min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));

}
min = Math.max(0, min);
for (int i=0; i<stats.length; i++) {
double n = stats[i];
double diff = Math.abs(mean - n);
double diff = (mean - n) * (mean - n);
totalCost += diff;
}

double scaled = scale(min, max, totalCost);
return scaled;
return cluster.scale(cluster.getMinStDev(total, count), cluster.getMaxStDev(total, count),
Math.sqrt(totalCost / count));
}

private double getSum(double[] stats) {
Expand All @@ -851,23 +834,6 @@ private double getSum(double[] stats) {
}
return total;
}

/**
* Scale the value between 0 and 1.
*
* @param min Min value
* @param max The Max value
* @param value The value to be scaled.
* @return The scaled value.
*/
protected double scale(double min, double max, double value) {
if (max <= min || value <= min) {
return 0;
}
if ((max - min) == 0) return 0;

return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
}
}

/**
Expand Down Expand Up @@ -916,7 +882,7 @@ protected double cost() {
return 1000000; // return a number much greater than any of the other cost
}

return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
return cluster.scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
}
}

Expand Down Expand Up @@ -1024,15 +990,7 @@ static class TableSkewCostFunction extends CostFunction {

@Override
protected double cost() {
double max = cluster.numRegions;
double min = ((double) cluster.numRegions) / cluster.numServers;
double value = 0;

for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
value += cluster.numMaxRegionsPerTable[i];
}

return scale(min, max, value);
return cluster.regionStDevByTable;
}
}

Expand Down Expand Up @@ -1385,7 +1343,7 @@ protected double cost() {
for (int i = 0 ; i < costsPerGroup.length; i++) {
totalCost += costsPerGroup[i];
}
return scale(0, maxCost, totalCost);
return cluster.scale(0, maxCost, totalCost);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,8 @@ public void testRegionAvailabilityWithRegionMoves() throws Exception {

// now move region1 from servers[0] to servers[2]
cluster.doAction(new MoveRegionAction(0, 0, 2));
// check that the numMaxRegionsPerTable for "table" has increased to 2
assertEquals(2, cluster.numMaxRegionsPerTable[0]);
// check that the regionStDevByTable for "table" has increased
assertEquals(0.5773502691896257, cluster.regionStDevByTable, 0.01);
// now repeat check whether moving region1 from servers[1] to servers[2]
// would lower availability
assertTrue(cluster.wouldLowerAvailability(hri1, servers[2]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ public void testTableSkewCost() {
BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster);
costFunction.init(cluster);
double cost = costFunction.cost();
assertTrue(cost >= 0);
assertTrue(cost >= -0.01);
assertTrue(cost <= 1.01);
}
}
Expand Down Expand Up @@ -470,7 +470,7 @@ public void testCostFromArray() {
statThree[i] = (0);
statThree[i+100] = 100;
}
assertEquals(0.5, costFunction.costFromArray(statThree), 0.01);
assertEquals(0.0708881205008336, costFunction.costFromArray(statThree), 0.01);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public void testLargeCluster() {
int numRegionsPerServer = 80; // all servers except one
int numTables = 100;
int replication = 1;

// we need to capture the outlier and generate a move
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
}
}