From 40a20f5765604754f6ee3f761d81908f7b6ddd0a Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 19 Mar 2018 15:06:20 +0100 Subject: [PATCH 1/4] Add new setting to disable persistent tasks allocations This commit adds a new setting `cluster.persistent_tasks.allocation.enable` that can be used to enable or disable the allocation of persistent tasks. The setting accepts the values `all` (default) or `none`. When set to none, the persistent tasks that are created (or that must be reassigned) won't be assigned to a node but will reside in the cluster state with a no "executor node" and a reason describing why it is not assigned: ``` "assignment" : { "executor_node" : null, "explanation" : "persistent task [foo/bar] cannot be assigned [no persistent task assignments are allowed due to cluster settings]" } ``` This commit adds some AssignmentDecider and AssignmentDecision classes that are similar to the deciders/decisions classes used for shard allocations. It also adds an EnableAssignmentDecider decider that is responsible for registering the new cluster settings. The setting is checked when executing the PersistentTasksClusterService `createAssignment()` method, before delegating the creation of the assignment to the PersistentTasksExecutor. As a follow up, I think that PersistentTasksExecutor could expose a `getTaskAllocationDecider` method that would be used by the PersistentTasksClusterService to build a list of nodes that could execute the new task. Then this list of assignable nodes could be passed again to the task executor to select the exact node to execute the task. --- docs/reference/modules/cluster/misc.asciidoc | 26 +++++ .../common/settings/ClusterSettings.java | 4 +- .../PersistentTasksClusterService.java | 21 +++- .../persistent/decider/AssignmentDecider.java | 47 ++++++++ .../decider/AssignmentDecision.java | 78 +++++++++++++ .../decider/EnableAssignmentDecider.java | 98 ++++++++++++++++ .../persistent/package-info.java | 2 +- .../PersistentTasksClusterServiceTests.java | 90 ++++++++++++++- .../PersistentTasksDecidersTestCase.java | 109 ++++++++++++++++++ .../decider/AssignmentDecisionTests.java | 35 ++++++ .../decider/EnableAssignmentDeciderTests.java | 51 ++++++++ 11 files changed, 553 insertions(+), 8 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/persistent/decider/AssignmentDecider.java create mode 100644 server/src/main/java/org/elasticsearch/persistent/decider/AssignmentDecision.java create mode 100644 server/src/main/java/org/elasticsearch/persistent/decider/EnableAssignmentDecider.java create mode 100644 server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java create mode 100644 server/src/test/java/org/elasticsearch/persistent/decider/AssignmentDecisionTests.java create mode 100644 server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderTests.java diff --git a/docs/reference/modules/cluster/misc.asciidoc b/docs/reference/modules/cluster/misc.asciidoc index 3963312c0f4ea..4edcd34e00f5d 100644 --- a/docs/reference/modules/cluster/misc.asciidoc +++ b/docs/reference/modules/cluster/misc.asciidoc @@ -56,3 +56,29 @@ PUT /_cluster/settings } ------------------------------- // CONSOLE + + +[[persistent-tasks-allocation]] +==== Persistent Tasks Allocations + +Plugins can create a kind of tasks called persistent tasks. Those tasks are +usually long-live tasks and are stored in the cluster state, allowing the +tasks to be revived after a full cluster restart. + +Every time a persistent task is created, the master nodes takes care of +assigning the task to a node of the cluster, and the assigned node will then +pick up the task and execute it locally. The process of assigning persistent +tasks to nodes is controlled by the following property, which can be updated +dynamically: + +`cluster.persistent_tasks.allocation.enable`:: ++ +-- +Enable or disable allocation for persistent tasks: + +* `all` - (default) Allows persistent tasks to be assigned to nodes +* `none` - No allocations are allowed for any type of persistent task + +This setting does not affect the persistent tasks that are already being executed. +Only newly created persistent tasks, or tasks that must be reassigned (after a node +left the cluster, for example), are impacted by this setting. diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 804340d63ed11..bcfed3388e9f2 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -79,6 +79,7 @@ import org.elasticsearch.monitor.os.OsService; import org.elasticsearch.monitor.process.ProcessService; import org.elasticsearch.node.Node; +import org.elasticsearch.persistent.decider.EnableAssignmentDecider; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.rest.BaseRestHandler; @@ -420,6 +421,7 @@ public void apply(Settings value, Settings current, Settings previous) { FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE, Node.BREAKER_TYPE_KEY, OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, - IndexGraveyard.SETTING_MAX_TOMBSTONES + IndexGraveyard.SETTING_MAX_TOMBSTONES, + EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING ))); } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index 9e064c3d20924..8d6b3c2e4e6d8 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -34,6 +34,9 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.persistent.decider.AssignmentDecider; +import org.elasticsearch.persistent.decider.AssignmentDecision; +import org.elasticsearch.persistent.decider.EnableAssignmentDecider; import org.elasticsearch.tasks.Task; import java.util.Objects; @@ -45,12 +48,14 @@ public class PersistentTasksClusterService extends AbstractComponent implements private final ClusterService clusterService; private final PersistentTasksExecutorRegistry registry; + private final AssignmentDecider decider; public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorRegistry registry, ClusterService clusterService) { super(settings); this.clusterService = clusterService; clusterService.addListener(this); this.registry = registry; + this.decider = new EnableAssignmentDecider<>(settings, clusterService.getClusterSettings()); } /** @@ -224,6 +229,12 @@ private Assignment createAssignment(final final @Nullable Params taskParams, final ClusterState currentState) { PersistentTasksExecutor persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName); + + AssignmentDecision decision = decider.canAssign(taskName, taskParams); + if (decision.getType() == AssignmentDecision.Type.NO) { + return new Assignment(null, "persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]"); + } + return persistentTasksExecutor.getAssignment(taskParams, currentState); } @@ -249,7 +260,8 @@ public void onFailure(String source, Exception e) { /** * Returns true if the cluster state change(s) require to reassign some persistent tasks. It can happen in the following - * situations: a node left or is added, the routing table changed, the master node changed or the persistent tasks changed. + * situations: a node left or is added, the routing table changed, the master node changed, the metadata changed or the + * persistent tasks changed. */ boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) { final PersistentTasksCustomMetaData tasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); @@ -259,7 +271,12 @@ boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) { boolean masterChanged = event.previousState().nodes().isLocalNodeElectedMaster() == false; - if (persistentTasksChanged(event) || event.nodesChanged() || event.routingTableChanged() || masterChanged) { + if (persistentTasksChanged(event) + || event.nodesChanged() + || event.routingTableChanged() + || event.metaDataChanged() + || masterChanged) { + for (PersistentTask task : tasks.tasks()) { if (needsReassignment(task.getAssignment(), event.state().nodes())) { Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), event.state()); diff --git a/server/src/main/java/org/elasticsearch/persistent/decider/AssignmentDecider.java b/server/src/main/java/org/elasticsearch/persistent/decider/AssignmentDecider.java new file mode 100644 index 0000000000000..9a380efc2ad9d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/persistent/decider/AssignmentDecider.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.persistent.decider; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.PersistentTaskParams; + +/** + * {@link AssignmentDecider} are used to decide if a persistent + * task can be assigned to a node. + */ +public abstract class AssignmentDecider extends AbstractComponent { + + public AssignmentDecider(Settings settings) { + super(settings); + } + + /** + * Returns a {@link AssignmentDecision} whether the given persistent task can be assigned + * to any node of the cluster. The default is {@link AssignmentDecision#ALWAYS}. + * + * @param taskName the task's name + * @param taskParams the task's parameters + * @return the {@link AssignmentDecision} + */ + public AssignmentDecision canAssign(final String taskName, final @Nullable Params taskParams) { + return AssignmentDecision.ALWAYS; + } +} diff --git a/server/src/main/java/org/elasticsearch/persistent/decider/AssignmentDecision.java b/server/src/main/java/org/elasticsearch/persistent/decider/AssignmentDecision.java new file mode 100644 index 0000000000000..5434b8dc27300 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/persistent/decider/AssignmentDecision.java @@ -0,0 +1,78 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.persistent.decider; + +import java.util.Locale; +import java.util.Objects; + +/** + * {@link AssignmentDecision} represents the decision made during the process of + * assigning a persistent task to a node of the cluster. + * + * @see AssignmentDecider + */ +public final class AssignmentDecision { + + public static final AssignmentDecision ALWAYS = new AssignmentDecision(Type.YES); + public static final AssignmentDecision YES = new AssignmentDecision(Type.YES); + public static final AssignmentDecision NO = new AssignmentDecision(Type.NO); + + private final Type type; + private final String reason; + + private AssignmentDecision(final Type type) { + this(type, ""); + } + + public AssignmentDecision(final Type type, final String reason) { + this.type = Objects.requireNonNull(type); + this.reason = Objects.requireNonNull(reason); + } + + public Type getType() { + return type; + } + + public String getReason() { + return reason; + } + + @Override + public String toString() { + return "assignment decision [type=" + type + ", reason=" + reason + "]"; + } + + public enum Type { + NO(0), YES(1); + + private final int id; + + Type(int id) { + this.id = id; + } + + public int getId() { + return id; + } + + public static Type resolve(final String s) { + return Type.valueOf(s.toUpperCase(Locale.ROOT)); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/persistent/decider/EnableAssignmentDecider.java b/server/src/main/java/org/elasticsearch/persistent/decider/EnableAssignmentDecider.java new file mode 100644 index 0000000000000..c9159ffb13be7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/persistent/decider/EnableAssignmentDecider.java @@ -0,0 +1,98 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.persistent.decider; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.PersistentTaskParams; + +import java.util.Locale; + +import static org.elasticsearch.common.settings.Setting.Property.Dynamic; +import static org.elasticsearch.common.settings.Setting.Property.NodeScope; + +/** + * This {@link AssignmentDecider} is used to allow/disallow the persistent tasks + * to be assigned to cluster nodes. + *

