Skip to content

Commit dfc49ac

Browse files
committed
HBASE-25818 Move StochasticLoadBalancer to hbase-balancer module
1 parent 8856f61 commit dfc49ac

27 files changed

+200
-94
lines changed

hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/ClusterInfoProvider.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
import java.util.List;
2323
import java.util.Map;
2424
import java.util.function.Predicate;
25+
import java.util.function.Supplier;
2526
import org.apache.hadoop.conf.Configuration;
2627
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
2728
import org.apache.hadoop.hbase.ServerMetrics;
2829
import org.apache.hadoop.hbase.ServerName;
2930
import org.apache.hadoop.hbase.TableName;
3031
import org.apache.hadoop.hbase.client.RegionInfo;
3132
import org.apache.hadoop.hbase.client.TableDescriptor;
33+
import org.apache.hadoop.hbase.master.RegionPlan;
3234
import org.apache.yetus.audience.InterfaceAudience;
3335

3436
/**
@@ -78,4 +80,18 @@ List<ServerName> getOnlineServersListWithPredicator(List<ServerName> servers,
7880
* Get a snapshot of the current assignment status.
7981
*/
8082
Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment(Collection<RegionInfo> regions);
83+
84+
/**
85+
* Test whether we are in off peak hour.
86+
* <p/>
87+
* For peak and off peak hours we may have different cost for the same balancing operation.
88+
* @return
89+
*/
90+
boolean isOffPeakHour();
91+
92+
/**
93+
* Record the given region plans.
94+
*/
95+
void recordBalancerDecision(List<RegionPlan> plans, double currentCost, double initCost,
96+
String initFunctionTotalCosts, Supplier<String> totalCostsPerFunc, long step);
8197
}
Lines changed: 13 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Map;
2929
import java.util.Objects;
3030
import java.util.concurrent.ThreadLocalRandom;
31+
import java.util.function.Supplier;
3132
import java.util.stream.Collectors;
3233
import org.apache.hadoop.conf.Configuration;
3334
import org.apache.hadoop.hbase.ClusterMetrics;
@@ -36,13 +37,9 @@
3637
import org.apache.hadoop.hbase.ServerMetrics;
3738
import org.apache.hadoop.hbase.ServerName;
3839
import org.apache.hadoop.hbase.TableName;
39-
import org.apache.hadoop.hbase.client.BalancerDecision;
4040
import org.apache.hadoop.hbase.client.RegionInfo;
4141
import org.apache.hadoop.hbase.master.RegionPlan;
4242
import org.apache.hadoop.hbase.master.balancer.BalancerClusterState.LocalityType;
43-
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
44-
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
45-
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
4643
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
4744
import org.apache.hadoop.hbase.util.ReflectionUtils;
4845
import org.apache.yetus.audience.InterfaceAudience;
@@ -151,11 +148,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
151148
private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
152149
private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
153150

