From ccef6e7a8aba216fc319d84806d7cbaa2bcc5154 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 4 Jul 2018 16:30:56 +0100 Subject: [PATCH 1/2] [ML] Don't treat stale FAILED jobs as OPENING in job allocation Job persistent tasks with stale allocation IDs used to always be considered as OPENING jobs in the ML job node allocation decision. However, FAILED jobs are not relocated to other nodes, which leads to them blocking up the nodes they failed on after node restarts. FAILED jobs should not restrict how many other jobs can open on a node, regardless of whether they are stale or not. Closes #31794 --- .../ml/action/TransportOpenJobAction.java | 21 +++++-- .../action/TransportOpenJobActionTests.java | 58 +++++++++++++++++-- 2 files changed, 69 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index e7fb0fe5fb315..d394aadbe2b67 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -210,16 +210,27 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j for (PersistentTasksCustomMetaData.PersistentTask assignedTask : assignedTasks) { JobTaskState jobTaskState = (JobTaskState) assignedTask.getState(); JobState jobState; - if (jobTaskState == null || // executor node didn't have the chance to set job status to OPENING - // previous executor node failed and current executor node didn't have the chance to set job status to OPENING - jobTaskState.isStatusStale(assignedTask)) { + if (jobTaskState == null) { + // executor node didn't have the chance to set job status to OPENING ++numberOfAllocatingJobs; jobState = JobState.OPENING; } else { jobState = jobTaskState.getState(); + if (jobTaskState.isStatusStale(assignedTask)) { + if (jobState == JobState.CLOSING) { + // previous executor node failed while the job was closing - it won't + // be reopened, so consider it CLOSED for resource usage purposes + jobState = JobState.CLOSED; + } else if (jobState != JobState.FAILED) { + // previous executor node failed and current executor node didn't + // have the chance to set job status to OPENING + ++numberOfAllocatingJobs; + jobState = JobState.OPENING; + } + } } - // Don't count FAILED jobs, as they don't consume native memory - if (jobState != JobState.FAILED) { + // Don't count CLOSED or FAILED jobs, as they don't consume native memory + if (jobState != JobState.CLOSED && jobState != JobState.FAILED) { ++numberOfAssignedJobs; String assignedJobId = ((OpenJobAction.JobParams) assignedTask.getParams()).getJobId(); Job assignedJob = mlMetadata.getJobs().get(assignedJobId); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index b5a315d9687bb..dd8ddf3aa62ad 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -55,7 +55,6 @@ import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -285,7 +284,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { nodeAttr, Collections.emptySet(), Version.CURRENT)) .build(); - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id1", "_node_id1", null, tasksBuilder); addJobTask("job_id2", "_node_id1", null, tasksBuilder); addJobTask("job_id3", "_node_id2", null, tasksBuilder); @@ -340,6 +339,55 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } + public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() { + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302), + nodeAttr, Collections.emptySet(), Version.CURRENT)) + .build(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id1", "_node_id1", JobState.fromString("failed"), tasksBuilder); + // This will make the allocation stale for job_id1 + tasksBuilder.reassignTask(MlMetadata.jobTaskId("job_id1"), new Assignment("_node_id1", "test assignment")); + addJobTask("job_id2", "_node_id1", null, tasksBuilder); + addJobTask("job_id3", "_node_id2", null, tasksBuilder); + addJobTask("job_id4", "_node_id2", null, tasksBuilder); + addJobTask("job_id5", "_node_id3", null, tasksBuilder); + addJobTask("job_id6", "_node_id3", null, tasksBuilder); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); + + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + csBuilder.nodes(nodes); + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addJobAndIndices(metaData, routingTable, "job_id1", "job_id2", "job_id3", "job_id4", "job_id5", "job_id6", "job_id7", "job_id8"); + csBuilder.routingTable(routingTable.build()); + metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); + csBuilder.metaData(metaData); + + ClusterState cs = csBuilder.build(); + // Allocation won't be possible if the stale failed job is treated as opening + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, 10, 30, logger); + assertEquals("_node_id1", result.getExecutorNode()); + + tasksBuilder = PersistentTasksCustomMetaData.builder(tasks); + addJobTask("job_id7", "_node_id1", null, tasksBuilder); + tasks = tasksBuilder.build(); + + csBuilder = ClusterState.builder(cs); + csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); + cs = csBuilder.build(); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", cs, 2, 10, 30, logger); + assertNull("no node selected, because OPENING state", result.getExecutorNode()); + assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); + } + public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() { Map nodeAttr = new HashMap<>(); nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); @@ -710,13 +758,13 @@ private ClusterState getClusterStateWithMappingsWithMetaData(Map private static Function jobWithRulesCreator() { return jobId -> { - DetectionRule rule = new DetectionRule.Builder(Arrays.asList( + DetectionRule rule = new DetectionRule.Builder(Collections.singletonList( new RuleCondition(RuleCondition.AppliesTo.TYPICAL, Operator.LT, 100.0) )).build(); Detector.Builder detector = new Detector.Builder("count", null); - detector.setRules(Arrays.asList(rule)); - AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build())); + detector.setRules(Collections.singletonList(rule)); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); DataDescription.Builder dataDescription = new DataDescription.Builder(); Job.Builder job = new Job.Builder(jobId); job.setAnalysisConfig(analysisConfig); From 1617e8472d479122c447e3389fa92cddc3e98787 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Thu, 5 Jul 2018 09:55:21 +0100 Subject: [PATCH 2/2] Use isAnyOf() --- .../elasticsearch/xpack/ml/action/TransportOpenJobAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index d394aadbe2b67..290e407ab664c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -230,7 +230,7 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j } } // Don't count CLOSED or FAILED jobs, as they don't consume native memory - if (jobState != JobState.CLOSED && jobState != JobState.FAILED) { + if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) { ++numberOfAssignedJobs; String assignedJobId = ((OpenJobAction.JobParams) assignedTask.getParams()).getJobId(); Job assignedJob = mlMetadata.getJobs().get(assignedJobId);