diff --git a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index e78b2a98383cb..24e020b3f76f4 100644 --- a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.ReservedStateMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; @@ -43,7 +44,9 @@ import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportService; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.function.Predicate; @@ -173,9 +176,39 @@ protected Set modifiedKeys(Request request) { return Collections.emptySet(); } + // package private for testing + void validateForImmutableState(Request request, ClusterState state) { + Optional handlerName = reservedStateHandlerName(); + assert handlerName.isPresent(); + + Set modified = modifiedKeys(request); + List errors = new ArrayList<>(); + + for (ReservedStateMetadata metadata : state.metadata().reservedStateMetadata().values()) { + Set conflicts = metadata.conflicts(handlerName.get(), modified); + if (conflicts.isEmpty() == false) { + errors.add(format("[%s] set as read-only by [%s]", String.join(", ", conflicts), metadata.namespace())); + } + } + + if (errors.isEmpty() == false) { + throw new IllegalArgumentException( + format("Failed to process request [%s] with errors: [%s]", request, String.join(", ", errors)) + ); + } + } + + // package private for testing + boolean supportsImmutableState() { + return reservedStateHandlerName().isPresent(); + } + @Override protected void doExecute(Task task, final Request request, ActionListener listener) { ClusterState state = clusterService.state(); + if (supportsImmutableState()) { + validateForImmutableState(request, state); + } logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version()); if (task != null) { request.setParentTask(clusterService.localNode().getId(), task.getId()); diff --git a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java index 9770b1c42dc0f..467bd6cd887e8 100644 --- a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java @@ -14,12 +14,14 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlock; @@ -30,6 +32,8 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata; +import org.elasticsearch.cluster.metadata.ReservedStateMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -43,6 +47,7 @@ import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; @@ -65,6 +70,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; @@ -254,6 +260,63 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) } } + class ReservedStateAction extends Action { + ReservedStateAction(String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) { + super(actionName, transportService, clusterService, threadPool, ThreadPool.Names.SAME); + } + + @Override + protected Optional reservedStateHandlerName() { + return Optional.of("test_reserved_state_action"); + } + } + + class FakeClusterStateUpdateAction extends TransportMasterNodeAction { + FakeClusterStateUpdateAction( + String actionName, + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + String executor + ) { + super( + actionName, + transportService, + clusterService, + threadPool, + new ActionFilters(new HashSet<>()), + ClusterUpdateSettingsRequest::new, + TestIndexNameExpressionResolver.newInstance(), + Response::new, + executor + ); + } + + @Override + protected void masterOperation( + Task task, + ClusterUpdateSettingsRequest request, + ClusterState state, + ActionListener listener + ) {} + + @Override + protected ClusterBlockException checkBlock(ClusterUpdateSettingsRequest request, ClusterState state) { + return null; + } + + @Override + protected Optional reservedStateHandlerName() { + return Optional.of(ReservedClusterSettingsAction.NAME); + } + + @Override + protected Set modifiedKeys(ClusterUpdateSettingsRequest request) { + Settings allSettings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build(); + return allSettings.keySet(); + } + } + public void testLocalOperationWithoutBlocks() throws ExecutionException, InterruptedException { final boolean masterOperationFailure = randomBoolean(); @@ -686,7 +749,6 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(state, request) ); } - }; PlainActionFuture listener = new PlainActionFuture<>(); @@ -697,6 +759,54 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) assertThat(ex.getCause().getCause(), instanceOf(ClusterBlockException.class)); } + public void testRejectImmutableConflictClusterStateUpdate() { + ReservedStateHandlerMetadata hmOne = new ReservedStateHandlerMetadata(ReservedClusterSettingsAction.NAME, Set.of("a", "b")); + ReservedStateHandlerMetadata hmThree = new ReservedStateHandlerMetadata(ReservedClusterSettingsAction.NAME, Set.of("e", "f")); + ReservedStateMetadata omOne = ReservedStateMetadata.builder("namespace_one").putHandler(hmOne).build(); + ReservedStateMetadata omTwo = ReservedStateMetadata.builder("namespace_two").putHandler(hmThree).build(); + + Metadata metadata = Metadata.builder().put(omOne).put(omTwo).build(); + + ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); + + Action noHandler = new Action("internal:testAction", transportService, clusterService, threadPool, ThreadPool.Names.SAME); + + assertFalse(noHandler.supportsImmutableState()); + + noHandler = new ReservedStateAction("internal:testOpAction", transportService, clusterService, threadPool); + + assertTrue(noHandler.supportsImmutableState()); + + // nothing should happen here, since the request doesn't touch any of the immutable state keys + noHandler.validateForImmutableState(new Request(), clusterState); + + ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest().persistentSettings( + Settings.builder().put("a", "a value").build() + ).transientSettings(Settings.builder().put("e", "e value").build()); + + FakeClusterStateUpdateAction action = new FakeClusterStateUpdateAction( + "internal:testClusterSettings", + transportService, + clusterService, + threadPool, + ThreadPool.Names.SAME + ); + + assertTrue(action.supportsImmutableState()); + + assertTrue( + expectThrows(IllegalArgumentException.class, () -> action.validateForImmutableState(request, clusterState)).getMessage() + .contains("with errors: [[a] set as read-only by [namespace_one], " + "[e] set as read-only by [namespace_two]") + ); + + ClusterUpdateSettingsRequest okRequest = new ClusterUpdateSettingsRequest().persistentSettings( + Settings.builder().put("m", "m value").build() + ).transientSettings(Settings.builder().put("n", "n value").build()); + + // this should just work, no conflicts + action.validateForImmutableState(okRequest, clusterState); + } + private Runnable blockAllThreads(String executorName) throws Exception { final int numberOfThreads = threadPool.info(executorName).getMax(); final EsThreadPoolExecutor executor = (EsThreadPoolExecutor) threadPool.executor(executorName);