Skip to content

Commit cd7a509

Browse files
authored
HBASE-26311 Balancer gets stuck in cohosted replica distribution (#3724)
Signed-off-by: Huaxiang Sun <[email protected]>
1 parent 72a8846 commit cd7a509

16 files changed

+68
-81
lines changed

hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java

Lines changed: 15 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,9 @@ class BalancerClusterState {
8484
int[] regionIndexToServerIndex; // regionIndex -> serverIndex
8585
int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state)
8686
int[] regionIndexToTableIndex; // regionIndex -> tableIndex
87-
int[][] numRegionsPerServerPerTable; // serverIndex -> tableIndex -> # regions
87+
int[][] numRegionsPerServerPerTable; // tableIndex -> serverIndex -> tableIndex -> # regions
8888
int[] numRegionsPerTable; // tableIndex -> region count
8989
double[] meanRegionsPerTable; // mean region count per table
90-
double[] regionSkewByTable; // skew on RS per by table
91-
double[] minRegionSkewByTable; // min skew on RS per by table
92-
double[] maxRegionSkewByTable; // max skew on RS per by table
9390
int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary
9491
boolean hasRegionReplicas = false; // whether there is regions with replicas
9592

@@ -283,6 +280,11 @@ public String getRack(ServerName server) {
283280
regionIndex++;
284281
}
285282

283+
if (LOG.isDebugEnabled()) {
284+
for (int i = 0; i < numServers; i++) {
285+
LOG.debug("server {} has {} regions", i, regionsPerServer[i].length);
286+
}
287+
}
286288
for (int i = 0; i < serversPerHostList.size(); i++) {
287289
serversPerHost[i] = new int[serversPerHostList.get(i).size()];
288290
for (int j = 0; j < serversPerHost[i].length; j++) {
@@ -303,40 +305,29 @@ public String getRack(ServerName server) {
303305
}
304306

305307
numTables = tables.size();
306-
LOG.debug("Number of tables={}", numTables);
307-
numRegionsPerServerPerTable = new int[numServers][numTables];
308+
LOG.debug("Number of tables={}, number of hosts={}, number of racks={}", numTables,
309+
numHosts, numRacks);
310+
numRegionsPerServerPerTable = new int[numTables][numServers];
308311
numRegionsPerTable = new int[numTables];
309312

310-
for (int i = 0; i < numServers; i++) {
311-
for (int j = 0; j < numTables; j++) {
313+
for (int i = 0; i < numTables; i++) {
314+
for (int j = 0; j < numServers; j++) {
312315
numRegionsPerServerPerTable[i][j] = 0;
313316
}
314317
}
315318

316319
for (int i = 0; i < regionIndexToServerIndex.length; i++) {
317320
if (regionIndexToServerIndex[i] >= 0) {
318-
numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
321+
numRegionsPerServerPerTable[regionIndexToTableIndex[i]][regionIndexToServerIndex[i]]++;
319322
numRegionsPerTable[regionIndexToTableIndex[i]]++;
320323
}
321324
}
322325

323326
// Avoid repeated computation for planning
324327
meanRegionsPerTable = new double[numTables];
325-
regionSkewByTable = new double[numTables];
326-
maxRegionSkewByTable = new double[numTables];
327-
minRegionSkewByTable = new double[numTables];
328328

329329
for (int i = 0; i < numTables; i++) {
330330
meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers;
331-
minRegionSkewByTable[i] += DoubleArrayCost.getMinSkew(numRegionsPerTable[i], numServers);
332-
maxRegionSkewByTable[i] += DoubleArrayCost.getMaxSkew(numRegionsPerTable[i], numServers);
333-
}
334-
335-
for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
336-
for (int tableIdx = 0; tableIdx < aNumRegionsPerServerPerTable.length; tableIdx++) {
337-
regionSkewByTable[tableIdx] += Math.abs(aNumRegionsPerServerPerTable[tableIdx] -
338-
meanRegionsPerTable[tableIdx]);
339-
}
340331
}
341332

342333
for (int i = 0; i < regions.length; i++) {
@@ -684,14 +675,9 @@ void regionMoved(int region, int oldServer, int newServer) {
684675
}
685676
int tableIndex = regionIndexToTableIndex[region];
686677
if (oldServer >= 0) {
687-
numRegionsPerServerPerTable[oldServer][tableIndex]--;
688-
// update regionSkewPerTable for the move from old server
689-
regionSkewByTable[tableIndex] += getSkewChangeFor(oldServer, tableIndex, -1);
678+
numRegionsPerServerPerTable[tableIndex][oldServer]--;
690679
}
691-
numRegionsPerServerPerTable[newServer][tableIndex]++;
692-
693-
// update regionSkewPerTable for the move to new server
694-
regionSkewByTable[tableIndex] += getSkewChangeFor(newServer, tableIndex, 1);
680+
numRegionsPerServerPerTable[tableIndex][newServer]++;
695681

696682
// update for servers
697683
int primary = regionIndexToPrimaryIndex[region];
@@ -865,18 +851,9 @@ public String toString() {
865851
.append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=")
866852
.append(Arrays.deepToString(regionsPerServer));
867853

868-
desc.append(", regionSkewByTable=").append(Arrays.toString(regionSkewByTable))
869-
.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
854+
desc.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
870855
.append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions)
871856
.append('}');
872857
return desc.toString();
873858
}
874-
875-
private double getSkewChangeFor(int serverIndex, int tableIndex, int regionCountChange) {
876-
double curSkew = Math.abs(numRegionsPerServerPerTable[serverIndex][tableIndex] -
877-
meanRegionsPerTable[tableIndex]);
878-
double oldSkew = Math.abs(numRegionsPerServerPerTable[serverIndex][tableIndex] -
879-
regionCountChange - meanRegionsPerTable[tableIndex]);
880-
return curSkew - oldSkew;
881-
}
882859
}

hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/DoubleArrayCost.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,17 +66,21 @@ void applyCostsChange(Consumer<double[]> consumer) {
6666
}
6767

6868
private static double computeCost(double[] stats) {
69+
if (stats == null || stats.length == 0) {
70+
return 0;
71+
}
6972
double totalCost = 0;
7073
double total = getSum(stats);
7174

7275
double count = stats.length;
7376
double mean = total / count;
74-
7577
for (int i = 0; i < stats.length; i++) {
7678
double n = stats[i];
77-
double diff = Math.abs(mean - n);
79+
double diff = (mean - n) * (mean - n);
7880
totalCost += diff;
7981
}
82+
// No need to compute standard deviation with division by cluster size when scaling.
83+
totalCost = Math.sqrt(totalCost);
8084
return CostFunction.scale(getMinSkew(total, count),
8185
getMaxSkew(total, count), totalCost);
8286
}
@@ -94,18 +98,22 @@ private static double getSum(double[] stats) {
9498
* @param total is total number of regions
9599
*/
96100
public static double getMinSkew(double total, double numServers) {
101+
if (numServers == 0) {
102+
return 0;
103+
}
97104
double mean = total / numServers;
98105
// It's possible that there aren't enough regions to go around
99106
double min;
100107
if (numServers > total) {
101-
min = ((numServers - total) * mean + (1 - mean) * total) ;
108+
min = ((numServers - total) * mean * mean + (1 - mean) * (1 - mean) * total) ;
102109
} else {
103110
// Some will have 1 more than everything else.
104111
int numHigh = (int) (total - (Math.floor(mean) * numServers));
105112
int numLow = (int) (numServers - numHigh);
106-
min = numHigh * (Math.ceil(mean) - mean) + numLow * (mean - Math.floor(mean));
113+
min = numHigh * (Math.ceil(mean) - mean) * (Math.ceil(mean) - mean) +
114+
numLow * (mean - Math.floor(mean)) * (mean - Math.floor(mean));
107115
}
108-
return min;
116+
return Math.sqrt(min);
109117
}
110118

111119
/**
@@ -114,7 +122,10 @@ public static double getMinSkew(double total, double numServers) {
114122
* a zero sum cost for this to make sense.
115123
*/
116124
public static double getMaxSkew(double total, double numServers) {
125+
if (numServers == 0) {
126+
return 0;
127+
}
117128
double mean = total / numServers;
118-
return (total - mean) + (numServers - 1) * mean;
129+
return Math.sqrt((total - mean) * (total - mean) + (numServers - 1) * mean * mean);
119130
}
120131
}

hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveCostFunction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class MoveCostFunction extends CostFunction {
3434
static final float DEFAULT_MOVE_COST = 7;
3535
static final float DEFAULT_MOVE_COST_OFFPEAK = 3;
3636
private static final int DEFAULT_MAX_MOVES = 600;
37-
private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
37+
private static final float DEFAULT_MAX_MOVE_PERCENT = 1.0f;
3838

3939
private final float maxMovesPercent;
4040
private final ClusterInfoProvider provider;
@@ -79,4 +79,4 @@ protected double cost() {
7979

8080
return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
8181
}
82-
}
82+
}

hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionCountSkewCostFunction.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,12 @@
1919

2020
import org.apache.hadoop.conf.Configuration;
2121
import org.apache.yetus.audience.InterfaceAudience;
22-
import org.slf4j.Logger;
23-
import org.slf4j.LoggerFactory;
2422

2523
/**
2624
* Compute the cost of a potential cluster state from skew in number of regions on a cluster.
2725
*/
2826
@InterfaceAudience.Private
2927
class RegionCountSkewCostFunction extends CostFunction {
30-
31-
private static final Logger LOG = LoggerFactory.getLogger(RegionCountSkewCostFunction.class);
32-
3328
static final String REGION_COUNT_SKEW_COST_KEY =
3429
"hbase.master.balancer.stochastic.regionCountCost";
3530
static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
@@ -50,14 +45,6 @@ void prepare(BalancerClusterState cluster) {
5045
costs[i] = cluster.regionsPerServer[i].length;
5146
}
5247
});
53-
LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(),
54-
cluster.numServers, cluster.numRegions);
55-
if (LOG.isTraceEnabled()) {
56-
for (int i = 0; i < cluster.numServers; i++) {
57-
LOG.trace("{} sees server '{}' has {} regions", getClass().getSimpleName(),
58-
cluster.servers[i], cluster.regionsPerServer[i].length);
59-
}
60-
}
6148
}
6249

