Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/89662.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 89662
summary: Centralize the concept of processors configuration
area: Autoscaling
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.monitor.os;

import org.apache.lucene.util.Constants;
import org.elasticsearch.common.unit.Processors;
import org.elasticsearch.core.PathUtils;
import org.elasticsearch.test.ESTestCase;

Expand All @@ -24,7 +25,7 @@
public class EvilOsProbeTests extends ESTestCase {

public void testOsPrettyName() throws IOException {
final OsInfo osInfo = OsProbe.getInstance().osInfo(randomLongBetween(1, 100), randomIntBetween(1, 8));
final OsInfo osInfo = OsProbe.getInstance().osInfo(randomLongBetween(1, 100), Processors.of((double) randomIntBetween(1, 8)));
if (Constants.LINUX) {
final List<String> lines;
if (Files.exists(PathUtils.get("/etc/os-release"))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.Processors;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xcontent.json.JsonXContent;

Expand All @@ -31,37 +32,109 @@
import static org.hamcrest.Matchers.is;

public class DesiredNodesUpgradeIT extends AbstractRollingTestCase {
private enum ProcessorsPrecision {
DOUBLE,
FLOAT
}

public void testUpgradeDesiredNodes() throws Exception {
// Desired nodes was introduced in 8.1
if (UPGRADE_FROM_VERSION.before(Version.V_8_1_0)) {
return;
}

if (UPGRADE_FROM_VERSION.onOrAfter(Processors.DOUBLE_PROCESSORS_SUPPORT_VERSION)) {
assertUpgradedNodesCanReadDesiredNodes();
} else if (UPGRADE_FROM_VERSION.onOrAfter(DesiredNode.RANGE_FLOAT_PROCESSORS_SUPPORT_VERSION)) {
assertDesiredNodesUpdatedWithRoundedUpFloatsAreIdempotent();
} else {
assertDesiredNodesWithFloatProcessorsAreRejectedInOlderVersions();
}
}

private void assertUpgradedNodesCanReadDesiredNodes() throws Exception {
final int desiredNodesVersion = switch (CLUSTER_TYPE) {
case OLD -> 1;
case MIXED -> FIRST_MIXED_ROUND ? 2 : 3;
case UPGRADED -> 4;
};

if (CLUSTER_TYPE != ClusterType.OLD) {
final Map<String, Object> desiredNodes = getLatestDesiredNodes();
final String historyId = extractValue(desiredNodes, "history_id");
final int version = extractValue(desiredNodes, "version");
assertThat(historyId, is(equalTo("upgrade_test")));
assertThat(version, is(equalTo(desiredNodesVersion - 1)));
}

addClusterNodesToDesiredNodesWithProcessorsOrProcessorRanges(desiredNodesVersion, ProcessorsPrecision.DOUBLE);
assertAllDesiredNodesAreActualized();
}

private void assertDesiredNodesUpdatedWithRoundedUpFloatsAreIdempotent() throws Exception {
// We define the same set of desired nodes to ensure that they are equal across all
// the test runs, otherwise we cannot guarantee an idempotent update in this test
final var desiredNodes = getNodeNames().stream()
.map(
nodeName -> new DesiredNode(
Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeName).build(),
1238.49922909,
ByteSizeValue.ofGb(32),
ByteSizeValue.ofGb(128),
Version.CURRENT
)
)
.toList();

final int desiredNodesVersion = switch (CLUSTER_TYPE) {
case OLD -> 1;
case MIXED -> FIRST_MIXED_ROUND ? 2 : 3;
case UPGRADED -> 4;
};

if (CLUSTER_TYPE != ClusterType.OLD) {
updateDesiredNodes(desiredNodes, desiredNodesVersion - 1);
}
for (int i = 0; i < 2; i++) {
updateDesiredNodes(desiredNodes, desiredNodesVersion);
}

final Map<String, Object> latestDesiredNodes = getLatestDesiredNodes();
final int latestDesiredNodesVersion = extractValue(latestDesiredNodes, "version");
assertThat(latestDesiredNodesVersion, is(equalTo(desiredNodesVersion)));

if (CLUSTER_TYPE == ClusterType.UPGRADED) {
assertAllDesiredNodesAreActualized();
}
}

private void assertDesiredNodesWithFloatProcessorsAreRejectedInOlderVersions() throws Exception {
switch (CLUSTER_TYPE) {
case OLD -> addClusterNodesToDesiredNodesWithIntegerProcessors(1);
case MIXED -> {
int version = FIRST_MIXED_ROUND ? 2 : 3;
if (UPGRADE_FROM_VERSION.onOrAfter(DesiredNode.RANGE_FLOAT_PROCESSORS_SUPPORT_VERSION)) {
addClusterNodesToDesiredNodesWithFloatProcessorsOrProcessorRanges(version);
} else {
// Processor ranges or float processors are forbidden during upgrades: 8.2 -> 8.3 clusters
final var responseException = expectThrows(
ResponseException.class,
() -> addClusterNodesToDesiredNodesWithFloatProcessorsOrProcessorRanges(version)
);
final var statusCode = responseException.getResponse().getStatusLine().getStatusCode();
assertThat(statusCode, is(equalTo(400)));
}
// Processor ranges or float processors are forbidden during upgrades: 8.2 -> 8.3 clusters
final var responseException = expectThrows(
ResponseException.class,
() -> addClusterNodesToDesiredNodesWithProcessorsOrProcessorRanges(version, ProcessorsPrecision.FLOAT)
);
final var statusCode = responseException.getResponse().getStatusLine().getStatusCode();
assertThat(statusCode, is(equalTo(400)));
}
case UPGRADED -> {
assertAllDesiredNodesAreActualized();
addClusterNodesToDesiredNodesWithFloatProcessorsOrProcessorRanges(4);
addClusterNodesToDesiredNodesWithProcessorsOrProcessorRanges(4, ProcessorsPrecision.FLOAT);
}
}

getLatestDesiredNodes();
}

private Map<String, Object> getLatestDesiredNodes() throws IOException {
final var getDesiredNodesRequest = new Request("GET", "/_internal/desired_nodes/_latest");
final var response = client().performRequest(getDesiredNodesRequest);
assertThat(response.getStatusLine().getStatusCode(), is(equalTo(200)));
return responseAsMap(response);
}

private void assertAllDesiredNodesAreActualized() throws Exception {
Expand All @@ -77,32 +150,34 @@ private void assertAllDesiredNodesAreActualized() throws Exception {
}
}

private void addClusterNodesToDesiredNodesWithFloatProcessorsOrProcessorRanges(int version) throws Exception {
private void addClusterNodesToDesiredNodesWithProcessorsOrProcessorRanges(int version, ProcessorsPrecision processorsPrecision)
throws Exception {
final List<DesiredNode> nodes;
if (randomBoolean()) {
nodes = getNodeNames().stream()
.map(
nodeName -> new DesiredNode(
Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeName).build(),
0.5f,
processorsPrecision == ProcessorsPrecision.DOUBLE ? randomDoubleProcessorCount() : randomFloatProcessorCount(),
ByteSizeValue.ofGb(randomIntBetween(10, 24)),
ByteSizeValue.ofGb(randomIntBetween(128, 256)),
Version.CURRENT
)
)
.toList();
} else {
nodes = getNodeNames().stream()
.map(
nodeName -> new DesiredNode(
Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeName).build(),
new DesiredNode.ProcessorsRange(randomIntBetween(1, 10), (float) randomIntBetween(20, 30)),
ByteSizeValue.ofGb(randomIntBetween(10, 24)),
ByteSizeValue.ofGb(randomIntBetween(128, 256)),
Version.CURRENT
)
)
.toList();
nodes = getNodeNames().stream().map(nodeName -> {
double minProcessors = processorsPrecision == ProcessorsPrecision.DOUBLE
? randomDoubleProcessorCount()
: randomFloatProcessorCount();
return new DesiredNode(
Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeName).build(),
new DesiredNode.ProcessorsRange(minProcessors, minProcessors + randomIntBetween(10, 20)),
ByteSizeValue.ofGb(randomIntBetween(10, 24)),
ByteSizeValue.ofGb(randomIntBetween(128, 256)),
Version.CURRENT
);
}).toList();
}
updateDesiredNodes(nodes, version);
}
Expand All @@ -123,7 +198,7 @@ private void addClusterNodesToDesiredNodesWithIntegerProcessors(int version) thr
}

private void updateDesiredNodes(List<DesiredNode> nodes, int version) throws IOException {
final var request = new Request("PUT", "/_internal/desired_nodes/history/" + version);
final var request = new Request("PUT", "/_internal/desired_nodes/upgrade_test/" + version);
try (var builder = JsonXContent.contentBuilder()) {
builder.startObject();
builder.xContentList(UpdateDesiredNodesRequest.NODES_FIELD.getPreferredName(), nodes);
Expand All @@ -149,6 +224,14 @@ private List<String> getNodeNames() throws Exception {
return nodeNames;
}

private double randomDoubleProcessorCount() {
return randomDoubleBetween(0.5, 512.1234, true);
}

private float randomFloatProcessorCount() {
return randomIntBetween(1, 512) + randomFloat();
}

@SuppressWarnings("unchecked")
private static <T> T extractValue(Map<String, Object> map, String path) {
return (T) XContentMapValues.extractValue(path, map);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ public void testNodesInfosTotalIndexingBuffer() throws Exception {

public void testAllocatedProcessors() throws Exception {
List<String> nodesIds = internalCluster().startNodes(
Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 3).build(),
Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 6).build()
Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 2.9).build(),
Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 5.9).build()
);

final String node_1 = nodesIds.get(0);
Expand Down Expand Up @@ -134,6 +134,8 @@ public void testAllocatedProcessors() throws Exception {
);

assertThat(response.getNodesMap().get(server1NodeId).getInfo(OsInfo.class).getAllocatedProcessors(), equalTo(3));
assertThat(response.getNodesMap().get(server1NodeId).getInfo(OsInfo.class).getFractionalAllocatedProcessors(), equalTo(2.9));
assertThat(response.getNodesMap().get(server2NodeId).getInfo(OsInfo.class).getAllocatedProcessors(), equalTo(6));
assertThat(response.getNodesMap().get(server2NodeId).getInfo(OsInfo.class).getFractionalAllocatedProcessors(), equalTo(5.9));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ protected void doExecute(Task task, UpdateDesiredNodesRequest request, ActionLis
if (request.isCompatibleWithVersion(minNodeVersion) == false) {
listener.onFailure(
new IllegalArgumentException(
"Unable to use processor ranges or floating-point processors in mixed-clusters with nodes in version: " + minNodeVersion
"Unable to use processor ranges, floating-point (with greater precision) processors "
+ "in mixed-clusters with nodes in version: "
+ minNodeVersion
)
);
return;
Expand All @@ -124,7 +126,7 @@ static DesiredNodes updateDesiredNodes(DesiredNodes latestDesiredNodes, UpdateDe
);

if (latestDesiredNodes != null) {
if (latestDesiredNodes.equals(proposedDesiredNodes)) {
if (latestDesiredNodes.equalsWithProcessorsCloseTo(proposedDesiredNodes)) {
return latestDesiredNodes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ public UpdateDesiredNodesRequest(StreamInput in) throws IOException {
this.historyID = in.readString();
this.version = in.readLong();
this.nodes = in.readList(DesiredNode::readFrom);
dryRun = in.getVersion().onOrAfter(DRY_RUN_VERSION) ? in.readBoolean() : false;
if (in.getVersion().onOrAfter(DRY_RUN_VERSION)) {
this.dryRun = in.readBoolean();
} else {
this.dryRun = false;
}
}

@Override
Expand Down Expand Up @@ -98,6 +102,7 @@ public boolean isCompatibleWithVersion(Version version) {
if (version.onOrAfter(DesiredNode.RANGE_FLOAT_PROCESSORS_SUPPORT_VERSION)) {
return true;
}

return nodes.stream().allMatch(desiredNode -> desiredNode.isCompatibleWithVersion(version));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,13 @@ private void validate(DesiredNode node) {
int minProcessors = node.roundedDownMinProcessors();
Integer roundedUpMaxProcessors = node.roundedUpMaxProcessors();
int maxProcessors = roundedUpMaxProcessors == null ? minProcessors : roundedUpMaxProcessors;
Setting.intSetting(NODE_PROCESSORS_SETTING.getKey(), minProcessors, 1, maxProcessors, Setting.Property.NodeScope).get(settings);
Setting.doubleSetting(
NODE_PROCESSORS_SETTING.getKey(),
minProcessors,
Double.MIN_VALUE,
maxProcessors,
Setting.Property.NodeScope
).get(settings);
final Settings.Builder updatedSettings = Settings.builder().put(settings);
updatedSettings.remove(NODE_PROCESSORS_SETTING.getKey());
settings = updatedSettings.build();
Expand Down
Loading