Skip to content

Commit 2938673

Browse files
authored
[ML] Add setting to scale the processor count used in the model assignment planner (#98296)
Adds the ml.allocated_processors_scale setting which is used to scale the value of ml.allocated_processors_double. This setting influences the number of model allocations that can fit on a node
1 parent d0b2f65 commit 2938673

File tree

12 files changed

+188
-54
lines changed

12 files changed

+188
-54
lines changed

docs/changelog/98296.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 98296
2+
summary: Add setting to scale the processor count used in the model assignment planner
3+
area: Machine Learning
4+
type: enhancement
5+
issues: []

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,26 @@ public void loadExtensions(ExtensionLoader loader) {
583583

584584
public static final String ALLOCATED_PROCESSORS_NODE_ATTR = "ml.allocated_processors_double";
585585

586+
/**
587+
* For the NLP model assignment planner.
588+
* The {@link #ALLOCATED_PROCESSORS_NODE_ATTR} attribute may be
589+
* measured in hyper-threaded or virtual cores when the user
590+
* would like the planner to consider logical cores.
591+
*
592+
* ALLOCATED_PROCESSORS_NODE_ATTR is divided by this setting,
593+
* the default value of 1 means the attribute is unchanged, a value
594+
* of 2 accounts for hyper-threaded cores with 2 threads per core.
595+
* Increasing this setting above 1 reduces the number of model
596+
* allocations that can be deployed on a node.
597+
*/
598+
public static final Setting<Integer> ALLOCATED_PROCESSORS_SCALE = Setting.intSetting(
599+
"ml.allocated_processors_scale",
600+
1,
601+
1,
602+
Property.OperatorDynamic,
603+
Property.NodeScope
604+
);
605+
586606
public static final String ML_CONFIG_VERSION_NODE_ATTR = MlConfigVersion.ML_CONFIG_VERSION_NODE_ATTR;
587607

588608
public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS = Setting.intSetting(

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,10 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
9494
final MlAutoscalingContext mlContext = new MlAutoscalingContext(clusterState);
9595
final NativeMemoryCapacity currentNativeMemoryCapacity = memoryDecider.currentScale(mlContext.mlNodes);
9696
final MlMemoryAutoscalingCapacity currentMemoryCapacity = memoryDecider.capacityFromNativeMemory(currentNativeMemoryCapacity);
97-
final MlProcessorAutoscalingCapacity currentProcessorCapacity = processorDecider.computeCurrentCapacity(mlContext.mlNodes);
97+
final MlProcessorAutoscalingCapacity currentProcessorCapacity = processorDecider.computeCurrentCapacity(
98+
mlContext.mlNodes,
99+
configuration
100+
);
98101

99102
final MlScalingReason.Builder reasonBuilder = MlScalingReason.builder(mlContext)
100103
.setCurrentMlCapacity(

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTracker.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ public static void getMlAutoscalingStats(
8181
long modelMemoryAvailableFirstNode = mlNodes.length > 0
8282
? NativeMemoryCalculator.allowedBytesForMl(clusterState.nodes().get(mlNodes[0]), settings).orElse(0L)
8383
: 0L;
84-
int processorsAvailableFirstNode = mlNodes.length > 0 ? MlProcessors.get(clusterState.nodes().get(mlNodes[0])).roundDown() : 0;
84+
int processorsAvailableFirstNode = mlNodes.length > 0
85+
? MlProcessors.get(clusterState.nodes().get(mlNodes[0]), settings).roundDown()
86+
: 0;
8587

8688
// Todo: MAX_LOW_PRIORITY_MODELS_PER_NODE not checked yet
8789
int maxOpenJobsPerNode = MAX_OPEN_JOBS_PER_NODE.get(settings);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeci
258258
}
259259
// We should keep this check here as well as in the processor decider while cloud is not
260260
// reacting to processor autoscaling.
261-
if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes)) {
261+
if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes, configuration)) {
262262
logger.debug("not down-scaling; model assignments require more than half of the ML tier's allocated processors");
263263
return null;
264264
}
@@ -815,11 +815,15 @@ static MlMemoryAutoscalingCapacity ensureScaleDown(
815815
return newCapacity;
816816
}
817817

818-
static boolean modelAssignmentsRequireMoreThanHalfCpu(Collection<TrainedModelAssignment> assignments, List<DiscoveryNode> mlNodes) {
818+
static boolean modelAssignmentsRequireMoreThanHalfCpu(
819+
Collection<TrainedModelAssignment> assignments,
820+
List<DiscoveryNode> mlNodes,
821+
Settings settings
822+
) {
819823
int totalRequiredProcessors = assignments.stream()
820824
.mapToInt(t -> t.getTaskParams().getNumberOfAllocations() * t.getTaskParams().getThreadsPerAllocation())
821825
.sum();
822-
int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node).roundUp()).sum();
826+
int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node, settings).roundUp()).sum();
823827
return totalRequiredProcessors * 2 > totalMlProcessors;
824828
}
825829

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingD
5252
).build();
5353
}
5454