6350
@Override
@@ -72,4 +59,4 @@ protected void regionMoved(int region, int oldServer, int newServer) {
7259
costs[newServer] = cluster.regionsPerServer[newServer].length;
7360
});
7461
}
75-
}
62+
}

hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/TableSkewCostFunction.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,43 @@ class TableSkewCostFunction extends CostFunction {
2929
private static final String TABLE_SKEW_COST_KEY =
3030
"hbase.master.balancer.stochastic.tableSkewCost";
3131
private static final float DEFAULT_TABLE_SKEW_COST = 35;
32+
DoubleArrayCost[] costsPerTable;
3233

3334
TableSkewCostFunction(Configuration conf) {
3435
this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
3536
}
3637

38+
@Override
39+
void prepare(BalancerClusterState cluster) {
40+
super.prepare(cluster);
41+
costsPerTable = new DoubleArrayCost[cluster.numTables];
42+
for (int tableIdx = 0; tableIdx < cluster.numTables; tableIdx++) {
43+
costsPerTable[tableIdx] = new DoubleArrayCost();
44+
costsPerTable[tableIdx].prepare(cluster.numServers);
45+
final int tableIndex = tableIdx;
46+
costsPerTable[tableIdx].applyCostsChange(costs -> {
47+
// Keep a cached deep copy for change-only recomputation
48+
for (int i = 0; i < cluster.numServers; i++) {
49+
costs[i] = cluster.numRegionsPerServerPerTable[tableIndex][i];
50+
}
51+
});
52+
}
53+
}
54+
55+
@Override
56+
protected void regionMoved(int region, int oldServer, int newServer) {
57+
int tableIdx = cluster.regionIndexToTableIndex[region];
58+
costsPerTable[tableIdx].applyCostsChange(costs -> {
59+
costs[oldServer] = cluster.numRegionsPerServerPerTable[tableIdx][oldServer];
60+
costs[newServer] = cluster.numRegionsPerServerPerTable[tableIdx][newServer];
61+
});
62+
}
63+
3764
@Override
3865
protected double cost() {
3966
double cost = 0;
4067
for (int tableIdx = 0; tableIdx < cluster.numTables; tableIdx++) {
41-
cost += scale(cluster.minRegionSkewByTable[tableIdx],
42-
cluster.maxRegionSkewByTable[tableIdx], cluster.regionSkewByTable[tableIdx]);
68+
cost += costsPerTable[tableIdx].cost();
4369
}
4470
return cost;
4571
}

hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/StochasticBalancerTestBase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ public class StochasticBalancerTestBase extends BalancerTestBase {
4545
public static void beforeAllTests() throws Exception {
4646
conf = HBaseConfiguration.create();
4747
conf.setClass("hbase.util.ip.to.rack.determiner", MockMapping.class, DNSToSwitchMapping.class);
48-
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f);
4948
conf.setFloat("hbase.regions.slop", 0.0f);
5049
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
5150
conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);

hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/StochasticBalancerTestBase2.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ public class StochasticBalancerTestBase2 extends StochasticBalancerTestBase {
2424

2525
@Before
2626
public void before() {
27-
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
2827
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
2928
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
3029
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 3 * 60 * 1000L);

hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,8 +355,6 @@ public void testRegionAvailabilityWithRegionMoves() throws Exception {
355355

356356
// now move region1 from servers[0] to servers[2]
357357
cluster.doAction(new MoveRegionAction(0, 0, 2));
358-
// check that the regionSkewByTable for "table" has increased to 2
359-
assertEquals(2, cluster.regionSkewByTable[0], 0.01);
360358
// now repeat check whether moving region1 from servers[1] to servers[2]
361359
// would lower availability
362360
assertTrue(cluster.wouldLowerAvailability(hri1, servers[2]));

hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDoubleArrayCost.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,6 @@ public void testComputeCost() {
6262
}
6363
costs[100] = 100;
6464
});
65-
assertEquals(0.5, cost.cost(), 0.01);
65+
assertEquals(0.0708, cost.cost(), 0.01);
6666
}
6767
}

hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -417,13 +417,13 @@ public void testMoveCost() throws Exception {
417417
cluster.setNumRegions(10000);
418418
cluster.setNumMovedRegions(250);
419419
cost = costFunction.cost();
420-
assertEquals(0.1f, cost, 0.001);
420+
assertEquals(0.025f, cost, 0.001);
421421
cluster.setNumMovedRegions(1250);
422422
cost = costFunction.cost();
423-
assertEquals(0.5f, cost, 0.001);
423+
assertEquals(0.125f, cost, 0.001);
424424
cluster.setNumMovedRegions(2500);
425425
cost = costFunction.cost();
426-
assertEquals(1.0f, cost, 0.01);
426+
assertEquals(0.25f, cost, 0.01);
427427
}
428428
}
429429

0 commit comments

Comments
 (0)