Skip to content

Commit ef91aca

Browse files
committed
[ML] Migrate unallocated jobs and datafeeds (elastic#37430)
Migrate ml job and datafeed config of open jobs and update the parameters of the persistent tasks as they become unallocated during a rolling upgrade. Block allocation of ml persistent tasks until the configs are migrated.
1 parent ade684c commit ef91aca

File tree

13 files changed

+586
-182
lines changed

13 files changed

+586
-182
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
178178

179179
private static <T extends ToXContent> void mapValuesToXContent(ParseField field, Map<String, T> map, XContentBuilder builder,
180180
Params params) throws IOException {
181+
if (map.isEmpty()) {
182+
return;
183+
}
184+
181185
builder.startArray(field.getPreferredName());
182186
for (Map.Entry<String, T> entry : map.entrySet()) {
183187
entry.getValue().toXContent(builder, params);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,16 @@
66

77
package org.elasticsearch.xpack.core.ml;
88

9+
import org.elasticsearch.cluster.node.DiscoveryNodes;
910
import org.elasticsearch.common.Nullable;
11+
import org.elasticsearch.persistent.PersistentTasksClusterService;
1012
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
1113
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
1214
import org.elasticsearch.xpack.core.ml.job.config.JobState;
1315
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
1416

17+
import java.util.Collection;
1518
import java.util.Collections;
16-
import java.util.List;
1719
import java.util.Set;
1820
import java.util.stream.Collectors;
1921

@@ -133,6 +135,42 @@ public static Set<String> openJobIds(@Nullable PersistentTasksCustomMetaData tas
133135
.collect(Collectors.toSet());
134136
}
135137

138+
/**
139+
* Get the job Ids of anomaly detector job tasks that do
140+
* not have an assignment.
141+
*
142+
* @param tasks Persistent tasks. If null an empty set is returned.
143+
* @param nodes The cluster nodes
144+
* @return The job Ids of tasks to do not have an assignment.
145+
*/
146+
public static Set<String> unallocatedJobIds(@Nullable PersistentTasksCustomMetaData tasks,
147+
DiscoveryNodes nodes) {
148+
return unallocatedJobTasks(tasks, nodes).stream()
149+
.map(task -> task.getId().substring(JOB_TASK_ID_PREFIX.length()))
150+
.collect(Collectors.toSet());
151+
}
152+
153+
/**
154+
* The job tasks that do not have an allocation as determined by
155+
* {@link PersistentTasksClusterService#needsReassignment(PersistentTasksCustomMetaData.Assignment, DiscoveryNodes)}
156+
*
157+
* @param tasks Persistent tasks. If null an empty set is returned.
158+
* @param nodes The cluster nodes
159+
* @return Unallocated job tasks
160+
*/
161+
public static Collection<PersistentTasksCustomMetaData.PersistentTask> unallocatedJobTasks(
162+
@Nullable PersistentTasksCustomMetaData tasks,
163+
DiscoveryNodes nodes) {
164+
if (tasks == null) {
165+
return Collections.emptyList();
166+
}
167+
168+
return tasks.findTasks(JOB_TASK_NAME, task -> true)
169+
.stream()
170+
.filter(task -> PersistentTasksClusterService.needsReassignment(task.getAssignment(), nodes))
171+
.collect(Collectors.toList());
172+
}
173+
136174
/**
137175
* The datafeed Ids of started datafeed tasks
138176
*
@@ -151,26 +189,39 @@ public static Set<String> startedDatafeedIds(@Nullable PersistentTasksCustomMeta
151189
}
152190

153191
/**
154-
* Is there an ml anomaly detector job task for the job {@code jobId}?
155-
* @param jobId The job id
156-
* @param tasks Persistent tasks
157-
* @return True if the job has a task
192+
* Get the datafeed Ids of started datafeed tasks
193+
* that do not have an assignment.
194+
*
195+
* @param tasks Persistent tasks. If null an empty set is returned.
196+
* @param nodes The cluster nodes
197+
* @return The job Ids of tasks to do not have an assignment.
158198
*/
159-
public static boolean taskExistsForJob(String jobId, PersistentTasksCustomMetaData tasks) {
160-
return openJobIds(tasks).contains(jobId);
199+
public static Set<String> unallocatedDatafeedIds(@Nullable PersistentTasksCustomMetaData tasks,
200+
DiscoveryNodes nodes) {
201+
202+
return unallocatedDatafeedTasks(tasks, nodes).stream()
203+
.map(task -> task.getId().substring(DATAFEED_TASK_ID_PREFIX.length()))
204+
.collect(Collectors.toSet());
161205
}
162206

163207
/**
164-
* Read the active anomaly detector job tasks.
165-
* Active tasks are not {@code JobState.CLOSED} or {@code JobState.FAILED}.
208+
* The datafeed tasks that do not have an allocation as determined by
209+
* {@link PersistentTasksClusterService#needsReassignment(PersistentTasksCustomMetaData.Assignment, DiscoveryNodes)}
166210
*
167-
* @param tasks Persistent tasks
168-
* @return The job tasks excluding closed and failed jobs
211+
* @param tasks Persistent tasks. If null an empty set is returned.
212+
* @param nodes The cluster nodes
213+
* @return Unallocated datafeed tasks
169214
*/
170-
public static List<PersistentTasksCustomMetaData.PersistentTask<?>> activeJobTasks(PersistentTasksCustomMetaData tasks) {
171-
return tasks.findTasks(JOB_TASK_NAME, task -> true)
215+
public static Collection<PersistentTasksCustomMetaData.PersistentTask> unallocatedDatafeedTasks(
216+
@Nullable PersistentTasksCustomMetaData tasks,
217+
DiscoveryNodes nodes) {
218+
if (tasks == null) {
219+
return Collections.emptyList();
220+
}
221+
222+
return tasks.findTasks(DATAFEED_TASK_NAME, task -> true)
172223
.stream()
173-
.filter(task -> ((JobTaskState) task.getState()).getState().isAnyOf(JobState.CLOSED, JobState.FAILED) == false)
224+
.filter(task -> PersistentTasksClusterService.needsReassignment(task.getAssignment(), nodes))
174225
.collect(Collectors.toList());
175226
}
176227
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66

77
package org.elasticsearch.xpack.core.ml;
88

9+
import org.elasticsearch.Version;
10+
import org.elasticsearch.cluster.node.DiscoveryNode;
11+
import org.elasticsearch.cluster.node.DiscoveryNodes;
12+
import org.elasticsearch.common.transport.TransportAddress;
913
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
1014
import org.elasticsearch.test.ESTestCase;
1115
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
@@ -14,12 +18,14 @@
1418
import org.elasticsearch.xpack.core.ml.job.config.JobState;
1519
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
1620

21+
import java.net.InetAddress;
22+
1723
import static org.hamcrest.Matchers.containsInAnyOrder;
1824
import static org.hamcrest.Matchers.empty;
1925

2026
public class MlTasksTests extends ESTestCase {
2127
public void testGetJobState() {
22-
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
28+
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
2329
// A missing task is a closed job
2430
assertEquals(JobState.CLOSED, MlTasks.getJobState("foo", tasksBuilder.build()));
2531
// A task with no status is opening
@@ -52,7 +58,7 @@ public void testGetDatefeedState() {
5258
public void testGetJobTask() {
5359
assertNull(MlTasks.getJobTask("foo", null));
5460

55-
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
61+
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
5662
tasksBuilder.addTask(MlTasks.jobTaskId("foo"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo"),
5763
new PersistentTasksCustomMetaData.Assignment("bar", "test assignment"));
5864

@@ -73,7 +79,7 @@ public void testGetDatafeedTask() {
7379
}
7480

7581
public void testOpenJobIds() {
76-
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
82+
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
7783
assertThat(MlTasks.openJobIds(tasksBuilder.build()), empty());
7884

7985
tasksBuilder.addTask(MlTasks.jobTaskId("foo-1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"),
@@ -92,7 +98,7 @@ public void testOpenJobIds_GivenNull() {
9298
}
9399

94100
public void testStartedDatafeedIds() {
95-
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
101+
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
96102
assertThat(MlTasks.openJobIds(tasksBuilder.build()), empty());
97103

98104
tasksBuilder.addTask(MlTasks.jobTaskId("job-1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"),
@@ -111,16 +117,48 @@ public void testStartedDatafeedIds_GivenNull() {
111117
assertThat(MlTasks.startedDatafeedIds(null), empty());
112118
}
113119

114-
public void testTaskExistsForJob() {
115-
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
116-
assertFalse(MlTasks.taskExistsForJob("job-1", tasksBuilder.build()));
117-
118-
tasksBuilder.addTask(MlTasks.jobTaskId("foo"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo"),
119-
new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
120-
tasksBuilder.addTask(MlTasks.jobTaskId("bar"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("bar"),
120+
public void testUnallocatedJobIds() {
121+
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
122+
tasksBuilder.addTask(MlTasks.jobTaskId("job_with_assignment"), MlTasks.JOB_TASK_NAME,
123+
new OpenJobAction.JobParams("job_with_assignment"),
121124
new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
125+
tasksBuilder.addTask(MlTasks.jobTaskId("job_without_assignment"), MlTasks.JOB_TASK_NAME,
126+
new OpenJobAction.JobParams("job_without_assignment"),
127+
new PersistentTasksCustomMetaData.Assignment(null, "test assignment"));
128+
tasksBuilder.addTask(MlTasks.jobTaskId("job_without_node"), MlTasks.JOB_TASK_NAME,
129+
new OpenJobAction.JobParams("job_without_node"),
130+
new PersistentTasksCustomMetaData.Assignment("dead-node", "expired node"));
131+
132+
DiscoveryNodes nodes = DiscoveryNodes.builder()
133+
.add(new DiscoveryNode("node-1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT))
134+
.localNodeId("node-1")
135+
.masterNodeId("node-1")
136+
.build();
137+
138+
assertThat(MlTasks.unallocatedJobIds(tasksBuilder.build(), nodes),
139+
containsInAnyOrder("job_without_assignment", "job_without_node"));
140+
}
122141

123-
assertFalse(MlTasks.taskExistsForJob("job-1", tasksBuilder.build()));
124-
assertTrue(MlTasks.taskExistsForJob("foo", tasksBuilder.build()));
142+
public void testUnallocatedDatafeedIds() {
143+
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
144+
tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed_with_assignment"), MlTasks.DATAFEED_TASK_NAME,
145+
new StartDatafeedAction.DatafeedParams("datafeed_with_assignment", 0L),
146+
new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
147+
tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed_without_assignment"), MlTasks.DATAFEED_TASK_NAME,
148+
new StartDatafeedAction.DatafeedParams("datafeed_without_assignment", 0L),
149+
new PersistentTasksCustomMetaData.Assignment(null, "test assignment"));
150+
tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed_without_node"), MlTasks.DATAFEED_TASK_NAME,
151+
new StartDatafeedAction.DatafeedParams("datafeed_without_node", 0L),
152+
new PersistentTasksCustomMetaData.Assignment("dead_node", "expired node"));
153+
154+
155+
DiscoveryNodes nodes = DiscoveryNodes.builder()
156+
.add(new DiscoveryNode("node-1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT))
157+
.localNodeId("node-1")
158+
.masterNodeId("node-1")
159+
.build();
160+
161+
assertThat(MlTasks.unallocatedDatafeedIds(tasksBuilder.build(), nodes),
162+
containsInAnyOrder("datafeed_without_assignment", "datafeed_without_node"));
125163
}
126164
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void clusterChanged(ClusterChangedEvent event) {
5757
return;
5858
}
5959

60-
mlConfigMigrator.migrateConfigsWithoutTasks(event.state(), ActionListener.wrap(
60+
mlConfigMigrator.migrateConfigs(event.state(), ActionListener.wrap(
6161
response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(event)),
6262
e -> {
6363
logger.error("error migrating ml configurations", e);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,10 @@ static boolean mlConfigIndexIsAllocated(ClusterState clusterState) {
7979
* False if {@link #canStartMigration(ClusterState)} returns {@code false}
8080
* False if the job is not in the cluster state
8181
* False if the {@link Job#isDeleting()}
82-
* False if the job has a persistent task
82+
* False if the job has an allocated persistent task
8383
* True otherwise i.e. the job is present, not deleting
84-
* and does not have a persistent task.
84+
* and does not have a persistent task or its persistent
85+
* task is un-allocated
8586
*
8687
* @param jobId The job Id
8788
* @param clusterState The cluster state
@@ -100,15 +101,17 @@ public boolean jobIsEligibleForMigration(String jobId, ClusterState clusterState
100101
}
101102

102103
PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
103-
return MlTasks.openJobIds(persistentTasks).contains(jobId) == false;
104+
return MlTasks.openJobIds(persistentTasks).contains(jobId) == false ||
105+
MlTasks.unallocatedJobIds(persistentTasks, clusterState.nodes()).contains(jobId);
104106
}
105107

106108
/**
107109
* Is the datafeed a eligible for migration? Returns:
108110
* False if {@link #canStartMigration(ClusterState)} returns {@code false}
109111
* False if the datafeed is not in the cluster state
110-
* False if the datafeed has a persistent task
111-
* True otherwise i.e. the datafeed is present and does not have a persistent task.
112+
* False if the datafeed has an allocated persistent task
113+
* True otherwise i.e. the datafeed is present and does not have a persistent
114+
* task or its persistent task is un-allocated
112115
*
113116
* @param datafeedId The datafeed Id
114117
* @param clusterState The cluster state
@@ -125,6 +128,7 @@ public boolean datafeedIsEligibleForMigration(String datafeedId, ClusterState cl
125128
}
126129

127130
PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
128-
return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false;
131+
return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false
132+
|| MlTasks.unallocatedDatafeedIds(persistentTasks, clusterState.nodes()).contains(datafeedId);
129133
}
130134
}

0 commit comments

Comments
 (0)