From 9f5b03c0e1f7a2ceb2d653fdccb8d0916e1c870a Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 8 Mar 2021 10:38:06 +0000 Subject: [PATCH 1/2] Reduce state size in awareness allocation decider Today the awareness allocation decider computes the number of nodes and shards in each zone, even though it only needs to know the names of the zones and the number of shards in the current zone. This commit drops the unnecessary state in the computation. --- .../cluster/routing/RoutingNodes.java | 27 ++++---- .../decider/AwarenessAllocationDecider.java | 65 +++++++++---------- .../cluster/service/MasterService.java | 4 +- 3 files changed, 44 insertions(+), 52 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 88aaff2f85381..9a648bfb09ab8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -8,7 +8,6 @@ package org.elasticsearch.cluster.routing; -import com.carrotsearch.hppc.ObjectIntHashMap; import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.CollectionUtil; @@ -19,6 +18,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator; +import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.collect.Tuple; @@ -37,9 +37,12 @@ import java.util.ListIterator; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Queue; import java.util.Set; import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; /** * {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}. @@ -70,7 +73,7 @@ public class RoutingNodes implements Iterable { private int relocatingShards = 0; - private final Map> nodesPerAttributeNames = new HashMap<>(); + private final Map> attributeValuesByAttribute = new HashMap<>(); private final Map recoveriesPerNode = new HashMap<>(); public RoutingNodes(ClusterState clusterState) { @@ -229,20 +232,12 @@ public RoutingNode node(String nodeId) { return nodesToShards.get(nodeId); } - public ObjectIntHashMap nodesPerAttributesCounts(String attributeName) { - ObjectIntHashMap nodesPerAttributesCounts = nodesPerAttributeNames.get(attributeName); - if (nodesPerAttributesCounts != null) { - return nodesPerAttributesCounts; - } - nodesPerAttributesCounts = new ObjectIntHashMap<>(); - for (RoutingNode routingNode : this) { - String attrValue = routingNode.node().getAttributes().get(attributeName); - if (attrValue != null) { - nodesPerAttributesCounts.addTo(attrValue, 1); - } - } - nodesPerAttributeNames.put(attributeName, nodesPerAttributesCounts); - return nodesPerAttributesCounts; + public Set getAttributeValues(String attributeName) { + // Only ever accessed on the master service thread so no need for synchronization + assert MasterService.isMasterUpdateThread() || Thread.currentThread().getName().startsWith("TEST-") + : Thread.currentThread().getName() + " should be the master service thread"; + return attributeValuesByAttribute.computeIfAbsent(attributeName, ignored -> StreamSupport.stream(this.spliterator(), false) + .map(r -> r.node().getAttributes().get(attributeName)).filter(Objects::nonNull).collect(Collectors.toSet())); } /** diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java index c8be1593a910e..62661665c0f53 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java @@ -8,7 +8,6 @@ package org.elasticsearch.cluster.routing.allocation.decider; -import com.carrotsearch.hppc.ObjectIntHashMap; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; @@ -22,8 +21,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; -import java.util.stream.StreamSupport; +import java.util.stream.Stream; import static java.util.Collections.emptyList; import static java.util.stream.Collectors.toList; @@ -146,57 +146,54 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout return debug ? debugNoMissingAttribute(awarenessAttribute, awarenessAttributes) : Decision.NO; } - // build attr_value -> nodes map - ObjectIntHashMap nodesPerAttribute = allocation.routingNodes().nodesPerAttributesCounts(awarenessAttribute); + final Set actualAttributeValues = allocation.routingNodes().getAttributeValues(awarenessAttribute); + final String currentAttributeValue = node.node().getAttributes().get(awarenessAttribute); + assert currentAttributeValue != null : "attribute [" + awarenessAttribute + "] missing on " + node.node(); + assert actualAttributeValues.contains(currentAttributeValue) + : "attribute [" + awarenessAttribute + "] on " + node.node() + " is not in " + actualAttributeValues; + + int shardsForCurrentAttributeValue = 0; + // Will be the count of shards on nodes with attribute `awarenessAttribute` matching the one on `node`. - // build the count of shards per attribute value - ObjectIntHashMap shardPerAttribute = new ObjectIntHashMap<>(); for (ShardRouting assignedShard : allocation.routingNodes().assignedShards(shardRouting.shardId())) { if (assignedShard.started() || assignedShard.initializing()) { // Note: this also counts relocation targets as that will be the new location of the shard. // Relocation sources should not be counted as the shard is moving away - RoutingNode routingNode = allocation.routingNodes().node(assignedShard.currentNodeId()); - shardPerAttribute.addTo(routingNode.node().getAttributes().get(awarenessAttribute), 1); + final RoutingNode assignedNode = allocation.routingNodes().node(assignedShard.currentNodeId()); + if (currentAttributeValue.equals(assignedNode.node().getAttributes().get(awarenessAttribute))) { + shardsForCurrentAttributeValue += 1; + } } } if (moveToNode) { if (shardRouting.assignedToNode()) { - String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId(); - if (node.nodeId().equals(nodeId) == false) { - // we work on different nodes, move counts around - shardPerAttribute.putOrAdd(allocation.routingNodes().node(nodeId).node().getAttributes().get(awarenessAttribute), - 0, -1); - shardPerAttribute.addTo(node.node().getAttributes().get(awarenessAttribute), 1); - } + final RoutingNode currentNode = allocation.routingNodes().node( + shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId()); + if (currentAttributeValue.equals(currentNode.node().getAttributes().get(awarenessAttribute)) == false) { + shardsForCurrentAttributeValue += 1; + } // else this shard is already on a node in the same zone as the target node, so moving it doesn't change the count } else { - shardPerAttribute.addTo(node.node().getAttributes().get(awarenessAttribute), 1); + shardsForCurrentAttributeValue += 1; } } - int numberOfAttributes = nodesPerAttribute.size(); - List fullValues = forcedAwarenessAttributes.get(awarenessAttribute); - if (fullValues != null) { - for (String fullValue : fullValues) { - if (shardPerAttribute.containsKey(fullValue) == false) { - numberOfAttributes++; - } - } - } - // TODO should we remove ones that are not part of full list? + final List forcedValues = forcedAwarenessAttributes.get(awarenessAttribute); + final int valueCount = forcedValues == null + ? actualAttributeValues.size() + : Math.toIntExact(Stream.concat(actualAttributeValues.stream(), forcedValues.stream()).distinct().count()); - final int currentNodeCount = shardPerAttribute.get(node.node().getAttributes().get(awarenessAttribute)); - final int maximumNodeCount = (shardCount + numberOfAttributes - 1) / numberOfAttributes; // ceil(shardCount/numberOfAttributes) - if (currentNodeCount > maximumNodeCount) { + final int maximumShardsPerAttributeValue = (shardCount + valueCount - 1) / valueCount; // ceil(shardCount/valueCount) + if (shardsForCurrentAttributeValue > maximumShardsPerAttributeValue) { return debug ? debugNoTooManyCopies( shardCount, awarenessAttribute, node.node().getAttributes().get(awarenessAttribute), - numberOfAttributes, - StreamSupport.stream(nodesPerAttribute.keys().spliterator(), false).map(c -> c.value).sorted().collect(toList()), - fullValues == null ? null : fullValues.stream().sorted().collect(toList()), - currentNodeCount, - maximumNodeCount) + valueCount, + actualAttributeValues.stream().sorted().collect(toList()), + forcedValues == null ? null : forcedValues.stream().sorted().collect(toList()), + shardsForCurrentAttributeValue, + maximumShardsPerAttributeValue) : Decision.NO; } } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index f8c400313333d..f7f04456926ba 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -173,8 +173,8 @@ ClusterState state() { return clusterStateSupplier.get(); } - private static boolean isMasterUpdateThread() { - return Thread.currentThread().getName().contains(MASTER_UPDATE_THREAD_NAME); + public static boolean isMasterUpdateThread() { + return Thread.currentThread().getName().contains('[' + MASTER_UPDATE_THREAD_NAME + ']'); } public static boolean assertNotMasterUpdateThread(String reason) { From d9703a093706521b9f05d2ebac86eb09903a3103 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 16 Mar 2021 10:39:46 +0000 Subject: [PATCH 2/2] Various renaming --- .../decider/AwarenessAllocationDecider.java | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java index 62661665c0f53..8d5c2b669fb79 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java @@ -133,13 +133,13 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout } final boolean debug = allocation.debugDecision(); - IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index()); + final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index()); if (INDEX_AUTO_EXPAND_REPLICAS_SETTING.get(indexMetadata.getSettings()).expandToAllNodes()) { return YES_AUTO_EXPAND_ALL; } - int shardCount = indexMetadata.getNumberOfReplicas() + 1; // 1 for primary + final int shardCount = indexMetadata.getNumberOfReplicas() + 1; // 1 for primary for (String awarenessAttribute : awarenessAttributes) { // the node the shard exists on must be associated with an awareness attribute if (node.node().getAttributes().containsKey(awarenessAttribute) == false) { @@ -147,12 +147,12 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout } final Set actualAttributeValues = allocation.routingNodes().getAttributeValues(awarenessAttribute); - final String currentAttributeValue = node.node().getAttributes().get(awarenessAttribute); - assert currentAttributeValue != null : "attribute [" + awarenessAttribute + "] missing on " + node.node(); - assert actualAttributeValues.contains(currentAttributeValue) + final String targetAttributeValue = node.node().getAttributes().get(awarenessAttribute); + assert targetAttributeValue != null : "attribute [" + awarenessAttribute + "] missing on " + node.node(); + assert actualAttributeValues.contains(targetAttributeValue) : "attribute [" + awarenessAttribute + "] on " + node.node() + " is not in " + actualAttributeValues; - int shardsForCurrentAttributeValue = 0; + int shardsForTargetAttributeValue = 0; // Will be the count of shards on nodes with attribute `awarenessAttribute` matching the one on `node`. for (ShardRouting assignedShard : allocation.routingNodes().assignedShards(shardRouting.shardId())) { @@ -160,8 +160,8 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout // Note: this also counts relocation targets as that will be the new location of the shard. // Relocation sources should not be counted as the shard is moving away final RoutingNode assignedNode = allocation.routingNodes().node(assignedShard.currentNodeId()); - if (currentAttributeValue.equals(assignedNode.node().getAttributes().get(awarenessAttribute))) { - shardsForCurrentAttributeValue += 1; + if (targetAttributeValue.equals(assignedNode.node().getAttributes().get(awarenessAttribute))) { + shardsForTargetAttributeValue += 1; } } } @@ -170,11 +170,11 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout if (shardRouting.assignedToNode()) { final RoutingNode currentNode = allocation.routingNodes().node( shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId()); - if (currentAttributeValue.equals(currentNode.node().getAttributes().get(awarenessAttribute)) == false) { - shardsForCurrentAttributeValue += 1; + if (targetAttributeValue.equals(currentNode.node().getAttributes().get(awarenessAttribute)) == false) { + shardsForTargetAttributeValue += 1; } // else this shard is already on a node in the same zone as the target node, so moving it doesn't change the count } else { - shardsForCurrentAttributeValue += 1; + shardsForTargetAttributeValue += 1; } } @@ -184,7 +184,7 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout : Math.toIntExact(Stream.concat(actualAttributeValues.stream(), forcedValues.stream()).distinct().count()); final int maximumShardsPerAttributeValue = (shardCount + valueCount - 1) / valueCount; // ceil(shardCount/valueCount) - if (shardsForCurrentAttributeValue > maximumShardsPerAttributeValue) { + if (shardsForTargetAttributeValue > maximumShardsPerAttributeValue) { return debug ? debugNoTooManyCopies( shardCount, awarenessAttribute, @@ -192,7 +192,7 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout valueCount, actualAttributeValues.stream().sorted().collect(toList()), forcedValues == null ? null : forcedValues.stream().sorted().collect(toList()), - shardsForCurrentAttributeValue, + shardsForTargetAttributeValue, maximumShardsPerAttributeValue) : Decision.NO; } @@ -208,8 +208,8 @@ private static Decision debugNoTooManyCopies( int numberOfAttributes, List realAttributes, List forcedAttributes, - int currentNodeCount, - int maximumNodeCount) { + int actualShardCount, + int maximumShardCount) { return Decision.single(Decision.Type.NO, NAME, "there are [%d] copies of this shard and [%d] values for attribute [%s] (%s from nodes in the cluster and %s) so there " + "may be at most [%d] copies of this shard allocated to nodes with each value, but (including this copy) there " + @@ -219,8 +219,8 @@ private static Decision debugNoTooManyCopies( attributeName, realAttributes, forcedAttributes == null ? "no forced awareness" : forcedAttributes + " from forced awareness", - maximumNodeCount, - currentNodeCount, + maximumShardCount, + actualShardCount, attributeName, attributeValue); }