Skip to content

Commit 9e530e1

Browse files
[ML] Maintain ml node attribute for processors as integer for BWC (#89896)
In #89645 I switched the ml node attribute for allocated processors to write a string representation of a double value. However, that means the attribute value cannot be parsed in a mixed cluster version when the node that tries to read it is before 8.5.0. This commit addresses this problem by introducing a new attribute for the version the has a string representation of the double value and keeps writing the older version of the attribute as integer. It also refactors the logic of parsing the attribute value given a node in a single place as the logic now involves BWC and the duplication is adding complexity.
1 parent 984d225 commit 9e530e1

File tree

5 files changed

+64
-55
lines changed

5 files changed

+64
-55
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,11 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
557557
private static final String PRE_V8_MAX_OPEN_JOBS_NODE_ATTR = "ml.max_open_jobs";
558558
public static final String MACHINE_MEMORY_NODE_ATTR = "ml.machine_memory";
559559
public static final String MAX_JVM_SIZE_NODE_ATTR = "ml.max_jvm_size";
560-
public static final String ALLOCATED_PROCESSORS_NODE_ATTR = "ml.allocated_processors";
560+
561+
// TODO Remove if compatibility with 8.x is no longer necessary
562+
public static final String PRE_V_8_5_ALLOCATED_PROCESSORS_NODE_ATTR = "ml.allocated_processors";
563+
564+
public static final String ALLOCATED_PROCESSORS_NODE_ATTR = "ml.allocated_processors_double";
561565
public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS = Setting.intSetting(
562566
"xpack.ml.node_concurrent_job_allocations",
563567
2,
@@ -753,6 +757,7 @@ public Settings additionalSettings() {
753757
String maxOpenJobsPerNodeNodeAttrName = "node.attr." + PRE_V8_MAX_OPEN_JOBS_NODE_ATTR;
754758
String machineMemoryAttrName = "node.attr." + MACHINE_MEMORY_NODE_ATTR;
755759
String jvmSizeAttrName = "node.attr." + MAX_JVM_SIZE_NODE_ATTR;
760+
String deprecatedAllocatedProcessorsAttrName = "node.attr." + PRE_V_8_5_ALLOCATED_PROCESSORS_NODE_ATTR;
756761
String allocatedProcessorsAttrName = "node.attr." + ALLOCATED_PROCESSORS_NODE_ATTR;
757762

758763
if (enabled == false) {
@@ -768,11 +773,22 @@ public Settings additionalSettings() {
768773
Long.toString(OsProbe.getInstance().osStats().getMem().getAdjustedTotal().getBytes())
769774
);
770775
addMlNodeAttribute(additionalSettings, jvmSizeAttrName, Long.toString(Runtime.getRuntime().maxMemory()));
776+
addMlNodeAttribute(
777+
additionalSettings,
778+
deprecatedAllocatedProcessorsAttrName,
779+
Integer.toString(EsExecutors.allocatedProcessors(settings))
780+
);
771781
addMlNodeAttribute(additionalSettings, allocatedProcessorsAttrName, Double.toString(getAllocatedProcessors().count()));
772782
// This is not used in v8 and higher, but users are still prevented from setting it directly to avoid confusion
773783
disallowMlNodeAttributes(maxOpenJobsPerNodeNodeAttrName);
774784
} else {
775-
disallowMlNodeAttributes(maxOpenJobsPerNodeNodeAttrName, machineMemoryAttrName, jvmSizeAttrName, allocatedProcessorsAttrName);
785+
disallowMlNodeAttributes(
786+
maxOpenJobsPerNodeNodeAttrName,
787+
machineMemoryAttrName,
788+
jvmSizeAttrName,
789+
deprecatedAllocatedProcessorsAttrName,
790+
allocatedProcessorsAttrName
791+
);
776792
}
777793
return additionalSettings.build();
778794
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.common.Strings;
1414
import org.elasticsearch.common.settings.Settings;
1515
import org.elasticsearch.common.unit.ByteSizeValue;
16-
import org.elasticsearch.common.unit.Processors;
1716
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
1817
import org.elasticsearch.core.Nullable;
1918
import org.elasticsearch.core.TimeValue;
@@ -30,6 +29,7 @@
3029
import org.elasticsearch.xpack.ml.job.NodeLoad;
3130
import org.elasticsearch.xpack.ml.job.NodeLoadDetector;
3231
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
32+
import org.elasticsearch.xpack.ml.utils.MlProcessors;
3333
import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator;
3434

3535
import java.time.Duration;
@@ -819,20 +819,7 @@ static boolean modelAssignmentsRequireMoreThanHalfCpu(Collection<TrainedModelAss
819819
int totalRequiredProcessors = assignments.stream()
820820
.mapToInt(t -> t.getTaskParams().getNumberOfAllocations() * t.getTaskParams().getThreadsPerAllocation())
821821
.sum();
822-
int totalMlProcessors = mlNodes.stream().mapToInt(node -> {
823-
String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR);
824-
try {
825-
double allocatedProcessorsAsDouble = Double.parseDouble(allocatedProcessorsString);
826-
return allocatedProcessorsAsDouble > 0 ? Processors.of(allocatedProcessorsAsDouble).roundUp() : 0;
827-
} catch (NumberFormatException e) {
828-
assert e == null
829-
: MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR
830-
+ " should parse because we set it internally: invalid value was ["
831-
+ allocatedProcessorsString
832-
+ "]";
833-
return 0;
834-
}
835-
}).sum();
822+
int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node).roundUp()).sum();
836823
return totalRequiredProcessors * 2 > totalMlProcessors;
837824
}
838825

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
import org.elasticsearch.logging.Logger;
1717
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext;
1818
import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment;
19-
import org.elasticsearch.xpack.ml.MachineLearning;
2019
import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentMetadata;
20+
import org.elasticsearch.xpack.ml.utils.MlProcessors;
2121

