diff --git a/docs/changelog/88329.yaml b/docs/changelog/88329.yaml new file mode 100644 index 0000000000000..b105f533fc6ba --- /dev/null +++ b/docs/changelog/88329.yaml @@ -0,0 +1,5 @@ +pr: 88329 +summary: File Settings Service +area: Infra/Core +type: feature +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java new file mode 100644 index 0000000000000..f4a9d2993d188 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java @@ -0,0 +1,240 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.reservedstate.service; + +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata; +import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata; +import org.elasticsearch.cluster.metadata.ReservedStateMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Strings; +import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction; +import org.elasticsearch.test.ESIntegTestCase; + +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING; +import static org.elasticsearch.test.NodeRoles.dataOnlyNode; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) +public class FileSettingsServiceIT extends ESIntegTestCase { + + private AtomicLong versionCounter = new AtomicLong(1); + + private static String testJSON = """ + { + "metadata": { + "version": "%s", + "compatibility": "8.4.0" + }, + "state": { + "cluster_settings": { + "indices.recovery.max_bytes_per_sec": "50mb" + } + } + }"""; + + private static String testErrorJSON = """ + { + "metadata": { + "version": "%s", + "compatibility": "8.4.0" + }, + "state": { + "not_cluster_settings": { + "search.allow_expensive_queries": "false" + } + } + }"""; + + private void assertMasterNode(Client client, String node) { + assertThat( + client.admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), + equalTo(node) + ); + } + + private void writeJSONFile(String node, String json) throws Exception { + long version = versionCounter.incrementAndGet(); + + FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node); + + Files.createDirectories(fileSettingsService.operatorSettingsDir()); + Files.write(fileSettingsService.operatorSettingsFile(), Strings.format(json, version).getBytes(StandardCharsets.UTF_8)); + } + + private CountDownLatch setupClusterStateListener(String node) { + ClusterService clusterService = internalCluster().clusterService(node); + CountDownLatch savedClusterState = new CountDownLatch(1); + clusterService.addListener(new ClusterStateListener() { + @Override + public void clusterChanged(ClusterChangedEvent event) { + ReservedStateMetadata reservedState = event.state().metadata().reservedStateMetadata().get(FileSettingsService.NAMESPACE); + if (reservedState != null) { + ReservedStateHandlerMetadata handlerMetadata = reservedState.handlers().get(ReservedClusterSettingsAction.NAME); + if (handlerMetadata == null) { + fail("Should've found cluster settings in this metadata"); + } + assertThat(handlerMetadata.keys(), contains("indices.recovery.max_bytes_per_sec")); + clusterService.removeListener(this); + savedClusterState.countDown(); + } + } + }); + + return savedClusterState; + } + + private void assertClusterStateSaveOK(CountDownLatch savedClusterState) throws Exception { + boolean awaitSuccessful = savedClusterState.await(20, TimeUnit.SECONDS); + assertTrue(awaitSuccessful); + + final ClusterStateResponse clusterStateResponse = client().admin().cluster().state(new ClusterStateRequest()).actionGet(); + + assertThat( + clusterStateResponse.getState().metadata().persistentSettings().get(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey()), + equalTo("50mb") + ); + + ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest().persistentSettings( + Settings.builder().put(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "1234kb") + ); + assertEquals( + "java.lang.IllegalArgumentException: Failed to process request " + + "[org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest/unset] " + + "with errors: [[indices.recovery.max_bytes_per_sec] set as read-only by [file_settings]]", + expectThrows(ExecutionException.class, () -> client().admin().cluster().updateSettings(req).get()).getMessage() + ); + } + + public void testSettingsApplied() throws Exception { + internalCluster().setBootstrapMasterNodeIndex(0); + logger.info("--> start data node / non master node"); + String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s")); + FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode); + + assertFalse(dataFileSettingsService.watching()); + + logger.info("--> start master node"); + final String masterNode = internalCluster().startMasterOnlyNode(); + assertMasterNode(internalCluster().nonMasterClient(), masterNode); + var savedClusterState = setupClusterStateListener(masterNode); + + FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode); + + assertTrue(masterFileSettingsService.watching()); + assertFalse(dataFileSettingsService.watching()); + + writeJSONFile(masterNode, testJSON); + assertClusterStateSaveOK(savedClusterState); + } + + public void testSettingsAppliedOnStart() throws Exception { + internalCluster().setBootstrapMasterNodeIndex(0); + logger.info("--> start data node / non master node"); + String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s")); + FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode); + + assertFalse(dataFileSettingsService.watching()); + var savedClusterState = setupClusterStateListener(dataNode); + + // In internal cluster tests, the nodes share the config directory, so when we write with the data node path + // the master will pick it up on start + writeJSONFile(dataNode, testJSON); + + logger.info("--> start master node"); + final String masterNode = internalCluster().startMasterOnlyNode(); + assertMasterNode(internalCluster().nonMasterClient(), masterNode); + + FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode); + + assertTrue(masterFileSettingsService.watching()); + assertFalse(dataFileSettingsService.watching()); + + assertClusterStateSaveOK(savedClusterState); + } + + private CountDownLatch setupClusterStateListenerForError(String node) { + ClusterService clusterService = internalCluster().clusterService(node); + CountDownLatch savedClusterState = new CountDownLatch(1); + clusterService.addListener(new ClusterStateListener() { + @Override + public void clusterChanged(ClusterChangedEvent event) { + ReservedStateMetadata reservedState = event.state().metadata().reservedStateMetadata().get(FileSettingsService.NAMESPACE); + if (reservedState != null) { + assertEquals(ReservedStateErrorMetadata.ErrorKind.PARSING, reservedState.errorMetadata().errorKind()); + assertThat(reservedState.errorMetadata().errors(), allOf(notNullValue(), hasSize(1))); + assertThat( + reservedState.errorMetadata().errors().get(0), + containsString("Missing handler definition for content key [not_cluster_settings]") + ); + clusterService.removeListener(this); + savedClusterState.countDown(); + } + } + }); + + return savedClusterState; + } + + private void assertClusterStateNotSaved(CountDownLatch savedClusterState) throws Exception { + boolean awaitSuccessful = savedClusterState.await(20, TimeUnit.SECONDS); + assertTrue(awaitSuccessful); + + final ClusterStateResponse clusterStateResponse = client().admin().cluster().state(new ClusterStateRequest()).actionGet(); + + assertThat(clusterStateResponse.getState().metadata().persistentSettings().get("search.allow_expensive_queries"), nullValue()); + + ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest().persistentSettings( + Settings.builder().put("search.allow_expensive_queries", "false") + ); + // This should succeed, nothing was reserved + client().admin().cluster().updateSettings(req).get(); + } + + public void testErrorSaved() throws Exception { + internalCluster().setBootstrapMasterNodeIndex(0); + logger.info("--> start data node / non master node"); + String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s")); + FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode); + + assertFalse(dataFileSettingsService.watching()); + + logger.info("--> start master node"); + final String masterNode = internalCluster().startMasterOnlyNode(); + assertMasterNode(internalCluster().nonMasterClient(), masterNode); + var savedClusterState = setupClusterStateListenerForError(masterNode); + + FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode); + + assertTrue(masterFileSettingsService.watching()); + assertFalse(dataFileSettingsService.watching()); + + writeJSONFile(masterNode, testErrorJSON); + assertClusterStateNotSaved(savedClusterState); + } +} diff --git a/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java b/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java index 95e1d23fc208e..51d4fc3437c9d 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java @@ -81,8 +81,10 @@ public void testThreadNames() throws Exception { // or the ones that are occasionally come up from ESSingleNodeTestCase if (threadName.contains("[node_s_0]") // TODO: this can't possibly be right! single node and integ test are unrelated! || threadName.contains("Keep-Alive-Timer") + || threadName.contains("readiness-service") || threadName.contains("JVMCI-native") // GraalVM Compiler Thread - || threadName.contains("readiness-service")) { + || threadName.contains("file-settings-watcher") + || threadName.contains("FileSystemWatchService")) { continue; } String nodePrefix = "(" diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index bb845a5ee7b36..d3c6525432bd1 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -251,6 +251,7 @@ import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.NamedRegistry; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.TypeLiteral; @@ -274,6 +275,8 @@ import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.ActionPlugin.ActionHandler; import org.elasticsearch.plugins.interceptor.RestInterceptorActionPlugin; +import org.elasticsearch.reservedstate.ReservedClusterStateHandler; +import org.elasticsearch.reservedstate.service.ReservedClusterStateService; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; import org.elasticsearch.rest.RestHeaderDefinition; @@ -449,6 +452,7 @@ public class ActionModule extends AbstractModule { private final RequestValidators mappingRequestValidators; private final RequestValidators indicesAliasesRequestRequestValidators; private final ThreadPool threadPool; + private final ReservedClusterStateService reservedClusterStateService; public ActionModule( Settings settings, @@ -462,7 +466,9 @@ public ActionModule( CircuitBreakerService circuitBreakerService, UsageService usageService, SystemIndices systemIndices, - Tracer tracer + Tracer tracer, + ClusterService clusterService, + List> reservedStateHandlers ) { this.settings = settings; this.indexNameExpressionResolver = indexNameExpressionResolver; @@ -513,6 +519,7 @@ public ActionModule( ); restController = new RestController(headers, restInterceptor, nodeClient, circuitBreakerService, usageService, tracer); + reservedClusterStateService = new ReservedClusterStateService(clusterService, reservedStateHandlers); } public Map> getActions() { @@ -922,4 +929,8 @@ public ActionFilters getActionFilters() { public RestController getRestController() { return restController; } + + public ReservedClusterStateService getReservedClusterStateService() { + return reservedClusterStateService; + } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 65e6fc546a353..1fbd21a4aea66 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -164,6 +164,10 @@ import org.elasticsearch.readiness.ReadinessService; import org.elasticsearch.repositories.RepositoriesModule; import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.reservedstate.ReservedClusterStateHandler; +import org.elasticsearch.reservedstate.ReservedClusterStateHandlerProvider; +import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction; +import org.elasticsearch.reservedstate.service.FileSettingsService; import org.elasticsearch.rest.RestController; import org.elasticsearch.script.ScriptContext; import org.elasticsearch.script.ScriptEngine; @@ -705,6 +709,17 @@ protected Node( ) ).toList(); + List> reservedStateHandlers = new ArrayList<>(); + + // add all reserved state handlers from server + reservedStateHandlers.add(new ReservedClusterSettingsAction(settingsModule.getClusterSettings())); + + // add all reserved state handlers from plugins + List pluginHandlers = pluginsService.loadServiceProviders( + ReservedClusterStateHandlerProvider.class + ); + pluginHandlers.forEach(h -> reservedStateHandlers.addAll(h.handlers())); + ActionModule actionModule = new ActionModule( settings, clusterModule.getIndexNameExpressionResolver(), @@ -717,7 +732,9 @@ protected Node( circuitBreakerService, usageService, systemIndices, - tracer + tracer, + clusterService, + reservedStateHandlers ); modules.add(actionModule); @@ -929,6 +946,12 @@ protected Node( ? new HealthMetadataService(clusterService, settings) : null; + FileSettingsService fileSettingsService = new FileSettingsService( + clusterService, + actionModule.getReservedClusterStateService(), + environment + ); + modules.add(b -> { b.bind(Node.class).toInstance(this); b.bind(NodeService.class).toInstance(nodeService); @@ -1017,6 +1040,7 @@ protected Node( b.bind(HealthMetadataService.class).toInstance(healthMetadataService); } b.bind(Tracer.class).toInstance(tracer); + b.bind(FileSettingsService.class).toInstance(fileSettingsService); }); if (ReadinessService.enabled(environment)) { @@ -1297,6 +1321,7 @@ public void onTimeout(TimeValue timeout) { } } + injector.getInstance(FileSettingsService.class).start(); injector.getInstance(HttpServerTransport.class).start(); if (WRITE_PORTS_FILE_SETTING.get(settings())) { @@ -1334,6 +1359,7 @@ private Node stop() { if (ReadinessService.enabled(environment)) { injector.getInstance(ReadinessService.class).stop(); } + injector.getInstance(FileSettingsService.class).stop(); injector.getInstance(ResourceWatcherService.class).close(); injector.getInstance(HttpServerTransport.class).stop(); @@ -1417,6 +1443,7 @@ public synchronized void close() throws IOException { if (ReadinessService.enabled(environment)) { toClose.add(injector.getInstance(ReadinessService.class)); } + toClose.add(injector.getInstance(FileSettingsService.class)); for (LifecycleComponent plugin : pluginLifecycleComponents) { toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")")); diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/FileSettingsService.java b/server/src/main/java/org/elasticsearch/reservedstate/service/FileSettingsService.java new file mode 100644 index 0000000000000..a8141e8f711fa --- /dev/null +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/FileSettingsService.java @@ -0,0 +1,335 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.reservedstate.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.env.Environment; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.nio.file.ClosedWatchServiceException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; + +import static org.elasticsearch.xcontent.XContentType.JSON; + +/** + * File based settings applier service which watches an 'operator` directory inside the config directory. + *

