Skip to content

Commit e86c06a

Browse files
committed
Add new setting to disable persistent tasks allocations (#29137)
This commit adds a new setting `cluster.persistent_tasks.allocation.enable` that can be used to enable or disable the allocation of persistent tasks. The setting accepts the values `all` (default) or `none`. When set to none, the persistent tasks that are created (or that must be reassigned) won't be assigned to a node but will reside in the cluster state with a no "executor node" and a reason describing why it is not assigned: ``` "assignment" : { "executor_node" : null, "explanation" : "persistent task [foo/bar] cannot be assigned [no persistent task assignments are allowed due to cluster settings]" } ```
1 parent 329a15b commit e86c06a

File tree

11 files changed

+699
-8
lines changed

11 files changed

+699
-8
lines changed

docs/reference/modules/cluster/misc.asciidoc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,29 @@ PUT /_cluster/settings
5656
}
5757
-------------------------------
5858
// CONSOLE
59+
60+
61+
[[persistent-tasks-allocation]]
62+
==== Persistent Tasks Allocations
63+
64+
Plugins can create a kind of tasks called persistent tasks. Those tasks are
65+
usually long-live tasks and are stored in the cluster state, allowing the
66+
tasks to be revived after a full cluster restart.
67+
68+
Every time a persistent task is created, the master nodes takes care of
69+
assigning the task to a node of the cluster, and the assigned node will then
70+
pick up the task and execute it locally. The process of assigning persistent
71+
tasks to nodes is controlled by the following property, which can be updated
72+
dynamically:
73+
74+
`cluster.persistent_tasks.allocation.enable`::
75+
+
76+
--
77+
Enable or disable allocation for persistent tasks:
78+
79+
* `all` - (default) Allows persistent tasks to be assigned to nodes
80+
* `none` - No allocations are allowed for any type of persistent task
81+
82+
This setting does not affect the persistent tasks that are already being executed.
83+
Only newly created persistent tasks, or tasks that must be reassigned (after a node
84+
left the cluster, for example), are impacted by this setting.

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.elasticsearch.monitor.os.OsService;
8080
import org.elasticsearch.monitor.process.ProcessService;
8181
import org.elasticsearch.node.Node;
82+
import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
8283
import org.elasticsearch.plugins.PluginsService;
8384
import org.elasticsearch.repositories.fs.FsRepository;
8485
import org.elasticsearch.rest.BaseRestHandler;
@@ -420,6 +421,7 @@ public void apply(Settings value, Settings current, Settings previous) {
420421
FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE,
421422
Node.BREAKER_TYPE_KEY,
422423
OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING,
423-
IndexGraveyard.SETTING_MAX_TOMBSTONES
424+
IndexGraveyard.SETTING_MAX_TOMBSTONES,
425+
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING
424426
)));
425427
}

server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.elasticsearch.common.settings.Settings;
3535
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
3636
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
37+
import org.elasticsearch.persistent.decider.AssignmentDecision;
38+
import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
3739
import org.elasticsearch.tasks.Task;
3840

3941
import java.util.Objects;
@@ -45,12 +47,14 @@ public class PersistentTasksClusterService extends AbstractComponent implements
4547

4648
private final ClusterService clusterService;
4749
private final PersistentTasksExecutorRegistry registry;
50+
private final EnableAssignmentDecider decider;
4851

4952
public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorRegistry registry, ClusterService clusterService) {
5053
super(settings);
5154
this.clusterService = clusterService;
5255
clusterService.addListener(this);
5356
this.registry = registry;
57+
this.decider = new EnableAssignmentDecider(settings, clusterService.getClusterSettings());
5458
}
5559

