From 4d0f16a94684fc6bb1b88639bbe3daad767593e2 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 23 Aug 2022 17:07:03 +0300 Subject: [PATCH 1/4] [ML] Add processor autoscaling decider This adds a processor decider to the ML autoscaling decider service. This first implementation is simple and naive. It simply computes the required processor capacity to be the max trained model deployment `threads_per_allocation` for the node, and the sum of all processors required by trained model deployments for the tier. --- .../MlAutoscalingDeciderService.java | 32 +- .../MlMemoryAutoscalingDecider.java | 3 - .../MlProcessorAutoscalingCapacity.java | 49 ++ .../MlProcessorAutoscalingDecider.java | 160 ++++++ .../MlMemoryAutoscalingDeciderTests.java | 14 - .../MlProcessorAutoscalingDeciderTests.java | 457 ++++++++++++++++++ 6 files changed, 693 insertions(+), 22 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingCapacity.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDeciderTests.java diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java index 9a4c624d21ff0..691a7fc4291a2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java @@ -39,6 +39,7 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L private final ScaleTimer scaleTimer; private final MlMemoryAutoscalingDecider memoryDecider; + private final MlProcessorAutoscalingDecider processorDecider; private volatile boolean isMaster; @@ -66,6 +67,7 @@ public MlAutoscalingDeciderService( nodeLoadDetector, scaleTimer ); + this.processorDecider = new MlProcessorAutoscalingDecider(scaleTimer, nodeAvailabilityZoneMapper); clusterService.addLocalNodeMasterListener(this); } @@ -91,12 +93,21 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider final MlAutoscalingContext mlContext = new MlAutoscalingContext(clusterState); final NativeMemoryCapacity currentNativeMemoryCapacity = memoryDecider.currentScale(mlContext.mlNodes); final MlMemoryAutoscalingCapacity currentMemoryCapacity = memoryDecider.capacityFromNativeMemory(currentNativeMemoryCapacity); + final MlProcessorAutoscalingCapacity currentProcessorCapacity = processorDecider.computeCurrentCapacity(mlContext.mlNodes); final MlScalingReason.Builder reasonBuilder = MlScalingReason.builder(mlContext) .setCurrentMlCapacity( new AutoscalingCapacity( - new AutoscalingCapacity.AutoscalingResources(null, currentMemoryCapacity.tierSize(), null), - new AutoscalingCapacity.AutoscalingResources(null, currentMemoryCapacity.nodeSize(), null) + new AutoscalingCapacity.AutoscalingResources( + null, + currentMemoryCapacity.tierSize(), + Integer.valueOf(currentProcessorCapacity.tierProcessors()).floatValue() + ), + new AutoscalingCapacity.AutoscalingResources( + null, + currentMemoryCapacity.nodeSize(), + Integer.valueOf(currentProcessorCapacity.nodeProcessors()).floatValue() + ) ) ) .setPassedConfiguration(configuration); @@ -109,12 +120,23 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider } MlMemoryAutoscalingCapacity memoryCapacity = memoryDecider.scale(configuration, context, mlContext); - reasonBuilder.setSimpleReason(memoryCapacity.reason()); + MlProcessorAutoscalingCapacity processorCapacity = processorDecider.scale(configuration, context, mlContext); + reasonBuilder.setSimpleReason( + String.format(Locale.ROOT, "[memory_decider] %s; [processor_decider] %s", memoryCapacity.reason(), processorCapacity.reason()) + ); return new AutoscalingDeciderResult( new AutoscalingCapacity( - new AutoscalingCapacity.AutoscalingResources(null, memoryCapacity.tierSize(), null), - new AutoscalingCapacity.AutoscalingResources(null, memoryCapacity.nodeSize(), null) + new AutoscalingCapacity.AutoscalingResources( + null, + memoryCapacity.tierSize(), + Integer.valueOf(processorCapacity.tierProcessors()).floatValue() + ), + new AutoscalingCapacity.AutoscalingResources( + null, + memoryCapacity.nodeSize(), + Integer.valueOf(processorCapacity.nodeProcessors()).floatValue() + ) ), reasonBuilder.build() ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java index c836a8a147888..cd5a53145b38f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java @@ -185,8 +185,6 @@ public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeci final List partiallyAllocatedModels = mlContext.findPartiallyAllocatedModels(); - // TODO for autoscaling by memory, we only care about if the model is allocated to at least one node (see above) - // We should do this check in our autoscaling by processor count service, which will be a separate decider for readability's sake if (mlContext.waitingAnalyticsJobs.isEmpty() == false || mlContext.waitingSnapshotUpgrades.isEmpty() == false || mlContext.waitingAnomalyJobs.isEmpty() == false @@ -257,7 +255,6 @@ public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeci if (capacity == null) { return null; } - // TODO we should remove this when we can auto-scale (down and up) via a new CPU auto-scaling decider if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes)) { logger.debug("not down-scaling; model assignments require more than half of the ML tier's allocated processors"); return null; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingCapacity.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingCapacity.java new file mode 100644 index 0000000000000..daed88431d214 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingCapacity.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.autoscaling; + +public record MlProcessorAutoscalingCapacity(int nodeProcessors, int tierProcessors, String reason) { + + public static Builder builder(int nodeProcessors, int tierProcessors) { + return new Builder(nodeProcessors, tierProcessors); + } + + @Override + public String toString() { + return "MlProcessorAutoscalingCapacity{" + + "nodeProcessors=" + + nodeProcessors + + ", tierProcessors=" + + tierProcessors + + ", reason='" + + reason + + '\'' + + '}'; + } + + public static class Builder { + + private int nodeProcessors; + private int tierProcessors; + private String reason; + + public Builder(int nodeProcessors, int tierProcessors) { + this.nodeProcessors = nodeProcessors; + this.tierProcessors = tierProcessors; + } + + public Builder setReason(String reason) { + this.reason = reason; + return this; + } + + MlProcessorAutoscalingCapacity build() { + return new MlProcessorAutoscalingCapacity(nodeProcessors, tierProcessors, reason); + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java new file mode 100644 index 0000000000000..22b9d2cad7096 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java @@ -0,0 +1,160 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.autoscaling; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentElasticsearchExtension; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext; +import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentMetadata; + +import java.time.Instant; +import java.util.List; +import java.util.Locale; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.time.Instant.ofEpochMilli; +import static org.elasticsearch.common.xcontent.XContentElasticsearchExtension.DEFAULT_FORMATTER; +import static org.elasticsearch.core.Strings.format; + +class MlProcessorAutoscalingDecider { + + private static final Logger logger = LogManager.getLogger(MlProcessorAutoscalingDecider.class); + + private final ScaleTimer scaleTimer; + private final NodeAvailabilityZoneMapper nodeAvailabilityZoneMapper; + + MlProcessorAutoscalingDecider(ScaleTimer scaleTimer, NodeAvailabilityZoneMapper nodeAvailabilityZoneMapper) { + this.scaleTimer = Objects.requireNonNull(scaleTimer); + this.nodeAvailabilityZoneMapper = Objects.requireNonNull(nodeAvailabilityZoneMapper); + } + + public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingDeciderContext context, MlAutoscalingContext mlContext) { + TrainedModelAssignmentMetadata trainedModelAssignmentMetadata = TrainedModelAssignmentMetadata.fromState(context.state()); + + if (hasUnsatisfiedDeployments(trainedModelAssignmentMetadata, mlContext.mlNodes)) { + logger.debug(() -> "Computing required capacity as there are partially allocated deployments"); + scaleTimer.resetScaleDownCoolDown(); + return computeRequiredCapacity(trainedModelAssignmentMetadata).setReason( + "requesting scale up as there are unsatisfied deployments" + ).build(); + } + + final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes); + + final MlProcessorAutoscalingCapacity requiredCapacity = computeRequiredCapacity(trainedModelAssignmentMetadata).build(); + + if (requiredCapacity.tierProcessors() == currentCapacity.tierProcessors()) { + return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors()) + .setReason("passing currently perceived capacity as it is fully used") + .build(); + } + + if (MlMemoryAutoscalingDecider.modelAssignmentsRequireMoreThanHalfCpu( + trainedModelAssignmentMetadata.modelAssignments().values(), + mlContext.mlNodes + )) { + return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors()) + .setReason("not scaling down as model assignments require more than half of the ML tier's allocated processors") + .build(); + } + + long msLeftToScale = scaleTimer.markDownScaleAndGetMillisLeftFromDelay(configuration); + if (msLeftToScale <= 0) { + return MlProcessorAutoscalingCapacity.builder(requiredCapacity.nodeProcessors(), requiredCapacity.tierProcessors()) + .setReason("requesting scale down as tier and/or node size could be smaller") + .build(); + } + + TimeValue downScaleDelay = MlAutoscalingDeciderService.DOWN_SCALE_DELAY.get(configuration); + logger.debug( + () -> format( + "not scaling down as the current scale down delay [%s] is not satisfied." + + " The last time scale down was detected [%s]. Calculated scaled down capacity [%s] ", + downScaleDelay.getStringRep(), + DEFAULT_FORMATTER.format(ofEpochMilli(scaleTimer.downScaleDetectedMillis())), + requiredCapacity + ) + ); + return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors()) + .setReason( + String.format( + Locale.ROOT, + "Passing currently perceived capacity as down scale delay has not been satisfied; configured delay [%s] " + + "last detected scale down event [%s]. Will request scale down in approximately [%s]", + downScaleDelay.getStringRep(), + XContentElasticsearchExtension.DEFAULT_FORMATTER.format(Instant.ofEpochMilli(scaleTimer.downScaleDetectedMillis())), + TimeValue.timeValueMillis(msLeftToScale).getStringRep() + ) + ) + .build(); + } + + private boolean hasUnsatisfiedDeployments(TrainedModelAssignmentMetadata trainedModelAssignmentMetadata, List mlNodes) { + final Set mlNodeIds = mlNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); + return trainedModelAssignmentMetadata.modelAssignments() + .values() + .stream() + .anyMatch(deployment -> deployment.isSatisfied(mlNodeIds) == false); + } + + private MlProcessorAutoscalingCapacity.Builder computeRequiredCapacity(TrainedModelAssignmentMetadata trainedModelAssignmentMetadata) { + int maxThreadsPerAllocation = 0; + int processorCount = 0; + for (TrainedModelAssignment assignment : trainedModelAssignmentMetadata.modelAssignments().values()) { + int threadsPerAllocation = assignment.getTaskParams().getThreadsPerAllocation(); + maxThreadsPerAllocation = Math.max(maxThreadsPerAllocation, threadsPerAllocation); + processorCount += assignment.getTaskParams().getNumberOfAllocations() * threadsPerAllocation; + } + + final int numMlAvailabilityZones = nodeAvailabilityZoneMapper.getNumMlAvailabilityZones().orElse(1); + if (numMlAvailabilityZones > 1) { + // We assume cloud provides what we ask for tier processors for each availability zone. + // Thus we need to devide the total processor count required by the number of ML availability zones. + processorCount = (processorCount - 1) / numMlAvailabilityZones + 1; + } + processorCount = Math.max(processorCount, maxThreadsPerAllocation); + + return MlProcessorAutoscalingCapacity.builder(maxThreadsPerAllocation, processorCount); + } + + MlProcessorAutoscalingCapacity computeCurrentCapacity(List mlNodes) { + int maxNodeProcessors = 0; + int tierProcessors = 0; + for (DiscoveryNode node : mlNodes) { + int nodeProcessors = getProcessors(node); + maxNodeProcessors = Math.max(maxNodeProcessors, nodeProcessors); + tierProcessors += nodeProcessors; + } + return MlProcessorAutoscalingCapacity.builder(maxNodeProcessors, tierProcessors).build(); + } + + private int getProcessors(DiscoveryNode node) { + String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR); + if (allocatedProcessorsString == null) { + return 0; + } + try { + return Integer.parseInt(allocatedProcessorsString); + } catch (NumberFormatException e) { + assert e == null + : MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR + + " should parse because we set it internally: invalid value was [" + + allocatedProcessorsString + + "]"; + return 0; + } + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDeciderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDeciderTests.java index a048fb5c44906..de19c2f91c5d8 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDeciderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDeciderTests.java @@ -194,15 +194,6 @@ public void testScalingEdgeCase() { .incNumAssignedAnomalyDetectorJobs() .build() ); - MlScalingReason.Builder reasonBuilder = new MlScalingReason.Builder(new MlAutoscalingContext()).setPassedConfiguration( - Settings.EMPTY - ) - .setCurrentMlCapacity( - AutoscalingCapacity.builder() - .node(null, AUTO_NODE_TIERS_NO_MONITORING.get(0).v1(), null) - .total(null, AUTO_NODE_TIERS_NO_MONITORING.get(0).v1(), null) - .build() - ); MlMemoryAutoscalingDecider decider = buildDecider(); decider.setUseAuto(true); MlMemoryAutoscalingCapacity scaleUpResult = decider.checkForScaleUp( @@ -244,8 +235,6 @@ public void testScalingEdgeCase() { .incNumAssignedAnomalyDetectorJobs() .build() ); - reasonBuilder = new MlScalingReason.Builder(new MlAutoscalingContext()).setPassedConfiguration(Settings.EMPTY) - .setCurrentMlCapacity(AutoscalingCapacity.builder().node(null, 2147483648L, null).total(null, 2147483648L, null).build()); MlMemoryAutoscalingCapacity result = decider.checkForScaleDown( nodeForScaleDown, ByteSizeValue.ofMb(200).getBytes() + Job.PROCESS_MEMORY_OVERHEAD.getBytes(), @@ -956,9 +945,6 @@ public void testScaleDown() { when(nodeAvailabilityZoneMapper.getNumMlAvailabilityZones()).thenReturn(OptionalInt.of(3)); MlMemoryAutoscalingDecider decider = buildDecider(); decider.setMaxMachineMemoryPercent(25); - MlScalingReason.Builder reasonBuilder = new MlScalingReason.Builder(new MlAutoscalingContext()).setPassedConfiguration( - Settings.EMPTY - ).setCurrentMlCapacity(AutoscalingCapacity.ZERO); { // Current capacity allows for smaller node List nodeLoads = List.of( NodeLoad.builder("foo") diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDeciderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDeciderTests.java new file mode 100644 index 0000000000000..02300fcb6a5ab --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDeciderTests.java @@ -0,0 +1,457 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.autoscaling; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext; +import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction; +import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingInfo; +import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingState; +import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentMetadata; +import org.junit.Before; + +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.LongSupplier; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MlProcessorAutoscalingDeciderTests extends ESTestCase { + + private ScaleTimer scaleTimer; + private NodeAvailabilityZoneMapper nodeAvailabilityZoneMapper; + + @Before + public void setup() { + scaleTimer = new ScaleTimer(System::currentTimeMillis); + nodeAvailabilityZoneMapper = mock(NodeAvailabilityZoneMapper.class); + when(nodeAvailabilityZoneMapper.getNumMlAvailabilityZones()).thenReturn(OptionalInt.empty()); + } + + public void testScale_GivenCurrentCapacityIsUsedExactly() { + String modelId1 = "model-id-1"; + String modelId2 = "model-id-2"; + + String mlNodeId1 = "ml-node-id-1"; + String mlNodeId2 = "ml-node-id-2"; + String dataNodeId = "data-node-id"; + DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 8); + DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 8); + DiscoveryNode dataNode = buildNode(dataNodeId, false, 24); + + ClusterState clusterState = ClusterState.builder(new ClusterName("test")) + .nodes(DiscoveryNodes.builder().add(mlNode1).add(mlNode2).add(dataNode).build()) + .metadata( + Metadata.builder() + .putCustom( + TrainedModelAssignmentMetadata.NAME, + TrainedModelAssignmentMetadata.Builder.empty() + .addNewAssignment( + modelId1, + TrainedModelAssignment.Builder.empty( + new StartTrainedModelDeploymentAction.TaskParams(modelId1, 42L, 3, 2, 1024, ByteSizeValue.ONE) + ).addRoutingEntry(mlNodeId1, new RoutingInfo(2, 2, RoutingState.STARTED, "")) + ) + .addNewAssignment( + modelId2, + TrainedModelAssignment.Builder.empty( + new StartTrainedModelDeploymentAction.TaskParams(modelId2, 42L, 1, 10, 1024, ByteSizeValue.ONE) + ) + .addRoutingEntry(mlNodeId1, new RoutingInfo(2, 2, RoutingState.STARTED, "")) + .addRoutingEntry(mlNodeId2, new RoutingInfo(8, 8, RoutingState.STARTED, "")) + ) + .build() + ) + .build() + ) + .build(); + + MlProcessorAutoscalingDecider decider = newDecider(); + + MlProcessorAutoscalingCapacity capacity = decider.scale( + Settings.EMPTY, + newContext(clusterState), + new MlAutoscalingContext(clusterState) + ); + + assertThat(capacity.nodeProcessors(), equalTo(8)); + assertThat(capacity.tierProcessors(), equalTo(16)); + assertThat(capacity.reason(), equalTo("passing currently perceived capacity as it is fully used")); + } + + public void testScale_GivenUnsatisfiedDeployments() { + String modelId1 = "model-id-1"; + String modelId2 = "model-id-2"; + + String mlNodeId1 = "ml-node-id-1"; + String mlNodeId2 = "ml-node-id-2"; + String dataNodeId = "data-node-id"; + DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 4); + DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 4); + DiscoveryNode dataNode = buildNode(dataNodeId, false, 24); + + ClusterState clusterState = ClusterState.builder(new ClusterName("test")) + .nodes(DiscoveryNodes.builder().add(mlNode1).add(mlNode2).add(dataNode).build()) + .metadata( + Metadata.builder() + .putCustom( + TrainedModelAssignmentMetadata.NAME, + TrainedModelAssignmentMetadata.Builder.empty() + .addNewAssignment( + modelId1, + TrainedModelAssignment.Builder.empty( + new StartTrainedModelDeploymentAction.TaskParams(modelId1, 42L, 8, 1, 1024, ByteSizeValue.ONE) + ) + ) + .addNewAssignment( + modelId2, + TrainedModelAssignment.Builder.empty( + new StartTrainedModelDeploymentAction.TaskParams(modelId2, 42L, 4, 3, 1024, ByteSizeValue.ONE) + ) + .addRoutingEntry(mlNodeId1, new RoutingInfo(1, 1, RoutingState.STARTED, "")) + .addRoutingEntry(mlNodeId2, new RoutingInfo(1, 1, RoutingState.STARTED, "")) + ) + .build() + ) + .build() + ) + .build(); + + MlProcessorAutoscalingDecider decider = newDecider(); + + MlProcessorAutoscalingCapacity capacity = decider.scale( + Settings.EMPTY, + newContext(clusterState), + new MlAutoscalingContext(clusterState) + ); + + assertThat(capacity.nodeProcessors(), equalTo(8)); + assertThat(capacity.tierProcessors(), equalTo(20)); + assertThat(capacity.reason(), equalTo("requesting scale up as there are unsatisfied deployments")); + } + + public void testScale_GivenUnsatisfiedDeployments_AndThreeMlAvailabilityZones_AndNodeProcessorsMoreThanTierProcessors() { + givenMlAvailabilityZones(3); + + String modelId1 = "model-id-1"; + String modelId2 = "model-id-2"; + + String mlNodeId1 = "ml-node-id-1"; + String mlNodeId2 = "ml-node-id-2"; + String dataNodeId = "data-node-id"; + DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 4); + DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 4); + DiscoveryNode dataNode = buildNode(dataNodeId, false, 24); + + ClusterState clusterState = ClusterState.builder(new ClusterName("test")) + .nodes(DiscoveryNodes.builder().add(mlNode1).add(mlNode2).add(dataNode).build()) + .metadata( + Metadata.builder() + .putCustom( + TrainedModelAssignmentMetadata.NAME, + TrainedModelAssignmentMetadata.Builder.empty() + .addNewAssignment( + modelId1, + TrainedModelAssignment.Builder.empty( + new StartTrainedModelDeploymentAction.TaskParams(modelId1, 42L, 8, 1, 1024, ByteSizeValue.ONE) + ) + ) + .addNewAssignment( + modelId2, + TrainedModelAssignment.Builder.empty( + new StartTrainedModelDeploymentAction.TaskParams(modelId2, 42L, 4, 3, 1024, ByteSizeValue.ONE) + ) + .addRoutingEntry(mlNodeId1, new RoutingInfo(1, 1, RoutingState.STARTED, "")) + .addRoutingEntry(mlNodeId2, new RoutingInfo(1, 1, RoutingState.STARTED, "")) + ) + .build() + ) + .build() + ) + .build(); + + MlProcessorAutoscalingDecider decider = newDecider(); + + MlProcessorAutoscalingCapacity capacity = decider.scale( + Settings.EMPTY, + newContext(clusterState), + new MlAutoscalingContext(clusterState) + ); + + assertThat(capacity.nodeProcessors(), equalTo(8)); + assertThat(capacity.tierProcessors(), equalTo(8)); + assertThat(capacity.reason(), equalTo("requesting scale up as there are unsatisfied deployments")); + } + + public void testScale_GivenUnsatisfiedDeployments_AndThreeMlAvailabilityZones_AndNodeProcessorsLessThanTierProcessors() { + givenMlAvailabilityZones(3); + + String modelId1 = "model-id-1"; + String modelId2 = "model-id-2"; + + String mlNodeId1 = "ml-node-id-1"; + String mlNodeId2 = "ml-node-id-2"; + String dataNodeId = "data-node-id"; + DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 4); + DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 4); + DiscoveryNode dataNode = buildNode(dataNodeId, false, 24); + + ClusterState clusterState = ClusterState.builder(new ClusterName("test")) + .nodes(DiscoveryNodes.builder().add(mlNode1).add(mlNode2).add(dataNode).build()) + .metadata( + Metadata.builder() + .putCustom( + TrainedModelAssignmentMetadata.NAME, + TrainedModelAssignmentMetadata.Builder.empty() + .addNewAssignment( + modelId1, + TrainedModelAssignment.Builder.empty( + new StartTrainedModelDeploymentAction.TaskParams(modelId1, 42L, 8, 1, 1024, ByteSizeValue.ONE) + ) + ) + .addNewAssignment( + modelId2, + TrainedModelAssignment.Builder.empty( + new StartTrainedModelDeploymentAction.TaskParams(modelId2, 42L, 4, 6, 1024, ByteSizeValue.ONE) + ) + .addRoutingEntry(mlNodeId1, new RoutingInfo(1, 1, RoutingState.STARTED, "")) + .addRoutingEntry(mlNodeId2, new RoutingInfo(1, 1, RoutingState.STARTED, "")) + ) + .build() + ) + .build() + ) + .build(); + + MlProcessorAutoscalingDecider decider = newDecider(); + + MlProcessorAutoscalingCapacity capacity = decider.scale( + Settings.EMPTY, + newContext(clusterState), + new MlAutoscalingContext(clusterState) + ); + + assertThat(capacity.nodeProcessors(), equalTo(8)); + assertThat(capacity.tierProcessors(), equalTo(11)); + assertThat(capacity.reason(), equalTo("requesting scale up as there are unsatisfied deployments")); + } + + public void testScale_GivenMoreThanHalfProcessorsAreUsed() { + String modelId1 = "model-id-1"; + String modelId2 = "model-id-2"; + + String mlNodeId1 = "ml-node-id-1"; + String mlNodeId2 = "ml-node-id-2"; + String dataNodeId = "data-node-id"; + DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 4); + DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 4); + DiscoveryNode dataNode = buildNode(dataNodeId, false, 24); + + ClusterState clusterState = ClusterState.builder(new ClusterName("test")) + .nodes(DiscoveryNodes.builder().add(mlNode1).add(mlNode2).add(dataNode).build()) + .metadata( + Metadata.builder() + .putCustom( + TrainedModelAssignmentMetadata.NAME, + TrainedModelAssignmentMetadata.Builder.empty() + .addNewAssignment( + modelId1, + TrainedModelAssignment.Builder.empty( + new StartTrainedModelDeploymentAction.TaskParams(modelId1, 42L, 2, 2, 1024, ByteSizeValue.ONE) + ).addRoutingEntry(mlNodeId1, new RoutingInfo(2, 2, RoutingState.STARTED, "")) + ) + .addNewAssignment( + modelId2, + TrainedModelAssignment.Builder.empty( + new StartTrainedModelDeploymentAction.TaskParams(modelId2, 42L, 1, 1, 1024, ByteSizeValue.ONE) + ).addRoutingEntry(mlNodeId2, new RoutingInfo(1, 1, RoutingState.STARTED, "")) + ) + .build() + ) + .build() + ) + .build(); + + MlProcessorAutoscalingDecider decider = newDecider(); + + MlProcessorAutoscalingCapacity capacity = decider.scale( + Settings.EMPTY, + newContext(clusterState), + new MlAutoscalingContext(clusterState) + ); + + assertThat(capacity.nodeProcessors(), equalTo(4)); + assertThat(capacity.tierProcessors(), equalTo(8)); + assertThat( + capacity.reason(), + equalTo("not scaling down as model assignments require more than half of the ML tier's allocated processors") + ); + } + + public void testScale_GivenDownScalePossible_DelayNotSatisfied() { + String modelId1 = "model-id-1"; + String modelId2 = "model-id-2"; + + String mlNodeId1 = "ml-node-id-1"; + String mlNodeId2 = "ml-node-id-2"; + String dataNodeId = "data-node-id"; + DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 8); + DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 8); + DiscoveryNode dataNode = buildNode(dataNodeId, false, 24); + + ClusterState clusterState = ClusterState.builder(new ClusterName("test")) + .nodes(DiscoveryNodes.builder().add(mlNode1).add(mlNode2).add(dataNode).build()) + .metadata( + Metadata.builder() + .putCustom( + TrainedModelAssignmentMetadata.NAME, + TrainedModelAssignmentMetadata.Builder.empty() + .addNewAssignment( + modelId1, + TrainedModelAssignment.Builder.empty( + new StartTrainedModelDeploymentAction.TaskParams(modelId1, 42L, 2, 2, 1024, ByteSizeValue.ONE) + ).addRoutingEntry(mlNodeId1, new RoutingInfo(2, 2, RoutingState.STARTED, "")) + ) + .addNewAssignment( + modelId2, + TrainedModelAssignment.Builder.empty( + new StartTrainedModelDeploymentAction.TaskParams(modelId2, 42L, 1, 1, 1024, ByteSizeValue.ONE) + ).addRoutingEntry(mlNodeId2, new RoutingInfo(1, 1, RoutingState.STARTED, "")) + ) + .build() + ) + .build() + ) + .build(); + + MlProcessorAutoscalingDecider decider = newDecider(); + scaleTimer.markScale(); + + MlProcessorAutoscalingCapacity capacity = decider.scale( + Settings.EMPTY, + newContext(clusterState), + new MlAutoscalingContext(clusterState) + ); + + assertThat(capacity.nodeProcessors(), equalTo(8)); + assertThat(capacity.tierProcessors(), equalTo(16)); + assertThat(capacity.reason(), containsString("Passing currently perceived capacity as down scale delay has not been satisfied")); + } + + public void testScale_GivenDownScalePossible_DelaySatisfied() { + String modelId1 = "model-id-1"; + String modelId2 = "model-id-2"; + + String mlNodeId1 = "ml-node-id-1"; + String mlNodeId2 = "ml-node-id-2"; + String dataNodeId = "data-node-id"; + DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 8); + DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 8); + DiscoveryNode dataNode = buildNode(dataNodeId, false, 24); + + ClusterState clusterState = ClusterState.builder(new ClusterName("test")) + .nodes(DiscoveryNodes.builder().add(mlNode1).add(mlNode2).add(dataNode).build()) + .metadata( + Metadata.builder() + .putCustom( + TrainedModelAssignmentMetadata.NAME, + TrainedModelAssignmentMetadata.Builder.empty() + .addNewAssignment( + modelId1, + TrainedModelAssignment.Builder.empty( + new StartTrainedModelDeploymentAction.TaskParams(modelId1, 42L, 2, 2, 1024, ByteSizeValue.ONE) + ).addRoutingEntry(mlNodeId1, new RoutingInfo(2, 2, RoutingState.STARTED, "")) + ) + .addNewAssignment( + modelId2, + TrainedModelAssignment.Builder.empty( + new StartTrainedModelDeploymentAction.TaskParams(modelId2, 42L, 1, 1, 1024, ByteSizeValue.ONE) + ).addRoutingEntry(mlNodeId2, new RoutingInfo(1, 1, RoutingState.STARTED, "")) + ) + .build() + ) + .build() + ) + .build(); + + TimeMachine timeMachine = new TimeMachine(); + scaleTimer = new ScaleTimer(timeMachine); + MlProcessorAutoscalingDecider decider = newDecider(); + scaleTimer.markScale(); + scaleTimer.markDownScaleAndGetMillisLeftFromDelay(Settings.EMPTY); + timeMachine.setOffset(TimeValue.timeValueHours(1)); + + MlProcessorAutoscalingCapacity capacity = decider.scale( + Settings.EMPTY, + newContext(clusterState), + new MlAutoscalingContext(clusterState) + ); + + assertThat(capacity.nodeProcessors(), equalTo(2)); + assertThat(capacity.tierProcessors(), equalTo(5)); + assertThat(capacity.reason(), containsString("requesting scale down as tier and/or node size could be smaller")); + } + + private static DiscoveryNode buildNode(String name, boolean isML, int allocatedProcessors) { + return new DiscoveryNode( + name, + name, + buildNewFakeTransportAddress(), + MapBuilder.newMapBuilder() + .put(MachineLearning.MAX_JVM_SIZE_NODE_ATTR, String.valueOf(10)) + .put(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, String.valueOf(allocatedProcessors)) + .map(), + isML ? DiscoveryNodeRole.roles() : Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE), + Version.CURRENT + ); + } + + private MlProcessorAutoscalingDecider newDecider() { + return new MlProcessorAutoscalingDecider(scaleTimer, nodeAvailabilityZoneMapper); + } + + private AutoscalingDeciderContext newContext(ClusterState clusterState) { + AutoscalingDeciderContext context = mock(AutoscalingDeciderContext.class); + when(context.state()).thenReturn(clusterState); + return context; + } + + private void givenMlAvailabilityZones(int zones) { + when(nodeAvailabilityZoneMapper.getNumMlAvailabilityZones()).thenReturn(OptionalInt.of(zones)); + } + + private static class TimeMachine implements LongSupplier { + + private long offsetMillis; + + void setOffset(TimeValue timeValue) { + this.offsetMillis = timeValue.millis(); + } + + @Override + public long getAsLong() { + return System.currentTimeMillis() + offsetMillis; + } + } +} From 40f6d1aed195c9590673b865ab959157d57cc15a Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Fri, 26 Aug 2022 13:03:44 +0300 Subject: [PATCH 2/4] Update docs/changelog/89645.yaml --- docs/changelog/89645.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/89645.yaml diff --git a/docs/changelog/89645.yaml b/docs/changelog/89645.yaml new file mode 100644 index 0000000000000..ad3ca0fea0083 --- /dev/null +++ b/docs/changelog/89645.yaml @@ -0,0 +1,5 @@ +pr: 89645 +summary: Add processor autoscaling decider +area: Machine Learning +type: enhancement +issues: [] From 8dc4ac30459f998587486ce93d2e225727e8a7e7 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 7 Sep 2022 14:43:39 +0300 Subject: [PATCH 3/4] Make use of `Processors` class --- .../xpack/ml/MachineLearning.java | 9 ++-- .../MlAutoscalingDeciderService.java | 16 ++----- .../MlMemoryAutoscalingDecider.java | 5 ++- .../MlProcessorAutoscalingCapacity.java | 12 ++--- .../MlProcessorAutoscalingDecider.java | 28 +++++++----- .../TrainedModelAssignmentRebalancer.java | 15 ++++--- .../MlProcessorAutoscalingDeciderTests.java | 45 ++++++++++--------- 7 files changed, 68 insertions(+), 62 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 67d1ea312b34d..05170e2db9bb5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; import org.elasticsearch.env.Environment; @@ -766,7 +767,7 @@ public Settings additionalSettings() { Long.toString(OsProbe.getInstance().osStats().getMem().getAdjustedTotal().getBytes()) ); addMlNodeAttribute(additionalSettings, jvmSizeAttrName, Long.toString(Runtime.getRuntime().maxMemory())); - addMlNodeAttribute(additionalSettings, allocatedProcessorsAttrName, Integer.toString(getAllocatedProcessors())); + addMlNodeAttribute(additionalSettings, allocatedProcessorsAttrName, Double.toString(getAllocatedProcessors().count())); // This is not used in v8 and higher, but users are still prevented from setting it directly to avoid confusion disallowMlNodeAttributes(maxOpenJobsPerNodeNodeAttrName); } else { @@ -784,8 +785,8 @@ private void addMlNodeAttribute(Settings.Builder additionalSettings, String attr } } - private int getAllocatedProcessors() { - return EsExecutors.allocatedProcessors(settings); + private Processors getAllocatedProcessors() { + return EsExecutors.nodeProcessors(settings); } private void disallowMlNodeAttributes(String... mlNodeAttributes) { @@ -1445,7 +1446,7 @@ public List> getExecutorBuilders(Settings unused) { ScalingExecutorBuilder pytorchComms = new ScalingExecutorBuilder( NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME, 3, - getAllocatedProcessors() * 3, + getAllocatedProcessors().roundUp() * 3, TimeValue.timeValueMinutes(1), false, "xpack.ml.native_inference_comms_thread_pool" diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java index 691a7fc4291a2..87c7cdaeb0de1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java @@ -101,12 +101,12 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider new AutoscalingCapacity.AutoscalingResources( null, currentMemoryCapacity.tierSize(), - Integer.valueOf(currentProcessorCapacity.tierProcessors()).floatValue() + currentProcessorCapacity.tierProcessors() ), new AutoscalingCapacity.AutoscalingResources( null, currentMemoryCapacity.nodeSize(), - Integer.valueOf(currentProcessorCapacity.nodeProcessors()).floatValue() + currentProcessorCapacity.nodeProcessors() ) ) ) @@ -127,16 +127,8 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider return new AutoscalingDeciderResult( new AutoscalingCapacity( - new AutoscalingCapacity.AutoscalingResources( - null, - memoryCapacity.tierSize(), - Integer.valueOf(processorCapacity.tierProcessors()).floatValue() - ), - new AutoscalingCapacity.AutoscalingResources( - null, - memoryCapacity.nodeSize(), - Integer.valueOf(processorCapacity.nodeProcessors()).floatValue() - ) + new AutoscalingCapacity.AutoscalingResources(null, memoryCapacity.tierSize(), processorCapacity.tierProcessors()), + new AutoscalingCapacity.AutoscalingResources(null, memoryCapacity.nodeSize(), processorCapacity.nodeProcessors()) ), reasonBuilder.build() ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java index cd5a53145b38f..52912bdb73a33 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.common.xcontent.XContentElasticsearchExtension; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; @@ -255,6 +256,8 @@ public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeci if (capacity == null) { return null; } + // We should keep this check here as well as in the processor decider while cloud is not + // reacting to processor autoscaling. if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes)) { logger.debug("not down-scaling; model assignments require more than half of the ML tier's allocated processors"); return null; @@ -819,7 +822,7 @@ static boolean modelAssignmentsRequireMoreThanHalfCpu(Collection { String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR); try { - return Integer.parseInt(allocatedProcessorsString); + return Processors.of(Double.parseDouble(allocatedProcessorsString)).roundUp(); } catch (NumberFormatException e) { assert e == null : MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingCapacity.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingCapacity.java index daed88431d214..652c3311ae13b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingCapacity.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingCapacity.java @@ -7,9 +7,11 @@ package org.elasticsearch.xpack.ml.autoscaling; -public record MlProcessorAutoscalingCapacity(int nodeProcessors, int tierProcessors, String reason) { +import org.elasticsearch.common.unit.Processors; - public static Builder builder(int nodeProcessors, int tierProcessors) { +public record MlProcessorAutoscalingCapacity(Processors nodeProcessors, Processors tierProcessors, String reason) { + + public static Builder builder(Processors nodeProcessors, Processors tierProcessors) { return new Builder(nodeProcessors, tierProcessors); } @@ -28,11 +30,11 @@ public String toString() { public static class Builder { - private int nodeProcessors; - private int tierProcessors; + private Processors nodeProcessors; + private Processors tierProcessors; private String reason; - public Builder(int nodeProcessors, int tierProcessors) { + public Builder(Processors nodeProcessors, Processors tierProcessors) { this.nodeProcessors = nodeProcessors; this.tierProcessors = tierProcessors; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java index 22b9d2cad7096..72b18fa2358e6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java @@ -9,6 +9,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.common.xcontent.XContentElasticsearchExtension; import org.elasticsearch.core.TimeValue; import org.elasticsearch.logging.LogManager; @@ -56,7 +57,7 @@ public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingD final MlProcessorAutoscalingCapacity requiredCapacity = computeRequiredCapacity(trainedModelAssignmentMetadata).build(); - if (requiredCapacity.tierProcessors() == currentCapacity.tierProcessors()) { + if (requiredCapacity.tierProcessors().roundUp() == currentCapacity.tierProcessors().roundUp()) { return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors()) .setReason("passing currently perceived capacity as it is fully used") .build(); @@ -127,34 +128,39 @@ private MlProcessorAutoscalingCapacity.Builder computeRequiredCapacity(TrainedMo } processorCount = Math.max(processorCount, maxThreadsPerAllocation); - return MlProcessorAutoscalingCapacity.builder(maxThreadsPerAllocation, processorCount); + return MlProcessorAutoscalingCapacity.builder( + Processors.of(Double.valueOf(maxThreadsPerAllocation)), + Processors.of(Double.valueOf(processorCount)) + ); } MlProcessorAutoscalingCapacity computeCurrentCapacity(List mlNodes) { - int maxNodeProcessors = 0; - int tierProcessors = 0; + Processors maxNodeProcessors = Processors.ZERO; + Processors tierProcessors = Processors.ZERO; for (DiscoveryNode node : mlNodes) { - int nodeProcessors = getProcessors(node); - maxNodeProcessors = Math.max(maxNodeProcessors, nodeProcessors); - tierProcessors += nodeProcessors; + Processors nodeProcessors = getProcessors(node); + if (nodeProcessors.compareTo(maxNodeProcessors) > 0) { + maxNodeProcessors = nodeProcessors; + } + tierProcessors = tierProcessors.plus(nodeProcessors); } return MlProcessorAutoscalingCapacity.builder(maxNodeProcessors, tierProcessors).build(); } - private int getProcessors(DiscoveryNode node) { + private Processors getProcessors(DiscoveryNode node) { String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR); if (allocatedProcessorsString == null) { - return 0; + return Processors.ZERO; } try { - return Integer.parseInt(allocatedProcessorsString); + return Processors.of(Double.parseDouble(allocatedProcessorsString)); } catch (NumberFormatException e) { assert e == null : MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR + " should parse because we set it internally: invalid value was [" + allocatedProcessorsString + "]"; - return 0; + return Processors.ZERO; } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java index 209a3a1fc73ab..437313eff568a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java @@ -14,6 +14,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction; import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingInfo; import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingState; @@ -28,7 +29,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.OptionalInt; import java.util.Set; import java.util.TreeMap; import java.util.stream.Collectors; @@ -87,7 +87,7 @@ AssignmentPlan computeAssignmentPlan() { // We subtract native inference memory as the planner expects available memory for // native inference including current assignments. getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(e.getValue()), - getNodeAllocatedProcessors(e.getKey()).orElse(0) + getNodeAllocatedProcessors(e.getKey()).roundUp() ) ) .toList(); @@ -130,16 +130,17 @@ AssignmentPlan computeAssignmentPlan() { return new AssignmentPlanner(planNodes, planModels).computePlan(); } - private static OptionalInt getNodeAllocatedProcessors(DiscoveryNode node) { + private static Processors getNodeAllocatedProcessors(DiscoveryNode node) { String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR); try { - return OptionalInt.of(Integer.parseInt(allocatedProcessorsString)); + + return allocatedProcessorsString == null ? Processors.ZERO : Processors.of(Double.parseDouble(allocatedProcessorsString)); } catch (NumberFormatException e) { assert e == null : MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR + " should parse because we set it internally: invalid value was " + allocatedProcessorsString; - return OptionalInt.empty(); + return Processors.ZERO; } } @@ -238,7 +239,7 @@ private Optional explainAssignment( // But we should also check if we managed to assign a model during the rebalance for which // we check if the node has used up any of its allocated processors. boolean isPerNodeOverheadAccountedFor = load.getNumAssignedJobsAndModels() > 0 - || assignmentPlan.getRemainingNodeCores(load.getNodeId()) < getNodeAllocatedProcessors(node).orElse(0); + || assignmentPlan.getRemainingNodeCores(load.getNodeId()) < getNodeAllocatedProcessors(node).roundUp(); long requiredMemory = model.memoryBytes() + (isPerNodeOverheadAccountedFor ? 0 : MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes()); @@ -267,7 +268,7 @@ private Optional explainAssignment( "This node has insufficient allocated processors. Available processors [{}], free processors [{}], " + "processors required for each allocation of this model [{}]", new Object[] { - getNodeAllocatedProcessors(node).orElse(0), + getNodeAllocatedProcessors(node).roundUp(), assignmentPlan.getRemainingNodeCores(node.getId()), model.threadsPerAllocation() } ) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDeciderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDeciderTests.java index 02300fcb6a5ab..19e55905158fb 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDeciderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDeciderTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext; @@ -56,9 +57,9 @@ public void testScale_GivenCurrentCapacityIsUsedExactly() { String mlNodeId1 = "ml-node-id-1"; String mlNodeId2 = "ml-node-id-2"; String dataNodeId = "data-node-id"; - DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 8); - DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 8); - DiscoveryNode dataNode = buildNode(dataNodeId, false, 24); + DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 7.8); + DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 7.6); + DiscoveryNode dataNode = buildNode(dataNodeId, false, 24.0); ClusterState clusterState = ClusterState.builder(new ClusterName("test")) .nodes(DiscoveryNodes.builder().add(mlNode1).add(mlNode2).add(dataNode).build()) @@ -95,8 +96,8 @@ public void testScale_GivenCurrentCapacityIsUsedExactly() { new MlAutoscalingContext(clusterState) ); - assertThat(capacity.nodeProcessors(), equalTo(8)); - assertThat(capacity.tierProcessors(), equalTo(16)); + assertThat(capacity.nodeProcessors(), equalTo(Processors.of(7.8))); + assertThat(capacity.tierProcessors(), equalTo(Processors.of(15.4))); assertThat(capacity.reason(), equalTo("passing currently perceived capacity as it is fully used")); } @@ -146,8 +147,8 @@ public void testScale_GivenUnsatisfiedDeployments() { new MlAutoscalingContext(clusterState) ); - assertThat(capacity.nodeProcessors(), equalTo(8)); - assertThat(capacity.tierProcessors(), equalTo(20)); + assertThat(capacity.nodeProcessors(), equalTo(Processors.of(8.0))); + assertThat(capacity.tierProcessors(), equalTo(Processors.of(20.0))); assertThat(capacity.reason(), equalTo("requesting scale up as there are unsatisfied deployments")); } @@ -199,8 +200,8 @@ public void testScale_GivenUnsatisfiedDeployments_AndThreeMlAvailabilityZones_An new MlAutoscalingContext(clusterState) ); - assertThat(capacity.nodeProcessors(), equalTo(8)); - assertThat(capacity.tierProcessors(), equalTo(8)); + assertThat(capacity.nodeProcessors(), equalTo(Processors.of(8.0))); + assertThat(capacity.tierProcessors(), equalTo(Processors.of(8.0))); assertThat(capacity.reason(), equalTo("requesting scale up as there are unsatisfied deployments")); } @@ -252,8 +253,8 @@ public void testScale_GivenUnsatisfiedDeployments_AndThreeMlAvailabilityZones_An new MlAutoscalingContext(clusterState) ); - assertThat(capacity.nodeProcessors(), equalTo(8)); - assertThat(capacity.tierProcessors(), equalTo(11)); + assertThat(capacity.nodeProcessors(), equalTo(Processors.of(8.0))); + assertThat(capacity.tierProcessors(), equalTo(Processors.of(11.0))); assertThat(capacity.reason(), equalTo("requesting scale up as there are unsatisfied deployments")); } @@ -264,8 +265,8 @@ public void testScale_GivenMoreThanHalfProcessorsAreUsed() { String mlNodeId1 = "ml-node-id-1"; String mlNodeId2 = "ml-node-id-2"; String dataNodeId = "data-node-id"; - DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 4); - DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 4); + DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 3.8); + DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 3.8); DiscoveryNode dataNode = buildNode(dataNodeId, false, 24); ClusterState clusterState = ClusterState.builder(new ClusterName("test")) @@ -301,8 +302,8 @@ public void testScale_GivenMoreThanHalfProcessorsAreUsed() { new MlAutoscalingContext(clusterState) ); - assertThat(capacity.nodeProcessors(), equalTo(4)); - assertThat(capacity.tierProcessors(), equalTo(8)); + assertThat(capacity.nodeProcessors(), equalTo(Processors.of(3.8))); + assertThat(capacity.tierProcessors(), equalTo(Processors.of(7.6))); assertThat( capacity.reason(), equalTo("not scaling down as model assignments require more than half of the ML tier's allocated processors") @@ -316,8 +317,8 @@ public void testScale_GivenDownScalePossible_DelayNotSatisfied() { String mlNodeId1 = "ml-node-id-1"; String mlNodeId2 = "ml-node-id-2"; String dataNodeId = "data-node-id"; - DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 8); - DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 8); + DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 7.9); + DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 7.9); DiscoveryNode dataNode = buildNode(dataNodeId, false, 24); ClusterState clusterState = ClusterState.builder(new ClusterName("test")) @@ -354,8 +355,8 @@ public void testScale_GivenDownScalePossible_DelayNotSatisfied() { new MlAutoscalingContext(clusterState) ); - assertThat(capacity.nodeProcessors(), equalTo(8)); - assertThat(capacity.tierProcessors(), equalTo(16)); + assertThat(capacity.nodeProcessors(), equalTo(Processors.of(7.9))); + assertThat(capacity.tierProcessors(), equalTo(Processors.of(15.8))); assertThat(capacity.reason(), containsString("Passing currently perceived capacity as down scale delay has not been satisfied")); } @@ -408,12 +409,12 @@ public void testScale_GivenDownScalePossible_DelaySatisfied() { new MlAutoscalingContext(clusterState) ); - assertThat(capacity.nodeProcessors(), equalTo(2)); - assertThat(capacity.tierProcessors(), equalTo(5)); + assertThat(capacity.nodeProcessors(), equalTo(Processors.of(2.0))); + assertThat(capacity.tierProcessors(), equalTo(Processors.of(5.0))); assertThat(capacity.reason(), containsString("requesting scale down as tier and/or node size could be smaller")); } - private static DiscoveryNode buildNode(String name, boolean isML, int allocatedProcessors) { + private static DiscoveryNode buildNode(String name, boolean isML, double allocatedProcessors) { return new DiscoveryNode( name, name, From 0b750e47e7dc01ac2d0074570f1e8fb4fb5f5e55 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 7 Sep 2022 15:48:29 +0300 Subject: [PATCH 4/4] Do not call `Processors.of(0.0)` --- .../xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java | 3 ++- .../ml/autoscaling/MlProcessorAutoscalingDecider.java | 7 ++++--- .../assignment/TrainedModelAssignmentRebalancer.java | 4 ++-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java index 52912bdb73a33..60f9a7cdcc45c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java @@ -822,7 +822,8 @@ static boolean modelAssignmentsRequireMoreThanHalfCpu(Collection { String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR); try { - return Processors.of(Double.parseDouble(allocatedProcessorsString)).roundUp(); + double allocatedProcessorsAsDouble = Double.parseDouble(allocatedProcessorsString); + return allocatedProcessorsAsDouble > 0 ? Processors.of(allocatedProcessorsAsDouble).roundUp() : 0; } catch (NumberFormatException e) { assert e == null : MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java index 72b18fa2358e6..b6b2c317f505d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java @@ -129,8 +129,8 @@ private MlProcessorAutoscalingCapacity.Builder computeRequiredCapacity(TrainedMo processorCount = Math.max(processorCount, maxThreadsPerAllocation); return MlProcessorAutoscalingCapacity.builder( - Processors.of(Double.valueOf(maxThreadsPerAllocation)), - Processors.of(Double.valueOf(processorCount)) + maxThreadsPerAllocation > 0 ? Processors.of(Double.valueOf(maxThreadsPerAllocation)) : Processors.ZERO, + processorCount > 0 ? Processors.of(Double.valueOf(processorCount)) : Processors.ZERO ); } @@ -153,7 +153,8 @@ private Processors getProcessors(DiscoveryNode node) { return Processors.ZERO; } try { - return Processors.of(Double.parseDouble(allocatedProcessorsString)); + double processorsAsDouble = Double.parseDouble(allocatedProcessorsString); + return processorsAsDouble > 0 ? Processors.of(processorsAsDouble) : Processors.ZERO; } catch (NumberFormatException e) { assert e == null : MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java index 437313eff568a..74a0937e03ce9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java @@ -133,8 +133,8 @@ AssignmentPlan computeAssignmentPlan() { private static Processors getNodeAllocatedProcessors(DiscoveryNode node) { String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR); try { - - return allocatedProcessorsString == null ? Processors.ZERO : Processors.of(Double.parseDouble(allocatedProcessorsString)); + double allocatedProcessorsAsDouble = allocatedProcessorsString == null ? 0.0 : Double.parseDouble(allocatedProcessorsString); + return allocatedProcessorsAsDouble > 0 ? Processors.of(allocatedProcessorsAsDouble) : Processors.ZERO; } catch (NumberFormatException e) { assert e == null : MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR