88
99package org .elasticsearch .cluster .routing .allocation .decider ;
1010
11- import com .carrotsearch .hppc .ObjectIntHashMap ;
1211import org .elasticsearch .cluster .metadata .IndexMetadata ;
1312import org .elasticsearch .cluster .routing .RoutingNode ;
1413import org .elasticsearch .cluster .routing .ShardRouting ;
2221import java .util .HashMap ;
2322import java .util .List ;
2423import java .util .Map ;
24+ import java .util .Set ;
2525import java .util .function .Function ;
26- import java .util .stream .StreamSupport ;
26+ import java .util .stream .Stream ;
2727
2828import static java .util .Collections .emptyList ;
2929import static java .util .stream .Collectors .toList ;
@@ -133,70 +133,67 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
133133 }
134134
135135 final boolean debug = allocation .debugDecision ();
136- IndexMetadata indexMetadata = allocation .metadata ().getIndexSafe (shardRouting .index ());
136+ final IndexMetadata indexMetadata = allocation .metadata ().getIndexSafe (shardRouting .index ());
137137
138138 if (INDEX_AUTO_EXPAND_REPLICAS_SETTING .get (indexMetadata .getSettings ()).expandToAllNodes ()) {
139139 return YES_AUTO_EXPAND_ALL ;
140140 }
141141
142- int shardCount = indexMetadata .getNumberOfReplicas () + 1 ; // 1 for primary
142+ final int shardCount = indexMetadata .getNumberOfReplicas () + 1 ; // 1 for primary
143143 for (String awarenessAttribute : awarenessAttributes ) {
144144 // the node the shard exists on must be associated with an awareness attribute
145145 if (node .node ().getAttributes ().containsKey (awarenessAttribute ) == false ) {
146146 return debug ? debugNoMissingAttribute (awarenessAttribute , awarenessAttributes ) : Decision .NO ;
147147 }
148148
149- // build attr_value -> nodes map
150- ObjectIntHashMap <String > nodesPerAttribute = allocation .routingNodes ().nodesPerAttributesCounts (awarenessAttribute );
149+ final Set <String > actualAttributeValues = allocation .routingNodes ().getAttributeValues (awarenessAttribute );
150+ final String targetAttributeValue = node .node ().getAttributes ().get (awarenessAttribute );
151+ assert targetAttributeValue != null : "attribute [" + awarenessAttribute + "] missing on " + node .node ();
152+ assert actualAttributeValues .contains (targetAttributeValue )
153+ : "attribute [" + awarenessAttribute + "] on " + node .node () + " is not in " + actualAttributeValues ;
154+
155+ int shardsForTargetAttributeValue = 0 ;
156+ // Will be the count of shards on nodes with attribute `awarenessAttribute` matching the one on `node`.
151157
152- // build the count of shards per attribute value
153- ObjectIntHashMap <String > shardPerAttribute = new ObjectIntHashMap <>();
154158 for (ShardRouting assignedShard : allocation .routingNodes ().assignedShards (shardRouting .shardId ())) {
155159 if (assignedShard .started () || assignedShard .initializing ()) {
156160 // Note: this also counts relocation targets as that will be the new location of the shard.
157161 // Relocation sources should not be counted as the shard is moving away
158- RoutingNode routingNode = allocation .routingNodes ().node (assignedShard .currentNodeId ());
159- shardPerAttribute .addTo (routingNode .node ().getAttributes ().get (awarenessAttribute ), 1 );
162+ final RoutingNode assignedNode = allocation .routingNodes ().node (assignedShard .currentNodeId ());
163+ if (targetAttributeValue .equals (assignedNode .node ().getAttributes ().get (awarenessAttribute ))) {
164+ shardsForTargetAttributeValue += 1 ;
165+ }
160166 }
161167 }
162168
163169 if (moveToNode ) {
164170 if (shardRouting .assignedToNode ()) {
165- String nodeId = shardRouting .relocating () ? shardRouting .relocatingNodeId () : shardRouting .currentNodeId ();
166- if (node .nodeId ().equals (nodeId ) == false ) {
167- // we work on different nodes, move counts around
168- shardPerAttribute .putOrAdd (allocation .routingNodes ().node (nodeId ).node ().getAttributes ().get (awarenessAttribute ),
169- 0 , -1 );
170- shardPerAttribute .addTo (node .node ().getAttributes ().get (awarenessAttribute ), 1 );
171- }
171+ final RoutingNode currentNode = allocation .routingNodes ().node (
172+ shardRouting .relocating () ? shardRouting .relocatingNodeId () : shardRouting .currentNodeId ());
173+ if (targetAttributeValue .equals (currentNode .node ().getAttributes ().get (awarenessAttribute )) == false ) {
174+ shardsForTargetAttributeValue += 1 ;
175+ } // else this shard is already on a node in the same zone as the target node, so moving it doesn't change the count
172176 } else {
173- shardPerAttribute . addTo ( node . node (). getAttributes (). get ( awarenessAttribute ), 1 ) ;
177+ shardsForTargetAttributeValue += 1 ;
174178 }
175179 }
176180
177- int numberOfAttributes = nodesPerAttribute .size ();
178- List <String > fullValues = forcedAwarenessAttributes .get (awarenessAttribute );
179- if (fullValues != null ) {
180- for (String fullValue : fullValues ) {
181- if (shardPerAttribute .containsKey (fullValue ) == false ) {
182- numberOfAttributes ++;
183- }
184- }
185- }
186- // TODO should we remove ones that are not part of full list?
181+ final List <String > forcedValues = forcedAwarenessAttributes .get (awarenessAttribute );
182+ final int valueCount = forcedValues == null
183+ ? actualAttributeValues .size ()
184+ : Math .toIntExact (Stream .concat (actualAttributeValues .stream (), forcedValues .stream ()).distinct ().count ());
187185
188- final int currentNodeCount = shardPerAttribute .get (node .node ().getAttributes ().get (awarenessAttribute ));
189- final int maximumNodeCount = (shardCount + numberOfAttributes - 1 ) / numberOfAttributes ; // ceil(shardCount/numberOfAttributes)
190- if (currentNodeCount > maximumNodeCount ) {
186+ final int maximumShardsPerAttributeValue = (shardCount + valueCount - 1 ) / valueCount ; // ceil(shardCount/valueCount)
187+ if (shardsForTargetAttributeValue > maximumShardsPerAttributeValue ) {
191188 return debug ? debugNoTooManyCopies (
192189 shardCount ,
193190 awarenessAttribute ,
194191 node .node ().getAttributes ().get (awarenessAttribute ),
195- numberOfAttributes ,
196- StreamSupport .stream (nodesPerAttribute . keys (). spliterator (), false ). map ( c -> c . value ).sorted ().collect (toList ()),
197- fullValues == null ? null : fullValues .stream ().sorted ().collect (toList ()),
198- currentNodeCount ,
199- maximumNodeCount )
192+ valueCount ,
193+ actualAttributeValues .stream ().sorted ().collect (toList ()),
194+ forcedValues == null ? null : forcedValues .stream ().sorted ().collect (toList ()),
195+ shardsForTargetAttributeValue ,
196+ maximumShardsPerAttributeValue )
200197 : Decision .NO ;
201198 }
202199 }
@@ -211,8 +208,8 @@ private static Decision debugNoTooManyCopies(
211208 int numberOfAttributes ,
212209 List <String > realAttributes ,
213210 List <String > forcedAttributes ,
214- int currentNodeCount ,
215- int maximumNodeCount ) {
211+ int actualShardCount ,
212+ int maximumShardCount ) {
216213 return Decision .single (Decision .Type .NO , NAME ,
217214 "there are [%d] copies of this shard and [%d] values for attribute [%s] (%s from nodes in the cluster and %s) so there " +
218215 "may be at most [%d] copies of this shard allocated to nodes with each value, but (including this copy) there " +
@@ -222,8 +219,8 @@ private static Decision debugNoTooManyCopies(
222219 attributeName ,
223220 realAttributes ,
224221 forcedAttributes == null ? "no forced awareness" : forcedAttributes + " from forced awareness" ,
225- maximumNodeCount ,
226- currentNodeCount ,
222+ maximumShardCount ,
223+ actualShardCount ,
227224 attributeName ,
228225 attributeValue );
229226 }
0 commit comments