55-
final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes);
55+
final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes, configuration);
5656

5757
final MlProcessorAutoscalingCapacity requiredCapacity = computeRequiredCapacity(trainedModelAssignmentMetadata).build();
5858

@@ -64,7 +64,8 @@ public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingD
6464

6565
if (MlMemoryAutoscalingDecider.modelAssignmentsRequireMoreThanHalfCpu(
6666
trainedModelAssignmentMetadata.allAssignments().values(),
67-
mlContext.mlNodes
67+
mlContext.mlNodes,
68+
configuration
6869
)) {
6970
return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors())
7071
.setReason("not scaling down as model assignments require more than half of the ML tier's allocated processors")
@@ -136,11 +137,11 @@ private MlProcessorAutoscalingCapacity.Builder computeRequiredCapacity(TrainedMo
136137
);
137138
}
138139

139-
MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes) {
140+
MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes, Settings settings) {
140141
Processors maxNodeProcessors = Processors.ZERO;
141142
Processors tierProcessors = Processors.ZERO;
142143
for (DiscoveryNode node : mlNodes) {
143-
Processors nodeProcessors = MlProcessors.get(node);
144+
Processors nodeProcessors = MlProcessors.get(node, settings);
144145
if (nodeProcessors.compareTo(maxNodeProcessors) > 0) {
145146
maxNodeProcessors = nodeProcessors;
146147
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ private TrainedModelAssignmentMetadata.Builder rebalanceAssignments(
488488
nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(currentState),
489489
modelToAdd
490490
);
491-
TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance();
491+
TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance(clusterService.getSettings());
492492
if (modelToAdd.isPresent()) {
493493
checkModelIsFullyAllocatedIfScalingIsNotPossible(modelToAdd.get().getDeploymentId(), rebalanced, nodes);
494494
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.ResourceAlreadyExistsException;
1414
import org.elasticsearch.cluster.node.DiscoveryNode;
1515
import org.elasticsearch.common.Strings;
16+
import org.elasticsearch.common.settings.Settings;
1617
import org.elasticsearch.common.unit.ByteSizeValue;
1718
import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction;
1819
import org.elasticsearch.xpack.core.ml.inference.assignment.Priority;
@@ -63,7 +64,7 @@ class TrainedModelAssignmentRebalancer {
6364
this.deploymentToAdd = Objects.requireNonNull(deploymentToAdd);
6465
}
6566

66-
TrainedModelAssignmentMetadata.Builder rebalance() throws Exception {
67+
TrainedModelAssignmentMetadata.Builder rebalance(Settings settings) {
6768
if (deploymentToAdd.isPresent() && currentMetadata.hasDeployment(deploymentToAdd.get().getDeploymentId())) {
6869
throw new ResourceAlreadyExistsException(
6970
"[{}] assignment for deployment with model [{}] already exists",
@@ -77,8 +78,8 @@ TrainedModelAssignmentMetadata.Builder rebalance() throws Exception {
7778
return TrainedModelAssignmentMetadata.Builder.fromMetadata(currentMetadata);
7879
}
7980

80-
AssignmentPlan assignmentPlan = computeAssignmentPlan();
81-
return buildAssignmentsFromPlan(assignmentPlan);
81+
AssignmentPlan assignmentPlan = computeAssignmentPlan(settings);
82+
return buildAssignmentsFromPlan(assignmentPlan, settings);
8283
}
8384

8485
private boolean areAllModelsSatisfiedAndNoOutdatedRoutingEntries() {
@@ -91,8 +92,8 @@ private boolean areAllModelsSatisfiedAndNoOutdatedRoutingEntries() {
9192
return true;
9293
}
9394

94-
AssignmentPlan computeAssignmentPlan() {
95-
final Map<List<String>, List<AssignmentPlan.Node>> nodesByZone = createNodesByZoneMap();
95+
AssignmentPlan computeAssignmentPlan(Settings settings) {
96+
final Map<List<String>, List<AssignmentPlan.Node>> nodesByZone = createNodesByZoneMap(settings);
9697
final Set<String> assignableNodeIds = nodesByZone.values()
9798
.stream()
9899
.flatMap(List::stream)
@@ -270,7 +271,7 @@ private Map<String, Integer> findFittingAssignments(
270271
return fittingAssignments;
271272
}
272273

273-
private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap() {
274+
private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap(Settings settings) {
274275
return mlNodesByZone.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> {
275276
Collection<DiscoveryNode> discoveryNodes = e.getValue();
276277
List<AssignmentPlan.Node> nodes = new ArrayList<>();
@@ -284,7 +285,7 @@ private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap() {
284285
// We subtract native inference memory as the planner expects available memory for
285286
// native inference including current assignments.
286287
getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(load),
287-
MlProcessors.get(discoveryNode).roundUp()
288+
MlProcessors.get(discoveryNode, settings).roundUp()
288289
)
289290
);
290291
} else {
@@ -304,7 +305,7 @@ private static long getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(
304305
return load.getFreeMemoryExcludingPerNodeOverhead() - load.getAssignedNativeInferenceMemory();
305306
}
306307

307-
private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(AssignmentPlan assignmentPlan) {
308+
private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(AssignmentPlan assignmentPlan, Settings settings) {
308309
TrainedModelAssignmentMetadata.Builder builder = TrainedModelAssignmentMetadata.Builder.empty();
309310
for (AssignmentPlan.Deployment deployment : assignmentPlan.models()) {
310311
TrainedModelAssignment existingAssignment = currentMetadata.getDeploymentAssignment(deployment.id());
@@ -342,7 +343,7 @@ private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(Assignme
342343
}
343344
assignmentBuilder.calculateAndSetAssignmentState();
344345

345-
explainAssignments(assignmentPlan, nodeLoads, deployment).ifPresent(assignmentBuilder::setReason);
346+
explainAssignments(assignmentPlan, nodeLoads, deployment, settings).ifPresent(assignmentBuilder::setReason);
346347
builder.addNewAssignment(deployment.id(), assignmentBuilder);
347348
}
348349
return builder;
@@ -351,7 +352,8 @@ private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(Assignme
351352
private Optional<String> explainAssignments(
352353
AssignmentPlan assignmentPlan,
353354
Map<DiscoveryNode, NodeLoad> nodeLoads,
354-
AssignmentPlan.Deployment deployment
355+
AssignmentPlan.Deployment deployment,
356+
Settings settings
355357
) {
356358
if (assignmentPlan.satisfiesAllocations(deployment)) {
357359
return Optional.empty();
@@ -363,7 +365,7 @@ private Optional<String> explainAssignments(
363365

364366
Map<String, String> nodeToReason = new TreeMap<>();
365367
for (Map.Entry<DiscoveryNode, NodeLoad> nodeAndLoad : nodeLoads.entrySet()) {
366-
Optional<String> reason = explainAssignment(assignmentPlan, nodeAndLoad.getKey(), nodeAndLoad.getValue(), deployment);
368+
Optional<String> reason = explainAssignment(assignmentPlan, nodeAndLoad.getKey(), nodeAndLoad.getValue(), deployment, settings);
367369
reason.ifPresent(s -> nodeToReason.put(nodeAndLoad.getKey().getId(), s));
368370
}
369371

@@ -382,7 +384,8 @@ private Optional<String> explainAssignment(
382384
AssignmentPlan assignmentPlan,
383385
DiscoveryNode node,
384386
NodeLoad load,
385-
AssignmentPlan.Deployment deployment
387+
AssignmentPlan.Deployment deployment,
388+
Settings settings
386389
) {
387390
if (Strings.isNullOrEmpty(load.getError()) == false) {
388391
return Optional.of(load.getError());
@@ -395,7 +398,7 @@ private Optional<String> explainAssignment(
395398
// But we should also check if we managed to assign a model during the rebalance for which
396399
// we check if the node has used up any of its allocated processors.
397400
boolean isPerNodeOverheadAccountedFor = load.getNumAssignedJobsAndModels() > 0
398-
|| assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node).roundUp();
401+
|| assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node, settings).roundUp();
399402
long requiredMemory = deployment.memoryBytes() + (isPerNodeOverheadAccountedFor
400403
? 0
401404
: MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes());
@@ -424,7 +427,7 @@ private Optional<String> explainAssignment(
424427
"This node has insufficient allocated processors. Available processors [{}], free processors [{}], "
425428
+ "processors required for each allocation of this model [{}]",
426429
new Object[] {
427-
MlProcessors.get(node).roundUp(),
430+
MlProcessors.get(node, settings).roundUp(),
428431
assignmentPlan.getRemainingNodeCores(node.getId()),
429432
deployment.threadsPerAllocation() }
430433
)

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlProcessors.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,15 @@
88
package org.elasticsearch.xpack.ml.utils;
99

1010
import org.elasticsearch.cluster.node.DiscoveryNode;
11+
import org.elasticsearch.common.settings.Settings;
1112
import org.elasticsearch.common.unit.Processors;
1213
import org.elasticsearch.xpack.ml.MachineLearning;
1314

1415
public final class MlProcessors {
1516

1617
private MlProcessors() {}
1718

18-
public static Processors get(DiscoveryNode node) {
19+
public static Processors get(DiscoveryNode node, Settings settings) {
1920
// Try getting the most modern setting, and if that's null then instead get the older setting. (If both are null then return zero.)
2021
String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR);
2122
if (allocatedProcessorsString == null) {
@@ -26,7 +27,19 @@ public static Processors get(DiscoveryNode node) {
2627
}
2728
try {
2829
double processorsAsDouble = Double.parseDouble(allocatedProcessorsString);
29-
return processorsAsDouble > 0 ? Processors.of(processorsAsDouble) : Processors.ZERO;
30+
if (processorsAsDouble <= 0) {
31+
return Processors.ZERO;
32+
}
33+
34+
Integer scale = null;
35+
if (settings != null) {
36+
scale = MachineLearning.ALLOCATED_PROCESSORS_SCALE.get(settings);
37+
}
38+
if (scale != null) {
39+
processorsAsDouble = processorsAsDouble / scale;
40+
}
41+
return Processors.of(processorsAsDouble);
42+
3043
} catch (NumberFormatException e) {
3144
assert e == null
3245
: MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDeciderTests.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,7 +1079,8 @@ public void testCpuModelAssignmentRequirements() {
10791079
)
10801080
).build()
10811081
),
1082-
withMlNodes("ml_node_1", "ml_node_2")
1082+
withMlNodes("ml_node_1", "ml_node_2"),
1083+
Settings.EMPTY
10831084
)
10841085
);
10851086
assertTrue(
@@ -1110,7 +1111,8 @@ public void testCpuModelAssignmentRequirements() {
11101111
)
11111112
).build()
11121113
),
1113-
withMlNodes("ml_node_1", "ml_node_2")
1114+
withMlNodes("ml_node_1", "ml_node_2"),
1115+
Settings.EMPTY
11141116
)
11151117
);
11161118
assertFalse(
@@ -1141,7 +1143,8 @@ public void testCpuModelAssignmentRequirements() {
11411143
)
11421144
).build()
11431145
),
1144-
withMlNodes("ml_node_1", "ml_node_2", "ml_node_3", "ml_node_4")
1146+
withMlNodes("ml_node_1", "ml_node_2", "ml_node_3", "ml_node_4"),
1147+
Settings.EMPTY
11451148
)
11461149
);
11471150
}

0 commit comments

Comments
 (0)