diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 88aaff2f85381..9a648bfb09ab8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -8,7 +8,6 @@ package org.elasticsearch.cluster.routing; -import com.carrotsearch.hppc.ObjectIntHashMap; import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.CollectionUtil; @@ -19,6 +18,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator; +import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.collect.Tuple; @@ -37,9 +37,12 @@ import java.util.ListIterator; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Queue; import java.util.Set; import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; /** * {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}. @@ -70,7 +73,7 @@ public class RoutingNodes implements Iterable { private int relocatingShards = 0; - private final Map> nodesPerAttributeNames = new HashMap<>(); + private final Map> attributeValuesByAttribute = new HashMap<>(); private final Map recoveriesPerNode = new HashMap<>(); public RoutingNodes(ClusterState clusterState) { @@ -229,20 +232,12 @@ public RoutingNode node(String nodeId) { return nodesToShards.get(nodeId); } - public ObjectIntHashMap nodesPerAttributesCounts(String attributeName) { - ObjectIntHashMap nodesPerAttributesCounts = nodesPerAttributeNames.get(attributeName); - if (nodesPerAttributesCounts != null) { - return nodesPerAttributesCounts; - } - nodesPerAttributesCounts = new ObjectIntHashMap<>(); - for (RoutingNode routingNode : this) { - String attrValue = routingNode.node().getAttributes().get(attributeName); - if (attrValue != null) { - nodesPerAttributesCounts.addTo(attrValue, 1); - } - } - nodesPerAttributeNames.put(attributeName, nodesPerAttributesCounts); - return nodesPerAttributesCounts; + public Set getAttributeValues(String attributeName) { + // Only ever accessed on the master service thread so no need for synchronization + assert MasterService.isMasterUpdateThread() || Thread.currentThread().getName().startsWith("TEST-") + : Thread.currentThread().getName() + " should be the master service thread"; + return attributeValuesByAttribute.computeIfAbsent(attributeName, ignored -> StreamSupport.stream(this.spliterator(), false) + .map(r -> r.node().getAttributes().get(attributeName)).filter(Objects::nonNull).collect(Collectors.toSet())); } /** diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java index c8be1593a910e..8d5c2b669fb79 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java @@ -8,7 +8,6 @@ package org.elasticsearch.cluster.routing.allocation.decider; -import com.carrotsearch.hppc.ObjectIntHashMap; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; @@ -22,8 +21,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; -import java.util.stream.StreamSupport; +import java.util.stream.Stream; import static java.util.Collections.emptyList; import static java.util.stream.Collectors.toList; @@ -133,70 +133,67 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout } final boolean debug = allocation.debugDecision(); - IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index()); + final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index()); if (INDEX_AUTO_EXPAND_REPLICAS_SETTING.get(indexMetadata.getSettings()).expandToAllNodes()) { return YES_AUTO_EXPAND_ALL; } - int shardCount = indexMetadata.getNumberOfReplicas() + 1; // 1 for primary + final int shardCount = indexMetadata.getNumberOfReplicas() + 1; // 1 for primary for (String awarenessAttribute : awarenessAttributes) { // the node the shard exists on must be associated with an awareness attribute if (node.node().getAttributes().containsKey(awarenessAttribute) == false) { return debug ? debugNoMissingAttribute(awarenessAttribute, awarenessAttributes) : Decision.NO; } - // build attr_value -> nodes map - ObjectIntHashMap nodesPerAttribute = allocation.routingNodes().nodesPerAttributesCounts(awarenessAttribute); + final Set actualAttributeValues = allocation.routingNodes().getAttributeValues(awarenessAttribute); + final String targetAttributeValue = node.node().getAttributes().get(awarenessAttribute); + assert targetAttributeValue != null : "attribute [" + awarenessAttribute + "] missing on " + node.node(); + assert actualAttributeValues.contains(targetAttributeValue) + : "attribute [" + awarenessAttribute + "] on " + node.node() + " is not in " + actualAttributeValues; + + int shardsForTargetAttributeValue = 0; + // Will be the count of shards on nodes with attribute `awarenessAttribute` matching the one on `node`. - // build the count of shards per attribute value - ObjectIntHashMap shardPerAttribute = new ObjectIntHashMap<>(); for (ShardRouting assignedShard : allocation.routingNodes().assignedShards(shardRouting.shardId())) { if (assignedShard.started() || assignedShard.initializing()) { // Note: this also counts relocation targets as that will be the new location of the shard. // Relocation sources should not be counted as the shard is moving away - RoutingNode routingNode = allocation.routingNodes().node(assignedShard.currentNodeId()); - shardPerAttribute.addTo(routingNode.node().getAttributes().get(awarenessAttribute), 1); + final RoutingNode assignedNode = allocation.routingNodes().node(assignedShard.currentNodeId()); + if (targetAttributeValue.equals(assignedNode.node().getAttributes().get(awarenessAttribute))) { + shardsForTargetAttributeValue += 1; + } } } if (moveToNode) { if (shardRouting.assignedToNode()) { - String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId(); - if (node.nodeId().equals(nodeId) == false) { - // we work on different nodes, move counts around - shardPerAttribute.putOrAdd(allocation.routingNodes().node(nodeId).node().getAttributes().get(awarenessAttribute), - 0, -1); - shardPerAttribute.addTo(node.node().getAttributes().get(awarenessAttribute), 1); - } + final RoutingNode currentNode = allocation.routingNodes().node( + shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId()); + if (targetAttributeValue.equals(currentNode.node().getAttributes().get(awarenessAttribute)) == false) { + shardsForTargetAttributeValue += 1; + } // else this shard is already on a node in the same zone as the target node, so moving it doesn't change the count } else { - shardPerAttribute.addTo(node.node().getAttributes().get(awarenessAttribute), 1); + shardsForTargetAttributeValue += 1; } } - int numberOfAttributes = nodesPerAttribute.size(); - List fullValues = forcedAwarenessAttributes.get(awarenessAttribute); - if (fullValues != null) { - for (String fullValue : fullValues) { - if (shardPerAttribute.containsKey(fullValue) == false) { - numberOfAttributes++; - } - } - } - // TODO should we remove ones that are not part of full list? + final List forcedValues = forcedAwarenessAttributes.get(awarenessAttribute); + final int valueCount = forcedValues == null + ? actualAttributeValues.size() + : Math.toIntExact(Stream.concat(actualAttributeValues.stream(), forcedValues.stream()).distinct().count()); - final int currentNodeCount = shardPerAttribute.get(node.node().getAttributes().get(awarenessAttribute)); - final int maximumNodeCount = (shardCount + numberOfAttributes - 1) / numberOfAttributes; // ceil(shardCount/numberOfAttributes) - if (currentNodeCount > maximumNodeCount) { + final int maximumShardsPerAttributeValue = (shardCount + valueCount - 1) / valueCount; // ceil(shardCount/valueCount) + if (shardsForTargetAttributeValue > maximumShardsPerAttributeValue) { return debug ? debugNoTooManyCopies( shardCount, awarenessAttribute, node.node().getAttributes().get(awarenessAttribute), - numberOfAttributes, - StreamSupport.stream(nodesPerAttribute.keys().spliterator(), false).map(c -> c.value).sorted().collect(toList()), - fullValues == null ? null : fullValues.stream().sorted().collect(toList()), - currentNodeCount, - maximumNodeCount) + valueCount, + actualAttributeValues.stream().sorted().collect(toList()), + forcedValues == null ? null : forcedValues.stream().sorted().collect(toList()), + shardsForTargetAttributeValue, + maximumShardsPerAttributeValue) : Decision.NO; } } @@ -211,8 +208,8 @@ private static Decision debugNoTooManyCopies( int numberOfAttributes, List realAttributes, List forcedAttributes, - int currentNodeCount, - int maximumNodeCount) { + int actualShardCount, + int maximumShardCount) { return Decision.single(Decision.Type.NO, NAME, "there are [%d] copies of this shard and [%d] values for attribute [%s] (%s from nodes in the cluster and %s) so there " + "may be at most [%d] copies of this shard allocated to nodes with each value, but (including this copy) there " + @@ -222,8 +219,8 @@ private static Decision debugNoTooManyCopies( attributeName, realAttributes, forcedAttributes == null ? "no forced awareness" : forcedAttributes + " from forced awareness", - maximumNodeCount, - currentNodeCount, + maximumShardCount, + actualShardCount, attributeName, attributeValue); } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index f8c400313333d..f7f04456926ba 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -173,8 +173,8 @@ ClusterState state() { return clusterStateSupplier.get(); } - private static boolean isMasterUpdateThread() { - return Thread.currentThread().getName().contains(MASTER_UPDATE_THREAD_NAME); + public static boolean isMasterUpdateThread() { + return Thread.currentThread().getName().contains('[' + MASTER_UPDATE_THREAD_NAME + ']'); } public static boolean assertNotMasterUpdateThread(String reason) {