+ * Allocation settings can have the following values (non-casesensitive): + *

    + *
  • NONE - no persistent tasks can be assigned + *
  • ALL - all persistent tasks can be assigned to nodes + *
+ * + * @see Allocation + */ +public class EnableAssignmentDecider extends AssignmentDecider { + + public static final Setting CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING = + new Setting<>("cluster.persistent_tasks.allocation.enable", Allocation.ALL.toString(), Allocation::fromString, Dynamic, NodeScope); + + private volatile Allocation enableAssignment; + + public EnableAssignmentDecider(final Settings settings, final ClusterSettings clusterSettings) { + super(settings); + this.enableAssignment = CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, this::setEnableAssignment); + } + + public void setEnableAssignment(final Allocation enableAssignment) { + this.enableAssignment = enableAssignment; + } + + @Override + public AssignmentDecision canAssign(final String taskName, final @Nullable Params taskParams) { + if (enableAssignment == Allocation.NONE) { + return new AssignmentDecision(AssignmentDecision.Type.NO, "no persistent task assignments are allowed due to cluster settings"); + } + return AssignmentDecision.YES; + } + + /** + * Allocation values or rather their string representation to be used used with + * {@link EnableAssignmentDecider#CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING} + * via cluster settings. + */ + public enum Allocation { + + NONE, + ALL; + + public static Allocation fromString(final String strValue) { + if (strValue == null) { + return null; + } else { + String value = strValue.toUpperCase(Locale.ROOT); + try { + return valueOf(value); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Illegal value [" + value + "] for [" + + CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey() + "]"); + } + } + } + + @Override + public String toString() { + return name().toLowerCase(Locale.ROOT); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/persistent/package-info.java b/server/src/main/java/org/elasticsearch/persistent/package-info.java index f948e3ace448e..3e71716e60643 100644 --- a/server/src/main/java/org/elasticsearch/persistent/package-info.java +++ b/server/src/main/java/org/elasticsearch/persistent/package-info.java @@ -30,7 +30,7 @@ * task. *

* 2. The master node updates the {@link org.elasticsearch.persistent.PersistentTasksCustomMetaData} in the cluster state to indicate - * that there is a new persistent task is running in the system. + * that there is a new persistent task running in the system. *

* 3. The {@link org.elasticsearch.persistent.PersistentTasksNodeService} running on every node in the cluster monitors changes in * the cluster state and starts execution of all new tasks assigned to the node it is running on. diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index e470c5028aa8f..916fdee213695 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -36,9 +36,16 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; +import org.elasticsearch.persistent.decider.EnableAssignmentDecider; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; import java.util.ArrayList; import java.util.Arrays; @@ -52,14 +59,41 @@ import static org.elasticsearch.persistent.PersistentTasksClusterService.needsReassignment; import static org.elasticsearch.persistent.PersistentTasksClusterService.persistentTasksChanged; import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -import static org.mockito.Mockito.mock; public class PersistentTasksClusterServiceTests extends ESTestCase { + /** Needed by {@link ClusterService} **/ + private static ThreadPool threadPool; + /** Needed by {@link PersistentTasksClusterService} **/ + private ClusterService clusterService; + + @BeforeClass + public static void setUpThreadPool() { + threadPool = new TestThreadPool(PersistentTasksClusterServiceTests.class.getSimpleName()); + } + + @Before + public void setUp() throws Exception { + super.setUp(); + clusterService = createClusterService(threadPool); + } + + @AfterClass + public static void tearDownThreadPool() throws Exception { + terminate(threadPool); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + clusterService.close(); + } + public void testReassignmentRequired() { final PersistentTasksClusterService service = createService((params, clusterState) -> "never_assign".equals(((TestParams) params).getTestParam()) ? NO_NODE_FOUND : randomNodeAssignment(clusterState.nodes()) @@ -81,6 +115,55 @@ public void testReassignmentRequired() { } } + public void testReassignmentRequiredOnMetadataChanges() { + EnableAssignmentDecider.Allocation allocation = randomFrom(EnableAssignmentDecider.Allocation.values()); + + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node", buildNewFakeTransportAddress(), Version.CURRENT)) + .localNodeId("_node") + .masterNodeId("_node") + .build(); + + boolean unassigned = randomBoolean(); + PersistentTasksCustomMetaData tasks = PersistentTasksCustomMetaData.builder() + .addTask("_task_1", TestPersistentTasksExecutor.NAME, null, new Assignment(unassigned ? null : "_node", "_reason")) + .build(); + + MetaData metaData = MetaData.builder() + .putCustom(PersistentTasksCustomMetaData.TYPE, tasks) + .persistentSettings(Settings.builder() + .put(EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey(), allocation.toString()) + .build()) + .build(); + + ClusterState previous = ClusterState.builder(new ClusterName("_name")) + .nodes(nodes) + .metaData(metaData) + .build(); + + ClusterState current; + + final boolean changed = randomBoolean(); + if (changed) { + allocation = randomValueOtherThan(allocation, () -> randomFrom(EnableAssignmentDecider.Allocation.values())); + + current = ClusterState.builder(previous) + .metaData(MetaData.builder(previous.metaData()) + .persistentSettings(Settings.builder() + .put(EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey(), allocation.toString()) + .build()) + .build()) + .build(); + } else { + current = ClusterState.builder(previous).build(); + } + + final ClusterChangedEvent event = new ClusterChangedEvent("test", current, previous); + + final PersistentTasksClusterService service = createService((params, clusterState) -> randomNodeAssignment(clusterState.nodes())); + assertThat(dumpEvent(event), service.shouldReassignPersistentTasks(event), equalTo(changed && unassigned)); + } + public void testReassignTasksWithNoTasks() { ClusterState clusterState = initialState(); assertThat(reassign(clusterState).metaData().custom(PersistentTasksCustomMetaData.TYPE), nullValue()); @@ -527,7 +610,6 @@ private DiscoveryNode newNode(String nodeId) { Version.CURRENT); } - private ClusterState initialState() { MetaData.Builder metaData = MetaData.builder(); RoutingTable.Builder routingTable = RoutingTable.builder(); @@ -558,7 +640,7 @@ private void changeRoutingTable(MetaData.Builder metaData, RoutingTable.Builder } /** Creates a PersistentTasksClusterService with a single PersistentTasksExecutor implemented by a BiFunction **/ - static

PersistentTasksClusterService createService(final BiFunction fn) { + private

PersistentTasksClusterService createService(final BiFunction fn) { PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, singleton(new PersistentTasksExecutor

(Settings.EMPTY, TestPersistentTasksExecutor.NAME, null) { @Override @@ -571,6 +653,6 @@ protected void nodeOperation(AllocatedPersistentTask task, P params, Task.Status throw new UnsupportedOperationException(); } })); - return new PersistentTasksClusterService(Settings.EMPTY, registry, mock(ClusterService.class)); + return new PersistentTasksClusterService(Settings.EMPTY, registry, clusterService); } } diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java new file mode 100644 index 0000000000000..3adf5f170786a --- /dev/null +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java @@ -0,0 +1,109 @@ +package org.elasticsearch.persistent; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.util.function.Predicate; + +import static java.util.Collections.emptyList; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; + +public abstract class PersistentTasksDecidersTestCase extends ESTestCase { + + /** Needed by {@link ClusterService} **/ + private static ThreadPool threadPool; + /** Needed by {@link PersistentTasksClusterService} **/ + private ClusterService clusterService; + + private PersistentTasksClusterService persistentTasksClusterService; + + @BeforeClass + public static void setUpThreadPool() { + threadPool = new TestThreadPool(getTestClass().getSimpleName()); + } + + @Before + public void setUp() throws Exception { + super.setUp(); + clusterService = createClusterService(threadPool); + PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(clusterService.getSettings(), emptyList()) { + @Override + public PersistentTasksExecutor getPersistentTaskExecutorSafe(String taskName) { + return new PersistentTasksExecutor(clusterService.getSettings(), taskName, null) { + @Override + protected void nodeOperation(AllocatedPersistentTask task, Params params, Task.Status status) { + logger.debug("Executing task {}", task); + } + }; + } + }; + persistentTasksClusterService = new PersistentTasksClusterService(clusterService.getSettings(), registry, clusterService); + } + + @AfterClass + public static void tearDownThreadPool() throws Exception { + terminate(threadPool); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + clusterService.close(); + } + + protected ClusterState reassign(final ClusterState clusterState) { + return persistentTasksClusterService.reassignTasks(clusterState); + } + + protected static ClusterState createClusterStateWithTasks(final Settings settings, final int nbNodes, final int nbTasks) { + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); + for (int i = 0; i < nbNodes; i++) { + nodes.add(new DiscoveryNode("_node_" + i, buildNewFakeTransportAddress(), Version.CURRENT)); + } + + PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(); + for (int i = 0; i < nbTasks; i++) { + tasks.addTask("_task_" + i, "test", null, new PersistentTasksCustomMetaData.Assignment(null, "initialized")); + } + + MetaData metaData = MetaData.builder() + .putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build()) + .persistentSettings(settings) + .build(); + + return ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).metaData(metaData).build(); + } + + /** Asserts that the given cluster state contains nbTasks tasks that are assigned **/ + protected static void assertNbAssignedTasks(final long nbTasks, final ClusterState clusterState) { + assertPersistentTasks(nbTasks, clusterState, PersistentTasksCustomMetaData.PersistentTask::isAssigned); + } + + /** Asserts that the given cluster state contains nbTasks tasks that are NOT assigned **/ + protected static void assertNbUnassignedTasks(final long nbTasks, final ClusterState clusterState) { + assertPersistentTasks(nbTasks, clusterState, task -> task.isAssigned() == false); + } + + /** Asserts that the cluster state contains nbTasks tasks that verify the given predicate **/ + protected static void assertPersistentTasks(final long nbTasks, + final ClusterState clusterState, + final Predicate predicate) { + PersistentTasksCustomMetaData tasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); + assertNotNull("Persistent tasks must be not null", tasks); + assertEquals(nbTasks, tasks.tasks().stream().filter(predicate).count()); + } +} diff --git a/server/src/test/java/org/elasticsearch/persistent/decider/AssignmentDecisionTests.java b/server/src/test/java/org/elasticsearch/persistent/decider/AssignmentDecisionTests.java new file mode 100644 index 0000000000000..51b08a52bd81f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/persistent/decider/AssignmentDecisionTests.java @@ -0,0 +1,35 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.persistent.decider; + +import org.elasticsearch.test.ESTestCase; + +public class AssignmentDecisionTests extends ESTestCase { + + public void testConstantsTypes() { + assertEquals(AssignmentDecision.Type.YES, AssignmentDecision.ALWAYS.getType()); + assertEquals(AssignmentDecision.Type.YES, AssignmentDecision.YES.getType()); + assertEquals(AssignmentDecision.Type.NO, AssignmentDecision.NO.getType()); + } + + public void testResolveFromType() { + final AssignmentDecision.Type expected = randomFrom(AssignmentDecision.Type.values()); + assertEquals(expected, AssignmentDecision.Type.resolve(expected.toString())); + } +} diff --git a/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderTests.java b/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderTests.java new file mode 100644 index 0000000000000..7c8b2ba764c47 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderTests.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.persistent.decider; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.PersistentTasksDecidersTestCase; + +public class EnableAssignmentDeciderTests extends PersistentTasksDecidersTestCase { + + public void testAllocationValues() { + final String all = randomFrom("all", "All", "ALL"); + assertEquals(EnableAssignmentDecider.Allocation.ALL, EnableAssignmentDecider.Allocation.fromString(all)); + + final String none = randomFrom("none", "None", "NONE"); + assertEquals(EnableAssignmentDecider.Allocation.NONE, EnableAssignmentDecider.Allocation.fromString(none)); + } + + public void testEnableAssignment() { + final int nbTasks = randomIntBetween(1, 10); + final int nbNodes = randomIntBetween(1, 5); + final EnableAssignmentDecider.Allocation allocation = randomFrom(EnableAssignmentDecider.Allocation.values()); + + Settings settings = Settings.builder() + .put(EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey(), allocation.toString()) + .build(); + + ClusterState clusterState = reassign(createClusterStateWithTasks(settings, nbNodes, nbTasks)); + if (allocation == EnableAssignmentDecider.Allocation.ALL) { + assertNbAssignedTasks(nbTasks, clusterState); + } else { + assertNbUnassignedTasks(nbTasks, clusterState); + } + } +} From 8d91188986606ec24d3baa772bff986eba33244a Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 19 Mar 2018 15:37:55 +0100 Subject: [PATCH 2/4] Fix license header --- .../PersistentTasksDecidersTestCase.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java index 3adf5f170786a..d7fe2f535d771 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java @@ -1,3 +1,21 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.elasticsearch.persistent; import org.elasticsearch.Version; From 69d93ac88f16c10dd4dbec8ac9c31689ba30b07e Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 21 Mar 2018 15:09:23 +0100 Subject: [PATCH 3/4] Apply feedback --- .../PersistentTasksClusterService.java | 7 +- .../persistent/decider/AssignmentDecider.java | 47 ----- .../decider/AssignmentDecision.java | 10 +- .../decider/EnableAssignmentDecider.java | 17 +- .../decider/AssignmentDecisionTests.java | 2 - .../decider/EnableAssignmentDeciderIT.java | 173 ++++++++++++++++++ 6 files changed, 188 insertions(+), 68 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/persistent/decider/AssignmentDecider.java create mode 100644 server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index 8d6b3c2e4e6d8..cf44556ee5ddc 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -34,7 +34,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; -import org.elasticsearch.persistent.decider.AssignmentDecider; import org.elasticsearch.persistent.decider.AssignmentDecision; import org.elasticsearch.persistent.decider.EnableAssignmentDecider; import org.elasticsearch.tasks.Task; @@ -48,14 +47,14 @@ public class PersistentTasksClusterService extends AbstractComponent implements private final ClusterService clusterService; private final PersistentTasksExecutorRegistry registry; - private final AssignmentDecider decider; + private final EnableAssignmentDecider decider; public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorRegistry registry, ClusterService clusterService) { super(settings); this.clusterService = clusterService; clusterService.addListener(this); this.registry = registry; - this.decider = new EnableAssignmentDecider<>(settings, clusterService.getClusterSettings()); + this.decider = new EnableAssignmentDecider(settings, clusterService.getClusterSettings()); } /** @@ -230,7 +229,7 @@ private Assignment createAssignment(final final ClusterState currentState) { PersistentTasksExecutor persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName); - AssignmentDecision decision = decider.canAssign(taskName, taskParams); + AssignmentDecision decision = decider.canAssign(); if (decision.getType() == AssignmentDecision.Type.NO) { return new Assignment(null, "persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]"); } diff --git a/server/src/main/java/org/elasticsearch/persistent/decider/AssignmentDecider.java b/server/src/main/java/org/elasticsearch/persistent/decider/AssignmentDecider.java deleted file mode 100644 index 9a380efc2ad9d..0000000000000 --- a/server/src/main/java/org/elasticsearch/persistent/decider/AssignmentDecider.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.persistent.decider; - -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.persistent.PersistentTaskParams; - -/** - * {@link AssignmentDecider} are used to decide if a persistent - * task can be assigned to a node. - */ -public abstract class AssignmentDecider extends AbstractComponent { - - public AssignmentDecider(Settings settings) { - super(settings); - } - - /** - * Returns a {@link AssignmentDecision} whether the given persistent task can be assigned - * to any node of the cluster. The default is {@link AssignmentDecision#ALWAYS}. - * - * @param taskName the task's name - * @param taskParams the task's parameters - * @return the {@link AssignmentDecision} - */ - public AssignmentDecision canAssign(final String taskName, final @Nullable Params taskParams) { - return AssignmentDecision.ALWAYS; - } -} diff --git a/server/src/main/java/org/elasticsearch/persistent/decider/AssignmentDecision.java b/server/src/main/java/org/elasticsearch/persistent/decider/AssignmentDecision.java index 5434b8dc27300..eb8f851a68dab 100644 --- a/server/src/main/java/org/elasticsearch/persistent/decider/AssignmentDecision.java +++ b/server/src/main/java/org/elasticsearch/persistent/decider/AssignmentDecision.java @@ -25,21 +25,15 @@ * {@link AssignmentDecision} represents the decision made during the process of * assigning a persistent task to a node of the cluster. * - * @see AssignmentDecider + * @see EnableAssignmentDecider */ public final class AssignmentDecision { - public static final AssignmentDecision ALWAYS = new AssignmentDecision(Type.YES); - public static final AssignmentDecision YES = new AssignmentDecision(Type.YES); - public static final AssignmentDecision NO = new AssignmentDecision(Type.NO); + public static final AssignmentDecision YES = new AssignmentDecision(Type.YES, ""); private final Type type; private final String reason; - private AssignmentDecision(final Type type) { - this(type, ""); - } - public AssignmentDecision(final Type type, final String reason) { this.type = Objects.requireNonNull(type); this.reason = Objects.requireNonNull(reason); diff --git a/server/src/main/java/org/elasticsearch/persistent/decider/EnableAssignmentDecider.java b/server/src/main/java/org/elasticsearch/persistent/decider/EnableAssignmentDecider.java index c9159ffb13be7..525e1379a4098 100644 --- a/server/src/main/java/org/elasticsearch/persistent/decider/EnableAssignmentDecider.java +++ b/server/src/main/java/org/elasticsearch/persistent/decider/EnableAssignmentDecider.java @@ -18,11 +18,9 @@ */ package org.elasticsearch.persistent.decider; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.persistent.PersistentTaskParams; import java.util.Locale; @@ -30,7 +28,7 @@ import static org.elasticsearch.common.settings.Setting.Property.NodeScope; /** - * This {@link AssignmentDecider} is used to allow/disallow the persistent tasks + * {@link EnableAssignmentDecider} is used to allow/disallow the persistent tasks * to be assigned to cluster nodes. *

* Allocation settings can have the following values (non-casesensitive): @@ -41,7 +39,7 @@ * * @see Allocation */ -public class EnableAssignmentDecider extends AssignmentDecider { +public class EnableAssignmentDecider { public static final Setting CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING = new Setting<>("cluster.persistent_tasks.allocation.enable", Allocation.ALL.toString(), Allocation::fromString, Dynamic, NodeScope); @@ -49,7 +47,6 @@ public class EnableAssignmentDecider extend private volatile Allocation enableAssignment; public EnableAssignmentDecider(final Settings settings, final ClusterSettings clusterSettings) { - super(settings); this.enableAssignment = CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, this::setEnableAssignment); } @@ -58,8 +55,14 @@ public void setEnableAssignment(final Allocation enableAssignment) { this.enableAssignment = enableAssignment; } - @Override - public AssignmentDecision canAssign(final String taskName, final @Nullable Params taskParams) { + /** + * Returns a {@link AssignmentDecision} whether the given persistent task can be assigned + * to a node of the cluster. The decision depends on the current value of the setting + * {@link EnableAssignmentDecider#CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING}. + * + * @return the {@link AssignmentDecision} + */ + public AssignmentDecision canAssign() { if (enableAssignment == Allocation.NONE) { return new AssignmentDecision(AssignmentDecision.Type.NO, "no persistent task assignments are allowed due to cluster settings"); } diff --git a/server/src/test/java/org/elasticsearch/persistent/decider/AssignmentDecisionTests.java b/server/src/test/java/org/elasticsearch/persistent/decider/AssignmentDecisionTests.java index 51b08a52bd81f..3fa580e726a83 100644 --- a/server/src/test/java/org/elasticsearch/persistent/decider/AssignmentDecisionTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/decider/AssignmentDecisionTests.java @@ -23,9 +23,7 @@ public class AssignmentDecisionTests extends ESTestCase { public void testConstantsTypes() { - assertEquals(AssignmentDecision.Type.YES, AssignmentDecision.ALWAYS.getType()); assertEquals(AssignmentDecision.Type.YES, AssignmentDecision.YES.getType()); - assertEquals(AssignmentDecision.Type.NO, AssignmentDecision.NO.getType()); } public void testResolveFromType() { diff --git a/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java b/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java new file mode 100644 index 0000000000000..15d12fb1ce932 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java @@ -0,0 +1,173 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.persistent.decider; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.persistent.TestPersistentTasksPlugin; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; + +import java.util.Collection; +import java.util.concurrent.CountDownLatch; + +import static java.util.Collections.singletonList; +import static org.elasticsearch.persistent.decider.EnableAssignmentDecider.Allocation; +import static org.elasticsearch.persistent.decider.EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(minNumDataNodes = 1) +public class EnableAssignmentDeciderIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return singletonList(TestPersistentTasksPlugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return nodePlugins(); + } + + @Override + protected boolean ignoreExternalCluster() { + return true; + } + + /** + * Test that the {@link EnableAssignmentDecider#CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING} setting correctly + * prevents persistent tasks to be assigned after a cluster restart. + */ + public void testEnableAssignmentAfterRestart() throws Exception { + final int numberOfTasks = randomIntBetween(1, 10); + logger.trace("creating {} persistent tasks", numberOfTasks); + + final CountDownLatch latch = new CountDownLatch(numberOfTasks); + for (int i = 0; i < numberOfTasks; i++) { + PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class); + service.startPersistentTask("task_" + i, TestPersistentTasksExecutor.NAME, randomTaskParams(), + new ActionListener>() { + @Override + public void onResponse(PersistentTask task) { + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + latch.countDown(); + } + }); + } + latch.await(); + + ClusterService clusterService = internalCluster().clusterService(internalCluster().getMasterName()); + PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertEquals(numberOfTasks, tasks.tasks().stream().filter(t -> TestPersistentTasksExecutor.NAME.equals(t.getTaskName())).count()); + + logger.trace("waiting for the tasks to be running"); + assertBusy(() -> { + ListTasksResponse listTasks = client().admin().cluster().prepareListTasks() + .setActions(TestPersistentTasksExecutor.NAME + "[c]") + .get(); + assertThat(listTasks.getTasks().size(), equalTo(numberOfTasks)); + }); + + try { + logger.trace("disable persistent tasks assignment"); + disablePersistentTasksAssignment(); + + logger.trace("restart the cluster"); + internalCluster().fullRestart(); + ensureYellow(); + + logger.trace("persistent tasks assignment is still disabled"); + assertEnableAssignmentSetting(Allocation.NONE); + + logger.trace("persistent tasks are not assigned"); + tasks = internalCluster().clusterService().state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertEquals(numberOfTasks, tasks.tasks().stream() + .filter(t -> TestPersistentTasksExecutor.NAME.equals(t.getTaskName())) + .filter(t -> t.isAssigned() == false) + .count()); + + ListTasksResponse runningTasks = client().admin().cluster().prepareListTasks() + .setActions(TestPersistentTasksExecutor.NAME + "[c]") + .get(); + assertThat(runningTasks.getTasks().size(), equalTo(0)); + + logger.trace("enable persistent tasks assignment"); + if (randomBoolean()) { + enablePersistentTasksAssignment(); + } else { + resetPersistentTasksAssignment(); + } + + assertBusy(() -> { + ListTasksResponse listTasks = client().admin().cluster().prepareListTasks() + .setActions(TestPersistentTasksExecutor.NAME + "[c]") + .get(); + assertThat(listTasks.getTasks().size(), equalTo(numberOfTasks)); + }); + + } finally { + resetPersistentTasksAssignment(); + } + } + + private void assertEnableAssignmentSetting(final Allocation expected) { + ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().clear().setMetaData(true).get(); + Settings settings = clusterStateResponse.getState().getMetaData().settings(); + + String value = settings.get(CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey()); + assertThat(Allocation.fromString(value), equalTo(expected)); + } + + private void disablePersistentTasksAssignment() { + Settings.Builder settings = Settings.builder().put(CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey(), Allocation.NONE); + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings)); + } + + private void enablePersistentTasksAssignment() { + Settings.Builder settings = Settings.builder().put(CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey(), Allocation.ALL); + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings)); + } + + private void resetPersistentTasksAssignment() { + Settings.Builder settings = Settings.builder().putNull(CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey()); + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings)); + } + + /** Returns a random task parameter **/ + private static PersistentTaskParams randomTaskParams() { + if (randomBoolean()) { + return null; + } + return new TestParams(randomAlphaOfLength(10)); + } +} From 3648a2e9df95314801a0182781730e5933812330 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 21 Mar 2018 15:54:14 +0100 Subject: [PATCH 4/4] Fix test --- .../persistent/PersistentTasksDecidersTestCase.java | 11 +++++++++-- .../decider/EnableAssignmentDeciderTests.java | 3 ++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java index d7fe2f535d771..356e518198c52 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; @@ -87,7 +88,14 @@ protected ClusterState reassign(final ClusterState clusterState) { return persistentTasksClusterService.reassignTasks(clusterState); } - protected static ClusterState createClusterStateWithTasks(final Settings settings, final int nbNodes, final int nbTasks) { + protected void updateSettings(final Settings settings) { + ClusterSettings clusterSettings = clusterService.getClusterSettings(); + Settings.Builder updated = Settings.builder(); + clusterSettings.updateDynamicSettings(settings, updated, Settings.builder(), getTestClass().getName()); + clusterSettings.applySettings(updated.build()); + } + + protected static ClusterState createClusterStateWithTasks(final int nbNodes, final int nbTasks) { DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); for (int i = 0; i < nbNodes; i++) { nodes.add(new DiscoveryNode("_node_" + i, buildNewFakeTransportAddress(), Version.CURRENT)); @@ -100,7 +108,6 @@ protected static ClusterState createClusterStateWithTasks(final Settings setting MetaData metaData = MetaData.builder() .putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build()) - .persistentSettings(settings) .build(); return ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).metaData(metaData).build(); diff --git a/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderTests.java b/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderTests.java index 7c8b2ba764c47..7aedde1ab9b60 100644 --- a/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderTests.java @@ -40,8 +40,9 @@ public void testEnableAssignment() { Settings settings = Settings.builder() .put(EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey(), allocation.toString()) .build(); + updateSettings(settings); - ClusterState clusterState = reassign(createClusterStateWithTasks(settings, nbNodes, nbTasks)); + ClusterState clusterState = reassign(createClusterStateWithTasks(nbNodes, nbTasks)); if (allocation == EnableAssignmentDecider.Allocation.ALL) { assertNbAssignedTasks(nbTasks, clusterState); } else {