Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,25 @@
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BalancerDecision;
import org.apache.hadoop.hbase.client.BalancerRejection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.yetus.audience.InterfaceAudience;

/**
* This is the cluster we want to balance. It provides methods to let us get the information we
* want.
*/
@InterfaceAudience.Private
public interface ClusterInfoProvider {
public interface ClusterInfoProvider extends ConfigurationObserver {

/**
* Get the configuration.
Expand Down Expand Up @@ -83,4 +87,21 @@ List<ServerName> getOnlineServersListWithPredicator(List<ServerName> servers,
* Get a snapshot of the current assignment status.
*/
Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment(Collection<RegionInfo> regions);

/**
* Test whether we are in off peak hour.
* <p/>
* For peak and off peak hours we may have different cost for the same balancing operation.
*/
boolean isOffPeakHour();

/**
* Record the given balancer decision.
*/
void recordBalancerDecision(Supplier<BalancerDecision> decision);

/**
* Record the given balancer rejection.
*/
void recordBalancerRejection(Supplier<BalancerRejection> rejection);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@

import com.google.errorprone.annotations.RestrictedApi;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -34,6 +34,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
import org.apache.hbase.thirdparty.com.google.common.io.CharStreams;

/**
* This is an optional Cost function designed to allow region count skew across RegionServers. A
* rule file is loaded from the local FS or HDFS before balancing. It contains lines of rules. A
Expand Down Expand Up @@ -169,14 +172,14 @@ void loadRules() {
if (line.startsWith("#")) {
continue;
}
final String[] splits = line.split(" ");
if (splits.length != 2) {
final List<String> splits = Splitter.on(' ').splitToList(line);
if (splits.size() != 2) {
throw new IOException(
"line '" + line + "' is malformated, " + "expected [regexp] [limit]. Skipping line");
}

final Pattern pattern = Pattern.compile(splits[0]);
final Integer limit = Integer.parseInt(splits[1]);
final Pattern pattern = Pattern.compile(splits.get(0));
final Integer limit = Integer.parseInt(splits.get(1));
this.limitPerRule.put(pattern, limit);
} catch (final IOException | NumberFormatException | PatternSyntaxException e) {
LOG.error("error on line: " + e);
Expand Down Expand Up @@ -209,29 +212,17 @@ private List<String> readFile(final String filename) {
private List<String> readFileFromHDFS(final String filename) throws IOException {
final Path path = new Path(filename);
final FileSystem fs = FileSystem.get(this.conf);
final BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(path)));
return readLines(reader);
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(fs.open(path), StandardCharsets.UTF_8))) {
return CharStreams.readLines(reader);
}
}

/**
* used to read the rule files from local FS
*/
private List<String> readFileFromLocalFS(final String filename) throws IOException {
BufferedReader reader = new BufferedReader(new FileReader(filename));
return readLines(reader);
}

private List<String> readLines(BufferedReader reader) throws IOException {
final List<String> records = new ArrayList<>();
try {
String line;
while ((line = reader.readLine()) != null) {
records.add(line);
}
} finally {
reader.close();
}
return records;
return Files.readAllLines(Paths.get(filename), StandardCharsets.UTF_8);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.balancer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand All @@ -38,14 +37,14 @@ class MoveCostFunction extends CostFunction {
private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;

private final float maxMovesPercent;
private final OffPeakHours offPeakHours;
private final ClusterInfoProvider provider;
private final float moveCost;
private final float moveCostOffPeak;

MoveCostFunction(Configuration conf) {
MoveCostFunction(Configuration conf, ClusterInfoProvider provider) {
this.provider = provider;
// What percent of the number of regions a single run of the balancer can move.
maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
offPeakHours = OffPeakHours.getInstance(conf);
moveCost = conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST);
moveCostOffPeak = conf.getFloat(MOVE_COST_OFFPEAK_KEY, DEFAULT_MOVE_COST_OFFPEAK);
// Initialize the multiplier so that addCostFunction will add this cost function.
Expand All @@ -58,7 +57,7 @@ void prepare(BalancerClusterState cluster) {
super.prepare(cluster);
// Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
// that large benefits are need to overcome the cost of a move.
if (offPeakHours.isOffPeakHour()) {
if (provider.isOffPeakHour()) {
this.setMultiplier(moveCostOffPeak);
} else {
this.setMultiplier(moveCost);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
Expand All @@ -39,9 +40,6 @@
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -128,8 +126,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
private int numRegionLoadsToRemember = 15;
private float minCostNeedBalance = 0.05f;
private boolean isBalancerDecisionRecording = false;
private boolean isBalancerRejectionRecording = false;

private List<CandidateGenerator> candidateGenerators;
private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
Expand All @@ -147,11 +143,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
private RegionReplicaRackCostFunction regionReplicaRackCostFunction;

/**
* Use to add balancer decision history to ring-buffer
*/
NamedQueueRecorder namedQueueRecorder;

/**
* The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
* default MetricsBalancer
Expand Down Expand Up @@ -233,7 +224,7 @@ protected void loadConf(Configuration conf) {
costFunctions = new ArrayList<>();
addCostFunction(new RegionCountSkewCostFunction(conf));
addCostFunction(new PrimaryRegionCountSkewCostFunction(conf));
addCostFunction(new MoveCostFunction(conf));
addCostFunction(new MoveCostFunction(conf, provider));
addCostFunction(localityCost);
addCostFunction(rackLocalityCost);
addCostFunction(new TableSkewCostFunction(conf));
Expand All @@ -249,17 +240,6 @@ protected void loadConf(Configuration conf) {
curFunctionCosts = new double[costFunctions.size()];
tempFunctionCosts = new double[costFunctions.size()];

isBalancerDecisionRecording = conf.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
isBalancerRejectionRecording =
conf.getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED,
BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED);

if (this.namedQueueRecorder == null &&
(isBalancerDecisionRecording || isBalancerRejectionRecording)) {
this.namedQueueRecorder = NamedQueueRecorder.getInstance(conf);
}

LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" +
Arrays.toString(getCostFunctionNames()) + " etc.");
Expand Down Expand Up @@ -305,6 +285,19 @@ private boolean areSomeRegionReplicasColocated(BalancerClusterState c) {
return false;
}

private String getBalanceReason(double total, double sumMultiplier) {
if (total <= 0) {
return "(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern) = " + total + " <= 0";
} else if (sumMultiplier <= 0) {
return "sumMultiplier = " + sumMultiplier + " <= 0";
} else if ((total / sumMultiplier) < minCostNeedBalance) {
return "[(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern)]/sumMultiplier = " +
(total / sumMultiplier) + " <= minCostNeedBalance(" + minCostNeedBalance + ")";
} else {
return "";
}
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
Expand All @@ -314,10 +307,8 @@ boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
LOG.debug("Not running balancer because only " + cs.getNumServers()
+ " active regionserver(s)");
}
if (this.isBalancerRejectionRecording) {
sendRejectionReasonToRingBuffer("The number of RegionServers " +
cs.getNumServers() + " < MIN_SERVER_BALANCE(" + MIN_SERVER_BALANCE + ")", null);
}
sendRejectionReasonToRingBuffer(() -> "The number of RegionServers " + cs.getNumServers() +
" < MIN_SERVER_BALANCE(" + MIN_SERVER_BALANCE + ")", null);
return false;
}
if (areSomeRegionReplicasColocated(cluster)) {
Expand Down Expand Up @@ -346,18 +337,11 @@ boolean needsBalance(TableName tableName, BalancerClusterState cluster) {

boolean balanced = total <= 0 || sumMultiplier <= 0 ||
(sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance);
if(balanced && isBalancerRejectionRecording){
String reason = "";
if (total <= 0) {
reason = "(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern) = " + total + " <= 0";
} else if (sumMultiplier <= 0) {
reason = "sumMultiplier = " + sumMultiplier + " <= 0";
} else if ((total / sumMultiplier) < minCostNeedBalance) {
reason =
"[(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern)]/sumMultiplier = " + (total
/ sumMultiplier) + " <= minCostNeedBalance(" + minCostNeedBalance + ")";
}
sendRejectionReasonToRingBuffer(reason, costFunctions);
if (balanced) {
final double calculatedTotal = total;
final double calculatedMultiplier = sumMultiplier;
sendRejectionReasonToRingBuffer(() -> getBalanceReason(calculatedTotal, calculatedMultiplier),
costFunctions);
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} {}; total cost={}, sum multiplier={}; cost/multiplier to need a balance is {}",
Expand Down Expand Up @@ -500,11 +484,10 @@ protected List<RegionPlan> balanceTable(TableName tableName, Map<ServerName,
return null;
}

private void sendRejectionReasonToRingBuffer(String reason, List<CostFunction> costFunctions){
if (this.isBalancerRejectionRecording){
BalancerRejection.Builder builder =
new BalancerRejection.Builder()
.setReason(reason);
private void sendRejectionReasonToRingBuffer(Supplier<String> reason,
List<CostFunction> costFunctions) {
provider.recordBalancerRejection(() -> {
BalancerRejection.Builder builder = new BalancerRejection.Builder().setReason(reason.get());
if (costFunctions != null) {
for (CostFunction c : costFunctions) {
float multiplier = c.getMultiplier();
Expand All @@ -514,29 +497,24 @@ private void sendRejectionReasonToRingBuffer(String reason, List<CostFunction> c
builder.addCostFuncInfo(c.getClass().getName(), c.cost(), c.getMultiplier());
}
}
namedQueueRecorder.addRecord(new BalancerRejectionDetails(builder.build()));
}
return builder.build();
});
}

private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost,
double initCost, String initFunctionTotalCosts, long step) {
if (this.isBalancerDecisionRecording) {
double initCost, String initFunctionTotalCosts, long step) {
provider.recordBalancerDecision(() -> {
List<String> regionPlans = new ArrayList<>();
for (RegionPlan plan : plans) {
regionPlans.add(
"table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName()
+ " , source: " + plan.getSource() + " , destination: " + plan.getDestination());
regionPlans
.add("table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName() +
" , source: " + plan.getSource() + " , destination: " + plan.getDestination());
}
BalancerDecision balancerDecision =
new BalancerDecision.Builder()
.setInitTotalCost(initCost)
.setInitialFunctionCosts(initFunctionTotalCosts)
.setComputedTotalCost(currentCost)
.setFinalFunctionCosts(totalCostsPerFunc())
.setComputedSteps(step)
.setRegionPlans(regionPlans).build();
namedQueueRecorder.addRecord(new BalancerDecisionDetails(balancerDecision));
}
return new BalancerDecision.Builder().setInitTotalCost(initCost)
.setInitialFunctionCosts(initFunctionTotalCosts).setComputedTotalCost(currentCost)
.setFinalFunctionCosts(totalCostsPerFunc()).setComputedSteps(step)
.setRegionPlans(regionPlans).build();
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BalancerDecision;
import org.apache.hadoop.hbase.client.BalancerRejection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;

Expand Down Expand Up @@ -80,4 +83,21 @@ public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> serv
public Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment(Collection<RegionInfo> regions) {
return Collections.emptyMap();
}

@Override
public boolean isOffPeakHour() {
return false;
}

@Override
public void recordBalancerDecision(Supplier<BalancerDecision> decision) {
}

@Override
public void recordBalancerRejection(Supplier<BalancerRejection> rejection) {
}

@Override
public void onConfigurationChange(Configuration conf) {
}
}
Loading