Skip to content

Commit f46f9b1

Browse files
authored
Add transport action immutable state checks (#88491)
1 parent ebde65d commit f46f9b1

File tree

2 files changed

+144
-1
lines changed

2 files changed

+144
-1
lines changed

server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.cluster.block.ClusterBlockException;
2424
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2525
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
26+
import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
2627
import org.elasticsearch.cluster.node.DiscoveryNode;
2728
import org.elasticsearch.cluster.node.DiscoveryNodes;
2829
import org.elasticsearch.cluster.service.ClusterService;
@@ -43,7 +44,9 @@
4344
import org.elasticsearch.transport.TransportException;
4445
import org.elasticsearch.transport.TransportService;
4546

47+
import java.util.ArrayList;
4648
import java.util.Collections;
49+
import java.util.List;
4750
import java.util.Optional;
4851
import java.util.Set;
4952
import java.util.function.Predicate;
@@ -173,9 +176,39 @@ protected Set<String> modifiedKeys(Request request) {
173176
return Collections.emptySet();
174177
}
175178

179+
// package private for testing
180+
void validateForImmutableState(Request request, ClusterState state) {
181+
Optional<String> handlerName = reservedStateHandlerName();
182+
assert handlerName.isPresent();
183+
184+
Set<String> modified = modifiedKeys(request);
185+
List<String> errors = new ArrayList<>();
186+
187+
for (ReservedStateMetadata metadata : state.metadata().reservedStateMetadata().values()) {
188+
Set<String> conflicts = metadata.conflicts(handlerName.get(), modified);
189+
if (conflicts.isEmpty() == false) {
190+
errors.add(format("[%s] set as read-only by [%s]", String.join(", ", conflicts), metadata.namespace()));
191+
}
192+
}
193+
194+
if (errors.isEmpty() == false) {
195+
throw new IllegalArgumentException(
196+
format("Failed to process request [%s] with errors: [%s]", request, String.join(", ", errors))
197+
);
198+
}
199+
}
200+
201+
// package private for testing
202+
boolean supportsImmutableState() {
203+
return reservedStateHandlerName().isPresent();
204+
}
205+
176206
@Override
177207
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
178208
ClusterState state = clusterService.state();
209+
if (supportsImmutableState()) {
210+
validateForImmutableState(request, state);
211+
}
179212
logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version());
180213
if (task != null) {
181214
request.setParentTask(clusterService.localNode().getId(), task.getId());

server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java

Lines changed: 111 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@
1414
import org.elasticsearch.action.ActionRequestValidationException;
1515
import org.elasticsearch.action.ActionResponse;
1616
import org.elasticsearch.action.IndicesRequest;
17+
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
1718
import org.elasticsearch.action.support.ActionFilters;
1819
import org.elasticsearch.action.support.ActionTestUtils;
1920
import org.elasticsearch.action.support.IndicesOptions;
2021
import org.elasticsearch.action.support.PlainActionFuture;
2122
import org.elasticsearch.action.support.ThreadedActionListener;
2223
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
24+
import org.elasticsearch.cluster.ClusterName;
2325
import org.elasticsearch.cluster.ClusterState;
2426
import org.elasticsearch.cluster.NotMasterException;
2527
import org.elasticsearch.cluster.block.ClusterBlock;
@@ -30,6 +32,8 @@
3032
import org.elasticsearch.cluster.metadata.IndexMetadata;
3133
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3234
import org.elasticsearch.cluster.metadata.Metadata;
35+
import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata;
36+
import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
3337
import org.elasticsearch.cluster.node.DiscoveryNode;
3438
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
3539
import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -43,6 +47,7 @@
4347
import org.elasticsearch.discovery.MasterNotDiscoveredException;
4448
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
4549
import org.elasticsearch.node.NodeClosedException;
50+
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
4651
import org.elasticsearch.rest.RestStatus;
4752
import org.elasticsearch.tasks.CancellableTask;
4853
import org.elasticsearch.tasks.Task;
@@ -65,6 +70,7 @@
6570
import java.util.HashSet;
6671
import java.util.Map;
6772
import java.util.Objects;
73+
import java.util.Optional;
6874
import java.util.Set;
6975
import java.util.concurrent.CountDownLatch;
7076
import java.util.concurrent.CyclicBarrier;
@@ -254,6 +260,63 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
254260
}
255261
}
256262

