Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,13 @@ public synchronized void loadConf(Configuration configuration) {
}

@Override
protected List<CandidateGenerator> createCandidateGenerators() {
List<CandidateGenerator> candidateGenerators = new ArrayList<>(2);
candidateGenerators.add(GeneratorFunctionType.LOAD.ordinal(),
protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
createCandidateGenerators() {
Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators =
new HashMap<>(2);
candidateGenerators.put(CacheAwareSkewnessCandidateGenerator.class,
new CacheAwareSkewnessCandidateGenerator());
candidateGenerators.add(GeneratorFunctionType.CACHE_RATIO.ordinal(),
new CacheAwareCandidateGenerator());
candidateGenerators.put(CacheAwareCandidateGenerator.class, new CacheAwareCandidateGenerator());
return candidateGenerators;
}

Expand Down Expand Up @@ -409,8 +410,9 @@ protected void regionMoved(int region, int oldServer, int newServer) {
});
}

public final void updateWeight(double[] weights) {
weights[GeneratorFunctionType.LOAD.ordinal()] += cost();
@Override
public final void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) {
weights.merge(LoadCandidateGenerator.class, cost(), Double::sum);
}
}

Expand Down Expand Up @@ -478,8 +480,8 @@ private int getServerWithBestCacheRatioForRegion(int region) {
}