154-
/**
155-
* Use to add balancer decision history to ring-buffer
156-
*/
157-
NamedQueueRecorder namedQueueRecorder;
158-
159151
/**
160152
* The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
161153
* default MetricsBalancer
@@ -205,7 +197,7 @@ public synchronized void setConf(Configuration conf) {
205197
costFunctions = new ArrayList<>();
206198
addCostFunction(new RegionCountSkewCostFunction(conf));
207199
addCostFunction(new PrimaryRegionCountSkewCostFunction(conf));
208-
addCostFunction(new MoveCostFunction(conf));
200+
addCostFunction(new MoveCostFunction(conf, () -> provider));
209201
addCostFunction(localityCost);
210202
addCostFunction(rackLocalityCost);
211203
addCostFunction(new TableSkewCostFunction(conf));
@@ -221,13 +213,6 @@ public synchronized void setConf(Configuration conf) {
221213
curFunctionCosts= new Double[costFunctions.size()];
222214
tempFunctionCosts= new Double[costFunctions.size()];
223215

224-
boolean isBalancerDecisionRecording = getConf()
225-
.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
226-
BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
227-
if (this.namedQueueRecorder == null && isBalancerDecisionRecording) {
228-
this.namedQueueRecorder = NamedQueueRecorder.getInstance(getConf());
229-
}
230-
231216
LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
232217
", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" +
233218
Arrays.toString(getCostFunctionNames()) + " etc.");
@@ -483,24 +468,9 @@ public synchronized List<RegionPlan> balanceTable(TableName tableName, Map<Serve
483468
}
484469

485470
private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost,
486-
double initCost, String initFunctionTotalCosts, long step) {
487-
if (this.namedQueueRecorder != null) {
488-
List<String> regionPlans = new ArrayList<>();
489-
for (RegionPlan plan : plans) {
490-
regionPlans.add(
491-
"table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName()
492-
+ " , source: " + plan.getSource() + " , destination: " + plan.getDestination());
493-
}
494-
BalancerDecision balancerDecision =
495-
new BalancerDecision.Builder()
496-
.setInitTotalCost(initCost)
497-
.setInitialFunctionCosts(initFunctionTotalCosts)
498-
.setComputedTotalCost(currentCost)
499-
.setFinalFunctionCosts(totalCostsPerFunc())
500-
.setComputedSteps(step)
501-
.setRegionPlans(regionPlans).build();
502-
namedQueueRecorder.addRecord(new BalancerDecisionDetails(balancerDecision));
503-
}
471+
double initCost, String initFunctionTotalCosts, long step) {
472+
provider.recordBalancerDecision(plans, currentCost, initCost, initFunctionTotalCosts,
473+
this::totalCostsPerFunc, step);
504474
}
505475

506476
/**
@@ -837,10 +807,15 @@ static class MoveCostFunction extends CostFunction {
837807

838808
private final float maxMovesPercent;
839809
private final Configuration conf;
810+
// we call setConf before setClusterInfoProvider, so when we create MoveCostFunction, the
811+
// provider is still null, thus here we need to provide a supplier instead of passing the
812+
// provider directly.
813+
private final Supplier<ClusterInfoProvider> provider;
840814

841-
MoveCostFunction(Configuration conf) {
815+
MoveCostFunction(Configuration conf, Supplier<ClusterInfoProvider> provider) {
842816
super(conf);
843817
this.conf = conf;
818+
this.provider = provider;
844819
// What percent of the number of regions a single run of the balancer can move.
845820
maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
846821

@@ -853,7 +828,8 @@ static class MoveCostFunction extends CostFunction {
853828
protected double cost() {
854829
// Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
855830
// that large benefits are need to overcome the cost of a move.
856-
if (OffPeakHours.getInstance(conf).isOffPeakHour()) {
831+
ClusterInfoProvider p = provider.get();
832+
if (p != null && p.isOffPeakHour()) {
857833
this.setMultiplier(conf.getFloat(MOVE_COST_OFFPEAK_KEY, DEFAULT_MOVE_COST_OFFPEAK));
858834
} else {
859835
this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.master.balancer;
19+
20+
import java.io.IOException;
21+
import java.util.Collection;
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.function.Predicate;
26+
import java.util.function.Supplier;
27+
import org.apache.hadoop.conf.Configuration;
28+
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
29+
import org.apache.hadoop.hbase.ServerMetrics;
30+
import org.apache.hadoop.hbase.ServerName;
31+
import org.apache.hadoop.hbase.TableName;
32+
import org.apache.hadoop.hbase.client.RegionInfo;
33+
import org.apache.hadoop.hbase.client.TableDescriptor;
34+
import org.apache.hadoop.hbase.master.RegionPlan;
35+
36+
public class DummyClusterInfoProvider implements ClusterInfoProvider {
37+
38+
@Override
39+
public List<RegionInfo> getAssignedRegions() {
40+
return Collections.emptyList();
41+
}
42+
43+
@Override
44+
public TableDescriptor getTableDescriptor(TableName tableName) throws IOException {
45+
return null;
46+
}
47+
48+
@Override
49+
public int getNumberOfTables() throws IOException {
50+
return 0;
51+
}
52+
53+
@Override
54+
public HDFSBlocksDistribution computeHDFSBlocksDistribution(Configuration conf,
55+
TableDescriptor tableDescriptor, RegionInfo regionInfo) throws IOException {
56+
return new HDFSBlocksDistribution();
57+
}
58+
59+
@Override
60+
public boolean hasRegionReplica(Collection<RegionInfo> regions) throws IOException {
61+
return false;
62+
}
63+
64+
@Override
65+
public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> servers,
66+
Predicate<ServerMetrics> filter) {
67+
return Collections.emptyList();
68+
}
69+
70+
@Override
71+
public Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment(Collection<RegionInfo> regions) {
72+
return Collections.emptyMap();
73+
}
74+
75+
@Override
76+
public boolean isOffPeakHour() {
77+
return false;
78+
}
79+
80+
@Override
81+
public void recordBalancerDecision(List<RegionPlan> plans, double currentCost, double initCost,
82+
String initFunctionTotalCosts, Supplier<String> totalCostsPerFunc, long step) {
83+
}
84+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.junit.Assert.assertNotNull;
2121
import static org.junit.Assert.assertNull;
22+
import static org.mockito.Mockito.mock;
2223

2324
import java.util.List;
2425
import java.util.Map;
@@ -48,6 +49,7 @@ public static void beforeAllTests() throws Exception {
4849
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
4950
loadBalancer = new StochasticLoadBalancer();
5051
loadBalancer.setConf(conf);
52+
loadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider());
5153
}
5254

5355
protected void testWithCluster(int numNodes, int numRegions, int numRegionsPerServer,

hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,7 @@ public static void beforeAllTests() throws Exception {
9292
conf.setClass("hbase.util.ip.to.rack.determiner", MockMapping.class, DNSToSwitchMapping.class);
9393
loadBalancer = new MockBalancer();
9494
loadBalancer.setConf(conf);
95-
ClusterInfoProvider provider = mock(ClusterInfoProvider.class);
96-
loadBalancer.setClusterInfoProvider(provider);
95+
loadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider());
9796

9897
// Set up the rack topologies (5 machines per rack)
9998
rackManager = mock(RackManager.class);

hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionHDFSBlockLocationFinder.java

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.Map;
3535
import java.util.Random;
3636
import java.util.function.Predicate;
37+
import java.util.function.Supplier;
3738
import org.apache.hadoop.conf.Configuration;
3839
import org.apache.hadoop.hbase.ClusterMetrics;
3940
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -47,6 +48,7 @@
4748
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
4849
import org.apache.hadoop.hbase.client.TableDescriptor;
4950
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
51+
import org.apache.hadoop.hbase.master.RegionPlan;
5052
import org.apache.hadoop.hbase.testclassification.MasterTests;
5153
import org.apache.hadoop.hbase.testclassification.SmallTests;
5254
import org.apache.hadoop.hbase.util.Bytes;
@@ -97,7 +99,7 @@ public static void setUpBeforeClass() {
9799
@Before
98100
public void setUp() {
99101
finder = new RegionHDFSBlockLocationFinder();
100-
finder.setClusterInfoProvider(new ClusterInfoProvider() {
102+
finder.setClusterInfoProvider(new DummyClusterInfoProvider() {
101103

102104
@Override
103105
public TableDescriptor getTableDescriptor(TableName tableName) throws IOException {
@@ -114,28 +116,6 @@ public HDFSBlocksDistribution computeHDFSBlocksDistribution(Configuration conf,
114116
TableDescriptor tableDescriptor, RegionInfo regionInfo) throws IOException {
115117
return generate(regionInfo);
116118
}
117-
118-
@Override
119-
public boolean hasRegionReplica(Collection<RegionInfo> regions) throws IOException {
120-
return false;
121-
}
122-
123-
@Override
124-
public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> servers,
125-
Predicate<ServerMetrics> filter) {
126-
return Collections.emptyList();
127-
}
128-
129-
@Override
130-
public Map<ServerName, List<RegionInfo>>
131-
getSnapShotOfAssignment(Collection<RegionInfo> regions) {
132-
return Collections.emptyMap();
133-
}
134-
135-
@Override
136-
public int getNumberOfTables() {
137-
return 0;
138-
}
139119
});
140120
}
141121

0 commit comments

Comments
 (0)