Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,12 @@ protected static class Cluster {
int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state)
int[] regionIndexToTableIndex; //regionIndex -> tableIndex
int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS
int[] numRegionsPerTable; // tableIndex -> region count
double[] meanRegionsPerTable; // mean region count per table
double regionSkewByTable; // skew on RS per by table
double minRegionSkewByTable; // min skew on RS per by table
double maxRegionSkewByTable; // max skew on RS per by table

int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary
boolean hasRegionReplicas = false; //whether there is regions with replicas

Expand Down Expand Up @@ -376,28 +381,37 @@ protected Cluster(

numTables = tables.size();
numRegionsPerServerPerTable = new int[numServers][numTables];
numRegionsPerTable = new int[numTables];

for (int i = 0; i < numServers; i++) {
for (int j = 0; j < numTables; j++) {
numRegionsPerServerPerTable[i][j] = 0;
}
}

for (int i = 0; i < numTables; i++) {
numRegionsPerTable[i] = 0;
}

for (int i=0; i < regionIndexToServerIndex.length; i++) {
if (regionIndexToServerIndex[i] >= 0) {
numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
numRegionsPerTable[regionIndexToTableIndex[i]]++;
}
}

numMaxRegionsPerTable = new int[numTables];
for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) {
if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
}
}
// Avoid repeated computation for planning
meanRegionsPerTable = new double[numTables];
maxRegionSkewByTable = 0;
minRegionSkewByTable = 0;
for (int i = 0; i < numTables; i++) {
meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers;
minRegionSkewByTable += Cluster.getMinSkew(numRegionsPerTable[i], numServers);
maxRegionSkewByTable += Cluster.getMaxSkew(numRegionsPerTable[i], numServers);
}

computeRegionSkewPerTable();

