From 46ebfc11d68be9de1fe513e371a566432184a8ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Tue, 23 Aug 2022 18:09:41 +0200 Subject: [PATCH 1/9] Centralize the concept of processors configuration --- .../upgrades/DesiredNodesUpgradeIT.java | 135 +++++++++++--- .../TransportUpdateDesiredNodesAction.java | 7 +- .../UpdateDesiredNodesRequest.java | 7 +- .../DesiredNodesSettingsValidator.java | 3 +- .../cluster/metadata/DesiredNode.java | 149 +++++++++------ .../metadata/DesiredNodeWithStatus.java | 7 +- .../cluster/metadata/DesiredNodes.java | 23 +++ .../elasticsearch/common/unit/Processors.java | 175 ++++++++++++++++++ .../common/util/concurrent/EsExecutors.java | 7 +- .../UpdateDesiredNodesRequestTests.java | 2 +- .../DesiredNodesSettingsValidatorTests.java | 4 +- .../DesiredNodeSerializationTests.java | 4 +- .../cluster/metadata/DesiredNodeTests.java | 114 +++++++++--- .../metadata/DesiredNodesTestCase.java | 10 +- .../cluster/metadata/DesiredNodesTests.java | 2 +- .../unit/ProcessorsSerializationTests.java | 31 ++++ .../common/unit/ProcessorsTests.java | 87 +++++++++ .../util/concurrent/EsExecutorsTests.java | 5 +- .../AutoscalingCalculateCapacityService.java | 2 +- .../capacity/AutoscalingCapacity.java | 34 ++-- .../FixedAutoscalingDeciderService.java | 26 +-- .../nodeinfo/AutoscalingNodeInfo.java | 10 +- .../nodeinfo/AutoscalingNodeInfoService.java | 3 +- .../FrozenExistenceDeciderService.java | 2 +- .../autoscaling/AutoscalingTestCase.java | 11 +- ...oscalingCalculateCapacityServiceTests.java | 24 ++- ...scalingCapacityWireSerializationTests.java | 12 +- .../AutoscalingDeciderResultsTests.java | 4 +- .../FixedAutoscalingDeciderServiceTests.java | 9 +- .../AutoscalingNodesInfoServiceTests.java | 17 +- .../ReactiveStorageDeciderDecisionTests.java | 8 +- 31 files changed, 738 insertions(+), 196 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/common/unit/Processors.java create mode 100644 server/src/test/java/org/elasticsearch/common/unit/ProcessorsSerializationTests.java create mode 100644 server/src/test/java/org/elasticsearch/common/unit/ProcessorsTests.java diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DesiredNodesUpgradeIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DesiredNodesUpgradeIT.java index 75ae6ab4e6c51..7c9ed27548cbb 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DesiredNodesUpgradeIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DesiredNodesUpgradeIT.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.xcontent.json.JsonXContent; @@ -31,37 +32,109 @@ import static org.hamcrest.Matchers.is; public class DesiredNodesUpgradeIT extends AbstractRollingTestCase { + private enum ProcessorsPrecision { + DOUBLE, + FLOAT + } + public void testUpgradeDesiredNodes() throws Exception { // Desired nodes was introduced in 8.1 if (UPGRADE_FROM_VERSION.before(Version.V_8_1_0)) { return; } + if (UPGRADE_FROM_VERSION.onOrAfter(Processors.DOUBLE_PROCESSORS_SUPPORT_VERSION)) { + assertUpgradedNodesCanReadDesiredNodes(); + } else if (UPGRADE_FROM_VERSION.onOrAfter(DesiredNode.RANGE_FLOAT_PROCESSORS_SUPPORT_VERSION)) { + assertDesiredNodesUpdatedWithRoundedUpFloatsAreIdempotent(); + } else { + assertDesiredNodesWithFloatProcessorsAreRejectedInOlderVersions(); + } + } + + private void assertUpgradedNodesCanReadDesiredNodes() throws Exception { + final int desiredNodesVersion = switch (CLUSTER_TYPE) { + case OLD -> 1; + case MIXED -> FIRST_MIXED_ROUND ? 2 : 3; + case UPGRADED -> 4; + }; + + if (CLUSTER_TYPE != ClusterType.OLD) { + final Map desiredNodes = getLatestDesiredNodes(); + final String historyId = extractValue(desiredNodes, "history_id"); + final int version = extractValue(desiredNodes, "version"); + assertThat(historyId, is(equalTo("upgrade_test"))); + assertThat(version, is(equalTo(desiredNodesVersion - 1))); + } + + addClusterNodesToDesiredNodesWithProcessorsOrProcessorRanges(desiredNodesVersion, ProcessorsPrecision.DOUBLE); + assertAllDesiredNodesAreActualized(); + } + + private void assertDesiredNodesUpdatedWithRoundedUpFloatsAreIdempotent() throws Exception { + // We define the same set of desired nodes to ensure that they are equal across all + // the test runs, otherwise we cannot guarantee an idempotent update in this test + final var desiredNodes = getNodeNames().stream() + .map( + nodeName -> new DesiredNode( + Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeName).build(), + 1238.49922909, + ByteSizeValue.ofGb(32), + ByteSizeValue.ofGb(128), + Version.CURRENT + ) + ) + .toList(); + + final int desiredNodesVersion = switch (CLUSTER_TYPE) { + case OLD -> 1; + case MIXED -> FIRST_MIXED_ROUND ? 2 : 3; + case UPGRADED -> 4; + }; + + if (CLUSTER_TYPE != ClusterType.OLD) { + updateDesiredNodes(desiredNodes, desiredNodesVersion - 1); + } + for (int i = 0; i < 2; i++) { + updateDesiredNodes(desiredNodes, desiredNodesVersion); + } + + final Map latestDesiredNodes = getLatestDesiredNodes(); + final int latestDesiredNodesVersion = extractValue(latestDesiredNodes, "version"); + assertThat(latestDesiredNodesVersion, is(equalTo(desiredNodesVersion))); + + if (CLUSTER_TYPE == ClusterType.UPGRADED) { + assertAllDesiredNodesAreActualized(); + } + } + + private void assertDesiredNodesWithFloatProcessorsAreRejectedInOlderVersions() throws Exception { switch (CLUSTER_TYPE) { case OLD -> addClusterNodesToDesiredNodesWithIntegerProcessors(1); case MIXED -> { int version = FIRST_MIXED_ROUND ? 2 : 3; - if (UPGRADE_FROM_VERSION.onOrAfter(DesiredNode.RANGE_FLOAT_PROCESSORS_SUPPORT_VERSION)) { - addClusterNodesToDesiredNodesWithFloatProcessorsOrProcessorRanges(version); - } else { - // Processor ranges or float processors are forbidden during upgrades: 8.2 -> 8.3 clusters - final var responseException = expectThrows( - ResponseException.class, - () -> addClusterNodesToDesiredNodesWithFloatProcessorsOrProcessorRanges(version) - ); - final var statusCode = responseException.getResponse().getStatusLine().getStatusCode(); - assertThat(statusCode, is(equalTo(400))); - } + // Processor ranges or float processors are forbidden during upgrades: 8.2 -> 8.3 clusters + final var responseException = expectThrows( + ResponseException.class, + () -> addClusterNodesToDesiredNodesWithProcessorsOrProcessorRanges(version, ProcessorsPrecision.FLOAT) + ); + final var statusCode = responseException.getResponse().getStatusLine().getStatusCode(); + assertThat(statusCode, is(equalTo(400))); } case UPGRADED -> { assertAllDesiredNodesAreActualized(); - addClusterNodesToDesiredNodesWithFloatProcessorsOrProcessorRanges(4); + addClusterNodesToDesiredNodesWithProcessorsOrProcessorRanges(4, ProcessorsPrecision.FLOAT); } } + getLatestDesiredNodes(); + } + + private Map getLatestDesiredNodes() throws IOException { final var getDesiredNodesRequest = new Request("GET", "/_internal/desired_nodes/_latest"); final var response = client().performRequest(getDesiredNodesRequest); assertThat(response.getStatusLine().getStatusCode(), is(equalTo(200))); + return responseAsMap(response); } private void assertAllDesiredNodesAreActualized() throws Exception { @@ -77,14 +150,15 @@ private void assertAllDesiredNodesAreActualized() throws Exception { } } - private void addClusterNodesToDesiredNodesWithFloatProcessorsOrProcessorRanges(int version) throws Exception { + private void addClusterNodesToDesiredNodesWithProcessorsOrProcessorRanges(int version, ProcessorsPrecision processorsPrecision) + throws Exception { final List nodes; if (randomBoolean()) { nodes = getNodeNames().stream() .map( nodeName -> new DesiredNode( Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeName).build(), - 0.5f, + processorsPrecision == ProcessorsPrecision.DOUBLE ? randomDoubleProcessorCount() : randomFloatProcessorCount(), ByteSizeValue.ofGb(randomIntBetween(10, 24)), ByteSizeValue.ofGb(randomIntBetween(128, 256)), Version.CURRENT @@ -92,17 +166,18 @@ private void addClusterNodesToDesiredNodesWithFloatProcessorsOrProcessorRanges(i ) .toList(); } else { - nodes = getNodeNames().stream() - .map( - nodeName -> new DesiredNode( - Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeName).build(), - new DesiredNode.ProcessorsRange(randomIntBetween(1, 10), (float) randomIntBetween(20, 30)), - ByteSizeValue.ofGb(randomIntBetween(10, 24)), - ByteSizeValue.ofGb(randomIntBetween(128, 256)), - Version.CURRENT - ) - ) - .toList(); + nodes = getNodeNames().stream().map(nodeName -> { + double minProcessors = processorsPrecision == ProcessorsPrecision.DOUBLE + ? randomDoubleProcessorCount() + : randomFloatProcessorCount(); + return new DesiredNode( + Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeName).build(), + new DesiredNode.ProcessorsRange(minProcessors, minProcessors + randomIntBetween(10, 20)), + ByteSizeValue.ofGb(randomIntBetween(10, 24)), + ByteSizeValue.ofGb(randomIntBetween(128, 256)), + Version.CURRENT + ); + }).toList(); } updateDesiredNodes(nodes, version); } @@ -123,7 +198,7 @@ private void addClusterNodesToDesiredNodesWithIntegerProcessors(int version) thr } private void updateDesiredNodes(List nodes, int version) throws IOException { - final var request = new Request("PUT", "/_internal/desired_nodes/history/" + version); + final var request = new Request("PUT", "/_internal/desired_nodes/upgrade_test/" + version); try (var builder = JsonXContent.contentBuilder()) { builder.startObject(); builder.xContentList(UpdateDesiredNodesRequest.NODES_FIELD.getPreferredName(), nodes); @@ -149,6 +224,14 @@ private List getNodeNames() throws Exception { return nodeNames; } + private double randomDoubleProcessorCount() { + return randomDoubleBetween(0.5, 512.1234, true); + } + + private float randomFloatProcessorCount() { + return randomIntBetween(1, 512) + randomFloat(); + } + @SuppressWarnings("unchecked") private static T extractValue(Map map, String path) { return (T) XContentMapValues.extractValue(path, map); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java index 9ee56e9ba8fa9..20e51727110ab 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java @@ -39,6 +39,7 @@ public class TransportUpdateDesiredNodesAction extends TransportMasterNodeAction { private static final Logger logger = LogManager.getLogger(TransportUpdateDesiredNodesAction.class); + private static final double MAX_DELTA_PROCESSORS = 7E-5; private final DesiredNodesSettingsValidator settingsValidator; private final ClusterStateTaskExecutor taskExecutor; @@ -100,7 +101,9 @@ protected void doExecute(Task task, UpdateDesiredNodesRequest request, ActionLis if (request.isCompatibleWithVersion(minNodeVersion) == false) { listener.onFailure( new IllegalArgumentException( - "Unable to use processor ranges or floating-point processors in mixed-clusters with nodes in version: " + minNodeVersion + "Unable to use processor ranges, floating-point (with greater precision) processors " + + "in mixed-clusters with nodes in version: " + + minNodeVersion ) ); return; @@ -124,7 +127,7 @@ static DesiredNodes updateDesiredNodes(DesiredNodes latestDesiredNodes, UpdateDe ); if (latestDesiredNodes != null) { - if (latestDesiredNodes.equals(proposedDesiredNodes)) { + if (latestDesiredNodes.equalsWithProcessorsCloseTo(proposedDesiredNodes, MAX_DELTA_PROCESSORS)) { return latestDesiredNodes; } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/UpdateDesiredNodesRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/UpdateDesiredNodesRequest.java index 28faab799f8f6..b4d7ffbc3d0d9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/UpdateDesiredNodesRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/UpdateDesiredNodesRequest.java @@ -58,7 +58,11 @@ public UpdateDesiredNodesRequest(StreamInput in) throws IOException { this.historyID = in.readString(); this.version = in.readLong(); this.nodes = in.readList(DesiredNode::readFrom); - dryRun = in.getVersion().onOrAfter(DRY_RUN_VERSION) ? in.readBoolean() : false; + if (in.getVersion().onOrAfter(DRY_RUN_VERSION)) { + this.dryRun = in.readBoolean(); + } else { + this.dryRun = false; + } } @Override @@ -98,6 +102,7 @@ public boolean isCompatibleWithVersion(Version version) { if (version.onOrAfter(DesiredNode.RANGE_FLOAT_PROCESSORS_SUPPORT_VERSION)) { return true; } + return nodes.stream().allMatch(desiredNode -> desiredNode.isCompatibleWithVersion(version)); } diff --git a/server/src/main/java/org/elasticsearch/cluster/desirednodes/DesiredNodesSettingsValidator.java b/server/src/main/java/org/elasticsearch/cluster/desirednodes/DesiredNodesSettingsValidator.java index 1e726ecbb49a5..f777f88416145 100644 --- a/server/src/main/java/org/elasticsearch/cluster/desirednodes/DesiredNodesSettingsValidator.java +++ b/server/src/main/java/org/elasticsearch/cluster/desirednodes/DesiredNodesSettingsValidator.java @@ -92,7 +92,8 @@ private void validate(DesiredNode node) { int minProcessors = node.roundedDownMinProcessors(); Integer roundedUpMaxProcessors = node.roundedUpMaxProcessors(); int maxProcessors = roundedUpMaxProcessors == null ? minProcessors : roundedUpMaxProcessors; - Setting.intSetting(NODE_PROCESSORS_SETTING.getKey(), minProcessors, 1, maxProcessors, Setting.Property.NodeScope).get(settings); + Setting.doubleSetting(NODE_PROCESSORS_SETTING.getKey(), minProcessors, 1, maxProcessors, Setting.Property.NodeScope) + .get(settings); final Settings.Builder updatedSettings = Settings.builder().put(settings); updatedSettings.remove(NODE_PROCESSORS_SETTING.getKey()); settings = updatedSettings.build(); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNode.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNode.java index 1cb1552e5afa3..b76726b1a2a1a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNode.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNode.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.core.Nullable; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ObjectParser; @@ -51,7 +52,7 @@ public final class DesiredNode implements Writeable, ToXContentObject, Comparabl false, (args, name) -> new DesiredNode( (Settings) args[0], - (Float) args[1], + (Processors) args[1], (ProcessorsRange) args[2], (ByteSizeValue) args[3], (ByteSizeValue) args[4], @@ -65,7 +66,12 @@ public final class DesiredNode implements Writeable, ToXContentObject, Comparabl static void configureParser(ConstructingObjectParser parser) { parser.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> Settings.fromXContent(p), SETTINGS_FIELD); - parser.declareFloat(ConstructingObjectParser.optionalConstructorArg(), PROCESSORS_FIELD); + parser.declareField( + ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> Processors.fromXContent(p), + PROCESSORS_FIELD, + ObjectParser.ValueType.DOUBLE + ); parser.declareObjectOrNull( ConstructingObjectParser.optionalConstructorArg(), (p, c) -> ProcessorsRange.fromXContent(p), @@ -100,7 +106,7 @@ private static Version parseVersion(String version) { } private final Settings settings; - private final Float processors; + private final Processors processors; private final ProcessorsRange processorsRange; private final ByteSizeValue memory; private final ByteSizeValue storage; @@ -112,13 +118,13 @@ public DesiredNode(Settings settings, ProcessorsRange processorsRange, ByteSizeV this(settings, null, processorsRange, memory, storage, version); } - public DesiredNode(Settings settings, float processors, ByteSizeValue memory, ByteSizeValue storage, Version version) { - this(settings, processors, null, memory, storage, version); + public DesiredNode(Settings settings, double processors, ByteSizeValue memory, ByteSizeValue storage, Version version) { + this(settings, new Processors(processors), null, memory, storage, version); } DesiredNode( Settings settings, - Float processors, + Processors processors, ProcessorsRange processorsRange, ByteSizeValue memory, ByteSizeValue storage, @@ -147,9 +153,9 @@ public DesiredNode(Settings settings, float processors, ByteSizeValue memory, By ); } - if (processors != null && invalidNumberOfProcessors(processors)) { + if (invalidNumberOfProcessors(processors)) { throw new IllegalArgumentException( - format(Locale.ROOT, "Only a positive number of [processors] are allowed and [%f] was provided", processors) + format(Locale.ROOT, "Only a positive number of [processors] are allowed and [%f] was provided", processors.count()) ); } @@ -171,13 +177,13 @@ public DesiredNode(Settings settings, float processors, ByteSizeValue memory, By public static DesiredNode readFrom(StreamInput in) throws IOException { final var settings = Settings.readSettingsFromStream(in); - final Float processors; + final Processors processors; final ProcessorsRange processorsRange; if (in.getVersion().onOrAfter(RANGE_FLOAT_PROCESSORS_SUPPORT_VERSION)) { - processors = in.readOptionalFloat(); + processors = in.readOptionalWriteable(Processors::readFrom); processorsRange = in.readOptionalWriteable(ProcessorsRange::readFrom); } else { - processors = (float) in.readInt(); + processors = Processors.readFrom(in); processorsRange = null; } final var memory = new ByteSizeValue(in); @@ -190,13 +196,12 @@ public static DesiredNode readFrom(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { settings.writeTo(out); if (out.getVersion().onOrAfter(RANGE_FLOAT_PROCESSORS_SUPPORT_VERSION)) { - out.writeOptionalFloat(processors); + out.writeOptionalWriteable(processors); out.writeOptionalWriteable(processorsRange); } else { assert processorsRange == null; assert processors != null; - assert processorHasDecimals() == false; - out.writeInt((int) (float) processors); + processors.writeTo(out); } memory.writeTo(out); storage.writeTo(out); @@ -238,7 +243,7 @@ public Settings settings() { return settings; } - public float minProcessors() { + public Processors minProcessors() { if (processors != null) { return processors; } @@ -246,10 +251,11 @@ public float minProcessors() { } public int roundedDownMinProcessors() { - return roundDown(minProcessors()); + return minProcessors().roundDown(); } - public Float maxProcessors() { + @Nullable + public Processors maxProcessors() { if (processors != null) { return processors; } @@ -258,19 +264,16 @@ public Float maxProcessors() { } public Integer roundedUpMaxProcessors() { - if (maxProcessors() == null) { + final Processors maxProcessors = maxProcessors(); + if (maxProcessors == null) { return null; } - return roundUp(maxProcessors()); - } - - private boolean processorHasDecimals() { - return processors != null && ((int) (float) processors) != Math.ceil(processors); + return maxProcessors.roundUp(); } @Nullable - Float processors() { + Processors processors() { return processors; } @@ -303,7 +306,8 @@ public boolean isCompatibleWithVersion(Version version) { if (version.onOrAfter(RANGE_FLOAT_PROCESSORS_SUPPORT_VERSION)) { return true; } - return processorsRange == null && processorHasDecimals() == false; + + return processorsRange == null && processors.isCompatibleWithVersion(version); } @Override @@ -311,9 +315,13 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; DesiredNode that = (DesiredNode) o; - return Objects.equals(settings, that.settings) - && Objects.equals(processors, that.processors) + return equalsWithoutProcessorsSpecification(that) && Objects.equals(processorsRange, that.processorsRange) + && Objects.equals(processors, that.processors); + } + + private boolean equalsWithoutProcessorsSpecification(DesiredNode that) { + return Objects.equals(settings, that.settings) && Objects.equals(memory, that.memory) && Objects.equals(storage, that.storage) && Objects.equals(version, that.version) @@ -321,6 +329,26 @@ public boolean equals(Object o) { && Objects.equals(roles, that.roles); } + public boolean equalsWithProcessorsCloseTo(DesiredNode that, double maxError) { + return equalsWithoutProcessorsSpecification(that) + && processorsEqualsOrCloseTo(processors, that.processors, maxError) + && ProcessorsRange.equalsOrCloseTo(processorsRange, that.processorsRange, maxError); + } + + static boolean processorsEqualsOrCloseTo(Processors a, Processors b, double maxError) { + return (a == b) || (a != null && (a.equals(b) || processorsAsFloatCloseTo(a, b, maxError))); + } + + private static boolean processorsAsFloatCloseTo(Processors a, Processors b, double maxError) { + if (b == null) { + return false; + } + + float floatCount = (float) a.count(); + float otherFloatCount = (float) b.count(); + return Float.isFinite(floatCount) && Float.isFinite(otherFloatCount) && (Math.abs(floatCount - otherFloatCount) < maxError); + } + @Override public int hashCode() { return Objects.hash(settings, processors, processorsRange, memory, storage, version, externalId, roles); @@ -354,42 +382,42 @@ public String toString() { + '}'; } - private static boolean invalidNumberOfProcessors(float processors) { - return processors <= 0 || Float.isInfinite(processors) || Float.isNaN(processors); - } - - private static int roundUp(float value) { - return (int) Math.ceil(value); - } - - private static int roundDown(float value) { - return Math.max(1, (int) Math.floor(value)); + private static boolean invalidNumberOfProcessors(Processors processors) { + return processors != null && processors.count() <= 0; } - public record ProcessorsRange(float min, Float max) implements Writeable, ToXContentObject { + public record ProcessorsRange(Processors min, @Nullable Processors max) implements Writeable, ToXContentObject { private static final ParseField MIN_FIELD = new ParseField("min"); private static final ParseField MAX_FIELD = new ParseField("max"); - public static final ConstructingObjectParser PROCESSORS_PARSER = new ConstructingObjectParser<>( - "processors", + public static final ConstructingObjectParser PROCESSORS_RANGE_PARSER = new ConstructingObjectParser<>( + "processors_range", false, - (args, name) -> new ProcessorsRange((float) args[0], (Float) args[1]) + (args, name) -> new ProcessorsRange((Processors) args[0], (Processors) args[1]) ); static { - PROCESSORS_PARSER.declareFloat(ConstructingObjectParser.constructorArg(), MIN_FIELD); - PROCESSORS_PARSER.declareFloat(ConstructingObjectParser.optionalConstructorArg(), MAX_FIELD); + PROCESSORS_RANGE_PARSER.declareField( + ConstructingObjectParser.constructorArg(), + (p, c) -> Processors.fromXContent(p), + MIN_FIELD, + ObjectParser.ValueType.DOUBLE + ); + PROCESSORS_RANGE_PARSER.declareField( + ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> Processors.fromXContent(p), + MAX_FIELD, + ObjectParser.ValueType.DOUBLE + ); } static ProcessorsRange fromXContent(XContentParser parser) throws IOException { - if (parser.currentToken() == XContentParser.Token.START_OBJECT) { - return PROCESSORS_PARSER.parse(parser, null); - } else { - // For BWC with nodes pre 8.3 - float processors = parser.floatValue(); - return new ProcessorsRange(processors, processors); - } + return PROCESSORS_RANGE_PARSER.parse(parser, null); + } + + public ProcessorsRange(double min, Double max) { + this(Processors.of(min), Processors.of(max)); } public ProcessorsRange { @@ -404,7 +432,7 @@ static ProcessorsRange fromXContent(XContentParser parser) throws IOException { ); } - if (max != null && invalidNumberOfProcessors(max)) { + if (invalidNumberOfProcessors(max)) { throw new IllegalArgumentException( format( Locale.ROOT, @@ -415,22 +443,21 @@ static ProcessorsRange fromXContent(XContentParser parser) throws IOException { ); } - if (max != null && min > max) { + if (max != null && min.compareTo(max) > 0) { throw new IllegalArgumentException( "min processors must be less than or equal to max processors and it was: min: " + min + " max: " + max ); } } - @Nullable private static ProcessorsRange readFrom(StreamInput in) throws IOException { - return new ProcessorsRange(in.readFloat(), in.readOptionalFloat()); + return new ProcessorsRange(Processors.readFrom(in), in.readOptionalWriteable(Processors::readFrom)); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeFloat(min); - out.writeOptionalFloat(max); + min.writeTo(out); + out.writeOptionalWriteable(max); } @Override @@ -443,5 +470,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.endObject(); return builder; } + + static boolean equalsOrCloseTo(ProcessorsRange a, ProcessorsRange b, double maxError) { + return (a == b) || (a != null && a.equalsOrCloseTo(b, maxError)); + } + + boolean equalsOrCloseTo(ProcessorsRange that, double maxError) { + return that != null + && (equals(that) + || (processorsEqualsOrCloseTo(min, that.min, maxError) && processorsEqualsOrCloseTo(max, that.max, maxError))); + } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNodeWithStatus.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNodeWithStatus.java index 36dc586a1282c..94a4694b61d9f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNodeWithStatus.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNodeWithStatus.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.ParseField; @@ -38,7 +39,7 @@ public record DesiredNodeWithStatus(DesiredNode desiredNode, Status status) (args, unused) -> new DesiredNodeWithStatus( new DesiredNode( (Settings) args[0], - (Float) args[1], + (Processors) args[1], (DesiredNode.ProcessorsRange) args[2], (ByteSizeValue) args[3], (ByteSizeValue) args[4], @@ -121,6 +122,10 @@ public int compareTo(DesiredNodeWithStatus o) { return desiredNode.compareTo(o.desiredNode); } + public boolean equalsWithProcessorsCloseTo(DesiredNodeWithStatus other, double maxError) { + return other != null && status == other.status && desiredNode.equalsWithProcessorsCloseTo(other.desiredNode, maxError); + } + public enum Status { PENDING((short) 0), ACTUALIZED((short) 1); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNodes.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNodes.java index 83e69c6e4ff95..e42731522a115 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNodes.java @@ -249,6 +249,29 @@ private static void checkForDuplicatedExternalIDs(List no } } + public boolean equalsWithProcessorsCloseTo(DesiredNodes that, double maxError) { + return that != null + && version == that.version + && Objects.equals(historyID, that.historyID) + && equalsNodesWithProcessorsCloseTo(that, maxError); + } + + public boolean equalsNodesWithProcessorsCloseTo(DesiredNodes that, double maxError) { + if (that == null || nodes.size() != that.nodes.size()) { + return false; + } + + for (Map.Entry desiredNodeEntry : nodes.entrySet()) { + final DesiredNodeWithStatus desiredNodeWithStatus = desiredNodeEntry.getValue(); + final DesiredNodeWithStatus otherDesiredNodeWithStatus = that.nodes.get(desiredNodeEntry.getKey()); + if (desiredNodeWithStatus.equalsWithProcessorsCloseTo(otherDesiredNodeWithStatus, maxError) == false) { + return false; + } + } + + return true; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/server/src/main/java/org/elasticsearch/common/unit/Processors.java b/server/src/main/java/org/elasticsearch/common/unit/Processors.java new file mode 100644 index 0000000000000..be04ddf50765a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/unit/Processors.java @@ -0,0 +1,175 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.unit; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.xcontent.ToXContentFragment; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.Locale; +import java.util.Objects; + +import static java.lang.String.format; + +public class Processors implements Writeable, Comparable, ToXContentFragment { + public static final Processors ZERO = new Processors(0.0); + public static final Processors MAX_PROCESSORS = new Processors(Double.MAX_VALUE); + + public static final Version FLOAT_PROCESSORS_SUPPORT_VERSION = Version.V_8_3_0; + public static final Version DOUBLE_PROCESSORS_SUPPORT_VERSION = Version.V_8_5_0; + static final int NUMBER_OF_DECIMAL_PLACES = 5; + private static final double MIN_REPRESENTABLE_PROCESSORS = 1E-5; + + private final double count; + + public Processors(double count) { + if (validNumberOfProcessors(count) == false) { + throw new IllegalArgumentException("processors must be a non-negative number; provided [" + count + "]"); + } + + // Avoid rounding up to MIN_REPRESENTABLE_PROCESSORS when 0 processors are used + if (count == 0.0) { + this.count = count; + } else { + this.count = Math.max( + MIN_REPRESENTABLE_PROCESSORS, + new BigDecimal(count).setScale(NUMBER_OF_DECIMAL_PLACES, RoundingMode.HALF_UP).doubleValue() + ); + } + } + + @Nullable + public static Processors of(Double count) { + if (count == null) { + return null; + } + return new Processors(count); + } + + public static Processors readFrom(StreamInput in) throws IOException { + final double processorCount; + if (in.getVersion().before(FLOAT_PROCESSORS_SUPPORT_VERSION)) { + processorCount = in.readInt(); + } else if (in.getVersion().before(DOUBLE_PROCESSORS_SUPPORT_VERSION)) { + processorCount = in.readFloat(); + } else { + processorCount = in.readDouble(); + } + return new Processors(processorCount); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + if (out.getVersion().before(FLOAT_PROCESSORS_SUPPORT_VERSION)) { + assert hasDecimals() == false; + out.writeInt((int) count); + } else if (out.getVersion().before(DOUBLE_PROCESSORS_SUPPORT_VERSION)) { + out.writeFloat((float) count); + } else { + out.writeDouble(count); + } + } + + @Nullable + public static Processors fromXContent(XContentParser parser) throws IOException { + final double count = parser.doubleValue(); + if (validNumberOfProcessors(count) == false) { + throw new IllegalArgumentException( + format(Locale.ROOT, "Only a positive number of [%s] are allowed and [%f] was provided", parser.currentName(), count) + ); + } + return new Processors(count); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.value(count); + } + + public Processors plus(Processors other) { + final double newProcessorCount = count + other.count; + if (Double.isFinite(newProcessorCount) == false) { + throw new ArithmeticException("Unable to add [" + this + "] and [" + other + "] the resulting value overflows"); + } + + return new Processors(newProcessorCount); + } + + public Processors multiply(int value) { + if (value <= 0) { + throw new IllegalArgumentException("Processors cannot be multiplied by a negative number"); + } + + final double newProcessorCount = count * value; + if (Double.isFinite(newProcessorCount) == false) { + throw new ArithmeticException("Unable to multiply [" + this + "] by [" + value + "] the resulting value overflows"); + } + + return new Processors(newProcessorCount); + } + + public double count() { + return count; + } + + public int roundUp() { + return (int) Math.ceil(count); + } + + public int roundDown() { + return Math.max(1, (int) Math.floor(count)); + } + + private static boolean validNumberOfProcessors(double processors) { + return Double.isFinite(processors) && processors >= 0.0; + } + + private boolean hasDecimals() { + return ((int) count) != Math.ceil(count); + } + + public boolean isCompatibleWithVersion(Version version) { + if (version.onOrAfter(FLOAT_PROCESSORS_SUPPORT_VERSION)) { + return true; + } + + return hasDecimals() == false; + } + + @Override + public int compareTo(Processors o) { + return Double.compare(count, o.count); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Processors that = (Processors) o; + return Double.compare(that.count, count) == 0; + } + + @Override + public int hashCode() { + return Objects.hash(count); + } + + @Override + public String toString() { + return Double.toString(count); + } +} diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index dd696f7b6b4f1..2331d0477851b 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.node.Node; @@ -40,7 +41,7 @@ public class EsExecutors { * available to Elasticsearch (e.g., because of CPU limits). Note that this setting accepts floating point processors. * If a rounded number is needed, always use {@link EsExecutors#allocatedProcessors(Settings)}. */ - public static final Setting NODE_PROCESSORS_SETTING = new Setting<>( + public static final Setting NODE_PROCESSORS_SETTING = new Setting<>( "node.processors", Double.toString(Runtime.getRuntime().availableProcessors()), textValue -> { @@ -60,7 +61,7 @@ public class EsExecutors { String err = "Failed to parse value [" + textValue + "] for setting [node.processors] must be <= " + maxNumberOfProcessors; throw new IllegalArgumentException(err); } - return numberOfProcessors; + return new Processors(numberOfProcessors); }, Property.NodeScope ); @@ -73,7 +74,7 @@ public class EsExecutors { * @return the number of allocated processors */ public static int allocatedProcessors(final Settings settings) { - return (int) Math.ceil(NODE_PROCESSORS_SETTING.get(settings)); + return NODE_PROCESSORS_SETTING.get(settings).roundUp(); } public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing( diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/desirednodes/UpdateDesiredNodesRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/desirednodes/UpdateDesiredNodesRequestTests.java index b387779c629fc..b9d2edb108683 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/desirednodes/UpdateDesiredNodesRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/desirednodes/UpdateDesiredNodesRequestTests.java @@ -50,7 +50,7 @@ private DesiredNode hotDesiredNode() { } else { return new DesiredNode( settings, - new DesiredNode.ProcessorsRange(1, randomBoolean() ? null : (float) 1), + new DesiredNode.ProcessorsRange(1, randomBoolean() ? null : (double) 1), ByteSizeValue.ofGb(1), ByteSizeValue.ofGb(1), Version.CURRENT diff --git a/server/src/test/java/org/elasticsearch/cluster/desirednodes/DesiredNodesSettingsValidatorTests.java b/server/src/test/java/org/elasticsearch/cluster/desirednodes/DesiredNodesSettingsValidatorTests.java index f8590cf16e4a1..5961da02a1272 100644 --- a/server/src/test/java/org/elasticsearch/cluster/desirednodes/DesiredNodesSettingsValidatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/desirednodes/DesiredNodesSettingsValidatorTests.java @@ -120,7 +120,7 @@ public void testNodeProcessorsValidation() { int desiredNodeProcessors = 128; Settings nodeSettings = Settings.builder() .put(NODE_EXTERNAL_ID_SETTING.getKey(), randomAlphaOfLength(10)) - .put(NODE_PROCESSORS_SETTING.getKey(), desiredNodeProcessors + 1) + .put(NODE_PROCESSORS_SETTING.getKey(), desiredNodeProcessors + 1.1) .build(); final List desiredNodes = List.of( new DesiredNode(nodeSettings, desiredNodeProcessors, ByteSizeValue.ofGb(1), ByteSizeValue.ofGb(1), Version.CURRENT) @@ -132,7 +132,7 @@ public void testNodeProcessorsValidation() { assertThat(exception.getSuppressed().length > 0, is(equalTo(true))); assertThat( exception.getSuppressed()[0].getMessage(), - containsString("Failed to parse value [129] for setting [node.processors] must be <= 128") + containsString("Failed to parse value [129.1] for setting [node.processors] must be <= 128.0") ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeSerializationTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeSerializationTests.java index bd10abf7cb6ca..b70754887bebc 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeSerializationTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.xcontent.XContentParser; @@ -50,7 +51,8 @@ public static DesiredNode mutateDesiredNode(DesiredNode instance) { ); case 1 -> new DesiredNode( instance.settings(), - randomValueOtherThan(instance.processors(), () -> randomFloat() + randomIntBetween(1, 128)), + randomValueOtherThan(instance.processors(), () -> new Processors(randomDouble() + randomIntBetween(1, 128))), + null, instance.memory(), instance.storage(), instance.version() diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeTests.java index 508fd8f953f82..98e389002fc82 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.test.ESTestCase; import static org.elasticsearch.cluster.node.DiscoveryNodeRole.VOTING_ONLY_NODE_ROLE; @@ -22,9 +23,11 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; public class DesiredNodeTests extends ESTestCase { + public static final double MAX_ERROR = 7E-5; public void testExternalIdIsRequired() { final Settings.Builder settings = Settings.builder(); @@ -58,7 +61,7 @@ public void testNumberOfProcessorsValidation() { expectThrows( IllegalArgumentException.class, - () -> new DesiredNode(settings, randomInvalidFloatProcessor(), ByteSizeValue.ofGb(1), ByteSizeValue.ofGb(1), Version.CURRENT) + () -> new DesiredNode(settings, randomInvalidProcessor(), ByteSizeValue.ofGb(1), ByteSizeValue.ofGb(1), Version.CURRENT) ); // Processor ranges @@ -66,7 +69,7 @@ public void testNumberOfProcessorsValidation() { IllegalArgumentException.class, () -> new DesiredNode( settings, - new DesiredNode.ProcessorsRange(randomInvalidFloatProcessor(), randomFrom(random(), null, 1.0f)), + new DesiredNode.ProcessorsRange(randomInvalidProcessor(), randomFrom(random(), null, 1.0)), ByteSizeValue.ofGb(1), ByteSizeValue.ofGb(1), Version.CURRENT @@ -76,7 +79,7 @@ public void testNumberOfProcessorsValidation() { IllegalArgumentException.class, () -> new DesiredNode( settings, - new DesiredNode.ProcessorsRange(randomFloat() + 0.1f, randomInvalidFloatProcessor()), + new DesiredNode.ProcessorsRange(randomDouble() + 0.1, randomInvalidProcessor()), ByteSizeValue.ofGb(1), ByteSizeValue.ofGb(1), Version.CURRENT @@ -86,15 +89,15 @@ public void testNumberOfProcessorsValidation() { IllegalArgumentException.class, () -> new DesiredNode( settings, - new DesiredNode.ProcessorsRange(randomInvalidFloatProcessor(), randomInvalidFloatProcessor()), + new DesiredNode.ProcessorsRange(randomInvalidProcessor(), randomInvalidProcessor()), ByteSizeValue.ofGb(1), ByteSizeValue.ofGb(1), Version.CURRENT ) ); - final var lowerBound = randomFloatBetween(0.1f, 10); - final var upperBound = randomFloatBetween(0.01f, lowerBound - Math.ulp(lowerBound)); + final var lowerBound = randomDoubleBetween(0.1, 10, true); + final var upperBound = randomDoubleBetween(0.01, lowerBound - Math.ulp(lowerBound), true); expectThrows( IllegalArgumentException.class, () -> new DesiredNode( @@ -160,33 +163,33 @@ public void testNodeCPUsRoundUp() { { final var desiredNode = new DesiredNode( settings, - new DesiredNode.ProcessorsRange((float) 0.4, (float) 1.2), + new DesiredNode.ProcessorsRange(0.4, 1.2), ByteSizeValue.ofGb(1), ByteSizeValue.ofGb(1), Version.CURRENT ); - assertThat(desiredNode.minProcessors(), is(equalTo((float) 0.4))); + assertThat(desiredNode.minProcessors().count(), is(equalTo(0.4))); assertThat(desiredNode.roundedDownMinProcessors(), is(equalTo(1))); - assertThat(desiredNode.maxProcessors(), is(equalTo((float) 1.2))); + assertThat(desiredNode.maxProcessors().count(), is(equalTo(1.2))); assertThat(desiredNode.roundedUpMaxProcessors(), is(equalTo(2))); } { - final var desiredNode = new DesiredNode(settings, 1.2f, ByteSizeValue.ofGb(1), ByteSizeValue.ofGb(1), Version.CURRENT); + final var desiredNode = new DesiredNode(settings, 1.2, ByteSizeValue.ofGb(1), ByteSizeValue.ofGb(1), Version.CURRENT); - assertThat(desiredNode.minProcessors(), is(equalTo((float) 1.2))); + assertThat(desiredNode.minProcessors().count(), is(equalTo(1.2))); assertThat(desiredNode.roundedDownMinProcessors(), is(equalTo(1))); - assertThat(desiredNode.maxProcessors(), is(equalTo((float) 1.2))); + assertThat(desiredNode.maxProcessors().count(), is(equalTo(1.2))); assertThat(desiredNode.roundedUpMaxProcessors(), is(equalTo(2))); } { final var desiredNode = new DesiredNode(settings, 1024, ByteSizeValue.ofGb(1), ByteSizeValue.ofGb(1), Version.CURRENT); - assertThat(desiredNode.minProcessors(), is(equalTo((float) 1024))); + assertThat(desiredNode.minProcessors().count(), is(equalTo(1024.0))); assertThat(desiredNode.roundedDownMinProcessors(), is(equalTo(1024))); - assertThat(desiredNode.maxProcessors(), is(equalTo((float) 1024))); + assertThat(desiredNode.maxProcessors().count(), is(equalTo(1024.0))); assertThat(desiredNode.roundedUpMaxProcessors(), is(equalTo(1024))); } } @@ -197,7 +200,7 @@ public void testDesiredNodeIsCompatible() { { final var desiredNode = new DesiredNode( settings, - new DesiredNode.ProcessorsRange((float) 0.4, (float) 1.2), + new DesiredNode.ProcessorsRange(0.4, 1.2), ByteSizeValue.ofGb(1), ByteSizeValue.ofGb(1), Version.CURRENT @@ -209,7 +212,7 @@ public void testDesiredNodeIsCompatible() { { final var desiredNode = new DesiredNode( settings, - randomIntBetween(0, 10) + randomFloat(), + randomIntBetween(0, 10) + randomDouble(), ByteSizeValue.ofGb(1), ByteSizeValue.ofGb(1), Version.CURRENT @@ -225,16 +228,81 @@ public void testDesiredNodeIsCompatible() { } } - private Float randomInvalidFloatProcessor() { - return randomFrom(0.0f, -1.0f, Float.NaN, Float.POSITIVE_INFINITY, Float.NEGATIVE_INFINITY); + public void testFloatProcessorsConvertedToDoubleAreCloseToEqual() { + final double processorCount = randomNumberOfProcessors(); + final float processorCountAsFloat = (float) processorCount; + final Processors bwcProcessors = new Processors(processorCountAsFloat); + final Processors doubleProcessor = new Processors(processorCount); + assertThat(DesiredNode.processorsEqualsOrCloseTo(bwcProcessors, doubleProcessor, MAX_ERROR), is(true)); } - private float randomFloatBetween(float start, float end) { - float result = 0.0f; - while (result < start || result > end || Float.isNaN(result)) { - result = start + randomFloat() * (end - start); + public void testProcessorsAreConsideredDifferentIfTheDifferenceIsGreaterThanMaxError() { + // Ensure that (processorCount - MAX_ERROR) is at least the smallest representable processor + final double processorCount = Math.max(Math.ulp(0.0) + MAX_ERROR, randomNumberOfProcessors()); + final Processors processorsA = new Processors(processorCount + MAX_ERROR); + final Processors processorsB = new Processors(processorCount - MAX_ERROR); + assertThat(DesiredNode.processorsEqualsOrCloseTo(processorsA, processorsB, MAX_ERROR), is(false)); + assertThat(processorsA.equals(processorsB), is(false)); + } + + public void testRoundedProcessorsToFloatAreCloseToEqual() { + double processorCount = randomNumberOfProcessors(); + final Processors doubleProcessor = new Processors(processorCount); + final Processors floatProcessor = new Processors((float) doubleProcessor.count()); + assertThat(DesiredNode.processorsEqualsOrCloseTo(doubleProcessor, floatProcessor, MAX_ERROR), is(true)); + } + + public void testEqualsOrProcessorsCloseTo() { + final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), randomAlphaOfLength(10)).build(); + + final double processorCount = randomNumberOfProcessors(); + final boolean shouldBeConsideredEqual = randomBoolean(); + final double maxDifferenceBetweenProcessorCounts = shouldBeConsideredEqual ? MAX_ERROR / 2 : MAX_ERROR * 2; + final ByteSizeValue memory = ByteSizeValue.ofGb(randomIntBetween(1, 32)); + final ByteSizeValue storage = ByteSizeValue.ofGb(randomIntBetween(128, 256)); + + final DesiredNode desiredNode1; + final DesiredNode desiredNode2; + if (randomBoolean()) { + desiredNode1 = new DesiredNode( + settings, + processorCount + maxDifferenceBetweenProcessorCounts, + memory, + storage, + Version.CURRENT + ); + desiredNode2 = new DesiredNode(settings, processorCount, memory, storage, Version.CURRENT); + } else { + final Double maxProcessors = randomBoolean() ? processorCount + randomIntBetween(1, 10) : null; + + final Double maxProcessorsDesiredNode1; + if (maxProcessors != null && randomBoolean()) { + maxProcessorsDesiredNode1 = maxProcessors + maxDifferenceBetweenProcessorCounts; + } else { + maxProcessorsDesiredNode1 = maxProcessors; + } + + final DesiredNode.ProcessorsRange processorsRange1 = new DesiredNode.ProcessorsRange( + processorCount + maxDifferenceBetweenProcessorCounts, + maxProcessorsDesiredNode1 + ); + + final DesiredNode.ProcessorsRange processorsRange2 = new DesiredNode.ProcessorsRange(processorCount, maxProcessors); + + desiredNode1 = new DesiredNode(settings, processorsRange1, memory, storage, Version.CURRENT); + desiredNode2 = new DesiredNode(settings, processorsRange2, memory, storage, Version.CURRENT); } - return result; + assertThat(desiredNode1.equalsWithProcessorsCloseTo(desiredNode2, MAX_ERROR), is(shouldBeConsideredEqual)); + assertThat(desiredNode1, is(not(equalTo(desiredNode2)))); + } + + private double randomNumberOfProcessors() { + return randomDoubleBetween(Math.ulp(0.0), 512.99999999, true); + } + + private Double randomInvalidProcessor() { + // 1E-7 is rounded to 0 since we only consider up to 5 decimal places + return randomFrom(0.0, -1.0, Double.NaN, Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodesTestCase.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodesTestCase.java index ac4136a211119..532093a8cf0fb 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodesTestCase.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodesTestCase.java @@ -56,11 +56,11 @@ public static DesiredNode randomDesiredNode(Version version, Settings settings) if (randomBoolean()) { return randomDesiredNode(version, settings, randomProcessorRange()); } else { - return randomDesiredNode(version, settings, randomIntBetween(1, 256) + randomFloat()); + return randomDesiredNode(version, settings, randomNumberOfProcessors()); } } - public static DesiredNode randomDesiredNode(Version version, Settings settings, float processors) { + public static DesiredNode randomDesiredNode(Version version, Settings settings, double processors) { return new DesiredNode( addExternalIdIfMissing(settings), processors, @@ -81,7 +81,7 @@ public static DesiredNode randomDesiredNode(Version version, Settings settings, } public static DesiredNode.ProcessorsRange randomProcessorRange() { - float minProcessors = randomFloat() + randomIntBetween(1, 16); + double minProcessors = randomNumberOfProcessors(); return new DesiredNode.ProcessorsRange(minProcessors, randomBoolean() ? null : minProcessors + randomIntBetween(0, 10)); } @@ -184,4 +184,8 @@ public static DiscoveryNode newDiscoveryNode(String nodeName) { Version.CURRENT ); } + + public static double randomNumberOfProcessors() { + return randomDoubleBetween(Math.ulp(0.0), 512.99999999, true); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodesTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodesTests.java index e9a78e932c888..41c2e7674bce9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodesTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodesTests.java @@ -187,7 +187,7 @@ private DesiredNodeWithStatus createPendingDesiredNode() { private DesiredNode desiredNodeWithDifferentSpecsAndSameExternalId(DesiredNode desiredNode) { return new DesiredNode( desiredNode.settings(), - desiredNode.minProcessors() + randomIntBetween(1, 10), + desiredNode.minProcessors().count() + randomIntBetween(1, 10), ByteSizeValue.ofGb(desiredNode.memory().getGb() + randomIntBetween(15, 20)), ByteSizeValue.ofGb(desiredNode.storage().getGb() + randomIntBetween(1, 100)), Version.CURRENT diff --git a/server/src/test/java/org/elasticsearch/common/unit/ProcessorsSerializationTests.java b/server/src/test/java/org/elasticsearch/common/unit/ProcessorsSerializationTests.java new file mode 100644 index 0000000000000..267c8f92ed9ea --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/unit/ProcessorsSerializationTests.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.unit; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; + +public class ProcessorsSerializationTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return Processors::readFrom; + } + + @Override + protected Processors createTestInstance() { + return new Processors(randomDoubleBetween(Math.ulp(0.0), 512.99999999, true)); + } + + @Override + protected Processors mutateInstance(Processors instance) throws IOException { + return new Processors(instance.count() + randomDoubleBetween(0.01, 1, true)); + } +} diff --git a/server/src/test/java/org/elasticsearch/common/unit/ProcessorsTests.java b/server/src/test/java/org/elasticsearch/common/unit/ProcessorsTests.java new file mode 100644 index 0000000000000..716e9c8ddadfb --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/unit/ProcessorsTests.java @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.unit; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class ProcessorsTests extends ESTestCase { + public void testTruncatesAfterFiveDecimalPlaces() { + final double processorCount = randomNumberOfProcessors(); + + final Processors processors = new Processors(processorCount); + final String processorsString = Double.toString(processors.count()); + final int decimalPlaces = processorsString.length() - processorsString.indexOf(".") - 1; + assertThat(decimalPlaces, is(lessThanOrEqualTo(Processors.NUMBER_OF_DECIMAL_PLACES))); + } + + public void testRounding() { + { + final Processors processors = new Processors(1.2); + assertThat(processors.roundDown(), is(equalTo(1))); + assertThat(processors.roundUp(), is(equalTo(2))); + } + + { + final Processors processors = new Processors(0.1); + assertThat(processors.roundDown(), is(equalTo(1))); + assertThat(processors.roundUp(), is(equalTo(1))); + } + + { + final Processors processors = new Processors(1E-12); + assertThat(processors.roundDown(), is(equalTo(1))); + assertThat(processors.roundUp(), is(equalTo(1))); + } + } + + public void testNeverRoundsDownToZero() { + final Processors processors = new Processors(1E-12); + assertThat(processors.count(), is(greaterThan(0.0))); + } + + public void testValidation() { + expectThrows(IllegalArgumentException.class, () -> new Processors(-1.0)); + expectThrows(IllegalArgumentException.class, () -> new Processors(Double.POSITIVE_INFINITY)); + expectThrows(IllegalArgumentException.class, () -> new Processors(Double.NEGATIVE_INFINITY)); + expectThrows(IllegalArgumentException.class, () -> new Processors(Double.NaN)); + } + + public void testAddition() { + final Processors processorsA = new Processors(randomNumberOfProcessors()); + final Processors processorsB = new Processors(randomNumberOfProcessors()); + + final Processors addedProcessors = processorsA.plus(processorsB); + + assertThat(addedProcessors, is(greaterThan(processorsA))); + assertThat(addedProcessors, is(greaterThan(processorsB))); + } + + public void testOverflowAddition() { + final Processors processorsA = new Processors(Double.MAX_VALUE); + final Processors processorsB = new Processors(Double.MAX_VALUE); + + expectThrows(ArithmeticException.class, () -> processorsA.plus(processorsB)); + } + + public void testMultiplication() { + final Processors processors = new Processors(randomNumberOfProcessors()); + final Processors multipliedProcessors = processors.multiply(100); + + assertThat(multipliedProcessors, is(greaterThan(processors))); + } + + private double randomNumberOfProcessors() { + return randomDoubleBetween(Math.ulp(0.0), 512.99999999, true); + } +} diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index 15ac647a2a256..83df12d47e7bc 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matcher; @@ -438,7 +439,7 @@ public void testGetTasks() throws InterruptedException { } public void testNodeProcessorsBound() { - final Setting processorsSetting = EsExecutors.NODE_PROCESSORS_SETTING; + final Setting processorsSetting = EsExecutors.NODE_PROCESSORS_SETTING; final int available = Runtime.getRuntime().availableProcessors(); final double processors = randomDoubleBetween(available + Math.ulp(available), Float.MAX_VALUE, true); final Settings settings = Settings.builder().put(processorsSetting.getKey(), processors).build(); @@ -478,7 +479,7 @@ public void testNodeProcessorsIsRoundedUpWhenUsingFloats() { } public void testNodeProcessorsFloatValidation() { - final Setting processorsSetting = EsExecutors.NODE_PROCESSORS_SETTING; + final Setting processorsSetting = EsExecutors.NODE_PROCESSORS_SETTING; { final Settings settings = Settings.builder().put(processorsSetting.getKey(), 0.0).build(); diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityService.java index 600d0a78e1f29..4e6106d348fb2 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityService.java @@ -332,7 +332,7 @@ private AutoscalingCapacity.AutoscalingResources resourcesFor(DiscoveryNode node return new AutoscalingCapacity.AutoscalingResources( storage == -1 ? ByteSizeValue.ZERO : new ByteSizeValue(storage), memoryAndProcessors.map(AutoscalingNodeInfo::memory).map(ByteSizeValue::new).orElse(ByteSizeValue.ZERO), - memoryAndProcessors.map(AutoscalingNodeInfo::processors).orElse(0f) + memoryAndProcessors.map(AutoscalingNodeInfo::processors).orElse(null) ); } diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCapacity.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCapacity.java index 4229df4e2155d..7681abc841a93 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCapacity.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCapacity.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.core.Nullable; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; @@ -31,17 +32,14 @@ public class AutoscalingCapacity implements ToXContent, Writeable { public static class AutoscalingResources implements ToXContent, Writeable { private final ByteSizeValue storage; private final ByteSizeValue memory; - private final Float processors; + private final Processors processors; - public static final AutoscalingResources ZERO = new AutoscalingResources(ByteSizeValue.ZERO, ByteSizeValue.ZERO, 0.0f); + public static final AutoscalingResources ZERO = new AutoscalingResources(ByteSizeValue.ZERO, ByteSizeValue.ZERO, Processors.ZERO); - public AutoscalingResources(ByteSizeValue storage, ByteSizeValue memory, Float processors) { + public AutoscalingResources(ByteSizeValue storage, ByteSizeValue memory, Processors processors) { assert storage != null || memory != null || processors != null; this.storage = storage; this.memory = memory; - if (processors != null && processors < 0.0f) { - throw new IllegalArgumentException("[processors] must be a non-negative number; provided [" + processors + "]"); - } this.processors = processors; } @@ -49,7 +47,7 @@ public AutoscalingResources(StreamInput in) throws IOException { this.storage = in.readOptionalWriteable(ByteSizeValue::new); this.memory = in.readOptionalWriteable(ByteSizeValue::new); if (in.getVersion().onOrAfter(Version.V_8_4_0)) { - this.processors = in.readOptionalFloat(); + this.processors = in.readOptionalWriteable(Processors::readFrom); } else { this.processors = null; } @@ -66,7 +64,7 @@ public ByteSizeValue memory() { } @Nullable - public Float processors() { + public Processors processors() { return processors; } @@ -96,7 +94,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(storage); out.writeOptionalWriteable(memory); if (out.getVersion().onOrAfter(Version.V_8_4_0)) { - out.writeOptionalFloat(processors); + out.writeOptionalWriteable(processors); } } @@ -152,7 +150,7 @@ private static ByteSizeValue add(ByteSizeValue v1, ByteSizeValue v2) { return new ByteSizeValue(v1.getBytes() + v2.getBytes()); } - private static Float max(Float v1, Float v2) { + private static Processors max(Processors v1, Processors v2) { if (v1 == null) { return v2; } @@ -163,7 +161,7 @@ private static Float max(Float v1, Float v2) { return v1.compareTo(v2) < 0 ? v2 : v1; } - private static Float add(Float v1, Float v2) { + private static Processors add(Processors v1, Processors v2) { if (v1 == null) { return v2; } @@ -171,7 +169,7 @@ private static Float add(Float v1, Float v2) { return v1; } - return v1 + v2; + return v1.plus(v2); } @Override @@ -286,11 +284,11 @@ public Builder capacity(AutoscalingCapacity capacity) { return this; } - public Builder total(Long storage, Long memory, Float processors) { - return total(byteSizeValue(storage), byteSizeValue(memory), processors); + public Builder total(Long storage, Long memory, Double processors) { + return total(byteSizeValue(storage), byteSizeValue(memory), Processors.of(processors)); } - public Builder total(ByteSizeValue storage, ByteSizeValue memory, Float processors) { + public Builder total(ByteSizeValue storage, ByteSizeValue memory, Processors processors) { return total(new AutoscalingResources(storage, memory, processors)); } @@ -299,11 +297,11 @@ public Builder total(AutoscalingResources total) { return this; } - public Builder node(Long storage, Long memory, Float processors) { - return node(byteSizeValue(storage), byteSizeValue(memory), processors); + public Builder node(Long storage, Long memory, Double processors) { + return node(byteSizeValue(storage), byteSizeValue(memory), Processors.of(processors)); } - public Builder node(ByteSizeValue storage, ByteSizeValue memory, Float processors) { + public Builder node(ByteSizeValue storage, ByteSizeValue memory, Processors processors) { return node(new AutoscalingResources(storage, memory, processors)); } diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/FixedAutoscalingDeciderService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/FixedAutoscalingDeciderService.java index c8feb6575e9f0..88a45076398fe 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/FixedAutoscalingDeciderService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/FixedAutoscalingDeciderService.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; @@ -28,7 +29,11 @@ public class FixedAutoscalingDeciderService implements AutoscalingDeciderService public static final Setting STORAGE = Setting.byteSizeSetting("storage", ByteSizeValue.ofBytes(-1)); public static final Setting MEMORY = Setting.byteSizeSetting("memory", ByteSizeValue.ofBytes(-1)); - public static final Setting PROCESSORS = Setting.floatSetting("processors", 1f, 0f); + public static final Setting PROCESSORS = new Setting<>( + "processors", + Double.toString(1.0), + textValue -> new Processors(Double.parseDouble(textValue)) + ); public static final Setting NODES = Setting.intSetting("nodes", 1, 0); @Inject @@ -47,7 +52,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider AutoscalingCapacity requiredCapacity; ByteSizeValue storage = STORAGE.exists(configuration) ? STORAGE.get(configuration) : null; ByteSizeValue memory = MEMORY.exists(configuration) ? MEMORY.get(configuration) : null; - Float processors = PROCESSORS.exists(configuration) ? PROCESSORS.get(configuration) : null; + Processors processors = PROCESSORS.exists(configuration) ? PROCESSORS.get(configuration) : null; if (storage != null || memory != null || processors != null) { requiredCapacity = AutoscalingCapacity.builder() .total(totalCapacity(storage, nodes), totalCapacity(memory, nodes), totalCapacity(processors, nodes)) @@ -68,9 +73,9 @@ private static ByteSizeValue totalCapacity(ByteSizeValue nodeCapacity, int nodes } } - private static Float totalCapacity(Float nodeCapacity, int nodes) { + private static Processors totalCapacity(Processors nodeCapacity, int nodes) { if (nodeCapacity != null) { - return nodeCapacity * nodes; + return nodeCapacity.multiply(nodes); } else { return null; } @@ -100,17 +105,14 @@ public static class FixedReason implements AutoscalingDeciderResult.Reason { private final ByteSizeValue storage; private final ByteSizeValue memory; - private final Float processors; + private final Processors processors; private final int nodes; - public FixedReason(ByteSizeValue storage, ByteSizeValue memory, int nodes, Float processors) { + public FixedReason(ByteSizeValue storage, ByteSizeValue memory, int nodes, Processors processors) { this.storage = storage; this.memory = memory; this.nodes = nodes; this.processors = processors; - if (processors != null && processors < 0) { - throw new IllegalArgumentException("[processors] must be a non-negative number"); - } } public FixedReason(StreamInput in) throws IOException { @@ -118,7 +120,7 @@ public FixedReason(StreamInput in) throws IOException { this.memory = in.readOptionalWriteable(ByteSizeValue::new); this.nodes = in.readInt(); if (in.getVersion().onOrAfter(Version.V_8_4_0)) { - this.processors = in.readOptionalFloat(); + this.processors = in.readOptionalWriteable(Processors::readFrom); } else { this.processors = null; } @@ -148,7 +150,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(memory); out.writeInt(nodes); if (out.getVersion().onOrAfter(Version.V_8_4_0)) { - out.writeOptionalFloat(processors); + out.writeOptionalWriteable(processors); } } @@ -159,7 +161,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("memory", memory); builder.field("nodes", nodes); if (processors != null) { - builder.field("processors", nodes); + builder.field("processors", processors); } builder.endObject(); return builder; diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodeInfo.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodeInfo.java index 3f84ec16cbec6..95ae3f0ddbeb8 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodeInfo.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodeInfo.java @@ -7,12 +7,14 @@ package org.elasticsearch.xpack.autoscaling.capacity.nodeinfo; +import org.elasticsearch.common.unit.Processors; + /** * Record for containing memory and processors for given node * @param memory node total memory * @param processors allocated processors */ -public record AutoscalingNodeInfo(long memory, float processors) { +public record AutoscalingNodeInfo(long memory, Processors processors) { static Builder builder() { return new Builder(); @@ -20,15 +22,15 @@ static Builder builder() { static class Builder { private Long memory; - private Float processors; + private Processors processors; Builder setMemory(long memory) { this.memory = memory; return this; } - Builder setProcessors(float processors) { - this.processors = processors; + Builder setProcessors(double processors) { + this.processors = new Processors(processors); return this; } diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodeInfoService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodeInfoService.java index d80fa350ccee6..9b623dfb87092 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodeInfoService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodeInfoService.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.TimeValue; import org.elasticsearch.monitor.os.OsInfo; @@ -50,7 +51,7 @@ public class AutoscalingNodeInfoService { Setting.Property.Dynamic, Setting.Property.NodeScope ); - private static final AutoscalingNodeInfo FETCHING_SENTINEL = new AutoscalingNodeInfo(Long.MIN_VALUE, Integer.MIN_VALUE); + private static final AutoscalingNodeInfo FETCHING_SENTINEL = new AutoscalingNodeInfo(Long.MIN_VALUE, Processors.MAX_PROCESSORS); private static final Logger logger = LogManager.getLogger(AutoscalingNodeInfoService.class); diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/existence/FrozenExistenceDeciderService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/existence/FrozenExistenceDeciderService.java index 6929f3f4d9198..feb6aeec128d9 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/existence/FrozenExistenceDeciderService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/existence/FrozenExistenceDeciderService.java @@ -56,7 +56,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider builder.total(MINIMUM_FROZEN_STORAGE, MINIMUM_FROZEN_MEMORY, null); builder.node(MINIMUM_FROZEN_STORAGE, MINIMUM_FROZEN_MEMORY, null); } else { - builder.total(0L, 0L, 0f); + builder.total(ByteSizeValue.ZERO, ByteSizeValue.ZERO, null); } return new AutoscalingDeciderResult(builder.build(), new FrozenExistenceReason(indicesNeedingFrozen)); diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/AutoscalingTestCase.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/AutoscalingTestCase.java index 24f7e52018df9..53eb4b71d71a2 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/AutoscalingTestCase.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/AutoscalingTestCase.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Tuple; import org.elasticsearch.test.ESTestCase; @@ -49,7 +50,7 @@ protected static AutoscalingDeciderResult randomAutoscalingDeciderResultWithCapa randomNullableByteSizeValue(), randomNullableByteSizeValue(), randomInt(1000), - (float) randomInt(64) + randomProcessors() ) ); } @@ -77,7 +78,7 @@ protected static AutoscalingCapacity randomNullableAutoscalingCapacity() { } protected static AutoscalingCapacity.AutoscalingResources randomAutoscalingResources() { - return new AutoscalingCapacity.AutoscalingResources(randomByteSizeValue(), randomByteSizeValue(), (float) randomInt(128)); + return new AutoscalingCapacity.AutoscalingResources(randomByteSizeValue(), randomByteSizeValue(), randomProcessors()); } private static AutoscalingCapacity.AutoscalingResources randomNullValueAutoscalingResources() { @@ -96,7 +97,7 @@ public static AutoscalingCapacity.AutoscalingResources randomNullValueAutoscalin return new AutoscalingCapacity.AutoscalingResources( addStorage ? randomByteSizeValue() : null, addMemory ? randomByteSizeValue() : null, - addProcessors ? (float) randomInt(128) : null + addProcessors ? randomProcessors() : null ); } @@ -153,6 +154,10 @@ public static AutoscalingPolicy randomAutoscalingPolicyOfName(final String name) return new AutoscalingPolicy(name, randomRoles(), randomAutoscalingDeciders()); } + public static Processors randomProcessors() { + return Processors.of(randomInt(128) + randomDouble()); + } + public static AutoscalingPolicy mutateAutoscalingPolicy(final AutoscalingPolicy instance) { String name = instance.name(); SortedSet roles = instance.roles(); diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java index d7d715334a2dc..7ebe08d6cb458 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.core.Tuple; import org.elasticsearch.snapshots.SnapshotShardSizeInfo; import org.elasticsearch.xpack.autoscaling.AutoscalingMetadata; @@ -83,9 +84,12 @@ public void testMultiplePoliciesFixedCapacity() { assertThat(deciderResult.requiredCapacity(), equalTo(requiredCapacity)); ByteSizeValue storage = configuration.getAsBytesSize(FixedAutoscalingDeciderService.STORAGE.getKey(), null); ByteSizeValue memory = configuration.getAsMemory(FixedAutoscalingDeciderService.MEMORY.getKey(), null); - Float processors = configuration.getAsFloat(FixedAutoscalingDeciderService.PROCESSORS.getKey(), null); + Double processors = configuration.getAsDouble(FixedAutoscalingDeciderService.PROCESSORS.getKey(), null); int nodes = FixedAutoscalingDeciderService.NODES.get(configuration); - assertThat(deciderResult.reason(), equalTo(new FixedAutoscalingDeciderService.FixedReason(storage, memory, nodes, processors))); + assertThat( + deciderResult.reason(), + equalTo(new FixedAutoscalingDeciderService.FixedReason(storage, memory, nodes, Processors.of(processors))) + ); assertThat( deciderResult.reason().summary(), equalTo("fixed storage [" + storage + "] memory [" + memory + "] processors [" + processors + "] nodes [" + nodes + "]") @@ -154,18 +158,18 @@ private SortedMap randomFixedDeciders() { private AutoscalingCapacity calculateFixedDeciderCapacity(Settings configuration) { ByteSizeValue storage = configuration.getAsBytesSize(FixedAutoscalingDeciderService.STORAGE.getKey(), null); ByteSizeValue memory = configuration.getAsBytesSize(FixedAutoscalingDeciderService.MEMORY.getKey(), null); - Float processors = configuration.getAsFloat(FixedAutoscalingDeciderService.PROCESSORS.getKey(), null); + Double processors = configuration.getAsDouble(FixedAutoscalingDeciderService.PROCESSORS.getKey(), null); int nodes = FixedAutoscalingDeciderService.NODES.get(configuration); ByteSizeValue totalStorage = storage != null ? new ByteSizeValue(storage.getBytes() * nodes) : null; ByteSizeValue totalMemory = memory != null ? new ByteSizeValue(memory.getBytes() * nodes) : null; - Float totalProcessors = processors != null ? processors * nodes : null; + Double totalProcessors = processors != null ? processors * nodes : null; if (totalStorage == null && totalMemory == null && totalProcessors == null) { return null; } else { return new AutoscalingCapacity( - new AutoscalingCapacity.AutoscalingResources(totalStorage, totalMemory, totalProcessors), - new AutoscalingCapacity.AutoscalingResources(storage, memory, processors) + new AutoscalingCapacity.AutoscalingResources(totalStorage, totalMemory, Processors.of(totalProcessors)), + new AutoscalingCapacity.AutoscalingResources(storage, memory, Processors.of(processors)) ); } } @@ -183,7 +187,7 @@ public void testContext() { state, info, snapshotShardSizeInfo, - n -> Optional.of(new AutoscalingNodeInfo(randomNonNegativeLong(), randomInt(64))), + n -> Optional.of(new AutoscalingNodeInfo(randomNonNegativeLong(), randomProcessors())), () -> {} ); @@ -208,7 +212,7 @@ public void testContext() { state, info, null, - n -> Optional.of(new AutoscalingNodeInfo(memory, randomInt(64))), + n -> Optional.of(new AutoscalingNodeInfo(memory, randomProcessors())), () -> {} ); @@ -266,7 +270,7 @@ public void testContext() { state, info, null, - n -> Optional.of(new AutoscalingNodeInfo(memory, randomInt(64))), + n -> Optional.of(new AutoscalingNodeInfo(memory, randomProcessors())), () -> {} ); @@ -312,7 +316,7 @@ public void testContext() { state, info, null, - n -> Optional.of(new AutoscalingNodeInfo(memory, randomInt(64))), + n -> Optional.of(new AutoscalingNodeInfo(memory, randomProcessors())), () -> {} ); assertThat(context.nodes(), equalTo(expectedNodes)); diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCapacityWireSerializationTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCapacityWireSerializationTests.java index 2053449f5d408..1e9055e9ced5f 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCapacityWireSerializationTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCapacityWireSerializationTests.java @@ -14,6 +14,8 @@ import java.util.Optional; +import static org.elasticsearch.xpack.autoscaling.AutoscalingTestCase.randomProcessors; + public class AutoscalingCapacityWireSerializationTests extends AbstractWireSerializingTestCase { @Override protected Writeable.Reader instanceReader() { @@ -57,7 +59,7 @@ protected AutoscalingCapacity mutateInstance(AutoscalingCapacity instance) { instance.total().memory(), hasAllMetrics && (instance.node() == null || instance.node().processors() == null) && randomBoolean() ? null - : randomIntBetween(1, 64) + Optional.ofNullable(instance.total().processors()).orElse(0f) + : Optional.ofNullable(instance.total().processors()).map(p -> p.plus(randomProcessors())).orElse(randomProcessors()) ); } } else { @@ -90,7 +92,9 @@ protected AutoscalingCapacity mutateInstance(AutoscalingCapacity instance) { && (instance.node().storage() != null || instance.node().memory() != null) && instance.node().processors() != null ? null - : randomIntBetween(1, 64) + Optional.ofNullable(instance.node().processors()).orElse(0f) + : Optional.ofNullable(instance.total().processors()) + .map(p -> p.plus(randomProcessors())) + .orElse(randomProcessors()) ); } else { ByteSizeValue newStorage = instance.total().storage() != null @@ -104,7 +108,9 @@ protected AutoscalingCapacity mutateInstance(AutoscalingCapacity instance) { newMem, randomBoolean() && (newMem != null || newStorage != null) && instance.node().processors() != null ? null : instance.total().processors() != null && randomBoolean() - ? randomIntBetween(1, 64) + Optional.ofNullable(instance.node().processors()).orElse(0f) + ? Optional.ofNullable(instance.total().processors()) + .map(p -> p.plus(randomProcessors())) + .orElse(randomProcessors()) : null ); } diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingDeciderResultsTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingDeciderResultsTests.java index dc6c08f530a0b..a3c014eaf01cf 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingDeciderResultsTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingDeciderResultsTests.java @@ -161,13 +161,13 @@ private AutoscalingCapacity randomCapacity(boolean node, boolean storage, boolea builder.total( storage ? randomLongBetween(lower, upper) : null, memory ? randomLongBetween(lower, upper) : null, - processor ? (float) randomIntBetween(lower, upper) : null + processor ? (double) randomIntBetween(lower, upper) : null ); if (node) { builder.node( storage ? randomLongBetween(lower, upper) : null, memory ? randomLongBetween(lower, upper) : null, - processor ? (float) randomIntBetween(lower, upper) : null + processor ? (double) randomIntBetween(lower, upper) : null ); } return builder.build(); diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/FixedAutoscalingDeciderServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/FixedAutoscalingDeciderServiceTests.java index c2bf99eb4ad9a..eb5bb34ec93e6 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/FixedAutoscalingDeciderServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/FixedAutoscalingDeciderServiceTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.xpack.autoscaling.AutoscalingTestCase; import org.hamcrest.Matchers; @@ -25,7 +26,7 @@ public void testScale() { ByteSizeValue storage = randomNullableByteSizeValue(); ByteSizeValue memory = randomNullableByteSizeValue(); - Float processors = (memory != null || storage != null) && randomBoolean() ? null : (float) randomInt(64); + Processors processors = (memory != null || storage != null) && randomBoolean() ? null : Processors.of((double) randomInt(64)); if (storage != null) { configurationBuilder.put(FixedAutoscalingDeciderService.STORAGE.getKey(), storage); } @@ -33,7 +34,7 @@ public void testScale() { configurationBuilder.put(FixedAutoscalingDeciderService.MEMORY.getKey(), memory); } if (processors != null) { - configurationBuilder.put(FixedAutoscalingDeciderService.PROCESSORS.getKey(), processors); + configurationBuilder.put(FixedAutoscalingDeciderService.PROCESSORS.getKey(), processors.count()); } verify( configurationBuilder.build(), @@ -61,7 +62,7 @@ private ByteSizeValue multiply(ByteSizeValue bytes, int nodes) { return bytes == null ? null : new ByteSizeValue(bytes.getBytes() * nodes); } - private Float multiply(Float processors, int nodes) { - return processors == null ? null : processors * nodes; + private Processors multiply(Processors processors, int nodes) { + return processors == null ? null : processors.multiply(nodes); } } diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java index e5006ac12b36b..f39cc191500b6 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.TimeValue; import org.elasticsearch.monitor.os.OsInfo; @@ -137,7 +138,7 @@ public void testAddRemoveNode() { ); NodesInfoResponse responseInfo = new NodesInfoResponse( ClusterName.DEFAULT, - succeedingNodes.stream().map(n -> infoForNode(n, randomIntBetween(0, 64))).collect(Collectors.toList()), + succeedingNodes.stream().map(n -> infoForNode(n, randomIntBetween(1, 64))).collect(Collectors.toList()), List.of() ); client.respondStats(response, () -> { @@ -187,7 +188,7 @@ public void testNoLongerMaster() { ); NodesInfoResponse responseInfo = new NodesInfoResponse( ClusterName.DEFAULT, - nodes.stream().map(n -> infoForNode(n, randomIntBetween(0, 64))).collect(Collectors.toList()), + nodes.stream().map(n -> infoForNode(n, randomIntBetween(1, 64))).collect(Collectors.toList()), List.of() ); @@ -223,7 +224,7 @@ public void testStatsFails() { ); NodesInfoResponse responseInfo = new NodesInfoResponse( ClusterName.DEFAULT, - nodes.stream().map(n -> infoForNode(n, randomIntBetween(0, 64))).collect(Collectors.toList()), + nodes.stream().map(n -> infoForNode(n, randomIntBetween(1, 64))).collect(Collectors.toList()), List.of() ); @@ -248,7 +249,7 @@ public void testInfoFails() { nodes.forEach(n -> assertThat(service.snapshot().get(n).isEmpty(), is(true))); NodesInfoResponse responseInfo = new NodesInfoResponse( ClusterName.DEFAULT, - nodes.stream().map(n -> infoForNode(n, randomIntBetween(0, 64))).collect(Collectors.toList()), + nodes.stream().map(n -> infoForNode(n, randomIntBetween(1, 64))).collect(Collectors.toList()), List.of() ); @@ -271,7 +272,7 @@ public void testRestartNode() { NodesInfoResponse responseInfo = new NodesInfoResponse( ClusterName.DEFAULT, - nodes.stream().map(n -> infoForNode(n, randomIntBetween(0, 64))).collect(Collectors.toList()), + nodes.stream().map(n -> infoForNode(n, randomIntBetween(1, 64))).collect(Collectors.toList()), List.of() ); @@ -300,7 +301,7 @@ public void testRestartNode() { NodesInfoResponse restartedInfoResponse = new NodesInfoResponse( ClusterName.DEFAULT, - Sets.difference(restartedNodes, nodes).stream().map(n -> infoForNode(n, randomIntBetween(0, 64))).collect(Collectors.toList()), + Sets.difference(restartedNodes, nodes).stream().map(n -> infoForNode(n, randomIntBetween(1, 64))).collect(Collectors.toList()), List.of() ); @@ -326,7 +327,7 @@ public void testConcurrentStateUpdate() throws Exception { ); NodesInfoResponse nodesInfoResponse = new NodesInfoResponse( ClusterName.DEFAULT, - nodes.stream().map(n -> infoForNode(n, randomIntBetween(0, 64))).collect(Collectors.toList()), + nodes.stream().map(n -> infoForNode(n, randomIntBetween(1, 64))).collect(Collectors.toList()), List.of() ); @@ -402,7 +403,7 @@ public void assertMatchesResponse(Set nodes, NodesStatsResponse r equalTo( new AutoscalingNodeInfo( response.getNodesMap().get(n.getId()).getOs().getMem().getAdjustedTotal().getBytes(), - infoResponse.getNodesMap().get(n.getId()).getInfo(OsInfo.class).getAllocatedProcessors() + new Processors(infoResponse.getNodesMap().get(n.getId()).getInfo(OsInfo.class).getAllocatedProcessors()) ) ) ); diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderDecisionTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderDecisionTests.java index 4d13727cd268b..b14976d488979 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderDecisionTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderDecisionTests.java @@ -548,12 +548,8 @@ static AutoscalingCapacity randomCurrentCapacity() { boolean includeMemory = randomBoolean(); boolean includeProcessors = randomBoolean(); return AutoscalingCapacity.builder() - .total( - randomByteSizeValue(), - includeMemory ? randomByteSizeValue() : null, - includeProcessors ? (float) randomInt(64) : null - ) - .node(randomByteSizeValue(), includeMemory ? randomByteSizeValue() : null, includeProcessors ? (float) randomInt(64) : null) + .total(randomByteSizeValue(), includeMemory ? randomByteSizeValue() : null, includeProcessors ? randomProcessors() : null) + .node(randomByteSizeValue(), includeMemory ? randomByteSizeValue() : null, includeProcessors ? randomProcessors() : null) .build(); } else { return null; From 92cc59653bbd53a0485f169075a8fd3dcafe2706 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Tue, 30 Aug 2022 09:45:40 +0200 Subject: [PATCH 2/9] Update docs/changelog/89662.yaml --- docs/changelog/89662.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/89662.yaml diff --git a/docs/changelog/89662.yaml b/docs/changelog/89662.yaml new file mode 100644 index 0000000000000..8399eb1a50261 --- /dev/null +++ b/docs/changelog/89662.yaml @@ -0,0 +1,5 @@ +pr: 89662 +summary: Centralize the concept of processors configuration +area: Autoscaling +type: enhancement +issues: [] From 2aca87aba78298c9e3523971b909ff52c3b84a55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Tue, 30 Aug 2022 10:17:13 +0200 Subject: [PATCH 3/9] Include fractional processors in the OsInfo response --- .../monitor/os/EvilOsProbeTests.java | 3 ++- .../nodesinfo/SimpleNodesInfoIT.java | 6 +++-- .../common/util/concurrent/EsExecutors.java | 4 +++ .../org/elasticsearch/monitor/os/OsInfo.java | 25 +++++++++++++++---- .../org/elasticsearch/monitor/os/OsProbe.java | 5 ++-- .../elasticsearch/monitor/os/OsService.java | 2 +- .../monitor/os/OsProbeTests.java | 3 ++- .../nodesinfo/NodeInfoStreamingTests.java | 3 ++- .../nodeinfo/AutoscalingNodeInfoService.java | 2 +- .../AutoscalingNodesInfoServiceTests.java | 2 +- 10 files changed, 40 insertions(+), 15 deletions(-) diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/monitor/os/EvilOsProbeTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/monitor/os/EvilOsProbeTests.java index 71967cd25b45b..4b880fb05afe3 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/monitor/os/EvilOsProbeTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/monitor/os/EvilOsProbeTests.java @@ -9,6 +9,7 @@ package org.elasticsearch.monitor.os; import org.apache.lucene.util.Constants; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.core.PathUtils; import org.elasticsearch.test.ESTestCase; @@ -24,7 +25,7 @@ public class EvilOsProbeTests extends ESTestCase { public void testOsPrettyName() throws IOException { - final OsInfo osInfo = OsProbe.getInstance().osInfo(randomLongBetween(1, 100), randomIntBetween(1, 8)); + final OsInfo osInfo = OsProbe.getInstance().osInfo(randomLongBetween(1, 100), new Processors(randomIntBetween(1, 8))); if (Constants.LINUX) { final List lines; if (Files.exists(PathUtils.get("/etc/os-release"))) { diff --git a/server/src/internalClusterTest/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java b/server/src/internalClusterTest/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java index 992fca7b635c6..73dc906e2367e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java @@ -104,8 +104,8 @@ public void testNodesInfosTotalIndexingBuffer() throws Exception { public void testAllocatedProcessors() throws Exception { List nodesIds = internalCluster().startNodes( - Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 3).build(), - Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 6).build() + Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 2.9).build(), + Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 5.9).build() ); final String node_1 = nodesIds.get(0); @@ -134,6 +134,8 @@ public void testAllocatedProcessors() throws Exception { ); assertThat(response.getNodesMap().get(server1NodeId).getInfo(OsInfo.class).getAllocatedProcessors(), equalTo(3)); + assertThat(response.getNodesMap().get(server1NodeId).getInfo(OsInfo.class).getFractionalAllocatedProcessors(), equalTo(2.9)); assertThat(response.getNodesMap().get(server2NodeId).getInfo(OsInfo.class).getAllocatedProcessors(), equalTo(6)); + assertThat(response.getNodesMap().get(server2NodeId).getInfo(OsInfo.class).getFractionalAllocatedProcessors(), equalTo(5.9)); } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 2331d0477851b..9d25e24eac41b 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -77,6 +77,10 @@ public static int allocatedProcessors(final Settings settings) { return NODE_PROCESSORS_SETTING.get(settings).roundUp(); } + public static Processors nodeProcessors(final Settings settings) { + return NODE_PROCESSORS_SETTING.get(settings); + } + public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing( String name, ThreadFactory threadFactory, diff --git a/server/src/main/java/org/elasticsearch/monitor/os/OsInfo.java b/server/src/main/java/org/elasticsearch/monitor/os/OsInfo.java index 8685f889b96b6..021522f7226bf 100644 --- a/server/src/main/java/org/elasticsearch/monitor/os/OsInfo.java +++ b/server/src/main/java/org/elasticsearch/monitor/os/OsInfo.java @@ -8,8 +8,10 @@ package org.elasticsearch.monitor.os; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.core.TimeValue; import org.elasticsearch.node.ReportingService; import org.elasticsearch.xcontent.XContentBuilder; @@ -17,10 +19,11 @@ import java.io.IOException; public class OsInfo implements ReportingService.Info { + private static final Version DOUBLE_PRECISION_ALLOCATED_PROCESSORS_SUPPORT = Version.V_8_5_0; private final long refreshInterval; private final int availableProcessors; - private final int allocatedProcessors; + private final Processors allocatedProcessors; private final String name; private final String prettyName; private final String arch; @@ -29,7 +32,7 @@ public class OsInfo implements ReportingService.Info { public OsInfo( final long refreshInterval, final int availableProcessors, - final int allocatedProcessors, + final Processors allocatedProcessors, final String name, final String prettyName, final String arch, @@ -47,7 +50,11 @@ public OsInfo( public OsInfo(StreamInput in) throws IOException { this.refreshInterval = in.readLong(); this.availableProcessors = in.readInt(); - this.allocatedProcessors = in.readInt(); + if (in.getVersion().onOrAfter(DOUBLE_PRECISION_ALLOCATED_PROCESSORS_SUPPORT)) { + this.allocatedProcessors = Processors.readFrom(in); + } else { + this.allocatedProcessors = new Processors(in.readInt()); + } this.name = in.readOptionalString(); this.prettyName = in.readOptionalString(); this.arch = in.readOptionalString(); @@ -58,7 +65,11 @@ public OsInfo(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { out.writeLong(refreshInterval); out.writeInt(availableProcessors); - out.writeInt(allocatedProcessors); + if (out.getVersion().onOrAfter(DOUBLE_PRECISION_ALLOCATED_PROCESSORS_SUPPORT)) { + allocatedProcessors.writeTo(out); + } else { + out.writeInt(getAllocatedProcessors()); + } out.writeOptionalString(name); out.writeOptionalString(prettyName); out.writeOptionalString(arch); @@ -74,7 +85,11 @@ public int getAvailableProcessors() { } public int getAllocatedProcessors() { - return this.allocatedProcessors; + return allocatedProcessors.roundUp(); + } + + public double getFractionalAllocatedProcessors() { + return allocatedProcessors.count(); } public String getName() { diff --git a/server/src/main/java/org/elasticsearch/monitor/os/OsProbe.java b/server/src/main/java/org/elasticsearch/monitor/os/OsProbe.java index 7778ff7efa4a4..97b4610e240e8 100644 --- a/server/src/main/java/org/elasticsearch/monitor/os/OsProbe.java +++ b/server/src/main/java/org/elasticsearch/monitor/os/OsProbe.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.Constants; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.core.PathUtils; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.monitor.Probes; @@ -762,11 +763,11 @@ public static OsProbe getInstance() { private final Logger logger = LogManager.getLogger(getClass()); - OsInfo osInfo(long refreshInterval, int allocatedProcessors) throws IOException { + OsInfo osInfo(long refreshInterval, Processors processors) throws IOException { return new OsInfo( refreshInterval, Runtime.getRuntime().availableProcessors(), - allocatedProcessors, + processors, Constants.OS_NAME, getPrettyName(), Constants.OS_ARCH, diff --git a/server/src/main/java/org/elasticsearch/monitor/os/OsService.java b/server/src/main/java/org/elasticsearch/monitor/os/OsService.java index a8284505a5a05..4e6a51bafb125 100644 --- a/server/src/main/java/org/elasticsearch/monitor/os/OsService.java +++ b/server/src/main/java/org/elasticsearch/monitor/os/OsService.java @@ -38,7 +38,7 @@ public class OsService implements ReportingService { public OsService(Settings settings) throws IOException { this.probe = OsProbe.getInstance(); TimeValue refreshInterval = REFRESH_INTERVAL_SETTING.get(settings); - this.info = probe.osInfo(refreshInterval.millis(), EsExecutors.allocatedProcessors(settings)); + this.info = probe.osInfo(refreshInterval.millis(), EsExecutors.nodeProcessors(settings)); this.osStatsCache = new OsStatsCache(refreshInterval, probe.osStats()); logger.debug("using refresh_interval [{}]", refreshInterval); } diff --git a/server/src/test/java/org/elasticsearch/monitor/os/OsProbeTests.java b/server/src/test/java/org/elasticsearch/monitor/os/OsProbeTests.java index 83317fb61c6a6..c24d78406b31e 100644 --- a/server/src/test/java/org/elasticsearch/monitor/os/OsProbeTests.java +++ b/server/src/test/java/org/elasticsearch/monitor/os/OsProbeTests.java @@ -9,6 +9,7 @@ package org.elasticsearch.monitor.os; import org.apache.lucene.util.Constants; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -55,7 +56,7 @@ List readOsRelease() { } }; - final OsInfo info = osProbe.osInfo(refreshInterval, allocatedProcessors); + final OsInfo info = osProbe.osInfo(refreshInterval, new Processors(allocatedProcessors)); assertNotNull(info); assertThat(info.getRefreshInterval(), equalTo(refreshInterval)); assertThat(info.getName(), equalTo(Constants.OS_NAME)); diff --git a/server/src/test/java/org/elasticsearch/nodesinfo/NodeInfoStreamingTests.java b/server/src/test/java/org/elasticsearch/nodesinfo/NodeInfoStreamingTests.java index 086d0c90d228a..c1f4a75a1237e 100644 --- a/server/src/test/java/org/elasticsearch/nodesinfo/NodeInfoStreamingTests.java +++ b/server/src/test/java/org/elasticsearch/nodesinfo/NodeInfoStreamingTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.http.HttpInfo; import org.elasticsearch.ingest.IngestInfo; import org.elasticsearch.ingest.ProcessorInfo; @@ -111,7 +112,7 @@ private static NodeInfo createNodeInfo() { OsInfo osInfo = null; if (randomBoolean()) { int availableProcessors = randomIntBetween(1, 64); - int allocatedProcessors = randomIntBetween(1, availableProcessors); + Processors allocatedProcessors = new Processors(randomIntBetween(1, availableProcessors)); long refreshInterval = randomBoolean() ? -1 : randomNonNegativeLong(); String name = randomAlphaOfLengthBetween(3, 10); String arch = randomAlphaOfLengthBetween(3, 10); diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodeInfoService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodeInfoService.java index 9b623dfb87092..118cd19d01b79 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodeInfoService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodeInfoService.java @@ -156,7 +156,7 @@ private void sendToMissingNodes(Function nodeLookup, Set< : "unexpected missing node when setting processors [" + nodeInfo.getNode().getEphemeralId() + "]"; builderBuilder.computeIfPresent( nodeInfo.getNode().getEphemeralId(), - (n, b) -> b.setProcessors(nodeInfo.getInfo(OsInfo.class).getAllocatedProcessors()) + (n, b) -> b.setProcessors(nodeInfo.getInfo(OsInfo.class).getFractionalAllocatedProcessors()) ); }); synchronized (mutex) { diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java index f39cc191500b6..ca15d40523c7c 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java @@ -446,7 +446,7 @@ private static NodeStats statsForNode(DiscoveryNode node, long memory) { } private static org.elasticsearch.action.admin.cluster.node.info.NodeInfo infoForNode(DiscoveryNode node, int processors) { - OsInfo osInfo = new OsInfo(randomLong(), processors, processors, null, null, null, null); + OsInfo osInfo = new OsInfo(randomLong(), processors, new Processors(processors), null, null, null, null); return new org.elasticsearch.action.admin.cluster.node.info.NodeInfo( Version.CURRENT, Build.CURRENT, From 45d264644bae02dd208f2317111c840d97763912 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Tue, 30 Aug 2022 10:36:03 +0200 Subject: [PATCH 4/9] Fix missing test call --- .../capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java index ca15d40523c7c..470af871fa7cd 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java @@ -403,7 +403,7 @@ public void assertMatchesResponse(Set nodes, NodesStatsResponse r equalTo( new AutoscalingNodeInfo( response.getNodesMap().get(n.getId()).getOs().getMem().getAdjustedTotal().getBytes(), - new Processors(infoResponse.getNodesMap().get(n.getId()).getInfo(OsInfo.class).getAllocatedProcessors()) + new Processors(infoResponse.getNodesMap().get(n.getId()).getInfo(OsInfo.class).getFractionalAllocatedProcessors()) ) ) ); From 7d1cb57cbf48fe15dd895c321369603cf73caf5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Wed, 31 Aug 2022 11:25:57 +0200 Subject: [PATCH 5/9] Review comments --- .../monitor/os/EvilOsProbeTests.java | 2 +- .../TransportUpdateDesiredNodesAction.java | 3 +- .../DesiredNodesSettingsValidator.java | 9 ++- .../cluster/metadata/DesiredNode.java | 31 +++----- .../metadata/DesiredNodeWithStatus.java | 4 +- .../cluster/metadata/DesiredNodes.java | 8 +-- .../elasticsearch/common/unit/Processors.java | 67 +++++++++++------- .../common/util/concurrent/EsExecutors.java | 2 +- .../org/elasticsearch/monitor/os/OsInfo.java | 4 +- .../DesiredNodeSerializationTests.java | 2 +- .../cluster/metadata/DesiredNodeTests.java | 70 +++++++------------ .../unit/ProcessorsSerializationTests.java | 4 +- .../common/unit/ProcessorsTests.java | 36 ++++++---- .../monitor/os/OsProbeTests.java | 2 +- .../nodesinfo/NodeInfoStreamingTests.java | 2 +- .../FixedAutoscalingDeciderService.java | 2 +- .../nodeinfo/AutoscalingNodeInfo.java | 2 +- .../FrozenExistenceDeciderService.java | 3 +- .../autoscaling/AutoscalingTestCase.java | 2 +- .../AutoscalingNodesInfoServiceTests.java | 4 +- 20 files changed, 129 insertions(+), 130 deletions(-) diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/monitor/os/EvilOsProbeTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/monitor/os/EvilOsProbeTests.java index 4b880fb05afe3..90c3a3e0cabb3 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/monitor/os/EvilOsProbeTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/monitor/os/EvilOsProbeTests.java @@ -25,7 +25,7 @@ public class EvilOsProbeTests extends ESTestCase { public void testOsPrettyName() throws IOException { - final OsInfo osInfo = OsProbe.getInstance().osInfo(randomLongBetween(1, 100), new Processors(randomIntBetween(1, 8))); + final OsInfo osInfo = OsProbe.getInstance().osInfo(randomLongBetween(1, 100), Processors.of((double) randomIntBetween(1, 8))); if (Constants.LINUX) { final List lines; if (Files.exists(PathUtils.get("/etc/os-release"))) { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java index 20e51727110ab..b0d7000afa8ac 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java @@ -39,7 +39,6 @@ public class TransportUpdateDesiredNodesAction extends TransportMasterNodeAction { private static final Logger logger = LogManager.getLogger(TransportUpdateDesiredNodesAction.class); - private static final double MAX_DELTA_PROCESSORS = 7E-5; private final DesiredNodesSettingsValidator settingsValidator; private final ClusterStateTaskExecutor taskExecutor; @@ -127,7 +126,7 @@ static DesiredNodes updateDesiredNodes(DesiredNodes latestDesiredNodes, UpdateDe ); if (latestDesiredNodes != null) { - if (latestDesiredNodes.equalsWithProcessorsCloseTo(proposedDesiredNodes, MAX_DELTA_PROCESSORS)) { + if (latestDesiredNodes.equalsWithProcessorsCloseTo(proposedDesiredNodes)) { return latestDesiredNodes; } diff --git a/server/src/main/java/org/elasticsearch/cluster/desirednodes/DesiredNodesSettingsValidator.java b/server/src/main/java/org/elasticsearch/cluster/desirednodes/DesiredNodesSettingsValidator.java index f777f88416145..7c51af2bf7ffa 100644 --- a/server/src/main/java/org/elasticsearch/cluster/desirednodes/DesiredNodesSettingsValidator.java +++ b/server/src/main/java/org/elasticsearch/cluster/desirednodes/DesiredNodesSettingsValidator.java @@ -92,8 +92,13 @@ private void validate(DesiredNode node) { int minProcessors = node.roundedDownMinProcessors(); Integer roundedUpMaxProcessors = node.roundedUpMaxProcessors(); int maxProcessors = roundedUpMaxProcessors == null ? minProcessors : roundedUpMaxProcessors; - Setting.doubleSetting(NODE_PROCESSORS_SETTING.getKey(), minProcessors, 1, maxProcessors, Setting.Property.NodeScope) - .get(settings); + Setting.doubleSetting( + NODE_PROCESSORS_SETTING.getKey(), + minProcessors, + Double.MIN_VALUE, + maxProcessors, + Setting.Property.NodeScope + ).get(settings); final Settings.Builder updatedSettings = Settings.builder().put(settings); updatedSettings.remove(NODE_PROCESSORS_SETTING.getKey()); settings = updatedSettings.build(); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNode.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNode.java index b76726b1a2a1a..f78ccc80cca75 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNode.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNode.java @@ -119,7 +119,7 @@ public DesiredNode(Settings settings, ProcessorsRange processorsRange, ByteSizeV } public DesiredNode(Settings settings, double processors, ByteSizeValue memory, ByteSizeValue storage, Version version) { - this(settings, new Processors(processors), null, memory, storage, version); + this(settings, Processors.of(processors), null, memory, storage, version); } DesiredNode( @@ -329,24 +329,10 @@ private boolean equalsWithoutProcessorsSpecification(DesiredNode that) { && Objects.equals(roles, that.roles); } - public boolean equalsWithProcessorsCloseTo(DesiredNode that, double maxError) { + public boolean equalsWithProcessorsCloseTo(DesiredNode that) { return equalsWithoutProcessorsSpecification(that) - && processorsEqualsOrCloseTo(processors, that.processors, maxError) - && ProcessorsRange.equalsOrCloseTo(processorsRange, that.processorsRange, maxError); - } - - static boolean processorsEqualsOrCloseTo(Processors a, Processors b, double maxError) { - return (a == b) || (a != null && (a.equals(b) || processorsAsFloatCloseTo(a, b, maxError))); - } - - private static boolean processorsAsFloatCloseTo(Processors a, Processors b, double maxError) { - if (b == null) { - return false; - } - - float floatCount = (float) a.count(); - float otherFloatCount = (float) b.count(); - return Float.isFinite(floatCount) && Float.isFinite(otherFloatCount) && (Math.abs(floatCount - otherFloatCount) < maxError); + && Processors.equalsOrCloseTo(processors, that.processors) + && ProcessorsRange.equalsOrCloseTo(processorsRange, that.processorsRange); } @Override @@ -471,14 +457,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } - static boolean equalsOrCloseTo(ProcessorsRange a, ProcessorsRange b, double maxError) { - return (a == b) || (a != null && a.equalsOrCloseTo(b, maxError)); + static boolean equalsOrCloseTo(ProcessorsRange a, ProcessorsRange b) { + return (a == b) || (a != null && a.equalsOrCloseTo(b)); } - boolean equalsOrCloseTo(ProcessorsRange that, double maxError) { + boolean equalsOrCloseTo(ProcessorsRange that) { return that != null - && (equals(that) - || (processorsEqualsOrCloseTo(min, that.min, maxError) && processorsEqualsOrCloseTo(max, that.max, maxError))); + && (equals(that) || (Processors.equalsOrCloseTo(min, that.min) && Processors.equalsOrCloseTo(max, that.max))); } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNodeWithStatus.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNodeWithStatus.java index 94a4694b61d9f..13a10213b8ed2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNodeWithStatus.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNodeWithStatus.java @@ -122,8 +122,8 @@ public int compareTo(DesiredNodeWithStatus o) { return desiredNode.compareTo(o.desiredNode); } - public boolean equalsWithProcessorsCloseTo(DesiredNodeWithStatus other, double maxError) { - return other != null && status == other.status && desiredNode.equalsWithProcessorsCloseTo(other.desiredNode, maxError); + public boolean equalsWithProcessorsCloseTo(DesiredNodeWithStatus other) { + return other != null && status == other.status && desiredNode.equalsWithProcessorsCloseTo(other.desiredNode); } public enum Status { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNodes.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNodes.java index e42731522a115..b720c90a0673f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNodes.java @@ -249,14 +249,14 @@ private static void checkForDuplicatedExternalIDs(List no } } - public boolean equalsWithProcessorsCloseTo(DesiredNodes that, double maxError) { + public boolean equalsWithProcessorsCloseTo(DesiredNodes that) { return that != null && version == that.version && Objects.equals(historyID, that.historyID) - && equalsNodesWithProcessorsCloseTo(that, maxError); + && equalsNodesWithProcessorsCloseTo(that); } - public boolean equalsNodesWithProcessorsCloseTo(DesiredNodes that, double maxError) { + public boolean equalsNodesWithProcessorsCloseTo(DesiredNodes that) { if (that == null || nodes.size() != that.nodes.size()) { return false; } @@ -264,7 +264,7 @@ public boolean equalsNodesWithProcessorsCloseTo(DesiredNodes that, double maxErr for (Map.Entry desiredNodeEntry : nodes.entrySet()) { final DesiredNodeWithStatus desiredNodeWithStatus = desiredNodeEntry.getValue(); final DesiredNodeWithStatus otherDesiredNodeWithStatus = that.nodes.get(desiredNodeEntry.getKey()); - if (desiredNodeWithStatus.equalsWithProcessorsCloseTo(otherDesiredNodeWithStatus, maxError) == false) { + if (desiredNodeWithStatus.equalsWithProcessorsCloseTo(otherDesiredNodeWithStatus) == false) { return false; } } diff --git a/server/src/main/java/org/elasticsearch/common/unit/Processors.java b/server/src/main/java/org/elasticsearch/common/unit/Processors.java index be04ddf50765a..2b8231c3cd85b 100644 --- a/server/src/main/java/org/elasticsearch/common/unit/Processors.java +++ b/server/src/main/java/org/elasticsearch/common/unit/Processors.java @@ -34,20 +34,19 @@ public class Processors implements Writeable, Comparable, ToXContent static final int NUMBER_OF_DECIMAL_PLACES = 5; private static final double MIN_REPRESENTABLE_PROCESSORS = 1E-5; - private final double count; - - public Processors(double count) { - if (validNumberOfProcessors(count) == false) { - throw new IllegalArgumentException("processors must be a non-negative number; provided [" + count + "]"); - } + private final double rawCount; + private final double roundedCount; + private Processors(double rawCount) { // Avoid rounding up to MIN_REPRESENTABLE_PROCESSORS when 0 processors are used - if (count == 0.0) { - this.count = count; + if (rawCount == 0.0) { + this.rawCount = rawCount; + this.roundedCount = rawCount; } else { - this.count = Math.max( + this.rawCount = rawCount; + this.roundedCount = Math.max( MIN_REPRESENTABLE_PROCESSORS, - new BigDecimal(count).setScale(NUMBER_OF_DECIMAL_PLACES, RoundingMode.HALF_UP).doubleValue() + new BigDecimal(rawCount).setScale(NUMBER_OF_DECIMAL_PLACES, RoundingMode.HALF_UP).doubleValue() ); } } @@ -57,6 +56,11 @@ public static Processors of(Double count) { if (count == null) { return null; } + + if (validNumberOfProcessors(count) == false) { + throw new IllegalArgumentException("processors must be a non-negative number; provided [" + count + "]"); + } + return new Processors(count); } @@ -76,11 +80,11 @@ public static Processors readFrom(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().before(FLOAT_PROCESSORS_SUPPORT_VERSION)) { assert hasDecimals() == false; - out.writeInt((int) count); + out.writeInt((int) rawCount); } else if (out.getVersion().before(DOUBLE_PROCESSORS_SUPPORT_VERSION)) { - out.writeFloat((float) count); + out.writeFloat((float) rawCount); } else { - out.writeDouble(count); + out.writeDouble(rawCount); } } @@ -97,11 +101,11 @@ public static Processors fromXContent(XContentParser parser) throws IOException @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return builder.value(count); + return builder.value(roundedCount); } public Processors plus(Processors other) { - final double newProcessorCount = count + other.count; + final double newProcessorCount = rawCount + other.rawCount; if (Double.isFinite(newProcessorCount) == false) { throw new ArithmeticException("Unable to add [" + this + "] and [" + other + "] the resulting value overflows"); } @@ -114,7 +118,7 @@ public Processors multiply(int value) { throw new IllegalArgumentException("Processors cannot be multiplied by a negative number"); } - final double newProcessorCount = count * value; + final double newProcessorCount = rawCount * value; if (Double.isFinite(newProcessorCount) == false) { throw new ArithmeticException("Unable to multiply [" + this + "] by [" + value + "] the resulting value overflows"); } @@ -123,15 +127,15 @@ public Processors multiply(int value) { } public double count() { - return count; + return roundedCount; } public int roundUp() { - return (int) Math.ceil(count); + return (int) Math.ceil(rawCount); } public int roundDown() { - return Math.max(1, (int) Math.floor(count)); + return Math.max(1, (int) Math.floor(rawCount)); } private static boolean validNumberOfProcessors(double processors) { @@ -139,7 +143,7 @@ private static boolean validNumberOfProcessors(double processors) { } private boolean hasDecimals() { - return ((int) count) != Math.ceil(count); + return ((int) rawCount) != Math.ceil(rawCount); } public boolean isCompatibleWithVersion(Version version) { @@ -152,7 +156,22 @@ public boolean isCompatibleWithVersion(Version version) { @Override public int compareTo(Processors o) { - return Double.compare(count, o.count); + return Double.compare(rawCount, o.rawCount); + } + + public static boolean equalsOrCloseTo(Processors a, Processors b) { + return (a == b) || (a != null && (a.equals(b) || a.closeToAsFloat(b))); + } + + private boolean closeToAsFloat(Processors b) { + if (b == null) { + return false; + } + + float floatCount = (float) rawCount; + float otherFloatCount = (float) b.rawCount; + float maxError = Math.max(Math.ulp(floatCount), Math.ulp(otherFloatCount)) + (float) MIN_REPRESENTABLE_PROCESSORS; + return Float.isFinite(floatCount) && Float.isFinite(otherFloatCount) && (Math.abs(floatCount - otherFloatCount) < maxError); } @Override @@ -160,16 +179,16 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Processors that = (Processors) o; - return Double.compare(that.count, count) == 0; + return Double.compare(that.rawCount, rawCount) == 0; } @Override public int hashCode() { - return Objects.hash(count); + return Objects.hash(rawCount); } @Override public String toString() { - return Double.toString(count); + return Double.toString(roundedCount); } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 9d25e24eac41b..266f698e904af 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -61,7 +61,7 @@ public class EsExecutors { String err = "Failed to parse value [" + textValue + "] for setting [node.processors] must be <= " + maxNumberOfProcessors; throw new IllegalArgumentException(err); } - return new Processors(numberOfProcessors); + return Processors.of(numberOfProcessors); }, Property.NodeScope ); diff --git a/server/src/main/java/org/elasticsearch/monitor/os/OsInfo.java b/server/src/main/java/org/elasticsearch/monitor/os/OsInfo.java index 021522f7226bf..a7be10d87308f 100644 --- a/server/src/main/java/org/elasticsearch/monitor/os/OsInfo.java +++ b/server/src/main/java/org/elasticsearch/monitor/os/OsInfo.java @@ -53,7 +53,7 @@ public OsInfo(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(DOUBLE_PRECISION_ALLOCATED_PROCESSORS_SUPPORT)) { this.allocatedProcessors = Processors.readFrom(in); } else { - this.allocatedProcessors = new Processors(in.readInt()); + this.allocatedProcessors = Processors.of((double) in.readInt()); } this.name = in.readOptionalString(); this.prettyName = in.readOptionalString(); @@ -137,7 +137,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(Fields.VERSION, version); } builder.field(Fields.AVAILABLE_PROCESSORS, availableProcessors); - builder.field(Fields.ALLOCATED_PROCESSORS, allocatedProcessors); + builder.field(Fields.ALLOCATED_PROCESSORS, getAllocatedProcessors()); builder.endObject(); return builder; } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeSerializationTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeSerializationTests.java index b70754887bebc..c312f24f5079e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeSerializationTests.java @@ -51,7 +51,7 @@ public static DesiredNode mutateDesiredNode(DesiredNode instance) { ); case 1 -> new DesiredNode( instance.settings(), - randomValueOtherThan(instance.processors(), () -> new Processors(randomDouble() + randomIntBetween(1, 128))), + randomValueOtherThan(instance.processors(), () -> Processors.of(randomDouble() + randomIntBetween(1, 128))), null, instance.memory(), instance.storage(), diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeTests.java index 98e389002fc82..07490909e1ec9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeTests.java @@ -12,7 +12,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.Processors; import org.elasticsearch.test.ESTestCase; import static org.elasticsearch.cluster.node.DiscoveryNodeRole.VOTING_ONLY_NODE_ROLE; @@ -228,72 +227,55 @@ public void testDesiredNodeIsCompatible() { } } - public void testFloatProcessorsConvertedToDoubleAreCloseToEqual() { - final double processorCount = randomNumberOfProcessors(); - final float processorCountAsFloat = (float) processorCount; - final Processors bwcProcessors = new Processors(processorCountAsFloat); - final Processors doubleProcessor = new Processors(processorCount); - assertThat(DesiredNode.processorsEqualsOrCloseTo(bwcProcessors, doubleProcessor, MAX_ERROR), is(true)); - } - - public void testProcessorsAreConsideredDifferentIfTheDifferenceIsGreaterThanMaxError() { - // Ensure that (processorCount - MAX_ERROR) is at least the smallest representable processor - final double processorCount = Math.max(Math.ulp(0.0) + MAX_ERROR, randomNumberOfProcessors()); - final Processors processorsA = new Processors(processorCount + MAX_ERROR); - final Processors processorsB = new Processors(processorCount - MAX_ERROR); - assertThat(DesiredNode.processorsEqualsOrCloseTo(processorsA, processorsB, MAX_ERROR), is(false)); - assertThat(processorsA.equals(processorsB), is(false)); - } - - public void testRoundedProcessorsToFloatAreCloseToEqual() { - double processorCount = randomNumberOfProcessors(); - final Processors doubleProcessor = new Processors(processorCount); - final Processors floatProcessor = new Processors((float) doubleProcessor.count()); - assertThat(DesiredNode.processorsEqualsOrCloseTo(doubleProcessor, floatProcessor, MAX_ERROR), is(true)); - } - public void testEqualsOrProcessorsCloseTo() { final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), randomAlphaOfLength(10)).build(); + final double maxDelta = 1E-3; final double processorCount = randomNumberOfProcessors(); - final boolean shouldBeConsideredEqual = randomBoolean(); - final double maxDifferenceBetweenProcessorCounts = shouldBeConsideredEqual ? MAX_ERROR / 2 : MAX_ERROR * 2; + final boolean isEqualOrCloseTo = randomBoolean(); final ByteSizeValue memory = ByteSizeValue.ofGb(randomIntBetween(1, 32)); final ByteSizeValue storage = ByteSizeValue.ofGb(randomIntBetween(128, 256)); final DesiredNode desiredNode1; final DesiredNode desiredNode2; if (randomBoolean()) { - desiredNode1 = new DesiredNode( + desiredNode1 = new DesiredNode(settings, processorCount, memory, storage, Version.CURRENT); + desiredNode2 = new DesiredNode( settings, - processorCount + maxDifferenceBetweenProcessorCounts, + isEqualOrCloseTo ? (float) processorCount : processorCount + maxDelta, memory, storage, Version.CURRENT ); - desiredNode2 = new DesiredNode(settings, processorCount, memory, storage, Version.CURRENT); } else { - final Double maxProcessors = randomBoolean() ? processorCount + randomIntBetween(1, 10) : null; + final double desiredNodes1Min = processorCount; + final Double desiredNodes1Max = randomBoolean() ? processorCount + randomIntBetween(1, 10) : null; + final DesiredNode.ProcessorsRange desiredNodes1ProcessorsRange = new DesiredNode.ProcessorsRange( + desiredNodes1Min, + desiredNodes1Max + ); + + final double modifiedMinProcessors = isEqualOrCloseTo ? (float) desiredNodes1Min : desiredNodes1Min + maxDelta; - final Double maxProcessorsDesiredNode1; - if (maxProcessors != null && randomBoolean()) { - maxProcessorsDesiredNode1 = maxProcessors + maxDifferenceBetweenProcessorCounts; + final double desiredNodes2Min; + final Double desiredNodes2Max; + if (desiredNodes1Max != null && randomBoolean()) { + desiredNodes2Min = randomBoolean() ? desiredNodes1Min : modifiedMinProcessors; + desiredNodes2Max = isEqualOrCloseTo ? desiredNodes1Max.floatValue() : desiredNodes1Max + maxDelta; } else { - maxProcessorsDesiredNode1 = maxProcessors; + desiredNodes2Min = modifiedMinProcessors; + desiredNodes2Max = desiredNodes1Max; } - - final DesiredNode.ProcessorsRange processorsRange1 = new DesiredNode.ProcessorsRange( - processorCount + maxDifferenceBetweenProcessorCounts, - maxProcessorsDesiredNode1 + final DesiredNode.ProcessorsRange desiredNodes2ProcessorsRange = new DesiredNode.ProcessorsRange( + desiredNodes2Min, + desiredNodes2Max ); - final DesiredNode.ProcessorsRange processorsRange2 = new DesiredNode.ProcessorsRange(processorCount, maxProcessors); - - desiredNode1 = new DesiredNode(settings, processorsRange1, memory, storage, Version.CURRENT); - desiredNode2 = new DesiredNode(settings, processorsRange2, memory, storage, Version.CURRENT); + desiredNode1 = new DesiredNode(settings, desiredNodes1ProcessorsRange, memory, storage, Version.CURRENT); + desiredNode2 = new DesiredNode(settings, desiredNodes2ProcessorsRange, memory, storage, Version.CURRENT); } - assertThat(desiredNode1.equalsWithProcessorsCloseTo(desiredNode2, MAX_ERROR), is(shouldBeConsideredEqual)); + assertThat(desiredNode1.equalsWithProcessorsCloseTo(desiredNode2), is(isEqualOrCloseTo)); assertThat(desiredNode1, is(not(equalTo(desiredNode2)))); } diff --git a/server/src/test/java/org/elasticsearch/common/unit/ProcessorsSerializationTests.java b/server/src/test/java/org/elasticsearch/common/unit/ProcessorsSerializationTests.java index 267c8f92ed9ea..054ab6dd8518c 100644 --- a/server/src/test/java/org/elasticsearch/common/unit/ProcessorsSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/common/unit/ProcessorsSerializationTests.java @@ -21,11 +21,11 @@ protected Writeable.Reader instanceReader() { @Override protected Processors createTestInstance() { - return new Processors(randomDoubleBetween(Math.ulp(0.0), 512.99999999, true)); + return Processors.of(randomDoubleBetween(Math.ulp(0.0), 512.99999999, true)); } @Override protected Processors mutateInstance(Processors instance) throws IOException { - return new Processors(instance.count() + randomDoubleBetween(0.01, 1, true)); + return Processors.of(instance.count() + randomDoubleBetween(0.01, 1, true)); } } diff --git a/server/src/test/java/org/elasticsearch/common/unit/ProcessorsTests.java b/server/src/test/java/org/elasticsearch/common/unit/ProcessorsTests.java index 716e9c8ddadfb..800d43e636dee 100644 --- a/server/src/test/java/org/elasticsearch/common/unit/ProcessorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/unit/ProcessorsTests.java @@ -19,7 +19,7 @@ public class ProcessorsTests extends ESTestCase { public void testTruncatesAfterFiveDecimalPlaces() { final double processorCount = randomNumberOfProcessors(); - final Processors processors = new Processors(processorCount); + final Processors processors = Processors.of(processorCount); final String processorsString = Double.toString(processors.count()); final int decimalPlaces = processorsString.length() - processorsString.indexOf(".") - 1; assertThat(decimalPlaces, is(lessThanOrEqualTo(Processors.NUMBER_OF_DECIMAL_PLACES))); @@ -27,39 +27,39 @@ public void testTruncatesAfterFiveDecimalPlaces() { public void testRounding() { { - final Processors processors = new Processors(1.2); + final Processors processors = Processors.of(1.2); assertThat(processors.roundDown(), is(equalTo(1))); assertThat(processors.roundUp(), is(equalTo(2))); } { - final Processors processors = new Processors(0.1); + final Processors processors = Processors.of(0.1); assertThat(processors.roundDown(), is(equalTo(1))); assertThat(processors.roundUp(), is(equalTo(1))); } { - final Processors processors = new Processors(1E-12); + final Processors processors = Processors.of(1E-12); assertThat(processors.roundDown(), is(equalTo(1))); assertThat(processors.roundUp(), is(equalTo(1))); } } public void testNeverRoundsDownToZero() { - final Processors processors = new Processors(1E-12); + final Processors processors = Processors.of(1E-12); assertThat(processors.count(), is(greaterThan(0.0))); } public void testValidation() { - expectThrows(IllegalArgumentException.class, () -> new Processors(-1.0)); - expectThrows(IllegalArgumentException.class, () -> new Processors(Double.POSITIVE_INFINITY)); - expectThrows(IllegalArgumentException.class, () -> new Processors(Double.NEGATIVE_INFINITY)); - expectThrows(IllegalArgumentException.class, () -> new Processors(Double.NaN)); + expectThrows(IllegalArgumentException.class, () -> Processors.of(-1.0)); + expectThrows(IllegalArgumentException.class, () -> Processors.of(Double.POSITIVE_INFINITY)); + expectThrows(IllegalArgumentException.class, () -> Processors.of(Double.NEGATIVE_INFINITY)); + expectThrows(IllegalArgumentException.class, () -> Processors.of(Double.NaN)); } public void testAddition() { - final Processors processorsA = new Processors(randomNumberOfProcessors()); - final Processors processorsB = new Processors(randomNumberOfProcessors()); + final Processors processorsA = Processors.of(randomNumberOfProcessors()); + final Processors processorsB = Processors.of(randomNumberOfProcessors()); final Processors addedProcessors = processorsA.plus(processorsB); @@ -68,19 +68,27 @@ public void testAddition() { } public void testOverflowAddition() { - final Processors processorsA = new Processors(Double.MAX_VALUE); - final Processors processorsB = new Processors(Double.MAX_VALUE); + final Processors processorsA = Processors.of(Double.MAX_VALUE); + final Processors processorsB = Processors.of(Double.MAX_VALUE); expectThrows(ArithmeticException.class, () -> processorsA.plus(processorsB)); } public void testMultiplication() { - final Processors processors = new Processors(randomNumberOfProcessors()); + final Processors processors = Processors.of(randomNumberOfProcessors()); final Processors multipliedProcessors = processors.multiply(100); assertThat(multipliedProcessors, is(greaterThan(processors))); } + public void testFloatProcessorsConvertedToDoubleAreCloseToEqual() { + final double processorCount = randomNumberOfProcessors(); + final float processorCountAsFloat = (float) processorCount; + final Processors bwcProcessors = Processors.of((double) processorCountAsFloat); + final Processors doubleProcessor = Processors.of(processorCount); + assertThat(Processors.equalsOrCloseTo(bwcProcessors, doubleProcessor), is(true)); + } + private double randomNumberOfProcessors() { return randomDoubleBetween(Math.ulp(0.0), 512.99999999, true); } diff --git a/server/src/test/java/org/elasticsearch/monitor/os/OsProbeTests.java b/server/src/test/java/org/elasticsearch/monitor/os/OsProbeTests.java index c24d78406b31e..a485e128e8feb 100644 --- a/server/src/test/java/org/elasticsearch/monitor/os/OsProbeTests.java +++ b/server/src/test/java/org/elasticsearch/monitor/os/OsProbeTests.java @@ -56,7 +56,7 @@ List readOsRelease() { } }; - final OsInfo info = osProbe.osInfo(refreshInterval, new Processors(allocatedProcessors)); + final OsInfo info = osProbe.osInfo(refreshInterval, Processors.of((double) allocatedProcessors)); assertNotNull(info); assertThat(info.getRefreshInterval(), equalTo(refreshInterval)); assertThat(info.getName(), equalTo(Constants.OS_NAME)); diff --git a/server/src/test/java/org/elasticsearch/nodesinfo/NodeInfoStreamingTests.java b/server/src/test/java/org/elasticsearch/nodesinfo/NodeInfoStreamingTests.java index c1f4a75a1237e..ca953f76a0997 100644 --- a/server/src/test/java/org/elasticsearch/nodesinfo/NodeInfoStreamingTests.java +++ b/server/src/test/java/org/elasticsearch/nodesinfo/NodeInfoStreamingTests.java @@ -112,7 +112,7 @@ private static NodeInfo createNodeInfo() { OsInfo osInfo = null; if (randomBoolean()) { int availableProcessors = randomIntBetween(1, 64); - Processors allocatedProcessors = new Processors(randomIntBetween(1, availableProcessors)); + Processors allocatedProcessors = Processors.of((double) randomIntBetween(1, availableProcessors)); long refreshInterval = randomBoolean() ? -1 : randomNonNegativeLong(); String name = randomAlphaOfLengthBetween(3, 10); String arch = randomAlphaOfLengthBetween(3, 10); diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/FixedAutoscalingDeciderService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/FixedAutoscalingDeciderService.java index 88a45076398fe..75f4493a61ae7 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/FixedAutoscalingDeciderService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/FixedAutoscalingDeciderService.java @@ -32,7 +32,7 @@ public class FixedAutoscalingDeciderService implements AutoscalingDeciderService public static final Setting PROCESSORS = new Setting<>( "processors", Double.toString(1.0), - textValue -> new Processors(Double.parseDouble(textValue)) + textValue -> Processors.of(Double.parseDouble(textValue)) ); public static final Setting NODES = Setting.intSetting("nodes", 1, 0); diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodeInfo.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodeInfo.java index 95ae3f0ddbeb8..38e2871fc9560 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodeInfo.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodeInfo.java @@ -30,7 +30,7 @@ Builder setMemory(long memory) { } Builder setProcessors(double processors) { - this.processors = new Processors(processors); + this.processors = Processors.of(processors); return this; } diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/existence/FrozenExistenceDeciderService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/existence/FrozenExistenceDeciderService.java index feb6aeec128d9..9b52162a14ca0 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/existence/FrozenExistenceDeciderService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/existence/FrozenExistenceDeciderService.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.Processors; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingCapacity; import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext; @@ -56,7 +57,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider builder.total(MINIMUM_FROZEN_STORAGE, MINIMUM_FROZEN_MEMORY, null); builder.node(MINIMUM_FROZEN_STORAGE, MINIMUM_FROZEN_MEMORY, null); } else { - builder.total(ByteSizeValue.ZERO, ByteSizeValue.ZERO, null); + builder.total(ByteSizeValue.ZERO, ByteSizeValue.ZERO, Processors.ZERO); } return new AutoscalingDeciderResult(builder.build(), new FrozenExistenceReason(indicesNeedingFrozen)); diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/AutoscalingTestCase.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/AutoscalingTestCase.java index 53eb4b71d71a2..333730bb677f9 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/AutoscalingTestCase.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/AutoscalingTestCase.java @@ -155,7 +155,7 @@ public static AutoscalingPolicy randomAutoscalingPolicyOfName(final String name) } public static Processors randomProcessors() { - return Processors.of(randomInt(128) + randomDouble()); + return Processors.of(randomDoubleBetween(Double.MIN_VALUE, 512.9999999, true)); } public static AutoscalingPolicy mutateAutoscalingPolicy(final AutoscalingPolicy instance) { diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java index 470af871fa7cd..89b735f890737 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/nodeinfo/AutoscalingNodesInfoServiceTests.java @@ -403,7 +403,7 @@ public void assertMatchesResponse(Set nodes, NodesStatsResponse r equalTo( new AutoscalingNodeInfo( response.getNodesMap().get(n.getId()).getOs().getMem().getAdjustedTotal().getBytes(), - new Processors(infoResponse.getNodesMap().get(n.getId()).getInfo(OsInfo.class).getFractionalAllocatedProcessors()) + Processors.of(infoResponse.getNodesMap().get(n.getId()).getInfo(OsInfo.class).getFractionalAllocatedProcessors()) ) ) ); @@ -446,7 +446,7 @@ private static NodeStats statsForNode(DiscoveryNode node, long memory) { } private static org.elasticsearch.action.admin.cluster.node.info.NodeInfo infoForNode(DiscoveryNode node, int processors) { - OsInfo osInfo = new OsInfo(randomLong(), processors, new Processors(processors), null, null, null, null); + OsInfo osInfo = new OsInfo(randomLong(), processors, Processors.of((double) processors), null, null, null, null); return new org.elasticsearch.action.admin.cluster.node.info.NodeInfo( Version.CURRENT, Build.CURRENT, From b1b865a735beebeeea680158bf6df444cc060304 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Wed, 31 Aug 2022 12:19:29 +0200 Subject: [PATCH 6/9] Revert change --- .../elasticsearch/common/unit/Processors.java | 47 +++++++++---------- .../cluster/metadata/DesiredNodeTests.java | 2 - 2 files changed, 22 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/unit/Processors.java b/server/src/main/java/org/elasticsearch/common/unit/Processors.java index 2b8231c3cd85b..9eb4e08e8c7e7 100644 --- a/server/src/main/java/org/elasticsearch/common/unit/Processors.java +++ b/server/src/main/java/org/elasticsearch/common/unit/Processors.java @@ -34,19 +34,16 @@ public class Processors implements Writeable, Comparable, ToXContent static final int NUMBER_OF_DECIMAL_PLACES = 5; private static final double MIN_REPRESENTABLE_PROCESSORS = 1E-5; - private final double rawCount; - private final double roundedCount; + private final double count; - private Processors(double rawCount) { + private Processors(double count) { // Avoid rounding up to MIN_REPRESENTABLE_PROCESSORS when 0 processors are used - if (rawCount == 0.0) { - this.rawCount = rawCount; - this.roundedCount = rawCount; + if (count == 0.0) { + this.count = count; } else { - this.rawCount = rawCount; - this.roundedCount = Math.max( + this.count = Math.max( MIN_REPRESENTABLE_PROCESSORS, - new BigDecimal(rawCount).setScale(NUMBER_OF_DECIMAL_PLACES, RoundingMode.HALF_UP).doubleValue() + new BigDecimal(count).setScale(NUMBER_OF_DECIMAL_PLACES, RoundingMode.HALF_UP).doubleValue() ); } } @@ -80,11 +77,11 @@ public static Processors readFrom(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().before(FLOAT_PROCESSORS_SUPPORT_VERSION)) { assert hasDecimals() == false; - out.writeInt((int) rawCount); + out.writeInt((int) count); } else if (out.getVersion().before(DOUBLE_PROCESSORS_SUPPORT_VERSION)) { - out.writeFloat((float) rawCount); + out.writeFloat((float) count); } else { - out.writeDouble(rawCount); + out.writeDouble(count); } } @@ -101,11 +98,11 @@ public static Processors fromXContent(XContentParser parser) throws IOException @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return builder.value(roundedCount); + return builder.value(count); } public Processors plus(Processors other) { - final double newProcessorCount = rawCount + other.rawCount; + final double newProcessorCount = count + other.count; if (Double.isFinite(newProcessorCount) == false) { throw new ArithmeticException("Unable to add [" + this + "] and [" + other + "] the resulting value overflows"); } @@ -118,7 +115,7 @@ public Processors multiply(int value) { throw new IllegalArgumentException("Processors cannot be multiplied by a negative number"); } - final double newProcessorCount = rawCount * value; + final double newProcessorCount = count * value; if (Double.isFinite(newProcessorCount) == false) { throw new ArithmeticException("Unable to multiply [" + this + "] by [" + value + "] the resulting value overflows"); } @@ -127,15 +124,15 @@ public Processors multiply(int value) { } public double count() { - return roundedCount; + return count; } public int roundUp() { - return (int) Math.ceil(rawCount); + return (int) Math.ceil(count); } public int roundDown() { - return Math.max(1, (int) Math.floor(rawCount)); + return Math.max(1, (int) Math.floor(count)); } private static boolean validNumberOfProcessors(double processors) { @@ -143,7 +140,7 @@ private static boolean validNumberOfProcessors(double processors) { } private boolean hasDecimals() { - return ((int) rawCount) != Math.ceil(rawCount); + return ((int) count) != Math.ceil(count); } public boolean isCompatibleWithVersion(Version version) { @@ -156,7 +153,7 @@ public boolean isCompatibleWithVersion(Version version) { @Override public int compareTo(Processors o) { - return Double.compare(rawCount, o.rawCount); + return Double.compare(count, o.count); } public static boolean equalsOrCloseTo(Processors a, Processors b) { @@ -168,8 +165,8 @@ private boolean closeToAsFloat(Processors b) { return false; } - float floatCount = (float) rawCount; - float otherFloatCount = (float) b.rawCount; + float floatCount = (float) count; + float otherFloatCount = (float) b.count; float maxError = Math.max(Math.ulp(floatCount), Math.ulp(otherFloatCount)) + (float) MIN_REPRESENTABLE_PROCESSORS; return Float.isFinite(floatCount) && Float.isFinite(otherFloatCount) && (Math.abs(floatCount - otherFloatCount) < maxError); } @@ -179,16 +176,16 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Processors that = (Processors) o; - return Double.compare(that.rawCount, rawCount) == 0; + return Double.compare(that.count, count) == 0; } @Override public int hashCode() { - return Objects.hash(rawCount); + return Objects.hash(count); } @Override public String toString() { - return Double.toString(roundedCount); + return Double.toString(count); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeTests.java index 07490909e1ec9..262bee527684d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeTests.java @@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; public class DesiredNodeTests extends ESTestCase { @@ -276,7 +275,6 @@ public void testEqualsOrProcessorsCloseTo() { } assertThat(desiredNode1.equalsWithProcessorsCloseTo(desiredNode2), is(isEqualOrCloseTo)); - assertThat(desiredNode1, is(not(equalTo(desiredNode2)))); } private double randomNumberOfProcessors() { From 152d9b3578d9f83a390050ea7a05ba0d7e13b094 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Thu, 1 Sep 2022 16:11:44 +0200 Subject: [PATCH 7/9] Improve test --- .../org/elasticsearch/common/unit/ProcessorsTests.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/common/unit/ProcessorsTests.java b/server/src/test/java/org/elasticsearch/common/unit/ProcessorsTests.java index 800d43e636dee..2618bba1cb1a8 100644 --- a/server/src/test/java/org/elasticsearch/common/unit/ProcessorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/unit/ProcessorsTests.java @@ -13,6 +13,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; public class ProcessorsTests extends ESTestCase { @@ -21,7 +22,14 @@ public void testTruncatesAfterFiveDecimalPlaces() { final Processors processors = Processors.of(processorCount); final String processorsString = Double.toString(processors.count()); - final int decimalPlaces = processorsString.length() - processorsString.indexOf(".") - 1; + final int decimalPlaces; + if (processorsString.contains("E")) { + int exponent = Integer.parseInt(processorsString.substring(processorsString.indexOf("E") + 1)); + assertThat(exponent, is(lessThan(0))); + decimalPlaces = Math.abs(exponent); + } else { + decimalPlaces = processorsString.length() - processorsString.indexOf(".") - 1; + } assertThat(decimalPlaces, is(lessThanOrEqualTo(Processors.NUMBER_OF_DECIMAL_PLACES))); } From d8ff31ff4219d67505ae9ec150ffa57f29b63485 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Tue, 6 Sep 2022 16:41:42 +0200 Subject: [PATCH 8/9] Review comments --- .../elasticsearch/common/unit/Processors.java | 2 +- .../cluster/metadata/DesiredNodeTests.java | 4 +--- .../cluster/metadata/DesiredNodesTestCase.java | 2 +- .../unit/ProcessorsSerializationTests.java | 2 +- .../common/unit/ProcessorsTests.java | 17 ++++++++++++++++- 5 files changed, 20 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/unit/Processors.java b/server/src/main/java/org/elasticsearch/common/unit/Processors.java index 9eb4e08e8c7e7..b95bdd3615f3c 100644 --- a/server/src/main/java/org/elasticsearch/common/unit/Processors.java +++ b/server/src/main/java/org/elasticsearch/common/unit/Processors.java @@ -136,7 +136,7 @@ public int roundDown() { } private static boolean validNumberOfProcessors(double processors) { - return Double.isFinite(processors) && processors >= 0.0; + return Double.isFinite(processors) && processors > 0.0; } private boolean hasDecimals() { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeTests.java index 262bee527684d..5451da0c22351 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeTests.java @@ -25,8 +25,6 @@ import static org.hamcrest.Matchers.notNullValue; public class DesiredNodeTests extends ESTestCase { - public static final double MAX_ERROR = 7E-5; - public void testExternalIdIsRequired() { final Settings.Builder settings = Settings.builder(); if (randomBoolean()) { @@ -278,7 +276,7 @@ public void testEqualsOrProcessorsCloseTo() { } private double randomNumberOfProcessors() { - return randomDoubleBetween(Math.ulp(0.0), 512.99999999, true); + return randomDoubleBetween(Double.MIN_VALUE, 512.99999999, true); } private Double randomInvalidProcessor() { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodesTestCase.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodesTestCase.java index 532093a8cf0fb..69dc49bb0dd0f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodesTestCase.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodesTestCase.java @@ -186,6 +186,6 @@ public static DiscoveryNode newDiscoveryNode(String nodeName) { } public static double randomNumberOfProcessors() { - return randomDoubleBetween(Math.ulp(0.0), 512.99999999, true); + return randomDoubleBetween(Double.MIN_VALUE, 512.99999999, true); } } diff --git a/server/src/test/java/org/elasticsearch/common/unit/ProcessorsSerializationTests.java b/server/src/test/java/org/elasticsearch/common/unit/ProcessorsSerializationTests.java index 054ab6dd8518c..25d024f23c35b 100644 --- a/server/src/test/java/org/elasticsearch/common/unit/ProcessorsSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/common/unit/ProcessorsSerializationTests.java @@ -21,7 +21,7 @@ protected Writeable.Reader instanceReader() { @Override protected Processors createTestInstance() { - return Processors.of(randomDoubleBetween(Math.ulp(0.0), 512.99999999, true)); + return Processors.of(randomDoubleBetween(Double.MIN_VALUE, 512.99999999, true)); } @Override diff --git a/server/src/test/java/org/elasticsearch/common/unit/ProcessorsTests.java b/server/src/test/java/org/elasticsearch/common/unit/ProcessorsTests.java index 2618bba1cb1a8..1abc187e1640a 100644 --- a/server/src/test/java/org/elasticsearch/common/unit/ProcessorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/unit/ProcessorsTests.java @@ -21,7 +21,8 @@ public void testTruncatesAfterFiveDecimalPlaces() { final double processorCount = randomNumberOfProcessors(); final Processors processors = Processors.of(processorCount); - final String processorsString = Double.toString(processors.count()); + final double roundedProcessorsCount = processors.count(); + final String processorsString = Double.toString(roundedProcessorsCount); final int decimalPlaces; if (processorsString.contains("E")) { int exponent = Integer.parseInt(processorsString.substring(processorsString.indexOf("E") + 1)); @@ -31,6 +32,8 @@ public void testTruncatesAfterFiveDecimalPlaces() { decimalPlaces = processorsString.length() - processorsString.indexOf(".") - 1; } assertThat(decimalPlaces, is(lessThanOrEqualTo(Processors.NUMBER_OF_DECIMAL_PLACES))); + + assertThat(Processors.of(roundedProcessorsCount).count(), is(equalTo(roundedProcessorsCount))); } public void testRounding() { @@ -40,6 +43,18 @@ public void testRounding() { assertThat(processors.roundUp(), is(equalTo(2))); } + { + final Processors processors = Processors.of(10.1); + assertThat(processors.roundDown(), is(equalTo(10))); + assertThat(processors.roundUp(), is(equalTo(11))); + } + + { + final Processors processors = Processors.of((double) 12); + assertThat(processors.roundDown(), is(equalTo(12))); + assertThat(processors.roundUp(), is(equalTo(12))); + } + { final Processors processors = Processors.of(0.1); assertThat(processors.roundDown(), is(equalTo(1))); From 53f9d498c2ba7bf62aeb038bf29d083d3a7e2050 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Tue, 6 Sep 2022 17:31:28 +0200 Subject: [PATCH 9/9] Remove dead code --- .../cluster/metadata/DesiredNode.java | 32 ------------------- 1 file changed, 32 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNode.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNode.java index f78ccc80cca75..518df45f956c5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNode.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DesiredNode.java @@ -153,12 +153,6 @@ public DesiredNode(Settings settings, double processors, ByteSizeValue memory, B ); } - if (invalidNumberOfProcessors(processors)) { - throw new IllegalArgumentException( - format(Locale.ROOT, "Only a positive number of [processors] are allowed and [%f] was provided", processors.count()) - ); - } - if (NODE_EXTERNAL_ID_SETTING.get(settings).isBlank()) { throw new IllegalArgumentException( format(Locale.ROOT, "[%s] or [%s] is missing or empty", NODE_NAME_SETTING.getKey(), NODE_EXTERNAL_ID_SETTING.getKey()) @@ -368,10 +362,6 @@ public String toString() { + '}'; } - private static boolean invalidNumberOfProcessors(Processors processors) { - return processors != null && processors.count() <= 0; - } - public record ProcessorsRange(Processors min, @Nullable Processors max) implements Writeable, ToXContentObject { private static final ParseField MIN_FIELD = new ParseField("min"); @@ -407,28 +397,6 @@ public ProcessorsRange(double min, Double max) { } public ProcessorsRange { - if (invalidNumberOfProcessors(min)) { - throw new IllegalArgumentException( - format( - Locale.ROOT, - "Only a positive number of [%s] processors are allowed and [%f] was provided", - MIN_FIELD.getPreferredName(), - min - ) - ); - } - - if (invalidNumberOfProcessors(max)) { - throw new IllegalArgumentException( - format( - Locale.ROOT, - "Only a positive number of [%s] processors are allowed and [%f] was provided", - MAX_FIELD.getPreferredName(), - max - ) - ); - } - if (max != null && min.compareTo(max) > 0) { throw new IllegalArgumentException( "min processors must be less than or equal to max processors and it was: min: " + min + " max: " + max