Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ enum Type {
ASSIGN_REGION,
MOVE_REGION,
SWAP_REGIONS,
MOVE_BATCH,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our conditional candidate generators can frequently see pretty far into the future if they've gone to the trouble of deriving one very opinionated move anyway. So this is a nice way to represent multiple regions moves triggered by one candidate generation iteration

NULL,
}

Expand All @@ -51,6 +52,10 @@ Type getType() {
return type;
}

long getStepCount() {
return 1;
}

@Override
public String toString() {
return type + ":";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,26 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.agrona.collections.Hashing;
import org.agrona.collections.Int2IntCounterMap;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Supplier;
import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;

/**
* An efficient array based implementation similar to ClusterState for keeping the status of the
* cluster in terms of region assignment and distribution. LoadBalancers, such as
Expand Down Expand Up @@ -122,6 +129,14 @@ class BalancerClusterState {
// Maps regionName -> oldServerName -> cache ratio of the region on the old server
Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap;

private Supplier<List<Integer>> shuffledServerIndicesSupplier =
Suppliers.memoizeWithExpiration(() -> {
Collection<Integer> serverIndices = serversToIndex.values();
List<Integer> shuffledServerIndices = new ArrayList<>(serverIndices);
Collections.shuffle(shuffledServerIndices);
return shuffledServerIndices;
}, 5, TimeUnit.SECONDS);

static class DefaultRackManager extends RackManager {
@Override
public String getRack(ServerName server) {
Expand Down Expand Up @@ -705,7 +720,41 @@ enum LocalityType {
RACK
}

public void doAction(BalanceAction action) {
public List<RegionPlan> convertActionToPlans(BalanceAction action) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RegionPlans are a more straightforward interface than BalanceActions, because you don't have to do all of this switch nonsense. So the new RegionPlanConditional interface isn't concerned with BalanceActions — it's just working with RegionInfo and RegionPlan objects, for example:
isViolatingServer(RegionPlan regionPlan, Set<RegionInfo> destinationRegions)

All this to say, this was a nice method to introduce so that we could convert BalanceActions to RegionPlans as necessary for conditional evaluations, and without altering the current BalancerClusterState in-place via doAction

switch (action.getType()) {
case NULL:
break;
case ASSIGN_REGION:
// FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
assert action instanceof AssignRegionAction : action.getClass();
AssignRegionAction ar = (AssignRegionAction) action;
return ImmutableList.of(regionMoved(ar.getRegion(), -1, ar.getServer()));
case MOVE_REGION:
assert action instanceof MoveRegionAction : action.getClass();
MoveRegionAction mra = (MoveRegionAction) action;
return ImmutableList
.of(regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer()));
case SWAP_REGIONS:
assert action instanceof SwapRegionsAction : action.getClass();
SwapRegionsAction a = (SwapRegionsAction) action;
return ImmutableList.of(regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer()),
regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer()));
case MOVE_BATCH:
assert action instanceof MoveBatchAction : action.getClass();
MoveBatchAction mba = (MoveBatchAction) action;
List<RegionPlan> mbRegionPlans = new ArrayList<>();
for (MoveRegionAction moveRegionAction : mba.getMoveActions()) {
mbRegionPlans.add(regionMoved(moveRegionAction.getRegion(),
moveRegionAction.getFromServer(), moveRegionAction.getToServer()));
}
return mbRegionPlans;
default:
throw new RuntimeException("Unknown action:" + action.getType());
}
return Collections.emptyList();
}

public List<RegionPlan> doAction(BalanceAction action) {
switch (action.getType()) {
case NULL:
break;
Expand All @@ -715,30 +764,47 @@ public void doAction(BalanceAction action) {
AssignRegionAction ar = (AssignRegionAction) action;
regionsPerServer[ar.getServer()] =
addRegion(regionsPerServer[ar.getServer()], ar.getRegion());
regionMoved(ar.getRegion(), -1, ar.getServer());
break;
return ImmutableList.of(regionMoved(ar.getRegion(), -1, ar.getServer()));
case MOVE_REGION:
assert action instanceof MoveRegionAction : action.getClass();
MoveRegionAction mra = (MoveRegionAction) action;
regionsPerServer[mra.getFromServer()] =
removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion());
regionsPerServer[mra.getToServer()] =
addRegion(regionsPerServer[mra.getToServer()], mra.getRegion());
regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer());
break;
return ImmutableList
.of(regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer()));
case SWAP_REGIONS:
assert action instanceof SwapRegionsAction : action.getClass();
SwapRegionsAction a = (SwapRegionsAction) action;
regionsPerServer[a.getFromServer()] =
replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion());
regionsPerServer[a.getToServer()] =
replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion());
regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
break;
return ImmutableList.of(regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer()),
regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer()));
case MOVE_BATCH:
assert action instanceof MoveBatchAction : action.getClass();
MoveBatchAction mba = (MoveBatchAction) action;
List<RegionPlan> mbRegionPlans = new ArrayList<>();
for (int serverIndex : mba.getServerToRegionsToRemove().keySet()) {
Set<Integer> regionsToRemove = mba.getServerToRegionsToRemove().get(serverIndex);
regionsPerServer[serverIndex] =
removeRegions(regionsPerServer[serverIndex], regionsToRemove);
}
for (int serverIndex : mba.getServerToRegionsToAdd().keySet()) {
Set<Integer> regionsToAdd = mba.getServerToRegionsToAdd().get(serverIndex);
regionsPerServer[serverIndex] = addRegions(regionsPerServer[serverIndex], regionsToAdd);
}
for (MoveRegionAction moveRegionAction : mba.getMoveActions()) {
mbRegionPlans.add(regionMoved(moveRegionAction.getRegion(),
moveRegionAction.getFromServer(), moveRegionAction.getToServer()));
}
return mbRegionPlans;
default:
throw new RuntimeException("Uknown action:" + action.getType());
throw new RuntimeException("Unknown action:" + action.getType());
}
return Collections.emptyList();
}

/**
Expand Down Expand Up @@ -822,7 +888,7 @@ void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
doAction(new AssignRegionAction(region, server));
}

void regionMoved(int region, int oldServer, int newServer) {
RegionPlan regionMoved(int region, int oldServer, int newServer) {
regionIndexToServerIndex[region] = newServer;
if (initialRegionIndexToServerIndex[region] == newServer) {
numMovedRegions--; // region moved back to original location
Expand Down Expand Up @@ -853,6 +919,11 @@ void regionMoved(int region, int oldServer, int newServer) {
updateForLocation(serverIndexToRackIndex, regionsPerRack, colocatedReplicaCountsPerRack,
oldServer, newServer, primary, region);
}

// old server name can be null
ServerName oldServerName = oldServer == -1 ? null : servers[oldServer];

return new RegionPlan(regions[region], oldServerName, servers[newServer]);
}

/**
Expand Down Expand Up @@ -899,6 +970,48 @@ int[] addRegion(int[] regions, int regionIndex) {
return newRegions;
}

int[] removeRegions(int[] regions, Set<Integer> regionIndicesToRemove) {
// Calculate the size of the new regions array
int newSize = regions.length - regionIndicesToRemove.size();
if (newSize < 0) {
throw new IllegalStateException(
"Region indices mismatch: more regions to remove than in the regions array");
}
Comment on lines +976 to +979
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods make it easier to add/remove many indices from the BCS. This is nice for the MOVE_BATCH balance action, that I've justified for candidate generation performance reasons in another comment here.

Also, the nicer error messaging here is a good win imo. Previously you'd just hit ArrayIndexOutOfBoundExceptions, or worse — erroneous moves — when you fumbled the state management of your mutable BalancerClusterState. This should help anyone down the road if they're debugging a custom conditional implementation


int[] newRegions = new int[newSize];
int newIndex = 0;

// Copy only the regions not in the removal set
for (int region : regions) {
if (!regionIndicesToRemove.contains(region)) {
newRegions[newIndex++] = region;
}
}

// If the newIndex is smaller than newSize, some regions were missing from the input array
if (newIndex != newSize) {
throw new IllegalStateException("Region indices mismatch: some regions in the removal "
+ "set were not found in the regions array");
}

return newRegions;
}

int[] addRegions(int[] regions, Set<Integer> regionIndicesToAdd) {
int[] newRegions = new int[regions.length + regionIndicesToAdd.size()];

// Copy the existing regions to the new array
System.arraycopy(regions, 0, newRegions, 0, regions.length);

// Add the new regions at the end of the array
int newIndex = regions.length;
for (int regionIndex : regionIndicesToAdd) {
newRegions[newIndex++] = regionIndex;
}

return newRegions;
}

int[] addRegionSorted(int[] regions, int regionIndex) {
int[] newRegions = new int[regions.length + 1];
int i = 0;
Expand Down Expand Up @@ -998,6 +1111,10 @@ void setNumMovedRegions(int numMovedRegions) {
this.numMovedRegions = numMovedRegions;
}

List<Integer> getShuffledServerIndices() {
return shuffledServerIndicesSupplier.get();
}

@Override
public String toString() {
StringBuilder desc = new StringBuilder("Cluster={servers=[");
Expand Down
Loading