+ * The service expects that the operator directory will contain a single JSON file with all the settings that + * need to be applied to the cluster state. The name of the file is fixed to be settings.json. The operator + * directory name can be configured by setting the 'path.config.operator_directory' in the node properties. + *

+ * The {@link FileSettingsService} is active always, but enabled only on the current master node. We register + * the service as a listener to cluster state changes, so that we can enable the file watcher thread when this + * node becomes a master node. + */ +public class FileSettingsService extends AbstractLifecycleComponent implements ClusterStateListener { + private static final Logger logger = LogManager.getLogger(FileSettingsService.class); + + private static final String SETTINGS_FILE_NAME = "settings.json"; + static final String NAMESPACE = "file_settings"; + + private final ClusterService clusterService; + private final ReservedClusterStateService stateService; + private final Path operatorSettingsDir; + + private WatchService watchService; // null; + private CountDownLatch watcherThreadLatch; + + private volatile FileUpdateState fileUpdateState = null; + private volatile WatchKey settingsDirWatchKey = null; + + private volatile boolean active = false; + private volatile boolean initialState = true; + + public static final String OPERATOR_DIRECTORY = "operator"; + + /** + * Constructs the {@link FileSettingsService} + * + * @param clusterService so we can register ourselves as a cluster state change listener + * @param stateService an instance of the immutable cluster state controller, so we can perform the cluster state changes + * @param environment we need the environment to pull the location of the config and operator directories + */ + public FileSettingsService(ClusterService clusterService, ReservedClusterStateService stateService, Environment environment) { + this.clusterService = clusterService; + this.stateService = stateService; + this.operatorSettingsDir = environment.configFile().toAbsolutePath().resolve(OPERATOR_DIRECTORY); + } + + // package private for testing + Path operatorSettingsDir() { + return operatorSettingsDir; + } + + // package private for testing + Path operatorSettingsFile() { + return operatorSettingsDir().resolve(SETTINGS_FILE_NAME); + } + + // platform independent way to tell if a file changed + // we compare the file modified timestamp, the absolute path (symlinks), and file id on the system + boolean watchedFileChanged(Path path) throws IOException { + if (Files.exists(path) == false) { + return false; + } + + FileUpdateState previousUpdateState = fileUpdateState; + + BasicFileAttributes attr = Files.readAttributes(path, BasicFileAttributes.class); + fileUpdateState = new FileUpdateState(attr.lastModifiedTime().toMillis(), path.toRealPath().toString(), attr.fileKey()); + + return (previousUpdateState == null || previousUpdateState.equals(fileUpdateState) == false); + } + + @Override + protected void doStart() { + // We start the file watcher when we know we are master from a cluster state change notification. + // We need the additional active flag, since cluster state can change after we've shutdown the service + // causing the watcher to start again. + this.active = Files.exists(operatorSettingsDir().getParent()); + startIfMaster(clusterService.state()); + clusterService.addListener(this); + } + + @Override + protected void doStop() { + this.active = false; + logger.debug("Stopping file settings service"); + stopWatcher(); + } + + @Override + protected void doClose() {} + + private boolean currentNodeMaster(ClusterState clusterState) { + return clusterState.nodes().getLocalNodeId().equals(clusterState.nodes().getMasterNodeId()); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + ClusterState clusterState = event.state(); + startIfMaster(clusterState); + } + + private void startIfMaster(ClusterState clusterState) { + setWatching(currentNodeMaster(clusterState), initialState); + initialState = false; + } + + private void setWatching(boolean watching, boolean initialState) { + if (watching) { + startWatcher(initialState); + } else { + stopWatcher(); + } + } + + // package private for testing + boolean watching() { + return this.watchService != null; + } + + synchronized void startWatcher(boolean onStartup) { + if (watching() || active == false) { + // already watching or inactive, nothing to do + return; + } + + logger.info("starting file settings watcher ..."); + + Path settingsDir = operatorSettingsDir(); + + /* + * We essentially watch for two things: + * - the creation of the operator directory (if it doesn't exist), symlink changes to the operator directory + * - any changes to files inside the operator directory if it exists, filtering for settings.json + */ + try { + this.watchService = operatorSettingsDir().getParent().getFileSystem().newWatchService(); + if (Files.exists(settingsDir)) { + Path settingsFilePath = operatorSettingsFile(); + if (Files.exists(settingsFilePath)) { + logger.debug("found initial operator settings file [{}], applying...", settingsFilePath); + // we make a distinction here for startup, so that if we had operator settings before the node started + // we would fail startup. + processFileSettings(settingsFilePath, (e) -> { + if (onStartup) { + throw new FileSettingsStartupException("Error applying operator settings", e); + } else { + logger.error("Error processing operator settings json file", e); + } + }).await(); + } + settingsDirWatchKey = enableSettingsWatcher(settingsDirWatchKey, settingsDir); + } else { + logger.debug("operator settings directory [{}] not found, will watch for its creation...", settingsDir); + } + // We watch the config directory always, even if initially we had an operator directory + // it can be deleted and created later. The config directory never goes away, we only + // register it once for watching. + enableSettingsWatcher(null, operatorSettingsDir().getParent()); + } catch (Exception e) { + if (watchService != null) { + try { + this.watchService.close(); + } catch (Exception ignore) {} finally { + this.watchService = null; + } + } + + throw new IllegalStateException("unable to launch a new watch service", e); + } + + this.watcherThreadLatch = new CountDownLatch(1); + + new Thread(() -> { + try { + logger.info("file settings service up and running [tid={}]", Thread.currentThread().getId()); + + WatchKey key; + while ((key = watchService.take()) != null) { + /** + * Reading and interpreting watch service events can vary from platform to platform. E.g: + * MacOS symlink delete and set (rm -rf operator && ln -s /file_settings/ operator): + * ENTRY_MODIFY:operator + * ENTRY_CREATE:settings.json + * ENTRY_MODIFY:settings.json + * Linux in Docker symlink delete and set (rm -rf operator && ln -s /file_settings/ operator): + * ENTRY_CREATE:operator + * Windows + * ENTRY_CREATE:operator + * ENTRY_MODIFY:operator + * After we get an indication that something has changed, we check the timestamp, file id, + * real path of our desired file. + */ + if (Files.exists(settingsDir)) { + try { + Path path = operatorSettingsFile(); + + if (logger.isDebugEnabled()) { + key.pollEvents().stream().forEach(e -> logger.debug("{}:{}", e.kind().toString(), e.context().toString())); + } + + key.pollEvents(); + key.reset(); + + // We re-register the settings directory watch key, because we don't know + // if the file name maps to the same native file system file id. Symlinks + // are one potential cause of inconsistency here, since their handling by + // the WatchService is platform dependent. + settingsDirWatchKey = enableSettingsWatcher(settingsDirWatchKey, settingsDir); + + if (watchedFileChanged(path)) { + processFileSettings(path, (e) -> logger.error("Error processing operator settings json file", e)).await(); + } + } catch (IOException e) { + logger.warn("encountered I/O error while watching file settings", e); + } + } else { + key.pollEvents(); + key.reset(); + } + } + } catch (ClosedWatchServiceException | InterruptedException expected) { + logger.info("shutting down watcher thread"); + } catch (Exception e) { + logger.error("shutting down watcher thread with exception", e); + } finally { + watcherThreadLatch.countDown(); + } + }, "elasticsearch[file-settings-watcher]").start(); + } + + synchronized void stopWatcher() { + logger.debug("stopping watcher ..."); + if (watching()) { + try { + if (settingsDirWatchKey != null) { + settingsDirWatchKey.cancel(); + settingsDirWatchKey = null; + } + fileUpdateState = null; + watchService.close(); + if (watcherThreadLatch != null) { + watcherThreadLatch.await(); + } + } catch (IOException e) { + logger.warn("encountered exception while closing watch service", e); + } catch (InterruptedException interruptedException) { + logger.info("interrupted while closing the watch service", interruptedException); + } finally { + watchService = null; + logger.info("watcher service stopped"); + } + } else { + logger.debug("file settings service already stopped"); + } + } + + private WatchKey enableSettingsWatcher(WatchKey previousKey, Path settingsDir) throws IOException { + if (previousKey != null) { + previousKey.cancel(); + } + return settingsDir.register( + watchService, + StandardWatchEventKinds.ENTRY_MODIFY, + StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.ENTRY_DELETE + ); + } + + CountDownLatch processFileSettings(Path path, Consumer errorHandler) throws IOException { + CountDownLatch waitForCompletion = new CountDownLatch(1); + logger.info("processing path [{}] for [{}]", path, NAMESPACE); + try (var json = new BufferedInputStream(Files.newInputStream(path))) { + try (XContentParser parser = JSON.xContent().createParser(XContentParserConfiguration.EMPTY, json)) { + stateService.process(NAMESPACE, parser, (e) -> { + if (e != null) { + errorHandler.accept(e); + } + waitForCompletion.countDown(); + }); + } + } + + return waitForCompletion; + } + + /** + * Holds information about the last known state of the file we watched. We use this + * class to determine if a file has been changed. + */ + record FileUpdateState(long timestamp, String path, Object fileKey) {} + + /** + * Error subclass that is thrown when we encounter a fatal error while applying + * the operator cluster state at Elasticsearch boot time. + */ + public static class FileSettingsStartupException extends RuntimeException { + public FileSettingsStartupException(String message, Throwable t) { + super(message, t); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java index 1c9bffc269de7..96b5d5597f4cd 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java @@ -104,9 +104,11 @@ public void process(String namespace, XContentParser parser, Consumer } catch (Exception e) { ErrorState errorState = new ErrorState(namespace, -1L, e, ReservedStateErrorMetadata.ErrorKind.PARSING); saveErrorState(errorState); - logger.error("error processing state change request for [{}] with the following errors [{}]", namespace, errorState); + logger.debug("error processing state change request for [{}] with the following errors [{}]", namespace, errorState); - errorListener.accept(new IllegalStateException("Error processing state change request for " + namespace, e)); + errorListener.accept( + new IllegalStateException("Error processing state change request for " + namespace + ", errors: " + errorState, e) + ); return; } @@ -123,7 +125,7 @@ public void process(String namespace, XContentParser parser, Consumer */ public void process(String namespace, ReservedStateChunk reservedStateChunk, Consumer errorListener) { Map reservedState = reservedStateChunk.state(); - ReservedStateVersion reservedStateVersion = reservedStateChunk.metadata(); + final ReservedStateVersion reservedStateVersion = reservedStateChunk.metadata(); LinkedHashSet orderedHandlers; try { @@ -137,9 +139,11 @@ public void process(String namespace, ReservedStateChunk reservedStateChunk, Con ); saveErrorState(errorState); - logger.error("error processing state change request for [{}] with the following errors [{}]", namespace, errorState); + logger.debug("error processing state change request for [{}] with the following errors [{}]", namespace, errorState); - errorListener.accept(new IllegalStateException("Error processing state change request for " + namespace, e)); + errorListener.accept( + new IllegalStateException("Error processing state change request for " + namespace + ", errors: " + errorState, e) + ); return; } @@ -166,8 +170,11 @@ public void onResponse(ActionResponse.Empty empty) { @Override public void onFailure(Exception e) { - logger.error("Failed to apply reserved cluster state", e); - errorListener.accept(e); + // Don't spam the logs on repeated errors + if (isNewError(existingMetadata, reservedStateVersion.version())) { + logger.debug("Failed to apply reserved cluster state", e); + errorListener.accept(e); + } } } ), @@ -209,13 +216,35 @@ static boolean checkMetadataVersion( return true; } - private void saveErrorState(ErrorState state) { + // package private for testing + static boolean isNewError(ReservedStateMetadata existingMetadata, Long newStateVersion) { + return (existingMetadata == null + || existingMetadata.errorMetadata() == null + || existingMetadata.errorMetadata().version() < newStateVersion); + } + + private void saveErrorState(ErrorState errorState) { + ClusterState clusterState = clusterService.state(); + ReservedStateMetadata existingMetadata = clusterState.metadata().reservedStateMetadata().get(errorState.namespace()); + + if (isNewError(existingMetadata, errorState.version()) == false) { + logger.info( + () -> format( + "Not updating error state because version [%s] is less or equal to the last state error version [%s]", + errorState.version(), + existingMetadata.errorMetadata().version() + ) + ); + + return; + } + clusterService.submitStateUpdateTask( - "reserved cluster state update error for [ " + state.namespace() + "]", - new ReservedStateErrorTask(state, new ActionListener<>() { + "reserved cluster state update error for [ " + errorState.namespace() + "]", + new ReservedStateErrorTask(errorState, new ActionListener<>() { @Override public void onResponse(ActionResponse.Empty empty) { - logger.info("Successfully applied new reserved error state for namespace [{}]", state.namespace()); + logger.info("Successfully applied new reserved error state for namespace [{}]", errorState.namespace()); } @Override diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java index 421378e8ec60a..0631aee59cf6e 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java @@ -98,27 +98,18 @@ protected ClusterState execute(final ClusterState currentState) { if (errors.isEmpty() == false) { // Check if we had previous error metadata with version information, don't spam with cluster state updates, if the // version hasn't been updated. - logger.error("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); - if (existingMetadata != null - && existingMetadata.errorMetadata() != null - && existingMetadata.errorMetadata().version() >= reservedStateVersion.version()) { - - logger.info( - () -> format( - "Not updating error state because version [%s] is less or equal to the last state error version [%s]", - reservedStateVersion.version(), - existingMetadata.errorMetadata().version() - ) - ); - - return currentState; - } + logger.debug("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); - errorReporter.accept( - new ErrorState(namespace, reservedStateVersion.version(), errors, ReservedStateErrorMetadata.ErrorKind.VALIDATION) + var errorState = new ErrorState( + namespace, + reservedStateVersion.version(), + errors, + ReservedStateErrorMetadata.ErrorKind.VALIDATION ); - throw new IllegalStateException("Error processing state change request for " + namespace); + errorReporter.accept(errorState); + + throw new IllegalStateException("Error processing state change request for " + namespace + ", errors: " + errorState); } // remove the last error if we had previously encountered any diff --git a/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java b/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java index ebc869a3aba2c..c9cbcf497cef1 100644 --- a/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java +++ b/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; @@ -49,6 +50,7 @@ import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.startsWith; +import static org.mockito.Mockito.mock; public class ActionModuleTests extends ESTestCase { public void testSetupActionsContainsKnownBuiltin() { @@ -117,7 +119,9 @@ public void testSetupRestHandlerContainsKnownBuiltin() { null, usageService, null, - null + null, + mock(ClusterService.class), + List.of() ); actionModule.initRestHandlers(null); // At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail @@ -174,7 +178,9 @@ public String getName() { null, usageService, null, - null + null, + mock(ClusterService.class), + List.of() ); Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null)); assertThat(e.getMessage(), startsWith("Cannot replace existing handler for [/] for method: GET")); @@ -224,7 +230,9 @@ public List getRestHandlers( null, usageService, null, - null + null, + mock(ClusterService.class), + List.of() ); actionModule.initRestHandlers(null); // At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail @@ -269,7 +277,9 @@ public void test3rdPartyHandlerIsNotInstalled() { null, usageService, null, - null + null, + mock(ClusterService.class), + List.of() ) ); assertThat( diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/FileSettingsServiceTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/FileSettingsServiceTests.java new file mode 100644 index 0000000000000..9db5bba768a3b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/FileSettingsServiceTests.java @@ -0,0 +1,220 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.reservedstate.service; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RerouteService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xcontent.XContentParser; +import org.junit.After; +import org.junit.Before; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; + +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.FileTime; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class FileSettingsServiceTests extends ESTestCase { + private Environment env; + private ClusterService clusterService; + private FileSettingsService fileSettingsService; + private ThreadPool threadpool; + + @Before + public void setUp() throws Exception { + super.setUp(); + + threadpool = new TestThreadPool("file_settings_service_tests"); + + clusterService = spy( + new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadpool, + null + ) + ); + + final DiscoveryNode localNode = new DiscoveryNode("node", buildNewFakeTransportAddress(), Version.CURRENT); + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId())) + .build(); + doAnswer((Answer) invocation -> clusterState).when(clusterService).state(); + + clusterService.setRerouteService(mock(RerouteService.class)); + env = newEnvironment(Settings.EMPTY); + + Files.createDirectories(env.configFile()); + + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + ReservedClusterStateService controller = new ReservedClusterStateService( + clusterService, + List.of(new ReservedClusterSettingsAction(clusterSettings)) + ); + + fileSettingsService = new FileSettingsService(clusterService, controller, env); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadpool.shutdownNow(); + } + + public void testOperatorDirName() { + Path operatorPath = fileSettingsService.operatorSettingsDir(); + assertTrue(operatorPath.startsWith(env.configFile())); + assertTrue(operatorPath.endsWith("operator")); + + Path operatorSettingsFile = fileSettingsService.operatorSettingsFile(); + assertTrue(operatorSettingsFile.startsWith(operatorPath)); + assertTrue(operatorSettingsFile.endsWith("settings.json")); + } + + public void testWatchedFile() throws Exception { + Path tmpFile = createTempFile(); + Path tmpFile1 = createTempFile(); + Path otherFile = tmpFile.getParent().resolve("other.json"); + // we return false on non-existent paths, we don't remember state + assertFalse(fileSettingsService.watchedFileChanged(otherFile)); + + // we remember the previous state + assertTrue(fileSettingsService.watchedFileChanged(tmpFile)); + assertFalse(fileSettingsService.watchedFileChanged(tmpFile)); + + // we modify the timestamp of the file, it should trigger a change + Instant now = LocalDateTime.now(ZoneId.systemDefault()).toInstant(ZoneOffset.ofHours(0)); + Files.setLastModifiedTime(tmpFile, FileTime.from(now)); + + assertTrue(fileSettingsService.watchedFileChanged(tmpFile)); + assertFalse(fileSettingsService.watchedFileChanged(tmpFile)); + + // we change to another real file, it should be changed + assertTrue(fileSettingsService.watchedFileChanged(tmpFile1)); + assertFalse(fileSettingsService.watchedFileChanged(tmpFile1)); + } + + public void testStartStop() { + fileSettingsService.start(); + assertTrue(fileSettingsService.watching()); + fileSettingsService.stop(); + assertFalse(fileSettingsService.watching()); + fileSettingsService.close(); + } + + public void testCallsProcessing() throws Exception { + FileSettingsService service = spy(fileSettingsService); + CountDownLatch processFileLatch = new CountDownLatch(1); + + doAnswer((Answer) invocation -> { + processFileLatch.countDown(); + return null; + }).when(service).processFileSettings(any(), any()); + + service.start(); + assertTrue(service.watching()); + + Files.createDirectories(service.operatorSettingsDir()); + + Files.write(service.operatorSettingsFile(), "{}".getBytes(StandardCharsets.UTF_8)); + + // we need to wait a bit, on MacOS it may take up to 10 seconds for the Java watcher service to notice the file, + // on Linux is instantaneous. Windows is instantaneous too. + processFileLatch.await(30, TimeUnit.SECONDS); + + verify(service, Mockito.atLeast(1)).watchedFileChanged(any()); + verify(service, times(1)).processFileSettings(any(), any()); + + service.stop(); + assertFalse(service.watching()); + service.close(); + } + + @SuppressWarnings("unchecked") + public void testInitialFile() throws Exception { + ReservedClusterStateService stateService = mock(ReservedClusterStateService.class); + + doAnswer((Answer) invocation -> { + ((Consumer) invocation.getArgument(2)).accept(new IllegalStateException("Some exception")); + return null; + }).when(stateService).process(any(), (XContentParser) any(), any()); + + FileSettingsService service = spy(new FileSettingsService(clusterService, stateService, env)); + + Files.createDirectories(service.operatorSettingsDir()); + + // contents of the JSON don't matter, we just need a file to exist + Files.write(service.operatorSettingsFile(), "{}".getBytes(StandardCharsets.UTF_8)); + + Exception startupException = expectThrows(IllegalStateException.class, () -> service.start()); + assertThat( + startupException.getCause(), + allOf( + instanceOf(FileSettingsService.FileSettingsStartupException.class), + hasToString( + "org.elasticsearch.reservedstate.service.FileSettingsService$FileSettingsStartupException: " + + "Error applying operator settings" + ) + ) + ); + + verify(service, times(1)).processFileSettings(any(), any()); + + service.stop(); + + clearInvocations(service); + + // Let's check that if we didn't throw an error that everything works + doAnswer((Answer) invocation -> { + ((Consumer) invocation.getArgument(2)).accept(null); + return null; + }).when(stateService).process(any(), (XContentParser) any(), any()); + + service.start(); + service.startWatcher(true); + + verify(service, times(1)).processFileSettings(any(), any()); + + service.stop(); + service.close(); + } +} diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java index eaf55cdb7c8c3..478ca01f2de96 100644 --- a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java @@ -43,6 +43,7 @@ import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -88,7 +89,7 @@ public void testOperatorController() throws IOException { controller.process("operator", parser, (e) -> x.set(e)); assertTrue(x.get() instanceof IllegalStateException); - assertEquals("Error processing state change request for operator", x.get().getMessage()); + assertThat(x.get().getMessage(), containsString("Error processing state change request for operator")); } testJSON = """ @@ -278,6 +279,27 @@ public Map fromXContent(XContentParser parser) throws IOExceptio } }; + ReservedStateHandlerMetadata hmOne = new ReservedStateHandlerMetadata("one", Set.of("a", "b")); + ReservedStateErrorMetadata emOne = new ReservedStateErrorMetadata( + 1L, + ReservedStateErrorMetadata.ErrorKind.VALIDATION, + List.of("Test error 1", "Test error 2") + ); + + final ReservedStateMetadata operatorMetadata = ReservedStateMetadata.builder("namespace_one") + .errorMetadata(emOne) + .version(1L) + .putHandler(hmOne) + .build(); + + Metadata metadata = Metadata.builder().put(operatorMetadata).build(); + ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); + + assertFalse(ReservedClusterStateService.isNewError(operatorMetadata, 1L)); + assertFalse(ReservedClusterStateService.isNewError(operatorMetadata, 0L)); + assertTrue(ReservedClusterStateService.isNewError(operatorMetadata, 2L)); + assertTrue(ReservedClusterStateService.isNewError(null, 0L)); + // We submit a task with two handler, one will cause an exception, the other will create a new state. // When we fail to update the metadata because of version, we ensure that the returned state is equal to the // original state by pointer reference to avoid cluster state update task to run. @@ -286,7 +308,7 @@ public Map fromXContent(XContentParser parser) throws IOExceptio new ReservedStateChunk(Map.of("one", "two", "maker", "three"), new ReservedStateVersion(1L, Version.CURRENT)), Map.of(exceptionThrower.name(), exceptionThrower, newStateMaker.name(), newStateMaker), List.of(exceptionThrower.name(), newStateMaker.name()), - (errorState) -> {}, + (errorState) -> { assertFalse(ReservedClusterStateService.isNewError(operatorMetadata, errorState.version())); }, new ActionListener<>() { @Override public void onResponse(ActionResponse.Empty empty) {} @@ -296,25 +318,11 @@ public void onFailure(Exception e) {} } ); - ReservedStateHandlerMetadata hmOne = new ReservedStateHandlerMetadata("one", Set.of("a", "b")); - ReservedStateErrorMetadata emOne = new ReservedStateErrorMetadata( - 1L, - ReservedStateErrorMetadata.ErrorKind.VALIDATION, - List.of("Test error 1", "Test error 2") - ); - - ReservedStateMetadata operatorMetadata = ReservedStateMetadata.builder("namespace_one") - .errorMetadata(emOne) - .version(1L) - .putHandler(hmOne) - .build(); - - Metadata metadata = Metadata.builder().put(operatorMetadata).build(); - ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); - // We exit on duplicate errors before we update the cluster state error metadata - // The reference == ensures we return the same object as the current state to avoid publishing no-op state update - assertTrue(state == task.execute(state)); + assertThat( + expectThrows(IllegalStateException.class, () -> task.execute(state)).getMessage(), + containsString("Error processing state change request for namespace_one") + ); emOne = new ReservedStateErrorMetadata( 0L, @@ -323,15 +331,19 @@ public void onFailure(Exception e) {} ); // If we are writing with older error metadata, we should get proper IllegalStateException - operatorMetadata = ReservedStateMetadata.builder("namespace_one").errorMetadata(emOne).version(0L).putHandler(hmOne).build(); + ReservedStateMetadata opMetadata = ReservedStateMetadata.builder("namespace_one") + .errorMetadata(emOne) + .version(0L) + .putHandler(hmOne) + .build(); - metadata = Metadata.builder().put(operatorMetadata).build(); + metadata = Metadata.builder().put(opMetadata).build(); ClusterState newState = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); // We exit on duplicate errors before we update the cluster state error metadata - assertEquals( - "Error processing state change request for namespace_one", - expectThrows(IllegalStateException.class, () -> task.execute(newState)).getMessage() + assertThat( + expectThrows(IllegalStateException.class, () -> task.execute(newState)).getMessage(), + containsString("Error processing state change request for namespace_one") ); } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ReservedLifecycleStateServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ReservedLifecycleStateServiceTests.java index 8fa8abcdd2aa7..b9e25163c8cb8 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ReservedLifecycleStateServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ReservedLifecycleStateServiceTests.java @@ -65,6 +65,7 @@ import java.util.function.Consumer; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; @@ -345,7 +346,7 @@ public void testOperatorControllerFromJSONContent() throws IOException { controller.process("operator", parser, (e) -> x.set(e)); assertTrue(x.get() instanceof IllegalStateException); - assertEquals("Error processing state change request for operator", x.get().getMessage()); + assertThat(x.get().getMessage(), containsString("Error processing state change request for operator")); } Client client = mock(Client.class); @@ -410,7 +411,7 @@ public void testOperatorControllerWithPluginPackage() { controller.process("operator", pack, (e) -> x.set(e)); assertTrue(x.get() instanceof IllegalStateException); - assertEquals("Error processing state change request for operator", x.get().getMessage()); + assertThat(x.get().getMessage(), containsString("Error processing state change request for operator")); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java index d5896f6355cb5..8f5e156a1cfd9 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java @@ -780,7 +780,9 @@ public void testSecurityRestHandlerInterceptorCanBeInstalled() throws IllegalAcc null, usageService, null, - Tracer.NOOP + Tracer.NOOP, + mock(ClusterService.class), + List.of() ); actionModule.initRestHandlers(null);