Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,13 @@ public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObse
void setClusterInfoProvider(ClusterInfoProvider provider);

/**
* Perform the major balance operation for cluster, will invoke {@link #balanceTable} to do actual
* balance. Normally not need override this method, except {@link SimpleLoadBalancer} and
* {@code RSGroupBasedLoadBalancer}
* Perform the major balance operation for cluster.
* @param loadOfAllTable region load of servers for all table
* @return a list of regions to be moved, including source and destination, or null if cluster is
* already balanced
*/
List<RegionPlan> balanceCluster(Map<TableName,
Map<ServerName, List<RegionInfo>>> loadOfAllTable) throws IOException;

/**
* Perform the major balance operation for table, all class implement of {@link LoadBalancer}
* should override this method
* @param tableName the table to be balanced
* @param loadOfOneTable region load of servers for the specific one table
* @return List of plans
*/
List<RegionPlan> balanceTable(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable);
List<RegionPlan> balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable)
throws IOException;

/**
* Perform a Round Robin assignment of regions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,13 +610,44 @@ private Map<ServerName, List<RegionInfo>> toEnsumbleTableLoad(
return returnMap;
}

@Override
public abstract List<RegionPlan> balanceTable(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable);
/**
* Perform the major balance operation for table, all sub classes should override this method.
* <p/>
* Will be invoked by {@link #balanceCluster(Map)}. If
* {@link HConstants#HBASE_MASTER_LOADBALANCE_BYTABLE} is enabled, we will call this method
* multiple times, one table a time, where we will only pass in the regions for a single table
* each time. If not, we will pass in all the regions at once, and the {@code tableName} will be
* {@link HConstants#ENSEMBLE_TABLE_NAME}.
* @param tableName the table to be balanced
* @param loadOfOneTable region load of servers for the specific one table
* @return List of plans
*/
protected abstract List<RegionPlan> balanceTable(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable);

/**
* Called before actually executing balanceCluster. The sub classes could override this method to
* do some initialization work.
*/
protected void
preBalanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
}

/**
* Perform the major balance operation for cluster, will invoke
* {@link #balanceTable(TableName, Map)} to do actual balance.
* <p/>
* THIs method is marked as final which means you should not override this method. See the javadoc
* for {@link #balanceTable(TableName, Map)} for more details.
* @param loadOfAllTable region load of servers for all table
* @return a list of regions to be moved, including source and destination, or null if cluster is
* already balanced
* @see #balanceTable(TableName, Map)
*/
@Override
public List<RegionPlan>
balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
public final synchronized List<RegionPlan>
balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
preBalanceCluster(loadOfAllTable);
if (isByTable) {
List<RegionPlan> result = new ArrayList<>();
loadOfAllTable.forEach((tableName, loadOfOneTable) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoa
avgLoadOverall = sum / serverLoadList.size();
}

@Override
protected void
preBalanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
// We need clusterLoad of all regions on every server to achieve overall balanced
setClusterLoad(loadOfAllTable);
}

