Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.cluster.routing;

import com.carrotsearch.hppc.ObjectIntHashMap;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.CollectionUtil;
Expand All @@ -19,6 +18,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.collect.Tuple;
Expand All @@ -37,9 +37,12 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}.
Expand Down Expand Up @@ -70,7 +73,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {

private int relocatingShards = 0;

private final Map<String, ObjectIntHashMap<String>> nodesPerAttributeNames = new HashMap<>();
private final Map<String, Set<String>> attributeValuesByAttribute = new HashMap<>();
private final Map<String, Recoveries> recoveriesPerNode = new HashMap<>();

public RoutingNodes(ClusterState clusterState) {
Expand Down Expand Up @@ -229,20 +232,12 @@ public RoutingNode node(String nodeId) {
return nodesToShards.get(nodeId);
}

public ObjectIntHashMap<String> nodesPerAttributesCounts(String attributeName) {
ObjectIntHashMap<String> nodesPerAttributesCounts = nodesPerAttributeNames.get(attributeName);
if (nodesPerAttributesCounts != null) {
return nodesPerAttributesCounts;
}
nodesPerAttributesCounts = new ObjectIntHashMap<>();
for (RoutingNode routingNode : this) {
String attrValue = routingNode.node().getAttributes().get(attributeName);
if (attrValue != null) {
nodesPerAttributesCounts.addTo(attrValue, 1);
}
}
nodesPerAttributeNames.put(attributeName, nodesPerAttributesCounts);
return nodesPerAttributesCounts;
public Set<String> getAttributeValues(String attributeName) {
// Only ever accessed on the master service thread so no need for synchronization
assert MasterService.isMasterUpdateThread() || Thread.currentThread().getName().startsWith("TEST-")
: Thread.currentThread().getName() + " should be the master service thread";
return attributeValuesByAttribute.computeIfAbsent(attributeName, ignored -> StreamSupport.stream(this.spliterator(), false)
.map(r -> r.node().getAttributes().get(attributeName)).filter(Objects::nonNull).collect(Collectors.toSet()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.cluster.routing.allocation.decider;

import com.carrotsearch.hppc.ObjectIntHashMap;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand All @@ -22,8 +21,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.StreamSupport;
import java.util.stream.Stream;

import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -133,70 +133,67 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
}

final boolean debug = allocation.debugDecision();
IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());
final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());

if (INDEX_AUTO_EXPAND_REPLICAS_SETTING.get(indexMetadata.getSettings()).expandToAllNodes()) {
return YES_AUTO_EXPAND_ALL;
}

int shardCount = indexMetadata.getNumberOfReplicas() + 1; // 1 for primary
final int shardCount = indexMetadata.getNumberOfReplicas() + 1; // 1 for primary
for (String awarenessAttribute : awarenessAttributes) {
// the node the shard exists on must be associated with an awareness attribute
if (node.node().getAttributes().containsKey(awarenessAttribute) == false) {
return debug ? debugNoMissingAttribute(awarenessAttribute, awarenessAttributes) : Decision.NO;
}

// build attr_value -> nodes map
ObjectIntHashMap<String> nodesPerAttribute = allocation.routingNodes().nodesPerAttributesCounts(awarenessAttribute);
final Set<String> actualAttributeValues = allocation.routingNodes().getAttributeValues(awarenessAttribute);
final String targetAttributeValue = node.node().getAttributes().get(awarenessAttribute);
assert targetAttributeValue != null : "attribute [" + awarenessAttribute + "] missing on " + node.node();
assert actualAttributeValues.contains(targetAttributeValue)
: "attribute [" + awarenessAttribute + "] on " + node.node() + " is not in " + actualAttributeValues;

int shardsForTargetAttributeValue = 0;
// Will be the count of shards on nodes with attribute `awarenessAttribute` matching the one on `node`.

// build the count of shards per attribute value
ObjectIntHashMap<String> shardPerAttribute = new ObjectIntHashMap<>();
for (ShardRouting assignedShard : allocation.routingNodes().assignedShards(shardRouting.shardId())) {
if (assignedShard.started() || assignedShard.initializing()) {
// Note: this also counts relocation targets as that will be the new location of the shard.
// Relocation sources should not be counted as the shard is moving away
RoutingNode routingNode = allocation.routingNodes().node(assignedShard.currentNodeId());
shardPerAttribute.addTo(routingNode.node().getAttributes().get(awarenessAttribute), 1);
final RoutingNode assignedNode = allocation.routingNodes().node(assignedShard.currentNodeId());
if (targetAttributeValue.equals(assignedNode.node().getAttributes().get(awarenessAttribute))) {
shardsForTargetAttributeValue += 1;
}
}
}

if (moveToNode) {
if (shardRouting.assignedToNode()) {
String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId();
if (node.nodeId().equals(nodeId) == false) {
// we work on different nodes, move counts around
shardPerAttribute.putOrAdd(allocation.routingNodes().node(nodeId).node().getAttributes().get(awarenessAttribute),
0, -1);
shardPerAttribute.addTo(node.node().getAttributes().get(awarenessAttribute), 1);
}
final RoutingNode currentNode = allocation.routingNodes().node(
shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId());
if (targetAttributeValue.equals(currentNode.node().getAttributes().get(awarenessAttribute)) == false) {
shardsForTargetAttributeValue += 1;
} // else this shard is already on a node in the same zone as the target node, so moving it doesn't change the count
} else {
shardPerAttribute.addTo(node.node().getAttributes().get(awarenessAttribute), 1);
shardsForTargetAttributeValue += 1;
}
}

int numberOfAttributes = nodesPerAttribute.size();
List<String> fullValues = forcedAwarenessAttributes.get(awarenessAttribute);
if (fullValues != null) {
for (String fullValue : fullValues) {
if (shardPerAttribute.containsKey(fullValue) == false) {
numberOfAttributes++;
}
}
}
// TODO should we remove ones that are not part of full list?
final List<String> forcedValues = forcedAwarenessAttributes.get(awarenessAttribute);
final int valueCount = forcedValues == null
? actualAttributeValues.size()
: Math.toIntExact(Stream.concat(actualAttributeValues.stream(), forcedValues.stream()).distinct().count());

final int currentNodeCount = shardPerAttribute.get(node.node().getAttributes().get(awarenessAttribute));
final int maximumNodeCount = (shardCount + numberOfAttributes - 1) / numberOfAttributes; // ceil(shardCount/numberOfAttributes)
if (currentNodeCount > maximumNodeCount) {
final int maximumShardsPerAttributeValue = (shardCount + valueCount - 1) / valueCount; // ceil(shardCount/valueCount)
if (shardsForTargetAttributeValue > maximumShardsPerAttributeValue) {
return debug ? debugNoTooManyCopies(
shardCount,
awarenessAttribute,
node.node().getAttributes().get(awarenessAttribute),
numberOfAttributes,
StreamSupport.stream(nodesPerAttribute.keys().spliterator(), false).map(c -> c.value).sorted().collect(toList()),
fullValues == null ? null : fullValues.stream().sorted().collect(toList()),
currentNodeCount,
maximumNodeCount)
valueCount,
actualAttributeValues.stream().sorted().collect(toList()),
forcedValues == null ? null : forcedValues.stream().sorted().collect(toList()),
shardsForTargetAttributeValue,
maximumShardsPerAttributeValue)
: Decision.NO;
}
}
Expand All @@ -211,8 +208,8 @@ private static Decision debugNoTooManyCopies(
int numberOfAttributes,
List<String> realAttributes,
List<String> forcedAttributes,
int currentNodeCount,
int maximumNodeCount) {
int actualShardCount,
int maximumShardCount) {
return Decision.single(Decision.Type.NO, NAME,
"there are [%d] copies of this shard and [%d] values for attribute [%s] (%s from nodes in the cluster and %s) so there " +
"may be at most [%d] copies of this shard allocated to nodes with each value, but (including this copy) there " +
Expand All @@ -222,8 +219,8 @@ private static Decision debugNoTooManyCopies(
attributeName,
realAttributes,
forcedAttributes == null ? "no forced awareness" : forcedAttributes + " from forced awareness",
maximumNodeCount,
currentNodeCount,
maximumShardCount,
actualShardCount,
attributeName,
attributeValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ ClusterState state() {
return clusterStateSupplier.get();
}

private static boolean isMasterUpdateThread() {
return Thread.currentThread().getName().contains(MASTER_UPDATE_THREAD_NAME);
public static boolean isMasterUpdateThread() {
return Thread.currentThread().getName().contains('[' + MASTER_UPDATE_THREAD_NAME + ']');
}

public static boolean assertNotMasterUpdateThread(String reason) {
Expand Down