2222
import java.time.Instant;
2323
import java.util.List;
@@ -138,30 +138,12 @@ MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNode
138138
Processors maxNodeProcessors = Processors.ZERO;
139139
Processors tierProcessors = Processors.ZERO;
140140
for (DiscoveryNode node : mlNodes) {
141-
Processors nodeProcessors = getProcessors(node);
141+
Processors nodeProcessors = MlProcessors.get(node);
142142
if (nodeProcessors.compareTo(maxNodeProcessors) > 0) {
143143
maxNodeProcessors = nodeProcessors;
144144
}
145145
tierProcessors = tierProcessors.plus(nodeProcessors);
146146
}
147147
return MlProcessorAutoscalingCapacity.builder(maxNodeProcessors, tierProcessors).build();
148148
}
149-
150-
private Processors getProcessors(DiscoveryNode node) {
151-
String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR);
152-
if (allocatedProcessorsString == null) {
153-
return Processors.ZERO;
154-
}
155-
try {
156-
double processorsAsDouble = Double.parseDouble(allocatedProcessorsString);
157-
return processorsAsDouble > 0 ? Processors.of(processorsAsDouble) : Processors.ZERO;
158-
} catch (NumberFormatException e) {
159-
assert e == null
160-
: MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR
161-
+ " should parse because we set it internally: invalid value was ["
162-
+ allocatedProcessorsString
163-
+ "]";
164-
return Processors.ZERO;
165-
}
166-
}
167149
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.cluster.node.DiscoveryNode;
1515
import org.elasticsearch.common.Strings;
1616
import org.elasticsearch.common.unit.ByteSizeValue;
17-
import org.elasticsearch.common.unit.Processors;
1817
import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction;
1918
import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingInfo;
2019
import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingState;
@@ -24,6 +23,7 @@
2423
import org.elasticsearch.xpack.ml.inference.assignment.planning.AssignmentPlan;
2524
import org.elasticsearch.xpack.ml.inference.assignment.planning.ZoneAwareAssignmentPlanner;
2625
import org.elasticsearch.xpack.ml.job.NodeLoad;
26+
import org.elasticsearch.xpack.ml.utils.MlProcessors;
2727