@Override
public void onConfigurationChange(Configuration conf) {
float originSlop = slop;
Expand Down Expand Up @@ -247,7 +254,7 @@ private boolean overallNeedsBalance() {
* or null if cluster is already balanced
*/
@Override
public List<RegionPlan> balanceTable(TableName tableName,
protected List<RegionPlan> balanceTable(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
long startTime = System.currentTimeMillis();

Expand Down Expand Up @@ -466,7 +473,7 @@ public List<RegionPlan> balanceTable(TableName tableName,
* max. Together with other regions left to be assigned, we distribute all regionToMove, to the RS
* that have less regions in whole cluster scope.
*/
public void balanceOverall(List<RegionPlan> regionsToReturn,
private void balanceOverall(List<RegionPlan> regionsToReturn,
Map<ServerName, BalanceInfo> serverBalanceInfo, boolean fetchFromTail,
MinMaxPriorityQueue<RegionPlan> regionsToMove, int max, int min) {
// Step 1.
Expand Down Expand Up @@ -613,15 +620,4 @@ private void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove,
rp.setDestination(sn);
regionsToReturn.add(rp);
}

/**
* Override to invoke {@link #setClusterLoad} before balance, We need clusterLoad of all regions
* on every server to achieve overall balanced
*/
@Override
public synchronized List<RegionPlan>
balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
setClusterLoad(loadOfAllTable);
return super.balanceCluster(loadOfAllTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,9 @@ public static void beforeAllTests() throws Exception {
}

public static class MockBalancer extends BaseLoadBalancer {
@Override
public List<RegionPlan>
balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
return null;
}

@Override
public List<RegionPlan> balanceTable(TableName tableName,
protected List<RegionPlan> balanceTable(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public synchronized void initialize() throws HBaseIOException {
}

@Override
public List<RegionPlan> balanceTable(TableName tableName,
protected List<RegionPlan> balanceTable(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
// TODO. Look at is whether Stochastic loadbalancer can be integrated with this
List<RegionPlan> plans = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ private int pickMostLoadedServer(final BalancerClusterState cluster) {
* implementation. For the misplaced regions, we assign a bogus server to it and AM takes care.
*/
@Override
public List<RegionPlan> balanceTable(TableName tableName,
protected List<RegionPlan> balanceTable(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
if (this.services != null) {
List<RegionPlan> regionPlans = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ public List<RegionPlan> balanceCluster(
return Collections.emptyList();
}

@Override
public List<RegionPlan> balanceTable(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
return Collections.emptyList();
}

private Map<ServerName, List<RegionInfo>> assign(Collection<RegionInfo> regions,
List<ServerName> servers) {
// should only have 1 region server in maintenance mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ BalanceAction nextAction(BalancerClusterState cluster) {
* should always approach the optimal state given enough steps.
*/
@Override
public synchronized List<RegionPlan> balanceTable(TableName tableName, Map<ServerName,
protected List<RegionPlan> balanceTable(TableName tableName, Map<ServerName,
List<RegionInfo>> loadOfOneTable) {
// On clusters with lots of HFileLinks or lots of reference files,
// instantiating the storefile infos can be quite expensive.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ public void setMasterServices(MasterServices masterServices) {
}

/**
* Override to balance by RSGroup
* not invoke {@link #balanceTable(TableName, Map)}
* Balance by RSGroup.
*/
@Override
public List<RegionPlan> balanceCluster(
Expand Down Expand Up @@ -449,40 +448,6 @@ public void updateBalancerStatus(boolean status) {
internalBalancer.updateBalancerStatus(status);
}

/**
* can achieve table balanced rather than overall balanced
*/
@Override
public List<RegionPlan> balanceTable(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
if (!isOnline()) {
LOG.error(RSGroupInfoManager.class.getSimpleName()
+ " is not online, unable to perform balanceTable");
return null;
}
Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfThisTable = new HashMap<>();
loadOfThisTable.put(tableName, loadOfOneTable);
Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
correctedStateAndRegionPlans;
// Calculate correct assignments and a list of RegionPlan for mis-placed regions
try {
correctedStateAndRegionPlans = correctAssignments(loadOfThisTable);
} catch (IOException e) {
LOG.error("get correct assignments and mis-placed regions error ", e);
return null;
}
Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfThisTable =
correctedStateAndRegionPlans.getFirst();
List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond();
List<RegionPlan> tablePlans =
this.internalBalancer.balanceTable(tableName, correctedLoadOfThisTable.get(tableName));

if (tablePlans != null) {
regionPlans.addAll(tablePlans);
}
return regionPlans;
}

private List<ServerName> getFallBackCandidates(List<ServerName> servers) {
List<ServerName> serverNames = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public class LoadBalancerPerformanceEvaluation extends AbstractHBaseTool {
private List<ServerName> servers;
private List<RegionInfo> regions;
private Map<RegionInfo, ServerName> regionServerMap;
private Map<ServerName, List<RegionInfo>> serverRegionMap;
private Map<TableName, Map<ServerName, List<RegionInfo>>> tableServerRegionMap;

// Non-default configurations.
private void setupConf() {
Expand All @@ -92,6 +92,7 @@ private void setupConf() {
}

private void generateRegionsAndServers() {
TableName tableName = TableName.valueOf("LoadBalancerPerfTable");
// regions
regions = new ArrayList<>(numRegions);
regionServerMap = new HashMap<>(numRegions);
Expand All @@ -101,7 +102,6 @@ private void generateRegionsAndServers() {

Bytes.putInt(start, 0, i);
Bytes.putInt(end, 0, i + 1);
TableName tableName = TableName.valueOf("LoadBalancerPerfTable");
RegionInfo hri = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(start)
.setEndKey(end)
Expand All @@ -114,12 +114,13 @@ private void generateRegionsAndServers() {

// servers
servers = new ArrayList<>(numServers);
serverRegionMap = new HashMap<>(numServers);
Map<ServerName, List<RegionInfo>> serverRegionMap = new HashMap<>(numServers);
for (int i = 0; i < numServers; ++i) {
ServerName sn = ServerName.valueOf("srv" + i, HConstants.DEFAULT_REGIONSERVER_PORT, i);
servers.add(sn);
serverRegionMap.put(sn, i == 0 ? regions : Collections.emptyList());
}
tableServerRegionMap = Collections.singletonMap(tableName, serverRegionMap);
}

@Override
Expand Down Expand Up @@ -174,7 +175,7 @@ protected int doWork() throws Exception {
LOG.info("Calling " + methodName);
watch.reset().start();

loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverRegionMap);
loadBalancer.balanceCluster(tableServerRegionMap);
System.out.print(formatResults(methodName, watch.elapsed(TimeUnit.MILLISECONDS)));

return EXIT_SUCCESS;
Expand Down