-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Operator/autoscaling #89708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Operator/autoscaling #89708
Changes from all commits
59a4deb
c903228
0a87ab8
25c0fa1
5f177cf
637d63f
6eb0fda
ed30627
12fdcea
f4e39b6
c652c66
782ded9
d93d715
0018f75
b14b182
5801ad4
944b8fd
700b558
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 89708 | ||
| summary: Operator/autoscaling | ||
| area: Infra/Core | ||
| type: enhancement | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,229 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| package org.elasticsearch.xpack.autoscaling; | ||
|
|
||
| import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; | ||
| import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; | ||
| import org.elasticsearch.cluster.ClusterChangedEvent; | ||
| import org.elasticsearch.cluster.ClusterStateListener; | ||
| import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata; | ||
| import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata; | ||
| import org.elasticsearch.cluster.metadata.ReservedStateMetadata; | ||
| import org.elasticsearch.cluster.service.ClusterService; | ||
| import org.elasticsearch.core.Strings; | ||
| import org.elasticsearch.core.Tuple; | ||
| import org.elasticsearch.reservedstate.service.FileSettingsService; | ||
| import org.elasticsearch.xcontent.XContentParserConfiguration; | ||
| import org.elasticsearch.xpack.autoscaling.action.PutAutoscalingPolicyAction; | ||
| import org.elasticsearch.xpack.autoscaling.action.ReservedAutoscalingPolicyAction; | ||
|
|
||
| import java.io.ByteArrayInputStream; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.nio.file.Files; | ||
| import java.nio.file.Path; | ||
| import java.nio.file.StandardCopyOption; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
|
|
||
| import static org.elasticsearch.xcontent.XContentType.JSON; | ||
| import static org.hamcrest.Matchers.allOf; | ||
| import static org.hamcrest.Matchers.containsInAnyOrder; | ||
| import static org.hamcrest.Matchers.containsString; | ||
| import static org.hamcrest.Matchers.hasSize; | ||
| import static org.hamcrest.Matchers.notNullValue; | ||
|
|
||
| /** | ||
| * Tests that file settings service can properly add autoscaling policies and detect REST clashes | ||
| * with the reserved policies. | ||
| */ | ||
| public class AutoscalingFileSettingsIT extends AutoscalingIntegTestCase { | ||
|
|
||
| private static AtomicLong versionCounter = new AtomicLong(1); | ||
|
|
||
| private static String testJSON = """ | ||
| { | ||
| "metadata": { | ||
| "version": "%s", | ||
| "compatibility": "8.4.0" | ||
| }, | ||
| "state": { | ||
| "autoscaling": { | ||
| "my_autoscaling_policy": { | ||
| "roles" : [ "data_hot" ], | ||
| "deciders": { | ||
| "fixed": { | ||
| } | ||
| } | ||
| }, | ||
| "my_autoscaling_policy_1": { | ||
| "roles" : [ "data_warm" ], | ||
| "deciders": { | ||
| "fixed": { | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| }"""; | ||
|
|
||
| private static String testErrorJSON = """ | ||
| { | ||
| "metadata": { | ||
| "version": "%s", | ||
| "compatibility": "8.4.0" | ||
| }, | ||
| "state": { | ||
| "autoscaling": { | ||
| "my_autoscaling_policy_bad": { | ||
| "roles" : [ "data_warm" ], | ||
| "deciders": { | ||
| "undecided": { | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| }"""; | ||
|
|
||
| private void writeJSONFile(String node, String json) throws Exception { | ||
| long version = versionCounter.incrementAndGet(); | ||
|
|
||
| FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node); | ||
| assertTrue(fileSettingsService.watching()); | ||
|
|
||
| Files.deleteIfExists(fileSettingsService.operatorSettingsFile()); | ||
|
|
||
| Files.createDirectories(fileSettingsService.operatorSettingsDir()); | ||
| Path tempFilePath = createTempFile(); | ||
|
|
||
| logger.info("--> writing JSON config to node {} with path {}", node, tempFilePath); | ||
| Files.write(tempFilePath, Strings.format(json, version).getBytes(StandardCharsets.UTF_8)); | ||
| Files.move(tempFilePath, fileSettingsService.operatorSettingsFile(), StandardCopyOption.ATOMIC_MOVE); | ||
| } | ||
|
|
||
| private Tuple<CountDownLatch, AtomicLong> setupClusterStateListener(String node) { | ||
| ClusterService clusterService = internalCluster().clusterService(node); | ||
| CountDownLatch savedClusterState = new CountDownLatch(1); | ||
| AtomicLong metadataVersion = new AtomicLong(-1); | ||
| clusterService.addListener(new ClusterStateListener() { | ||
| @Override | ||
| public void clusterChanged(ClusterChangedEvent event) { | ||
| ReservedStateMetadata reservedState = event.state().metadata().reservedStateMetadata().get(FileSettingsService.NAMESPACE); | ||
| if (reservedState != null) { | ||
| ReservedStateHandlerMetadata handlerMetadata = reservedState.handlers().get(ReservedAutoscalingPolicyAction.NAME); | ||
| if (handlerMetadata != null && handlerMetadata.keys().contains("my_autoscaling_policy")) { | ||
| clusterService.removeListener(this); | ||
| metadataVersion.set(event.state().metadata().version()); | ||
| savedClusterState.countDown(); | ||
| } | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| return new Tuple<>(savedClusterState, metadataVersion); | ||
| } | ||
|
|
||
| private void assertPoliciesSaveOK(CountDownLatch savedClusterState, AtomicLong metadataVersion) throws Exception { | ||
| boolean awaitSuccessful = savedClusterState.await(20, TimeUnit.SECONDS); | ||
| assertTrue(awaitSuccessful); | ||
|
|
||
| final ClusterStateResponse clusterStateResponse = client().admin() | ||
| .cluster() | ||
| .state(new ClusterStateRequest().waitForMetadataVersion(metadataVersion.get())) | ||
| .actionGet(); | ||
|
|
||
| ReservedStateMetadata reservedState = clusterStateResponse.getState() | ||
| .metadata() | ||
| .reservedStateMetadata() | ||
| .get(FileSettingsService.NAMESPACE); | ||
|
|
||
| ReservedStateHandlerMetadata handlerMetadata = reservedState.handlers().get(ReservedAutoscalingPolicyAction.NAME); | ||
|
|
||
| assertThat(handlerMetadata.keys(), allOf(notNullValue(), containsInAnyOrder("my_autoscaling_policy", "my_autoscaling_policy_1"))); | ||
|
|
||
| // Try using the REST API to update the my_autoscaling_policy policy | ||
| // This should fail, we have reserved certain autoscaling policies in operator mode | ||
| assertEquals( | ||
| "Failed to process request [org.elasticsearch.xpack.autoscaling.action.PutAutoscalingPolicyAction$Request/unset] " | ||
| + "with errors: [[my_autoscaling_policy] set as read-only by [file_settings]]", | ||
| expectThrows( | ||
| IllegalArgumentException.class, | ||
| () -> client().execute(PutAutoscalingPolicyAction.INSTANCE, sampleRestRequest("my_autoscaling_policy")).actionGet() | ||
| ).getMessage() | ||
| ); | ||
| } | ||
|
|
||
| public void testPoliciesApplied() throws Exception { | ||
| ensureGreen(); | ||
|
|
||
| var savedClusterState = setupClusterStateListener(internalCluster().getMasterName()); | ||
| writeJSONFile(internalCluster().getMasterName(), testJSON); | ||
|
|
||
| assertPoliciesSaveOK(savedClusterState.v1(), savedClusterState.v2()); | ||
| } | ||
|
|
||
| private Tuple<CountDownLatch, AtomicLong> setupClusterStateListenerForError(String node) { | ||
| ClusterService clusterService = internalCluster().clusterService(node); | ||
| CountDownLatch savedClusterState = new CountDownLatch(1); | ||
| AtomicLong metadataVersion = new AtomicLong(-1); | ||
| clusterService.addListener(new ClusterStateListener() { | ||
| @Override | ||
| public void clusterChanged(ClusterChangedEvent event) { | ||
| ReservedStateMetadata reservedState = event.state().metadata().reservedStateMetadata().get(FileSettingsService.NAMESPACE); | ||
| if (reservedState != null && reservedState.errorMetadata() != null) { | ||
| clusterService.removeListener(this); | ||
| metadataVersion.set(event.state().metadata().version()); | ||
| savedClusterState.countDown(); | ||
| assertEquals(ReservedStateErrorMetadata.ErrorKind.VALIDATION, reservedState.errorMetadata().errorKind()); | ||
| assertThat(reservedState.errorMetadata().errors(), allOf(notNullValue(), hasSize(1))); | ||
| assertThat( | ||
| reservedState.errorMetadata().errors().get(0), | ||
| containsString("java.lang.IllegalArgumentException: unknown decider [undecided]") | ||
| ); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| return new Tuple<>(savedClusterState, metadataVersion); | ||
| } | ||
|
|
||
| private void assertPoliciesNotSaved(CountDownLatch savedClusterState, AtomicLong metadataVersion) throws Exception { | ||
| boolean awaitSuccessful = savedClusterState.await(20, TimeUnit.SECONDS); | ||
| assertTrue(awaitSuccessful); | ||
|
|
||
| // This should succeed, nothing was reserved | ||
| client().execute(PutAutoscalingPolicyAction.INSTANCE, sampleRestRequest("my_autoscaling_policy_bad")).actionGet(); | ||
| } | ||
|
|
||
| public void testErrorSaved() throws Exception { | ||
| ensureGreen(); | ||
| var savedClusterState = setupClusterStateListenerForError(internalCluster().getMasterName()); | ||
|
|
||
| writeJSONFile(internalCluster().getMasterName(), testErrorJSON); | ||
| assertPoliciesNotSaved(savedClusterState.v1(), savedClusterState.v2()); | ||
| } | ||
|
|
||
| private PutAutoscalingPolicyAction.Request sampleRestRequest(String name) throws Exception { | ||
| var json = """ | ||
| { | ||
| "roles" : [ "data_cold" ], | ||
| "deciders": { | ||
| "fixed": { | ||
| } | ||
| } | ||
| }"""; | ||
|
|
||
| try ( | ||
| var bis = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); | ||
| var parser = JSON.xContent().createParser(XContentParserConfiguration.EMPTY, bis) | ||
| ) { | ||
| return PutAutoscalingPolicyAction.Request.parse(parser, name); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| package org.elasticsearch.xpack.autoscaling; | ||
|
|
||
| import org.elasticsearch.reservedstate.ReservedClusterStateHandler; | ||
| import org.elasticsearch.reservedstate.ReservedClusterStateHandlerProvider; | ||
|
|
||
| import java.util.Collection; | ||
|
|
||
| /** | ||
| * Mock autoscaling provider implementation for the {@link ReservedClusterStateHandlerProvider} service interface | ||
| * <p> | ||
| * This class is a test version of the {@link ReservedAutoscalingStateHandlerProvider}. When we load handler providers through | ||
| * our custom SPI interface, we must match the plugin type exactly. With MockNode, when we run | ||
| * {@link org.elasticsearch.test.ESIntegTestCase} test cases, the version of the {@link Autoscaling} plugin | ||
| * is {@link LocalStateAutoscaling}, therefore we need to provide a test version of this class. | ||
| */ | ||
| public class LocalStateReservedAutoscalingStateHandlerProvider implements ReservedClusterStateHandlerProvider { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mock version of the SPI handler provider so that we can write Java integration tests. It implements equals and hashcode so we deduplicate the plugin, as it can be discovered multiple times in the MockNode because all plugins are loaded by the same classloader.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we not fix that in |
||
| private final LocalStateAutoscaling plugin; | ||
|
|
||
| public LocalStateReservedAutoscalingStateHandlerProvider() { | ||
| throw new IllegalStateException("Provider must be constructed using PluginsService"); | ||
| } | ||
|
|
||
| public LocalStateReservedAutoscalingStateHandlerProvider(LocalStateAutoscaling plugin) { | ||
| this.plugin = plugin; | ||
| } | ||
|
|
||
| @Override | ||
| public Collection<ReservedClusterStateHandler<?>> handlers() { | ||
| return plugin.testPlugin().reservedClusterStateHandlers(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| # | ||
| # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| # or more contributor license agreements. Licensed under the Elastic License | ||
| # 2.0; you may not use this file except in compliance with the Elastic License | ||
| # 2.0. | ||
| # | ||
|
|
||
| org.elasticsearch.xpack.autoscaling.LocalStateReservedAutoscalingStateHandlerProvider |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,7 @@ | |
| import org.elasticsearch.plugins.ExtensiblePlugin; | ||
| import org.elasticsearch.plugins.Plugin; | ||
| import org.elasticsearch.repositories.RepositoriesService; | ||
| import org.elasticsearch.reservedstate.ReservedClusterStateHandler; | ||
| import org.elasticsearch.rest.RestController; | ||
| import org.elasticsearch.rest.RestHandler; | ||
| import org.elasticsearch.script.ScriptService; | ||
|
|
@@ -43,6 +44,7 @@ | |
| import org.elasticsearch.xpack.autoscaling.action.GetAutoscalingCapacityAction; | ||
| import org.elasticsearch.xpack.autoscaling.action.GetAutoscalingPolicyAction; | ||
| import org.elasticsearch.xpack.autoscaling.action.PutAutoscalingPolicyAction; | ||
| import org.elasticsearch.xpack.autoscaling.action.ReservedAutoscalingPolicyAction; | ||
| import org.elasticsearch.xpack.autoscaling.action.TransportDeleteAutoscalingPolicyAction; | ||
| import org.elasticsearch.xpack.autoscaling.action.TransportGetAutoscalingCapacityAction; | ||
| import org.elasticsearch.xpack.autoscaling.action.TransportGetAutoscalingPolicyAction; | ||
|
|
@@ -92,6 +94,7 @@ public class Autoscaling extends Plugin implements ActionPlugin, ExtensiblePlugi | |
| private final SetOnce<ClusterService> clusterServiceHolder = new SetOnce<>(); | ||
| private final SetOnce<AllocationDeciders> allocationDeciders = new SetOnce<>(); | ||
| private final AutoscalingLicenseChecker autoscalingLicenseChecker; | ||
| private final SetOnce<ReservedAutoscalingPolicyAction> reservedAutoscalingPolicyAction = new SetOnce<>(); | ||
|
|
||
| public Autoscaling() { | ||
| this(new AutoscalingLicenseChecker()); | ||
|
|
@@ -121,6 +124,7 @@ public Collection<Object> createComponents( | |
| this.clusterServiceHolder.set(clusterService); | ||
| this.allocationDeciders.set(allocationDeciders); | ||
| var capacityServiceHolder = new AutoscalingCalculateCapacityService.Holder(this); | ||
| this.reservedAutoscalingPolicyAction.set(new ReservedAutoscalingPolicyAction(capacityServiceHolder)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we not simply defer the construction of this until
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I avoided doing that because then I'd have to also keep a local reference to the |
||
| return List.of(capacityServiceHolder, autoscalingLicenseChecker, new AutoscalingNodeInfoService(clusterService, client)); | ||
| } | ||
|
|
||
|
|
@@ -224,4 +228,8 @@ public Collection<AutoscalingDeciderService> deciders() { | |
| public Set<AutoscalingDeciderService> createDeciderServices() { | ||
| return autoscalingExtensions.stream().flatMap(p -> p.deciders().stream()).collect(Collectors.toSet()); | ||
| } | ||
|
|
||
| public Collection<ReservedClusterStateHandler<?>> reservedClusterStateHandlers() { | ||
| return Set.of(reservedAutoscalingPolicyAction.get()); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| package org.elasticsearch.xpack.autoscaling; | ||
|
|
||
| import org.elasticsearch.reservedstate.ReservedClusterStateHandler; | ||
| import org.elasticsearch.reservedstate.ReservedClusterStateHandlerProvider; | ||
|
|
||
| import java.util.Collection; | ||
|
|
||
| /** | ||
| * Autoscaling provider implementation for the {@link ReservedClusterStateHandlerProvider} service interface | ||
| */ | ||
| public class ReservedAutoscalingStateHandlerProvider implements ReservedClusterStateHandlerProvider { | ||
| private final Autoscaling plugin; | ||
|
|
||
| public ReservedAutoscalingStateHandlerProvider() { | ||
| throw new IllegalStateException("Provider must be constructed using PluginsService"); | ||
| } | ||
|
|
||
| public ReservedAutoscalingStateHandlerProvider(Autoscaling plugin) { | ||
| this.plugin = plugin; | ||
| } | ||
|
|
||
| @Override | ||
| public Collection<ReservedClusterStateHandler<?>> handlers() { | ||
| return plugin.reservedClusterStateHandlers(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a clone of the regular plugin SPI state handler provider for integration test purposes.