Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

private static <T extends ToXContent> void mapValuesToXContent(ParseField field, Map<String, T> map, XContentBuilder builder,
Params params) throws IOException {
if (map.isEmpty()) {
return;
}

builder.startArray(field.getPreferredName());
for (Map.Entry<String, T> entry : map.entrySet()) {
entry.getValue().toXContent(builder, params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@

package org.elasticsearch.xpack.core.ml;

import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.persistent.PersistentTasksClusterService;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -133,6 +135,42 @@ public static Set<String> openJobIds(@Nullable PersistentTasksCustomMetaData tas
.collect(Collectors.toSet());
}

/**
* Get the job Ids of anomaly detector job tasks that do
* not have an assignment.
*
* @param tasks Persistent tasks. If null an empty set is returned.
* @param nodes The cluster nodes
* @return The job Ids of tasks to do not have an assignment.
*/
public static Set<String> unallocatedJobIds(@Nullable PersistentTasksCustomMetaData tasks,
DiscoveryNodes nodes) {
return unallocatedJobTasks(tasks, nodes).stream()
.map(task -> task.getId().substring(JOB_TASK_ID_PREFIX.length()))
.collect(Collectors.toSet());
}

/**
* The job tasks that do not have an allocation as determined by
* {@link PersistentTasksClusterService#needsReassignment(PersistentTasksCustomMetaData.Assignment, DiscoveryNodes)}
*
* @param tasks Persistent tasks. If null an empty set is returned.
* @param nodes The cluster nodes
* @return Unallocated job tasks
*/
public static Collection<PersistentTasksCustomMetaData.PersistentTask> unallocatedJobTasks(
@Nullable PersistentTasksCustomMetaData tasks,
DiscoveryNodes nodes) {
if (tasks == null) {
return Collections.emptyList();
}

return tasks.findTasks(JOB_TASK_NAME, task -> true)
.stream()
.filter(task -> PersistentTasksClusterService.needsReassignment(task.getAssignment(), nodes))
.collect(Collectors.toList());
}

/**
* The datafeed Ids of started datafeed tasks
*
Expand All @@ -151,26 +189,39 @@ public static Set<String> startedDatafeedIds(@Nullable PersistentTasksCustomMeta
}

/**
* Is there an ml anomaly detector job task for the job {@code jobId}?
* @param jobId The job id
* @param tasks Persistent tasks
* @return True if the job has a task
* Get the datafeed Ids of started datafeed tasks
* that do not have an assignment.
*
* @param tasks Persistent tasks. If null an empty set is returned.
* @param nodes The cluster nodes
* @return The job Ids of tasks to do not have an assignment.
*/
public static boolean taskExistsForJob(String jobId, PersistentTasksCustomMetaData tasks) {
return openJobIds(tasks).contains(jobId);
public static Set<String> unallocatedDatafeedIds(@Nullable PersistentTasksCustomMetaData tasks,
DiscoveryNodes nodes) {

return unallocatedDatafeedTasks(tasks, nodes).stream()
.map(task -> task.getId().substring(DATAFEED_TASK_ID_PREFIX.length()))
.collect(Collectors.toSet());
}

/**
* Read the active anomaly detector job tasks.
* Active tasks are not {@code JobState.CLOSED} or {@code JobState.FAILED}.
* The datafeed tasks that do not have an allocation as determined by
* {@link PersistentTasksClusterService#needsReassignment(PersistentTasksCustomMetaData.Assignment, DiscoveryNodes)}
*
* @param tasks Persistent tasks
* @return The job tasks excluding closed and failed jobs
* @param tasks Persistent tasks. If null an empty set is returned.
* @param nodes The cluster nodes
* @return Unallocated datafeed tasks
*/
public static List<PersistentTasksCustomMetaData.PersistentTask<?>> activeJobTasks(PersistentTasksCustomMetaData tasks) {
return tasks.findTasks(JOB_TASK_NAME, task -> true)
public static Collection<PersistentTasksCustomMetaData.PersistentTask> unallocatedDatafeedTasks(
@Nullable PersistentTasksCustomMetaData tasks,
DiscoveryNodes nodes) {
if (tasks == null) {
return Collections.emptyList();
}

return tasks.findTasks(DATAFEED_TASK_NAME, task -> true)
.stream()
.filter(task -> ((JobTaskState) task.getState()).getState().isAnyOf(JobState.CLOSED, JobState.FAILED) == false)
.filter(task -> PersistentTasksClusterService.needsReassignment(task.getAssignment(), nodes))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

package org.elasticsearch.xpack.core.ml;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
Expand All @@ -14,12 +18,14 @@
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;

import java.net.InetAddress;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;

public class MlTasksTests extends ESTestCase {
public void testGetJobState() {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
// A missing task is a closed job
assertEquals(JobState.CLOSED, MlTasks.getJobState("foo", tasksBuilder.build()));
// A task with no status is opening
Expand Down Expand Up @@ -52,7 +58,7 @@ public void testGetDatefeedState() {
public void testGetJobTask() {
assertNull(MlTasks.getJobTask("foo", null));

PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
tasksBuilder.addTask(MlTasks.jobTaskId("foo"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo"),
new PersistentTasksCustomMetaData.Assignment("bar", "test assignment"));

Expand All @@ -73,7 +79,7 @@ public void testGetDatafeedTask() {
}

public void testOpenJobIds() {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
assertThat(MlTasks.openJobIds(tasksBuilder.build()), empty());

tasksBuilder.addTask(MlTasks.jobTaskId("foo-1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"),
Expand All @@ -92,7 +98,7 @@ public void testOpenJobIds_GivenNull() {
}

public void testStartedDatafeedIds() {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
assertThat(MlTasks.openJobIds(tasksBuilder.build()), empty());

tasksBuilder.addTask(MlTasks.jobTaskId("job-1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"),
Expand All @@ -111,16 +117,48 @@ public void testStartedDatafeedIds_GivenNull() {
assertThat(MlTasks.startedDatafeedIds(null), empty());
}

public void testTaskExistsForJob() {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
assertFalse(MlTasks.taskExistsForJob("job-1", tasksBuilder.build()));

tasksBuilder.addTask(MlTasks.jobTaskId("foo"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo"),
new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
tasksBuilder.addTask(MlTasks.jobTaskId("bar"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("bar"),
public void testUnallocatedJobIds() {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
tasksBuilder.addTask(MlTasks.jobTaskId("job_with_assignment"), MlTasks.JOB_TASK_NAME,
new OpenJobAction.JobParams("job_with_assignment"),
new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
tasksBuilder.addTask(MlTasks.jobTaskId("job_without_assignment"), MlTasks.JOB_TASK_NAME,
new OpenJobAction.JobParams("job_without_assignment"),
new PersistentTasksCustomMetaData.Assignment(null, "test assignment"));
tasksBuilder.addTask(MlTasks.jobTaskId("job_without_node"), MlTasks.JOB_TASK_NAME,
new OpenJobAction.JobParams("job_without_node"),
new PersistentTasksCustomMetaData.Assignment("dead-node", "expired node"));

DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node-1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT))
.localNodeId("node-1")
.masterNodeId("node-1")
.build();

assertThat(MlTasks.unallocatedJobIds(tasksBuilder.build(), nodes),
containsInAnyOrder("job_without_assignment", "job_without_node"));
}

assertFalse(MlTasks.taskExistsForJob("job-1", tasksBuilder.build()));
assertTrue(MlTasks.taskExistsForJob("foo", tasksBuilder.build()));
public void testUnallocatedDatafeedIds() {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed_with_assignment"), MlTasks.DATAFEED_TASK_NAME,
new StartDatafeedAction.DatafeedParams("datafeed_with_assignment", 0L),
new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed_without_assignment"), MlTasks.DATAFEED_TASK_NAME,
new StartDatafeedAction.DatafeedParams("datafeed_without_assignment", 0L),
new PersistentTasksCustomMetaData.Assignment(null, "test assignment"));
tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed_without_node"), MlTasks.DATAFEED_TASK_NAME,
new StartDatafeedAction.DatafeedParams("datafeed_without_node", 0L),
new PersistentTasksCustomMetaData.Assignment("dead_node", "expired node"));


DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node-1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT))
.localNodeId("node-1")
.masterNodeId("node-1")
.build();

assertThat(MlTasks.unallocatedDatafeedIds(tasksBuilder.build(), nodes),
containsInAnyOrder("datafeed_without_assignment", "datafeed_without_node"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void clusterChanged(ClusterChangedEvent event) {
return;
}

mlConfigMigrator.migrateConfigsWithoutTasks(event.state(), ActionListener.wrap(
mlConfigMigrator.migrateConfigs(event.state(), ActionListener.wrap(
response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(event)),
e -> {
logger.error("error migrating ml configurations", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ static boolean mlConfigIndexIsAllocated(ClusterState clusterState) {
* False if {@link #canStartMigration(ClusterState)} returns {@code false}
* False if the job is not in the cluster state
* False if the {@link Job#isDeleting()}
* False if the job has a persistent task
* False if the job has an allocated persistent task
* True otherwise i.e. the job is present, not deleting
* and does not have a persistent task.
* and does not have a persistent task or its persistent
* task is un-allocated
*
* @param jobId The job Id
* @param clusterState The cluster state
Expand All @@ -100,15 +101,17 @@ public boolean jobIsEligibleForMigration(String jobId, ClusterState clusterState
}

PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
return MlTasks.openJobIds(persistentTasks).contains(jobId) == false;
return MlTasks.openJobIds(persistentTasks).contains(jobId) == false ||
MlTasks.unallocatedJobIds(persistentTasks, clusterState.nodes()).contains(jobId);
}

/**
* Is the datafeed a eligible for migration? Returns:
* False if {@link #canStartMigration(ClusterState)} returns {@code false}
* False if the datafeed is not in the cluster state
* False if the datafeed has a persistent task
* True otherwise i.e. the datafeed is present and does not have a persistent task.
* False if the datafeed has an allocated persistent task
* True otherwise i.e. the datafeed is present and does not have a persistent
* task or its persistent task is un-allocated
*
* @param datafeedId The datafeed Id
* @param clusterState The cluster state
Expand All @@ -125,6 +128,7 @@ public boolean datafeedIsEligibleForMigration(String datafeedId, ClusterState cl
}

PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false;
return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false
|| MlTasks.unallocatedDatafeedIds(persistentTasks, clusterState.nodes()).contains(datafeedId);
}
}
Loading