2828
import java.util.ArrayList;
2929
import java.util.Collection;
@@ -142,7 +142,7 @@ private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap() {
142142
// We subtract native inference memory as the planner expects available memory for
143143
// native inference including current assignments.
144144
getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(load),
145-
getNodeAllocatedProcessors(discoveryNode).roundUp()
145+
MlProcessors.get(discoveryNode).roundUp()
146146
)
147147
);
148148
} else {
@@ -158,20 +158,6 @@ private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap() {
158158
}));
159159
}
160160

161-
private static Processors getNodeAllocatedProcessors(DiscoveryNode node) {
162-
String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR);
163-
try {
164-
double allocatedProcessorsAsDouble = allocatedProcessorsString == null ? 0.0 : Double.parseDouble(allocatedProcessorsString);
165-
return allocatedProcessorsAsDouble > 0 ? Processors.of(allocatedProcessorsAsDouble) : Processors.ZERO;
166-
} catch (NumberFormatException e) {
167-
assert e == null
168-
: MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR
169-
+ " should parse because we set it internally: invalid value was "
170-
+ allocatedProcessorsString;
171-
return Processors.ZERO;
172-
}
173-
}
174-
175161
private static long getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(NodeLoad load) {
176162
return load.getFreeMemoryExcludingPerNodeOverhead() - load.getAssignedNativeInferenceMemory();
177163
}
@@ -267,7 +253,7 @@ private Optional<String> explainAssignment(
267253
// But we should also check if we managed to assign a model during the rebalance for which
268254
// we check if the node has used up any of its allocated processors.
269255
boolean isPerNodeOverheadAccountedFor = load.getNumAssignedJobsAndModels() > 0
270-
|| assignmentPlan.getRemainingNodeCores(load.getNodeId()) < getNodeAllocatedProcessors(node).roundUp();
256+
|| assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node).roundUp();
271257
long requiredMemory = model.memoryBytes() + (isPerNodeOverheadAccountedFor
272258
? 0
273259
: MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes());
@@ -296,7 +282,7 @@ private Optional<String> explainAssignment(
296282
"This node has insufficient allocated processors. Available processors [{}], free processors [{}], "
297283
+ "processors required for each allocation of this model [{}]",
298284
new Object[] {
299-
getNodeAllocatedProcessors(node).roundUp(),
285+
MlProcessors.get(node).roundUp(),
300286
assignmentPlan.getRemainingNodeCores(node.getId()),
301287
model.threadsPerAllocation() }
302288
)
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.ml.utils;
9+
10+
import org.elasticsearch.Version;
11+
import org.elasticsearch.cluster.node.DiscoveryNode;
12+
import org.elasticsearch.common.unit.Processors;
13+
import org.elasticsearch.xpack.ml.MachineLearning;
14+
15+
public final class MlProcessors {
16+
17+
private MlProcessors() {}
18+
19+
public static Processors get(DiscoveryNode node) {
20+
String allocatedProcessorsString = node.getVersion().onOrAfter(Version.V_8_5_0)
21+
? node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR)
22+
: node.getAttributes().get(MachineLearning.PRE_V_8_5_ALLOCATED_PROCESSORS_NODE_ATTR);
23+
if (allocatedProcessorsString == null) {
24+
return Processors.ZERO;
25+
}
26+
try {
27+
double processorsAsDouble = Double.parseDouble(allocatedProcessorsString);
28+
return processorsAsDouble > 0 ? Processors.of(processorsAsDouble) : Processors.ZERO;
29+
} catch (NumberFormatException e) {
30+
assert e == null
31+
: MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR
32+
+ " should parse because we set it internally: invalid value was ["
33+
+ allocatedProcessorsString
34+
+ "]";
35+
return Processors.ZERO;
36+
}
37+
}
38+
}

0 commit comments

Comments
 (0)