263+
class ReservedStateAction extends Action {
264+
ReservedStateAction(String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) {
265+
super(actionName, transportService, clusterService, threadPool, ThreadPool.Names.SAME);
266+
}
267+
268+
@Override
269+
protected Optional<String> reservedStateHandlerName() {
270+
return Optional.of("test_reserved_state_action");
271+
}
272+
}
273+
274+
class FakeClusterStateUpdateAction extends TransportMasterNodeAction<ClusterUpdateSettingsRequest, Response> {
275+
FakeClusterStateUpdateAction(
276+
String actionName,
277+
TransportService transportService,
278+
ClusterService clusterService,
279+
ThreadPool threadPool,
280+
String executor
281+
) {
282+
super(
283+
actionName,
284+
transportService,
285+
clusterService,
286+
threadPool,
287+
new ActionFilters(new HashSet<>()),
288+
ClusterUpdateSettingsRequest::new,
289+
TestIndexNameExpressionResolver.newInstance(),
290+
Response::new,
291+
executor
292+
);
293+
}
294+
295+
@Override
296+
protected void masterOperation(
297+
Task task,
298+
ClusterUpdateSettingsRequest request,
299+
ClusterState state,
300+
ActionListener<Response> listener
301+
) {}
302+
303+
@Override
304+
protected ClusterBlockException checkBlock(ClusterUpdateSettingsRequest request, ClusterState state) {
305+
return null;
306+
}
307+
308+
@Override
309+
protected Optional<String> reservedStateHandlerName() {
310+
return Optional.of(ReservedClusterSettingsAction.NAME);
311+
}
312+
313+
@Override
314+
protected Set<String> modifiedKeys(ClusterUpdateSettingsRequest request) {
315+
Settings allSettings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build();
316+
return allSettings.keySet();
317+
}
318+
}
319+
257320
public void testLocalOperationWithoutBlocks() throws ExecutionException, InterruptedException {
258321
final boolean masterOperationFailure = randomBoolean();
259322

@@ -686,7 +749,6 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
686749
indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(state, request)
687750
);
688751
}
689-
690752
};
691753

692754
PlainActionFuture<Response> listener = new PlainActionFuture<>();
@@ -697,6 +759,54 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
697759
assertThat(ex.getCause().getCause(), instanceOf(ClusterBlockException.class));
698760
}
699761

762+
public void testRejectImmutableConflictClusterStateUpdate() {
763+
ReservedStateHandlerMetadata hmOne = new ReservedStateHandlerMetadata(ReservedClusterSettingsAction.NAME, Set.of("a", "b"));
764+
ReservedStateHandlerMetadata hmThree = new ReservedStateHandlerMetadata(ReservedClusterSettingsAction.NAME, Set.of("e", "f"));
765+
ReservedStateMetadata omOne = ReservedStateMetadata.builder("namespace_one").putHandler(hmOne).build();
766+
ReservedStateMetadata omTwo = ReservedStateMetadata.builder("namespace_two").putHandler(hmThree).build();
767+
768+
Metadata metadata = Metadata.builder().put(omOne).put(omTwo).build();
769+
770+
ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(metadata).build();
771+
772+
Action noHandler = new Action("internal:testAction", transportService, clusterService, threadPool, ThreadPool.Names.SAME);
773+
774+
assertFalse(noHandler.supportsImmutableState());
775+
776+
noHandler = new ReservedStateAction("internal:testOpAction", transportService, clusterService, threadPool);
777+
778+
assertTrue(noHandler.supportsImmutableState());
779+
780+
// nothing should happen here, since the request doesn't touch any of the immutable state keys
781+
noHandler.validateForImmutableState(new Request(), clusterState);
782+
783+
ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest().persistentSettings(
784+
Settings.builder().put("a", "a value").build()
785+
).transientSettings(Settings.builder().put("e", "e value").build());
786+
787+
FakeClusterStateUpdateAction action = new FakeClusterStateUpdateAction(
788+
"internal:testClusterSettings",
789+
transportService,
790+
clusterService,
791+
threadPool,
792+
ThreadPool.Names.SAME
793+
);
794+
795+
assertTrue(action.supportsImmutableState());
796+
797+
assertTrue(
798+
expectThrows(IllegalArgumentException.class, () -> action.validateForImmutableState(request, clusterState)).getMessage()
799+
.contains("with errors: [[a] set as read-only by [namespace_one], " + "[e] set as read-only by [namespace_two]")
800+
);
801+
802+
ClusterUpdateSettingsRequest okRequest = new ClusterUpdateSettingsRequest().persistentSettings(
803+
Settings.builder().put("m", "m value").build()
804+
).transientSettings(Settings.builder().put("n", "n value").build());
805+
806+
// this should just work, no conflicts
807+
action.validateForImmutableState(okRequest, clusterState);
808+
}
809+
700810
private Runnable blockAllThreads(String executorName) throws Exception {
701811
final int numberOfThreads = threadPool.info(executorName).getMax();
702812
final EsThreadPoolExecutor executor = (EsThreadPoolExecutor) threadPool.executor(executorName);

0 commit comments

Comments
 (0)