Skip to content

Commit 2b78d6e

Browse files
committed
HBASE-25818 Move StochasticLoadBalancer to hbase-balancer module
1 parent 8856f61 commit 2b78d6e

21 files changed

+115
-64
lines changed

hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/ClusterInfoProvider.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
import java.util.List;
2323
import java.util.Map;
2424
import java.util.function.Predicate;
25+
import java.util.function.Supplier;
2526
import org.apache.hadoop.conf.Configuration;
2627
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
2728
import org.apache.hadoop.hbase.ServerMetrics;
2829
import org.apache.hadoop.hbase.ServerName;
2930
import org.apache.hadoop.hbase.TableName;
3031
import org.apache.hadoop.hbase.client.RegionInfo;
3132
import org.apache.hadoop.hbase.client.TableDescriptor;
33+
import org.apache.hadoop.hbase.master.RegionPlan;
3234
import org.apache.yetus.audience.InterfaceAudience;
3335

3436
/**
@@ -78,4 +80,18 @@ List<ServerName> getOnlineServersListWithPredicator(List<ServerName> servers,
7880
* Get a snapshot of the current assignment status.
7981
*/
8082
Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment(Collection<RegionInfo> regions);
83+
84+
/**
85+
* Test whether we are in off peak hour.
86+
* <p/>
87+
* For peak and off peak hours we may have different cost for the same balancing operation.
88+
* @return
89+
*/
90+
boolean isOffPeakHour();
91+
92+
/**
93+
* Record the given region plans.
94+
*/
95+
void recordBalancerDecision(List<RegionPlan> plans, double currentCost, double initCost,
96+
String initFunctionTotalCosts, Supplier<String> totalCostsPerFunc, long step);
8197
}
Lines changed: 13 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Map;
2929
import java.util.Objects;
3030
import java.util.concurrent.ThreadLocalRandom;
31+
import java.util.function.Supplier;
3132
import java.util.stream.Collectors;
3233
import org.apache.hadoop.conf.Configuration;
3334
import org.apache.hadoop.hbase.ClusterMetrics;
@@ -36,13 +37,9 @@
3637
import org.apache.hadoop.hbase.ServerMetrics;
3738
import org.apache.hadoop.hbase.ServerName;
3839
import org.apache.hadoop.hbase.TableName;
39-
import org.apache.hadoop.hbase.client.BalancerDecision;
4040
import org.apache.hadoop.hbase.client.RegionInfo;
4141
import org.apache.hadoop.hbase.master.RegionPlan;
4242
import org.apache.hadoop.hbase.master.balancer.BalancerClusterState.LocalityType;
43-
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
44-
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
45-
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
4643
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
4744
import org.apache.hadoop.hbase.util.ReflectionUtils;
4845
import org.apache.yetus.audience.InterfaceAudience;
@@ -151,11 +148,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
151148
private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
152149
private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
153150

