diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 5f09b896fd00b..cdd3af133f6dc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -261,6 +261,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope); public static final Setting MAX_MACHINE_MEMORY_PERCENT = Setting.intSetting("xpack.ml.max_machine_memory_percent", 30, 5, 90, Property.Dynamic, Property.NodeScope); + public static final Setting MAX_LAZY_ML_NODES = + Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope); private static final Logger logger = Loggers.getLogger(XPackPlugin.class); @@ -288,6 +290,7 @@ public List> getSettings() { ML_ENABLED, CONCURRENT_JOB_ALLOCATIONS, MachineLearningField.MAX_MODEL_MEMORY_LIMIT, + MAX_LAZY_ML_NODES, MAX_MACHINE_MEMORY_PERCENT, AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING, AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 42b67b2917387..b620816cc8252 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -99,6 +99,9 @@ public class TransportOpenJobAction extends TransportMasterNodeAction persistentTa jobState = jobTaskState == null ? JobState.OPENING : jobTaskState.getState(); PersistentTasksCustomMetaData.Assignment assignment = persistentTask.getAssignment(); + + // This means we are awaiting a new node to be spun up, ok to return back to the user to await node creation + if (assignment != null && assignment.equals(AWAITING_LAZY_ASSIGNMENT)) { + return true; + } + // This logic is only appropriate when opening a job, not when reallocating following a failure, // and this is why this class must only be used when opening a job if (assignment != null && assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false && diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java index 1b1c39c3bcf3c..c4150d633a8f0 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java @@ -61,6 +61,68 @@ public void testCloseFailedJob() throws Exception { assertEquals(JobState.OPENED, ((JobTaskState) task.getState()).getState()); } + public void testLazyNodeValidation() throws Exception { + int numNodes = 1; + int maxNumberOfJobsPerNode = 1; + int maxNumberOfLazyNodes = 2; + internalCluster().ensureAtMostNumDataNodes(0); + logger.info("[{}] is [{}]", AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode); + for (int i = 0; i < numNodes; i++) { + internalCluster().startNode(Settings.builder() + .put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode)); + } + logger.info("Started [{}] nodes", numNodes); + ensureStableCluster(numNodes); + logger.info("[{}] is [{}]", MachineLearning.MAX_LAZY_ML_NODES.getKey(), maxNumberOfLazyNodes); + // Set our lazy node number + assertTrue(client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .put(MachineLearning.MAX_LAZY_ML_NODES.getKey(), maxNumberOfLazyNodes)) + .get() + .isAcknowledged()); + // create and open first job, which succeeds: + Job.Builder job = createJob("lazy-node-validation-job-1", new ByteSizeValue(2, ByteSizeUnit.MB)); + PutJobAction.Request putJobRequest = new PutJobAction.Request(job); + client().execute(PutJobAction.INSTANCE, putJobRequest).get(); + client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).get(); + assertBusy(() -> { + GetJobsStatsAction.Response statsResponse = + client().execute(GetJobsStatsAction.INSTANCE, + new GetJobsStatsAction.Request("lazy-node-validation-job-1")).actionGet(); + assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED); + }); + + // create and try to open second job, which succeeds due to lazy node number: + job = createJob("lazy-node-validation-job-2", new ByteSizeValue(2, ByteSizeUnit.MB)); + putJobRequest = new PutJobAction.Request(job); + client().execute(PutJobAction.INSTANCE, putJobRequest).get(); + client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).get(); // Should return while job is opening + + assertBusy(() -> { + GetJobsStatsAction.Response statsResponse = + client().execute(GetJobsStatsAction.INSTANCE, + new GetJobsStatsAction.Request("lazy-node-validation-job-2")).actionGet(); + // Should get to opening state w/o a node + assertEquals(JobState.OPENING, statsResponse.getResponse().results().get(0).getState()); + }); + + // Add another Node so we can get allocated + internalCluster().startNode(Settings.builder() + .put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode)); + ensureStableCluster(numNodes+1); + + // We should automatically get allocated and opened to new node + assertBusy(() -> { + GetJobsStatsAction.Response statsResponse = + client().execute(GetJobsStatsAction.INSTANCE, + new GetJobsStatsAction.Request("lazy-node-validation-job-2")).actionGet(); + assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState()); + }); + } + public void testSingleNode() throws Exception { verifyMaxNumberOfJobsLimit(1, randomIntBetween(1, 100)); }