5660
/**
@@ -224,6 +228,12 @@ private <Params extends PersistentTaskParams> Assignment createAssignment(final
224228
final @Nullable Params taskParams,
225229
final ClusterState currentState) {
226230
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
231+
232+
AssignmentDecision decision = decider.canAssign();
233+
if (decision.getType() == AssignmentDecision.Type.NO) {
234+
return new Assignment(null, "persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]");
235+
}
236+
227237
return persistentTasksExecutor.getAssignment(taskParams, currentState);
228238
}
229239

@@ -249,7 +259,8 @@ public void onFailure(String source, Exception e) {
249259

250260
/**
251261
* Returns true if the cluster state change(s) require to reassign some persistent tasks. It can happen in the following
252-
* situations: a node left or is added, the routing table changed, the master node changed or the persistent tasks changed.
262+
* situations: a node left or is added, the routing table changed, the master node changed, the metadata changed or the
263+
* persistent tasks changed.
253264
*/
254265
boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) {
255266
final PersistentTasksCustomMetaData tasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
@@ -259,7 +270,12 @@ boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) {
259270

260271
boolean masterChanged = event.previousState().nodes().isLocalNodeElectedMaster() == false;
261272

262-
if (persistentTasksChanged(event) || event.nodesChanged() || event.routingTableChanged() || masterChanged) {
273+
if (persistentTasksChanged(event)
274+
|| event.nodesChanged()
275+
|| event.routingTableChanged()
276+
|| event.metaDataChanged()
277+
|| masterChanged) {
278+
263279
for (PersistentTask<?> task : tasks.tasks()) {
264280
if (needsReassignment(task.getAssignment(), event.state().nodes())) {
265281
Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), event.state());
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.persistent.decider;
20+
21+
import java.util.Locale;
22+
import java.util.Objects;
23+
24+
/**
25+
* {@link AssignmentDecision} represents the decision made during the process of
26+
* assigning a persistent task to a node of the cluster.
27+
*
28+
* @see EnableAssignmentDecider
29+
*/
30+
public final class AssignmentDecision {
31+
32+
public static final AssignmentDecision YES = new AssignmentDecision(Type.YES, "");
33+
34+
private final Type type;
35+
private final String reason;
36+
37+
public AssignmentDecision(final Type type, final String reason) {
38+
this.type = Objects.requireNonNull(type);
39+
this.reason = Objects.requireNonNull(reason);
40+
}
41+
42+
public Type getType() {
43+
return type;
44+
}
45+
46+
public String getReason() {
47+
return reason;
48+
}
49+
50+
@Override
51+
public String toString() {
52+
return "assignment decision [type=" + type + ", reason=" + reason + "]";
53+
}
54+
55+
public enum Type {
56+
NO(0), YES(1);
57+
58+
private final int id;
59+
60+
Type(int id) {
61+
this.id = id;
62+
}
63+
64+
public int getId() {
65+
return id;
66+
}
67+
68+
public static Type resolve(final String s) {
69+
return Type.valueOf(s.toUpperCase(Locale.ROOT));
70+
}
71+
}
72+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.persistent.decider;
20+
21+
import org.elasticsearch.common.settings.ClusterSettings;
22+
import org.elasticsearch.common.settings.Setting;
23+
import org.elasticsearch.common.settings.Settings;
24+
25+
import java.util.Locale;
26+
27+
import static org.elasticsearch.common.settings.Setting.Property.Dynamic;
28+
import static org.elasticsearch.common.settings.Setting.Property.NodeScope;
29+
30+
/**
31+
* {@link EnableAssignmentDecider} is used to allow/disallow the persistent tasks
32+
* to be assigned to cluster nodes.
33+
* <p>
34+
* Allocation settings can have the following values (non-casesensitive):
35+
* <ul>
36+
* <li> <code>NONE</code> - no persistent tasks can be assigned
37+
* <li> <code>ALL</code> - all persistent tasks can be assigned to nodes
38+
* </ul>
39+
*
40+
* @see Allocation
41+
*/
42+
public class EnableAssignmentDecider {
43+
44+
public static final Setting<Allocation> CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING =
45+
new Setting<>("cluster.persistent_tasks.allocation.enable", Allocation.ALL.toString(), Allocation::fromString, Dynamic, NodeScope);
46+
47+
private volatile Allocation enableAssignment;
48+
49+
public EnableAssignmentDecider(final Settings settings, final ClusterSettings clusterSettings) {
50+
this.enableAssignment = CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.get(settings);
51+
clusterSettings.addSettingsUpdateConsumer(CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, this::setEnableAssignment);
52+
}
53+
54+
public void setEnableAssignment(final Allocation enableAssignment) {
55+
this.enableAssignment = enableAssignment;
56+
}
57+
58+
/**
59+
* Returns a {@link AssignmentDecision} whether the given persistent task can be assigned
60+
* to a node of the cluster. The decision depends on the current value of the setting
61+
* {@link EnableAssignmentDecider#CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING}.
62+
*
63+
* @return the {@link AssignmentDecision}
64+
*/
65+
public AssignmentDecision canAssign() {
66+
if (enableAssignment == Allocation.NONE) {
67+
return new AssignmentDecision(AssignmentDecision.Type.NO, "no persistent task assignments are allowed due to cluster settings");
68+
}
69+
return AssignmentDecision.YES;
70+
}
71+
72+
/**
73+
* Allocation values or rather their string representation to be used used with
74+
* {@link EnableAssignmentDecider#CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING}
75+
* via cluster settings.
76+
*/
77+
public enum Allocation {
78+
79+
NONE,
80+
ALL;
81+
82+
public static Allocation fromString(final String strValue) {
83+
if (strValue == null) {
84+
return null;
85+
} else {
86+
String value = strValue.toUpperCase(Locale.ROOT);
87+
try {
88+
return valueOf(value);
89+
} catch (IllegalArgumentException e) {
90+
throw new IllegalArgumentException("Illegal value [" + value + "] for ["
91+
+ CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey() + "]");
92+
}
93+
}
94+
}
95+
96+
@Override
97+
public String toString() {
98+
return name().toLowerCase(Locale.ROOT);
99+
}
100+
}
101+
}

server/src/main/java/org/elasticsearch/persistent/package-info.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
* task.
3131
* <p>
3232
* 2. The master node updates the {@link org.elasticsearch.persistent.PersistentTasksCustomMetaData} in the cluster state to indicate
33-
* that there is a new persistent task is running in the system.
33+
* that there is a new persistent task running in the system.
3434
* <p>
3535
* 3. The {@link org.elasticsearch.persistent.PersistentTasksNodeService} running on every node in the cluster monitors changes in
3636
* the cluster state and starts execution of all new tasks assigned to the node it is running on.

0 commit comments

Comments
 (0)