Skip to content

Commit 92de94c

Browse files
author
David Roberts
authored
[ML] Don't treat stale FAILED jobs as OPENING in job allocation (#31800)
Job persistent tasks with stale allocation IDs used to always be considered as OPENING jobs in the ML job node allocation decision. However, FAILED jobs are not relocated to other nodes, which leads to them blocking up the nodes they failed on after node restarts. FAILED jobs should not restrict how many other jobs can open on a node, regardless of whether they are stale or not. Closes #31794
1 parent 9c11bf1 commit 92de94c

File tree

2 files changed

+69
-10
lines changed

2 files changed

+69
-10
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -210,16 +210,27 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j
210210
for (PersistentTasksCustomMetaData.PersistentTask<?> assignedTask : assignedTasks) {
211211
JobTaskState jobTaskState = (JobTaskState) assignedTask.getState();
212212
JobState jobState;
213-
if (jobTaskState == null || // executor node didn't have the chance to set job status to OPENING
214-
// previous executor node failed and current executor node didn't have the chance to set job status to OPENING
215-
jobTaskState.isStatusStale(assignedTask)) {
213+
if (jobTaskState == null) {
214+
// executor node didn't have the chance to set job status to OPENING
216215
++numberOfAllocatingJobs;
217216
jobState = JobState.OPENING;
218217
} else {
219218
jobState = jobTaskState.getState();
219+
if (jobTaskState.isStatusStale(assignedTask)) {
220+
if (jobState == JobState.CLOSING) {
221+
// previous executor node failed while the job was closing - it won't
222+
// be reopened, so consider it CLOSED for resource usage purposes
223+
jobState = JobState.CLOSED;
224+
} else if (jobState != JobState.FAILED) {
225+
// previous executor node failed and current executor node didn't
226+
// have the chance to set job status to OPENING
227+
++numberOfAllocatingJobs;
228+
jobState = JobState.OPENING;
229+
}
230+
}
220231
}
221-
// Don't count FAILED jobs, as they don't consume native memory
222-
if (jobState != JobState.FAILED) {
232+
// Don't count CLOSED or FAILED jobs, as they don't consume native memory
233+
if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) {
223234
++numberOfAssignedJobs;
224235
String assignedJobId = ((OpenJobAction.JobParams) assignedTask.getParams()).getJobId();
225236
Job assignedJob = mlMetadata.getJobs().get(assignedJobId);

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import java.io.IOException;
5656
import java.net.InetAddress;
5757
import java.util.ArrayList;
58-
import java.util.Arrays;
5958
import java.util.Collections;
6059
import java.util.Date;
6160
import java.util.HashMap;
@@ -285,7 +284,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
285284
nodeAttr, Collections.emptySet(), Version.CURRENT))
286285
.build();
287286

288-
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
287+
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
289288
addJobTask("job_id1", "_node_id1", null, tasksBuilder);
290289
addJobTask("job_id2", "_node_id1", null, tasksBuilder);
291290
addJobTask("job_id3", "_node_id2", null, tasksBuilder);
@@ -340,6 +339,55 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
340339
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
341340
}
342341

342+
public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() {
343+
Map<String, String> nodeAttr = new HashMap<>();
344+
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
345+
DiscoveryNodes nodes = DiscoveryNodes.builder()
346+
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
347+
nodeAttr, Collections.emptySet(), Version.CURRENT))
348+
.add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301),
349+
nodeAttr, Collections.emptySet(), Version.CURRENT))
350+
.add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302),
351+
nodeAttr, Collections.emptySet(), Version.CURRENT))
352+
.build();
353+
354+
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
355+
addJobTask("job_id1", "_node_id1", JobState.fromString("failed"), tasksBuilder);
356+
// This will make the allocation stale for job_id1
357+
tasksBuilder.reassignTask(MlMetadata.jobTaskId("job_id1"), new Assignment("_node_id1", "test assignment"));
358+
addJobTask("job_id2", "_node_id1", null, tasksBuilder);
359+
addJobTask("job_id3", "_node_id2", null, tasksBuilder);
360+
addJobTask("job_id4", "_node_id2", null, tasksBuilder);
361+
addJobTask("job_id5", "_node_id3", null, tasksBuilder);
362+
addJobTask("job_id6", "_node_id3", null, tasksBuilder);
363+
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
364+
365+
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
366+
csBuilder.nodes(nodes);
367+
MetaData.Builder metaData = MetaData.builder();
368+
RoutingTable.Builder routingTable = RoutingTable.builder();
369+
addJobAndIndices(metaData, routingTable, "job_id1", "job_id2", "job_id3", "job_id4", "job_id5", "job_id6", "job_id7", "job_id8");
370+
csBuilder.routingTable(routingTable.build());
371+
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
372+
csBuilder.metaData(metaData);
373+
374+
ClusterState cs = csBuilder.build();
375+
// Allocation won't be possible if the stale failed job is treated as opening
376+
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, 30, logger);
377+
assertEquals("_node_id1", result.getExecutorNode());
378+
379+
tasksBuilder = PersistentTasksCustomMetaData.builder(tasks);
380+
addJobTask("job_id7", "_node_id1", null, tasksBuilder);
381+
tasks = tasksBuilder.build();
382+
383+
csBuilder = ClusterState.builder(cs);
384+
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
385+
cs = csBuilder.build();
386+
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", cs, 2, 10, 30, logger);
387+
assertNull("no node selected, because OPENING state", result.getExecutorNode());
388+
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
389+
}
390+
343391
public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() {
344392
Map<String, String> nodeAttr = new HashMap<>();
345393
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
@@ -710,13 +758,13 @@ private ClusterState getClusterStateWithMappingsWithMetaData(Map<String, Object>
710758

711759
private static Function<String, Job> jobWithRulesCreator() {
712760
return jobId -> {
713-
DetectionRule rule = new DetectionRule.Builder(Arrays.asList(
761+
DetectionRule rule = new DetectionRule.Builder(Collections.singletonList(
714762
new RuleCondition(RuleCondition.AppliesTo.TYPICAL, Operator.LT, 100.0)
715763
)).build();
716764

717765
Detector.Builder detector = new Detector.Builder("count", null);
718-
detector.setRules(Arrays.asList(rule));
719-
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build()));
766+
detector.setRules(Collections.singletonList(rule));
767+
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
720768
DataDescription.Builder dataDescription = new DataDescription.Builder();
721769
Job.Builder job = new Job.Builder(jobId);
722770
job.setAnalysisConfig(analysisConfig);

0 commit comments

Comments
 (0)