154-
/**
155-
* Use to add balancer decision history to ring-buffer
156-
*/
157-
NamedQueueRecorder namedQueueRecorder;
158-
159151
/**
160152
* The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
161153
* default MetricsBalancer
@@ -205,7 +197,7 @@ public synchronized void setConf(Configuration conf) {
205197
costFunctions = new ArrayList<>();
206198
addCostFunction(new RegionCountSkewCostFunction(conf));
207199
addCostFunction(new PrimaryRegionCountSkewCostFunction(conf));
208-
addCostFunction(new MoveCostFunction(conf));
200+
addCostFunction(new MoveCostFunction(conf, () -> provider));
209201
addCostFunction(localityCost);
210202
addCostFunction(rackLocalityCost);
211203
addCostFunction(new TableSkewCostFunction(conf));
@@ -221,13 +213,6 @@ public synchronized void setConf(Configuration conf) {
221213
curFunctionCosts= new Double[costFunctions.size()];
222214
tempFunctionCosts= new Double[costFunctions.size()];
223215

224-
boolean isBalancerDecisionRecording = getConf()
225-
.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
226-
BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
227-
if (this.namedQueueRecorder == null && isBalancerDecisionRecording) {
228-
this.namedQueueRecorder = NamedQueueRecorder.getInstance(getConf());
229-
}
230-
231216
LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
232217
", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" +
233218
Arrays.toString(getCostFunctionNames()) + " etc.");
@@ -483,24 +468,9 @@ public synchronized List<RegionPlan> balanceTable(TableName tableName, Map<Serve
483468
}
484469

485470
private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost,
486-
double initCost, String initFunctionTotalCosts, long step) {
487-
if (this.namedQueueRecorder != null) {
488-
List<String> regionPlans = new ArrayList<>();
489-
for (RegionPlan plan : plans) {
490-
regionPlans.add(
491-
"table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName()
492-
+ " , source: " + plan.getSource() + " , destination: " + plan.getDestination());
493-
}
494-
BalancerDecision balancerDecision =
495-
new BalancerDecision.Builder()
496-
.setInitTotalCost(initCost)
497-
.setInitialFunctionCosts(initFunctionTotalCosts)
498-
.setComputedTotalCost(currentCost)
499-
.setFinalFunctionCosts(totalCostsPerFunc())
500-
.setComputedSteps(step)
501-
.setRegionPlans(regionPlans).build();
502-
namedQueueRecorder.addRecord(new BalancerDecisionDetails(balancerDecision));
503-
}
471+
double initCost, String initFunctionTotalCosts, long step) {
472+
provider.recordBalancerDecision(plans, currentCost, initCost, initFunctionTotalCosts,
473+
this::totalCostsPerFunc, step);
504474
}
505475

506476
/**
@@ -837,10 +807,15 @@ static class MoveCostFunction extends CostFunction {
837807

838808
private final float maxMovesPercent;
839809
private final Configuration conf;
810+
// we call setConf before setClusterInfoProvider, so when we create MoveCostFunction, the
811+
// provider is still null, thus here we need to provide a supplier instead of passing the
812+
// provider directly.
813+
private final Supplier<ClusterInfoProvider> provider;
840814

841-
MoveCostFunction(Configuration conf) {
815+
MoveCostFunction(Configuration conf, Supplier<ClusterInfoProvider> provider) {
842816
super(conf);
843817
this.conf = conf;
818+
this.provider = provider;
844819
// What percent of the number of regions a single run of the balancer can move.
845820
maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
846821

@@ -853,7 +828,8 @@ static class MoveCostFunction extends CostFunction {
853828
protected double cost() {
854829
// Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
855830
// that large benefits are need to overcome the cost of a move.
856-
if (OffPeakHours.getInstance(conf).isOffPeakHour()) {
831+
ClusterInfoProvider p = provider.get();
832+
if (p != null && p.isOffPeakHour()) {
857833
this.setMultiplier(conf.getFloat(MOVE_COST_OFFPEAK_KEY, DEFAULT_MOVE_COST_OFFPEAK));
858834
} else {
859835
this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.junit.Assert.assertNotNull;
2121
import static org.junit.Assert.assertNull;
22+
import static org.mockito.Mockito.mock;
2223

2324
import java.util.List;
2425
import java.util.Map;
@@ -48,6 +49,7 @@ public static void beforeAllTests() throws Exception {
4849
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
4950
loadBalancer = new StochasticLoadBalancer();
5051
loadBalancer.setConf(conf);
52+
loadBalancer.setClusterInfoProvider(mock(ClusterInfoProvider.class));
5153
}
5254

5355
protected void testWithCluster(int numNodes, int numRegions, int numRegionsPerServer,

hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionHDFSBlockLocationFinder.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.Map;
3535
import java.util.Random;
3636
import java.util.function.Predicate;
37+
import java.util.function.Supplier;
3738
import org.apache.hadoop.conf.Configuration;
3839
import org.apache.hadoop.hbase.ClusterMetrics;
3940
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -47,6 +48,7 @@
4748
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
4849
import org.apache.hadoop.hbase.client.TableDescriptor;
4950
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
51+
import org.apache.hadoop.hbase.master.RegionPlan;
5052
import org.apache.hadoop.hbase.testclassification.MasterTests;
5153
import org.apache.hadoop.hbase.testclassification.SmallTests;
5254
import org.apache.hadoop.hbase.util.Bytes;
@@ -136,6 +138,17 @@ public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> serv
136138
public int getNumberOfTables() {
137139
return 0;
138140
}
141+
142+
@Override
143+
public boolean isOffPeakHour() {
144+
return false;
145+
}
146+
147+
@Override
148+
public void recordBalancerDecision(List<RegionPlan> plans, double currentCost,
149+
double initCost, String initFunctionTotalCosts, Supplier<String> totalCostsPerFunc,
150+
long step) {
151+
}
139152
});
140153
}
141154

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.util.Map;
3333
import java.util.Queue;
3434
import java.util.Set;
35-
import java.util.TimeZone;
3635
import java.util.TreeMap;
3736
import org.apache.hadoop.conf.Configuration;
3837
import org.apache.hadoop.hbase.ClusterMetrics;
@@ -45,13 +44,11 @@
4544
import org.apache.hadoop.hbase.Size;
4645
import org.apache.hadoop.hbase.TableName;
4746
import org.apache.hadoop.hbase.client.RegionInfo;
48-
import org.apache.hadoop.hbase.master.MockNoopMasterServices;
4947
import org.apache.hadoop.hbase.master.RegionPlan;
5048
import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.ServerLocalityCostFunction;
5149
import org.apache.hadoop.hbase.testclassification.MasterTests;
5250
import org.apache.hadoop.hbase.testclassification.MediumTests;
5351
import org.apache.hadoop.hbase.util.Bytes;
54-
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
5552
import org.junit.ClassRule;
5653
import org.junit.Test;
5754
import org.junit.experimental.categories.Category;
@@ -196,7 +193,6 @@ public void testCPRequestCost() {
196193
assertEquals(2, regionsMoveFromServerA.size());
197194
assertEquals(2, targetServers.size());
198195
assertTrue(regionsOnServerA.containsAll(regionsMoveFromServerA));
199-
assertNull(loadBalancer.namedQueueRecorder);
200196
// reset config
201197
conf.setFloat("hbase.master.balancer.stochastic.cpRequestCost", 5f);
202198
loadBalancer.setConf(conf);
@@ -233,7 +229,6 @@ public void testKeepRegionLoad() throws Exception {
233229
String regionNameAsString = RegionInfo.getRegionNameAsString(Bytes.toBytes(REGION_KEY));
234230
assertTrue(loadBalancer.loads.get(regionNameAsString) != null);
235231
assertTrue(loadBalancer.loads.get(regionNameAsString).size() == 15);
236-
assertNull(loadBalancer.namedQueueRecorder);
237232

238233
Queue<BalancerRegionLoad> loads = loadBalancer.loads.get(regionNameAsString);
239234
int i = 0;
@@ -273,8 +268,6 @@ public void testNeedBalance() {
273268

274269
@Test
275270
public void testLocalityCost() throws Exception {
276-
Configuration conf = HBaseConfiguration.create();
277-
MockNoopMasterServices master = new MockNoopMasterServices();
278271
StochasticLoadBalancer.CostFunction
279272
costFunction = new ServerLocalityCostFunction(conf);
280273

@@ -286,40 +279,33 @@ public void testLocalityCost() throws Exception {
286279
double expected = 1 - expectedLocalities[test];
287280
assertEquals(expected, cost, 0.001);
288281
}
289-
assertNull(loadBalancer.namedQueueRecorder);
290282
}
291283

292284
@Test
293285
public void testMoveCostMultiplier() throws Exception {
294286
Configuration conf = HBaseConfiguration.create();
295-
StochasticLoadBalancer.CostFunction
296-
costFunction = new StochasticLoadBalancer.MoveCostFunction(conf);
287+
ClusterInfoProvider provider = mock(ClusterInfoProvider.class);
288+
StochasticLoadBalancer.CostFunction costFunction =
289+
new StochasticLoadBalancer.MoveCostFunction(conf, () -> provider);
290+
when(provider.isOffPeakHour()).thenReturn(false);
297291
BalancerClusterState cluster = mockCluster(clusterStateMocks[0]);
298292
costFunction.init(cluster);
299293
costFunction.cost();
300294
assertEquals(StochasticLoadBalancer.MoveCostFunction.DEFAULT_MOVE_COST,
301295
costFunction.getMultiplier(), 0.01);
302296

303297
// In offpeak hours, the multiplier of move cost should be lower
304-
conf.setInt("hbase.offpeak.start.hour",0);
305-
conf.setInt("hbase.offpeak.end.hour",23);
306-
// Set a fixed time which hour is 15, so it will always in offpeak
307-
// See HBASE-24898 for more info of the calculation here
308-
long deltaFor15 = TimeZone.getDefault().getRawOffset() - 28800000;
309-
long timeFor15 = 1597907081000L - deltaFor15;
310-
EnvironmentEdgeManager.injectEdge(() -> timeFor15);
311-
costFunction = new StochasticLoadBalancer.MoveCostFunction(conf);
312-
costFunction.init(cluster);
298+
when(provider.isOffPeakHour()).thenReturn(true);
313299
costFunction.cost();
314-
assertEquals(StochasticLoadBalancer.MoveCostFunction.DEFAULT_MOVE_COST_OFFPEAK
315-
, costFunction.getMultiplier(), 0.01);
300+
assertEquals(StochasticLoadBalancer.MoveCostFunction.DEFAULT_MOVE_COST_OFFPEAK,
301+
costFunction.getMultiplier(), 0.01);
316302
}
317303

318304
@Test
319305
public void testMoveCost() throws Exception {
320306
Configuration conf = HBaseConfiguration.create();
321-
StochasticLoadBalancer.CostFunction
322-
costFunction = new StochasticLoadBalancer.MoveCostFunction(conf);
307+
StochasticLoadBalancer.CostFunction costFunction =
308+
new StochasticLoadBalancer.MoveCostFunction(conf, () -> null);
323309
for (int[] mockCluster : clusterStateMocks) {
324310
BalancerClusterState cluster = mockCluster(mockCluster);
325311
costFunction.init(cluster);

0 commit comments

Comments
 (0)