for (int i = 0; i < regions.length; i ++) {
RegionInfo info = regions[i];
if (RegionReplicaUtil.isDefaultReplica(info)) {
Expand Down Expand Up @@ -522,6 +536,53 @@ public boolean serverHasTooFewRegions(int server) {
return numRegions < minLoad;
}

/**
* Return the min skew of distribution
*/
public static double getMinSkew(double total, double numServers) {
double mean = total / numServers;
// It's possible that there aren't enough regions to go around
double min;
if (numServers > total) {
min = ((numServers - total) * mean + (1 - mean) * total) ;
} else {
// Some will have 1 more than everything else.
int numHigh = (int) (total - (Math.floor(mean) * numServers));
int numLow = (int) (numServers - numHigh);
min = numHigh * (Math.ceil(mean) - mean) + numLow * (mean - Math.floor(mean));
}
return min;
}

/**
* Return the max deviation of distribution
* Compute max as if all region servers had 0 and one had the sum of all costs. This must be
* a zero sum cost for this to make sense.
*/
public static double getMaxSkew(double total, double numServers) {
double mean = total / numServers;
return (total - mean) + (numServers - 1) * mean;
}

/**
* Scale the value between 0 and 1.
*
* @param min Min value
* @param max The Max value
* @param value The value to be scaled.
* @return The scaled value.
*/
public static double scale(double min, double max, double value) {
if (max <= min || value <= min) {
return 0;
}
if ((max - min) == 0) {
return 0;
}

return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
}

/**
* Retrieves and lazily initializes a field storing the locality of
* every region/server combination
Expand Down Expand Up @@ -579,6 +640,21 @@ public int getRegionSizeMB(int region) {
return regionLoads[region].getLast().getStorefileSizeMB();
}

/**
* Recompute the region skew during init or plan of moves.
*/
private void computeRegionSkewPerTable() {
// reinitialize for recomputation
regionSkewByTable = 0;

for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
for (int tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) {
regionSkewByTable += Math.abs(aNumRegionsPerServerPerTable[tableIndex]
- meanRegionsPerTable[tableIndex]);
}
}
}

/**
* Computes and caches the locality for each region/rack combinations,
* as well as storing a mapping of region -> server and region -> rack such that server
Expand Down Expand Up @@ -834,22 +910,20 @@ void regionMoved(int region, int oldServer, int newServer) {
int tableIndex = regionIndexToTableIndex[region];
if (oldServer >= 0) {
numRegionsPerServerPerTable[oldServer][tableIndex]--;
// update regionSkewPerTable for the move from old server
regionSkewByTable +=
Math.abs(numRegionsPerServerPerTable[oldServer][tableIndex]
- meanRegionsPerTable[tableIndex])
- Math.abs(numRegionsPerServerPerTable[oldServer][tableIndex] + 1
- meanRegionsPerTable[tableIndex]);
}
numRegionsPerServerPerTable[newServer][tableIndex]++;

//check whether this caused maxRegionsPerTable in the new Server to be updated
if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex];
} else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1)
== numMaxRegionsPerTable[tableIndex]) {
//recompute maxRegionsPerTable since the previous value was coming from the old server
numMaxRegionsPerTable[tableIndex] = 0;
for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
}
}
}
// update regionSkewPerTable for the move to new server
regionSkewByTable +=
Math.abs(numRegionsPerServerPerTable[newServer][tableIndex]
- meanRegionsPerTable[tableIndex])
- Math.abs(numRegionsPerServerPerTable[newServer][tableIndex] - 1
- meanRegionsPerTable[tableIndex]);

// update for servers
int primary = regionIndexToPrimaryIndex[region];
Expand Down Expand Up @@ -1019,7 +1093,7 @@ public String toString() {
.append(Arrays.toString(serverIndicesSortedByRegionCount))
.append(", regionsPerServer=").append(Arrays.deepToString(regionsPerServer));

desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable))
desc.append(", regionSkewByTable=").append(regionSkewByTable)
.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
.append(", numTables=").append(numTables).append(", numMovedRegions=")
.append(numMovedRegions).append('}');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,7 @@ public CostFunction(Configuration c) {
boolean isNeeded() {
return true;
}

float getMultiplier() {
return multiplier;
}
Expand All @@ -770,35 +771,39 @@ void setMultiplier(float m) {
this.multiplier = m;
}

/** Called once per LB invocation to give the cost function
/**
* Called once per LB invocation to give the cost function
* to initialize it's state, and perform any costly calculation.
*/
void init(Cluster cluster) {
this.cluster = cluster;
}

/** Called once per cluster Action to give the cost function
/**
* Called once per cluster Action to give the cost function
* an opportunity to update it's state. postAction() is always
* called at least once before cost() is called with the cluster
* that this action is performed on. */
* that this action is performed on.
*/
void postAction(Action action) {
switch (action.type) {
case NULL: break;
case ASSIGN_REGION:
AssignRegionAction ar = (AssignRegionAction) action;
regionMoved(ar.region, -1, ar.server);
break;
case MOVE_REGION:
MoveRegionAction mra = (MoveRegionAction) action;
regionMoved(mra.region, mra.fromServer, mra.toServer);
break;
case SWAP_REGIONS:
SwapRegionsAction a = (SwapRegionsAction) action;
regionMoved(a.fromRegion, a.fromServer, a.toServer);
regionMoved(a.toRegion, a.toServer, a.fromServer);
break;
default:
throw new RuntimeException("Uknown action:" + action.type);
case NULL:
break;
case ASSIGN_REGION:
AssignRegionAction ar = (AssignRegionAction) action;
regionMoved(ar.region, -1, ar.server);
break;
case MOVE_REGION:
MoveRegionAction mra = (MoveRegionAction) action;
regionMoved(mra.region, mra.fromServer, mra.toServer);
break;
case SWAP_REGIONS:
SwapRegionsAction a = (SwapRegionsAction) action;
regionMoved(a.fromRegion, a.fromServer, a.toServer);
regionMoved(a.toRegion, a.toServer, a.fromServer);
break;
default:
throw new RuntimeException("Uknown action:" + action.type);
}
}

Expand All @@ -822,59 +827,25 @@ protected double costFromArray(double[] stats) {
double total = getSum(stats);

double count = stats.length;
double mean = total/count;

// Compute max as if all region servers had 0 and one had the sum of all costs. This must be
// a zero sum cost for this to make sense.
double max = ((count - 1) * mean) + (total - mean);

// It's possible that there aren't enough regions to go around
double min;
if (count > total) {
min = ((count - total) * mean) + ((1 - mean) * total);
} else {
// Some will have 1 more than everything else.
int numHigh = (int) (total - (Math.floor(mean) * count));
int numLow = (int) (count - numHigh);

min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
double mean = total / count;

}
min = Math.max(0, min);
for (int i=0; i<stats.length; i++) {
for (int i = 0; i < stats.length; i++) {
double n = stats[i];
double diff = Math.abs(mean - n);
totalCost += diff;
}

double scaled = scale(min, max, totalCost);
return scaled;
return Cluster
.scale(Cluster.getMinSkew(total, count), Cluster.getMaxSkew(total, count), totalCost);
}

private double getSum(double[] stats) {
double total = 0;
for(double s:stats) {
for (double s : stats) {
total += s;
}
return total;
}

/**
* Scale the value between 0 and 1.
*
* @param min Min value
* @param max The Max value
* @param value The value to be scaled.
* @return The scaled value.
*/
protected double scale(double min, double max, double value) {
if (max <= min || value <= min) {
return 0;
}
if ((max - min) == 0) return 0;

return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
}
}

/**
Expand Down Expand Up @@ -923,7 +894,7 @@ protected double cost() {
return 1000000; // return a number much greater than any of the other cost
}

return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
return Cluster.scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
}
}

Expand Down Expand Up @@ -1031,15 +1002,7 @@ static class TableSkewCostFunction extends CostFunction {

@Override
protected double cost() {
double max = cluster.numRegions;
double min = ((double) cluster.numRegions) / cluster.numServers;
double value = 0;

for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
value += cluster.numMaxRegionsPerTable[i];
}

return scale(min, max, value);
return Cluster.scale(cluster.minRegionSkewByTable, cluster.maxRegionSkewByTable, cluster.regionSkewByTable);
}
}

Expand Down Expand Up @@ -1392,7 +1355,7 @@ protected double cost() {
for (int i = 0 ; i < costsPerGroup.length; i++) {
totalCost += costsPerGroup[i];
}
return scale(0, maxCost, totalCost);
return Cluster.scale(0, maxCost, totalCost);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,8 @@ public void testRegionAvailabilityWithRegionMoves() throws Exception {

// now move region1 from servers[0] to servers[2]
cluster.doAction(new MoveRegionAction(0, 0, 2));
// check that the numMaxRegionsPerTable for "table" has increased to 2
assertEquals(2, cluster.numMaxRegionsPerTable[0]);
// check that the regionSkewByTable for "table" has increased to 2
assertEquals(2, cluster.regionSkewByTable, 0.01);
// now repeat check whether moving region1 from servers[1] to servers[2]
// would lower availability
assertTrue(cluster.wouldLowerAvailability(hri1, servers[2]));
Expand Down