@Override
public final void updateWeight(double[] weights) {
weights[GeneratorFunctionType.CACHE_RATIO.ordinal()] += cost();
public void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) {
weights.merge(LoadCandidateGenerator.class, cost(), Double::sum);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import java.util.Map;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -91,8 +92,8 @@ protected void regionMoved(int region, int oldServer, int newServer) {
* Called once per init or after postAction.
* @param weights the weights for every generator.
*/
public void updateWeight(double[] weights) {
weights[StochasticLoadBalancer.GeneratorType.RANDOM.ordinal()] += cost();
public void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) {
weights.merge(RandomCandidateGenerator.class, cost(), Double::sum);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,20 @@ public void setFavoredNodesManager(FavoredNodesManager fnm) {
}

@Override
protected List<CandidateGenerator> createCandidateGenerators() {
List<CandidateGenerator> fnPickers = new ArrayList<>(2);
fnPickers.add(new FavoredNodeLoadPicker());
fnPickers.add(new FavoredNodeLocalityPicker());
protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
createCandidateGenerators() {
Map<Class<? extends CandidateGenerator>, CandidateGenerator> fnPickers = new HashMap<>(2);
fnPickers.put(FavoredNodeLoadPicker.class, new FavoredNodeLoadPicker());
fnPickers.put(FavoredNodeLocalityPicker.class, new FavoredNodeLocalityPicker());
return fnPickers;
}

/** Returns any candidate generator in random */
@Override
protected CandidateGenerator getRandomGenerator() {
return candidateGenerators.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
Class<? extends CandidateGenerator> clazz = shuffledGeneratorClasses.get()
.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
return candidateGenerators.get(clazz);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.master.balancer.BalancerClusterState.LocalityType;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -89,7 +90,7 @@ private double getWeightedLocality(int region, int entity) {
}

@Override
public final void updateWeight(double[] weights) {
weights[StochasticLoadBalancer.GeneratorType.LOCALITY.ordinal()] += cost();
public final void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) {
weights.merge(LocalityBasedCandidateGenerator.class, cost(), Double::sum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -61,7 +62,7 @@ protected void regionMoved(int region, int oldServer, int newServer) {
}

@Override
public final void updateWeight(double[] weights) {
weights[StochasticLoadBalancer.GeneratorType.LOAD.ordinal()] += cost();
public final void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) {
weights.merge(LoadCandidateGenerator.class, cost(), Double::sum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.agrona.collections.Hashing;
import org.agrona.collections.Int2IntCounterMap;
Expand Down Expand Up @@ -73,8 +74,8 @@ protected double cost() {
}

@Override
public final void updateWeight(double[] weights) {
weights[StochasticLoadBalancer.GeneratorType.RACK.ordinal()] += cost();
public final void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) {
weights.merge(RegionReplicaRackCandidateGenerator.class, cost(), Double::sum);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
Expand All @@ -48,6 +51,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;

/**
* <p>
* This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will randomly try and
Expand Down Expand Up @@ -147,7 +153,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private double curOverallCost = 0d;
private double[] tempFunctionCosts;
private double[] curFunctionCosts;
private double[] weightsOfGenerators;

// Keep locality based picker and cost function to alert them
// when new services are offered
Expand All @@ -157,14 +162,16 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
private RegionReplicaRackCostFunction regionReplicaRackCostFunction;

protected List<CandidateGenerator> candidateGenerators;

public enum GeneratorType {
RANDOM,
LOAD,
LOCALITY,
RACK
}
private final Map<Class<? extends CandidateGenerator>, Double> weightsOfGenerators =
new HashMap<>();
protected Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators;
protected final Supplier<List<Class<? extends CandidateGenerator>>> shuffledGeneratorClasses =
Suppliers.memoizeWithExpiration(() -> {
List<Class<? extends CandidateGenerator>> shuffled =
new ArrayList<>(candidateGenerators.keySet());
Collections.shuffle(shuffled);
return shuffled;
}, 5, TimeUnit.SECONDS);

/**
* The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
Expand Down Expand Up @@ -213,16 +220,20 @@ private void loadCustomCostFunctions(Configuration conf) {

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
List<CandidateGenerator> getCandidateGenerators() {
Map<Class<? extends CandidateGenerator>, CandidateGenerator> getCandidateGenerators() {
return this.candidateGenerators;
}

protected List<CandidateGenerator> createCandidateGenerators() {
List<CandidateGenerator> candidateGenerators = new ArrayList<CandidateGenerator>(4);
candidateGenerators.add(GeneratorType.RANDOM.ordinal(), new RandomCandidateGenerator());
candidateGenerators.add(GeneratorType.LOAD.ordinal(), new LoadCandidateGenerator());
candidateGenerators.add(GeneratorType.LOCALITY.ordinal(), localityCandidateGenerator);
candidateGenerators.add(GeneratorType.RACK.ordinal(),
protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
createCandidateGenerators() {
Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators =
new HashMap<>(5);
candidateGenerators.put(RandomCandidateGenerator.class, new RandomCandidateGenerator());
candidateGenerators.put(LoadCandidateGenerator.class, new LoadCandidateGenerator());
candidateGenerators.put(LocalityBasedCandidateGenerator.class, localityCandidateGenerator);
candidateGenerators.put(RegionReplicaCandidateGenerator.class,
new RegionReplicaCandidateGenerator());
candidateGenerators.put(RegionReplicaRackCandidateGenerator.class,
new RegionReplicaRackCandidateGenerator());
return candidateGenerators;
}
Expand Down Expand Up @@ -409,34 +420,54 @@ boolean needsBalance(TableName tableName, BalancerClusterState cluster) {

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
BalanceAction nextAction(BalancerClusterState cluster) {
return getRandomGenerator().generate(cluster);
Pair<CandidateGenerator, BalanceAction> nextAction(BalancerClusterState cluster) {
CandidateGenerator generator = getRandomGenerator();
return Pair.newPair(generator, generator.generate(cluster));
}

/**
* Select the candidate generator to use based on the cost of cost functions. The chance of
* selecting a candidate generator is propotional to the share of cost of all cost functions among
* all cost functions that benefit from it.
* selecting a candidate generator is proportional to the share of cost of all cost functions
* among all cost functions that benefit from it.
*/
protected CandidateGenerator getRandomGenerator() {
double sum = 0;
for (int i = 0; i < weightsOfGenerators.length; i++) {
sum += weightsOfGenerators[i];
weightsOfGenerators[i] = sum;
}
if (sum == 0) {
return candidateGenerators.get(0);
Preconditions.checkState(!candidateGenerators.isEmpty(), "No candidate generators available.");
List<Class<? extends CandidateGenerator>> generatorClasses = shuffledGeneratorClasses.get();
List<Double> partialSums = new ArrayList<>(generatorClasses.size());
double sum = 0.0;
for (Class<? extends CandidateGenerator> clazz : generatorClasses) {
double weight = weightsOfGenerators.getOrDefault(clazz, 0.0);
sum += weight;
partialSums.add(sum);
}
for (int i = 0; i < weightsOfGenerators.length; i++) {
weightsOfGenerators[i] /= sum;

// If the sum of all weights is zero, fall back to any generator
if (sum == 0.0) {
return pickAnyGenerator(generatorClasses);
}

double rand = ThreadLocalRandom.current().nextDouble();
for (int i = 0; i < weightsOfGenerators.length; i++) {
if (rand <= weightsOfGenerators[i]) {
return candidateGenerators.get(i);
// Normalize partial sums so that the last one should be exactly 1.0
for (int i = 0; i < partialSums.size(); i++) {
partialSums.set(i, partialSums.get(i) / sum);
}

// Generate a random number and pick the first generator whose partial sum is >= rand
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This weighted selection algorithm is pretty awkward to follow if you don't expect it. Can you isolate it within its own method so that the calling method is easier to read and the implementation could be picked up for reuse later. Also, it becomes testable independently, if such a thing is testable.

Copy link
Contributor Author

@rmdmattingly rmdmattingly Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I had some trouble understanding exactly what the goals were when working this the original code. I want to flag that this weighted selection algorithm is a pretty faithful attempt at maintaining the original approach — but with maps rather than arrays, some extra comments to try to explain the steps more explicitly, explicitly throwing rather than fetching a nonexistent item if generators are empty, falling back to random generators rather than some magic importance on the last in the list, etc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I can clean this up a bit with the precondition and maybe pulling random generator picking out into its own method — but the logic necessary for picking a random generator is basically the entirety of this. Unless we want to make a static method with a ton of arguments just for testing's sake, and honestly I think this is really well tested by the current suite; if you don't pick the right generators at the right time, then the balancer doesn't work well at all

for (int i = 0; i < partialSums.size(); i++) {
if (rand <= partialSums.get(i)) {
return candidateGenerators.get(generatorClasses.get(i));
}
}
return candidateGenerators.get(candidateGenerators.size() - 1);

// Fallback: if for some reason we didn't return above, return any generator
return pickAnyGenerator(generatorClasses);
}

private CandidateGenerator
pickAnyGenerator(List<Class<? extends CandidateGenerator>> generatorClasses) {
Class<? extends CandidateGenerator> randomClass =
generatorClasses.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
return candidateGenerators.get(randomClass);
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
Expand Down Expand Up @@ -521,22 +552,27 @@ protected List<RegionPlan> balanceTable(TableName tableName,
final String initFunctionTotalCosts = totalCostsPerFunc();
// Perform a stochastic walk to see if we can get a good fit.
long step;

Map<Class<? extends CandidateGenerator>, Long> generatorToStepCount = new HashMap<>();
Map<Class<? extends CandidateGenerator>, Long> generatorToApprovedActionCount = new HashMap<>();
for (step = 0; step < computedMaxSteps; step++) {
BalanceAction action = nextAction(cluster);
Pair<CandidateGenerator, BalanceAction> nextAction = nextAction(cluster);
CandidateGenerator generator = nextAction.getFirst();
BalanceAction action = nextAction.getSecond();

if (action.getType() == BalanceAction.Type.NULL) {
continue;
}

cluster.doAction(action);
updateCostsAndWeightsWithAction(cluster, action);
generatorToStepCount.merge(generator.getClass(), 1L, Long::sum);

newCost = computeCost(cluster, currentCost);

// Should this be kept?
if (newCost < currentCost) {
currentCost = newCost;
generatorToApprovedActionCount.merge(generator.getClass(), 1L, Long::sum);

// save for JMX
curOverallCost = currentCost;
Expand All @@ -555,6 +591,15 @@ protected List<RegionPlan> balanceTable(TableName tableName,
}
long endTime = EnvironmentEdgeManager.currentTime();

StringJoiner joiner = new StringJoiner("\n");
joiner.add("CandidateGenerator activity summary:");
generatorToStepCount.forEach((generator, count) -> {
long approvals = generatorToApprovedActionCount.getOrDefault(generator, 0L);
joiner.add(String.format(" - %s: %d steps, %d approvals", generator.getSimpleName(), count,
approvals));
});
LOG.debug(joiner.toString());

metricsBalancer.balanceCluster(endTime - startTime);

if (initCost > currentCost) {
Expand Down Expand Up @@ -747,8 +792,10 @@ private void updateRegionLoad() {
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
void initCosts(BalancerClusterState cluster) {
// Initialize the weights of generator every time
weightsOfGenerators = new double[this.candidateGenerators.size()];
weightsOfGenerators.clear();
for (Class<? extends CandidateGenerator> clazz : candidateGenerators.keySet()) {
weightsOfGenerators.put(clazz, 0.0);
}
for (CostFunction c : costFunctions) {
c.prepare(cluster);
c.updateWeight(weightsOfGenerators);
Expand All @@ -762,8 +809,8 @@ void initCosts(BalancerClusterState cluster) {
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action) {
// Reset all the weights to 0
for (int i = 0; i < weightsOfGenerators.length; i++) {
weightsOfGenerators[i] = 0;
for (Class<? extends CandidateGenerator> clazz : candidateGenerators.keySet()) {
weightsOfGenerators.put(clazz, 0.0);
}
for (CostFunction c : costFunctions) {
if (c.isNeeded()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ public void testCostAfterUndoAction() {
loadBalancer.initCosts(cluster);
for (int i = 0; i != runs; ++i) {
final double expectedCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE);
BalanceAction action = loadBalancer.nextAction(cluster);
BalanceAction action = loadBalancer.nextAction(cluster).getSecond();
cluster.doAction(action);
loadBalancer.updateCostsAndWeightsWithAction(cluster, action);
BalanceAction undoAction = action.undoAction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class TestStochasticLoadBalancerBalanceCluster extends StochasticBalancer
*/
@Test
public void testBalanceCluster() throws Exception {
setMaxRunTime(Duration.ofMillis(1500));
setMaxRunTime(Duration.ofMillis(2500));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I was a little ambitious with the duration required here (in #6597 I drastically reduced balancer test suite runtime), and this test is much less flexible because it doesn't follow the standard balancer test framework. I could bite off a larger refactor, but I'm not sure it's worth it. So, anyway, I saw some flappiness in this test locally that was resolved by just giving a little bit more time for plan generation since it expects the plan to be perfect in one shot

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you'll find that the upstream execution needs quite a lot more time than local runs. Congratulations on your first flakey test!

loadBalancer.onConfigurationChange(conf);
for (int[] mockCluster : clusterStateMocks) {
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void testRegionReplicationOnMidClusterWithRacks() {
public void testRegionReplicationOnLargeClusterWithRacks() {
conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 100000000L);
setMaxRunTime(Duration.ofSeconds(5));
setMaxRunTime(Duration.ofSeconds(10));
loadBalancer.onConfigurationChange(conf);
int numNodes = 100;
int numRegions = numNodes * 30;
Expand Down
Loading