diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfig.java index 7830bccb45069..d4684bde70769 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfig.java @@ -56,6 +56,7 @@ public static Builder builder() { private static final ParseField MODEL_MEMORY_LIMIT = new ParseField("model_memory_limit"); private static final ParseField CREATE_TIME = new ParseField("create_time"); private static final ParseField VERSION = new ParseField("version"); + private static final ParseField ALLOW_LAZY_START = new ParseField("allow_lazy_start"); private static ObjectParser PARSER = new ObjectParser<>("data_frame_analytics_config", true, Builder::new); @@ -86,6 +87,7 @@ public static Builder builder() { }, VERSION, ValueType.STRING); + PARSER.declareBoolean(Builder::setAllowLazyStart, ALLOW_LAZY_START); } private static DataFrameAnalysis parseAnalysis(XContentParser parser) throws IOException { @@ -105,11 +107,12 @@ private static DataFrameAnalysis parseAnalysis(XContentParser parser) throws IOE private final ByteSizeValue modelMemoryLimit; private final Instant createTime; private final Version version; + private final Boolean allowLazyStart; private DataFrameAnalyticsConfig(@Nullable String id, @Nullable String description, @Nullable DataFrameAnalyticsSource source, @Nullable DataFrameAnalyticsDest dest, @Nullable DataFrameAnalysis analysis, @Nullable FetchSourceContext analyzedFields, @Nullable ByteSizeValue modelMemoryLimit, - @Nullable Instant createTime, @Nullable Version version) { + @Nullable Instant createTime, @Nullable Version version, @Nullable Boolean allowLazyStart) { this.id = id; this.description = description; this.source = source; @@ -119,6 +122,7 @@ private DataFrameAnalyticsConfig(@Nullable String id, @Nullable String descripti this.modelMemoryLimit = modelMemoryLimit; this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());; this.version = version; + this.allowLazyStart = allowLazyStart; } public String getId() { @@ -157,6 +161,10 @@ public Version getVersion() { return version; } + public Boolean getAllowLazyStart() { + return allowLazyStart; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -190,6 +198,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (version != null) { builder.field(VERSION.getPreferredName(), version); } + if (allowLazyStart != null) { + builder.field(ALLOW_LAZY_START.getPreferredName(), allowLazyStart); + } builder.endObject(); return builder; } @@ -208,12 +219,13 @@ public boolean equals(Object o) { && Objects.equals(analyzedFields, other.analyzedFields) && Objects.equals(modelMemoryLimit, other.modelMemoryLimit) && Objects.equals(createTime, other.createTime) - && Objects.equals(version, other.version); + && Objects.equals(version, other.version) + && Objects.equals(allowLazyStart, other.allowLazyStart); } @Override public int hashCode() { - return Objects.hash(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime, version); + return Objects.hash(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime, version, allowLazyStart); } @Override @@ -232,6 +244,7 @@ public static class Builder { private ByteSizeValue modelMemoryLimit; private Instant createTime; private Version version; + private Boolean allowLazyStart; private Builder() {} @@ -280,9 +293,14 @@ public Builder setVersion(Version version) { return this; } + public Builder setAllowLazyStart(Boolean allowLazyStart) { + this.allowLazyStart = allowLazyStart; + return this; + } + public DataFrameAnalyticsConfig build() { return new DataFrameAnalyticsConfig(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime, - version); + version, allowLazyStart); } } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsState.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsState.java index 6ee349b8e8d38..20201a676a11c 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsState.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsState.java @@ -22,7 +22,7 @@ import java.util.Locale; public enum DataFrameAnalyticsState { - STARTED, REINDEXING, ANALYZING, STOPPING, STOPPED; + STARTED, REINDEXING, ANALYZING, STOPPING, STOPPED, STARTING; public static DataFrameAnalyticsState fromString(String name) { return valueOf(name.trim().toUpperCase(Locale.ROOT)); diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java index c5f9f00f895ff..3b39a4bda8704 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java @@ -67,6 +67,7 @@ public class Job implements ToXContentObject { public static final ParseField MODEL_SNAPSHOT_ID = new ParseField("model_snapshot_id"); public static final ParseField RESULTS_INDEX_NAME = new ParseField("results_index_name"); public static final ParseField DELETING = new ParseField("deleting"); + public static final ParseField ALLOW_LAZY_OPEN = new ParseField("allow_lazy_open"); public static final ObjectParser PARSER = new ObjectParser<>("job_details", true, Builder::new); @@ -96,6 +97,7 @@ public class Job implements ToXContentObject { PARSER.declareStringOrNull(Builder::setModelSnapshotId, MODEL_SNAPSHOT_ID); PARSER.declareString(Builder::setResultsIndexName, RESULTS_INDEX_NAME); PARSER.declareBoolean(Builder::setDeleting, DELETING); + PARSER.declareBoolean(Builder::setAllowLazyOpen, ALLOW_LAZY_OPEN); } private final String jobId; @@ -117,13 +119,14 @@ public class Job implements ToXContentObject { private final String modelSnapshotId; private final String resultsIndexName; private final Boolean deleting; + private final Boolean allowLazyOpen; private Job(String jobId, String jobType, List groups, String description, Date createTime, Date finishedTime, AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription, ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval, Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map customSettings, - String modelSnapshotId, String resultsIndexName, Boolean deleting) { + String modelSnapshotId, String resultsIndexName, Boolean deleting, Boolean allowLazyOpen) { this.jobId = jobId; this.jobType = jobType; @@ -143,6 +146,7 @@ private Job(String jobId, String jobType, List groups, String descriptio this.modelSnapshotId = modelSnapshotId; this.resultsIndexName = resultsIndexName; this.deleting = deleting; + this.allowLazyOpen = allowLazyOpen; } /** @@ -271,6 +275,10 @@ public Boolean getDeleting() { return deleting; } + public Boolean getAllowLazyOpen() { + return allowLazyOpen; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -326,6 +334,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (deleting != null) { builder.field(DELETING.getPreferredName(), deleting); } + if (allowLazyOpen != null) { + builder.field(ALLOW_LAZY_OPEN.getPreferredName(), allowLazyOpen); + } builder.endObject(); return builder; } @@ -358,7 +369,8 @@ public boolean equals(Object other) { && Objects.equals(this.customSettings, that.customSettings) && Objects.equals(this.modelSnapshotId, that.modelSnapshotId) && Objects.equals(this.resultsIndexName, that.resultsIndexName) - && Objects.equals(this.deleting, that.deleting); + && Objects.equals(this.deleting, that.deleting) + && Objects.equals(this.allowLazyOpen, that.allowLazyOpen); } @Override @@ -366,7 +378,7 @@ public int hashCode() { return Objects.hash(jobId, jobType, groups, description, createTime, finishedTime, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, - modelSnapshotId, resultsIndexName, deleting); + modelSnapshotId, resultsIndexName, deleting, allowLazyOpen); } @Override @@ -398,6 +410,7 @@ public static class Builder { private String modelSnapshotId; private String resultsIndexName; private Boolean deleting; + private Boolean allowLazyOpen; private Builder() { } @@ -425,6 +438,7 @@ public Builder(Job job) { this.modelSnapshotId = job.getModelSnapshotId(); this.resultsIndexName = job.getResultsIndexNameNoPrefix(); this.deleting = job.getDeleting(); + this.allowLazyOpen = job.getAllowLazyOpen(); } public Builder setId(String id) { @@ -521,6 +535,11 @@ Builder setDeleting(Boolean deleting) { return this; } + Builder setAllowLazyOpen(Boolean allowLazyOpen) { + this.allowLazyOpen = allowLazyOpen; + return this; + } + /** * Builds a job. * @@ -533,7 +552,7 @@ public Job build() { id, jobType, groups, description, createTime, finishedTime, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, - modelSnapshotId, resultsIndexName, deleting); + modelSnapshotId, resultsIndexName, deleting, allowLazyOpen); } } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/JobUpdate.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/JobUpdate.java index 15499a650439d..f91918d456817 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/JobUpdate.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/JobUpdate.java @@ -54,6 +54,7 @@ public class JobUpdate implements ToXContentObject { PARSER.declareLong(Builder::setModelSnapshotRetentionDays, Job.MODEL_SNAPSHOT_RETENTION_DAYS); PARSER.declareStringArray(Builder::setCategorizationFilters, AnalysisConfig.CATEGORIZATION_FILTERS); PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT); + PARSER.declareBoolean(Builder::setAllowLazyOpen, Job.ALLOW_LAZY_OPEN); } private final String jobId; @@ -68,13 +69,14 @@ public class JobUpdate implements ToXContentObject { private final Long resultsRetentionDays; private final List categorizationFilters; private final Map customSettings; + private final Boolean allowLazyOpen; private JobUpdate(String jobId, @Nullable List groups, @Nullable String description, @Nullable List detectorUpdates, @Nullable ModelPlotConfig modelPlotConfig, @Nullable AnalysisLimits analysisLimits, @Nullable TimeValue backgroundPersistInterval, @Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays, @Nullable Long modelSnapshotRetentionDays, @Nullable List categorisationFilters, - @Nullable Map customSettings) { + @Nullable Map customSettings, @Nullable Boolean allowLazyOpen) { this.jobId = jobId; this.groups = groups; this.description = description; @@ -87,6 +89,7 @@ private JobUpdate(String jobId, @Nullable List groups, @Nullable String this.resultsRetentionDays = resultsRetentionDays; this.categorizationFilters = categorisationFilters; this.customSettings = customSettings; + this.allowLazyOpen = allowLazyOpen; } public String getJobId() { @@ -137,6 +140,10 @@ public Map getCustomSettings() { return customSettings; } + public Boolean getAllowLazyOpen() { + return allowLazyOpen; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -174,6 +181,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (customSettings != null) { builder.field(Job.CUSTOM_SETTINGS.getPreferredName(), customSettings); } + if (allowLazyOpen != null) { + builder.field(Job.ALLOW_LAZY_OPEN.getPreferredName(), allowLazyOpen); + } builder.endObject(); return builder; } @@ -201,13 +211,15 @@ public boolean equals(Object other) { && Objects.equals(this.modelSnapshotRetentionDays, that.modelSnapshotRetentionDays) && Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays) && Objects.equals(this.categorizationFilters, that.categorizationFilters) - && Objects.equals(this.customSettings, that.customSettings); + && Objects.equals(this.customSettings, that.customSettings) + && Objects.equals(this.allowLazyOpen, that.allowLazyOpen); } @Override public int hashCode() { return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays, - backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings); + backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings, + allowLazyOpen); } public static class DetectorUpdate implements ToXContentObject { @@ -303,6 +315,7 @@ public static class Builder { private Long resultsRetentionDays; private List categorizationFilters; private Map customSettings; + private Boolean allowLazyOpen; /** * New {@link JobUpdate.Builder} object for the existing job @@ -446,9 +459,15 @@ public Builder setCustomSettings(Map customSettings) { return this; } + public Builder setAllowLazyOpen(boolean allowLazyOpen) { + this.allowLazyOpen = allowLazyOpen; + return this; + } + public JobUpdate build() { return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval, - renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings); + renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings, + allowLazyOpen); } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigTests.java index 13f277ade9957..1e59aac7af293 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigTests.java @@ -66,6 +66,9 @@ public static DataFrameAnalyticsConfig randomDataFrameAnalyticsConfig() { if (randomBoolean()) { builder.setVersion(Version.CURRENT); } + if (randomBoolean()) { + builder.setAllowLazyStart(randomBoolean()); + } return builder.build(); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java index 8794389066314..5af87c477c31e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java @@ -159,6 +159,9 @@ public static Job.Builder createRandomizedJobBuilder() { if (randomBoolean()) { builder.setDeleting(randomBoolean()); } + if (randomBoolean()) { + builder.setAllowLazyOpen(randomBoolean()); + } return builder; } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobUpdateTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobUpdateTests.java index b159fedb95d44..8d2d02879a64f 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobUpdateTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobUpdateTests.java @@ -79,6 +79,9 @@ public static JobUpdate createRandom(String jobId) { if (randomBoolean()) { update.setCustomSettings(Collections.singletonMap(randomAlphaOfLength(10), randomAlphaOfLength(10))); } + if (randomBoolean()) { + update.setAllowLazyOpen(randomBoolean()); + } return update.build(); } diff --git a/docs/reference/ml/anomaly-detection/apis/get-job.asciidoc b/docs/reference/ml/anomaly-detection/apis/get-job.asciidoc index e32513d9fdca9..a816bcd3e1ddc 100644 --- a/docs/reference/ml/anomaly-detection/apis/get-job.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/get-job.asciidoc @@ -122,7 +122,8 @@ The API returns the following results: "time_format": "epoch_ms" }, "model_snapshot_retention_days": 1, - "results_index_name": "shared" + "results_index_name": "shared", + "allow_lazy_open": false } ] } diff --git a/docs/reference/ml/anomaly-detection/apis/jobresource.asciidoc b/docs/reference/ml/anomaly-detection/apis/jobresource.asciidoc index f650e81623a4b..3699e73ca51c3 100644 --- a/docs/reference/ml/anomaly-detection/apis/jobresource.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/jobresource.asciidoc @@ -95,6 +95,19 @@ so do not set the `background_persist_interval` value too low. deleted from Elasticsearch. The default value is null, which means results are retained. +`allow_lazy_open`:: + (boolean) Advanced configuration option. + Whether this job should be allowed to open when there is insufficient + {ml} node capacity for it to be immediately assigned to a node. + The default is `false`, which means that the <> + will return an error if a {ml} node with capacity to run the + job cannot immediately be found. (However, this is also subject to + the cluster-wide `xpack.ml.max_lazy_ml_nodes` setting - see + <>.) If this option is set to `true` then + the <> will not return an error, and the job will + wait in the `opening` state until sufficient {ml} node capacity + is available. + [[ml-analysisconfig]] ==== Analysis Configuration Objects diff --git a/docs/reference/ml/anomaly-detection/apis/put-job.asciidoc b/docs/reference/ml/anomaly-detection/apis/put-job.asciidoc index 930b12aa8313d..69230160d2715 100644 --- a/docs/reference/ml/anomaly-detection/apis/put-job.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/put-job.asciidoc @@ -149,7 +149,8 @@ When the job is created, you receive the following results: "time_format" : "epoch_ms" }, "model_snapshot_retention_days" : 1, - "results_index_name" : "shared" + "results_index_name" : "shared", + "allow_lazy_open" : false } ---- // TESTRESPONSE[s/"job_version" : "8.0.0"/"job_version" : $body.job_version/] diff --git a/docs/reference/ml/anomaly-detection/apis/update-job.asciidoc b/docs/reference/ml/anomaly-detection/apis/update-job.asciidoc index 9676f7fd34f9e..196a20cecbc27 100644 --- a/docs/reference/ml/anomaly-detection/apis/update-job.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/update-job.asciidoc @@ -74,11 +74,14 @@ See <>. | Yes |`results_retention_days` |Advanced configuration option. The number of days for which job results are retained. See <>. | Yes +|`allow_lazy_open` |Advanced configuration option. Whether to allow the job to be +opened when no {ml} node has sufficient capacity. See <>. | Yes + |======================================================================= For those properties that have `Requires Restart` set to `Yes` in this table, if the job is open when you make the update, you must stop the data feed, close -the job, then restart the data feed and open the job for the changes to take +the job, then reopen the job and restart the data feed for the changes to take effect. [NOTE] @@ -170,7 +173,8 @@ configuration information, including the updated property values. For example: } ] }, - "results_index_name": "shared" + "results_index_name": "shared", + "allow_lazy_open": false } ---- // TESTRESPONSE[s/"job_version": "7.0.0-alpha1"/"job_version": $body.job_version/] diff --git a/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc b/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc index c861d56d1af29..f17249b47d545 100644 --- a/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc +++ b/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc @@ -138,6 +138,18 @@ that don’t contain a results field are not included in the {reganalysis}. as this object is passed verbatim to {es}. By default, this property has the following value: `{"match_all": {}}`. +`allow_lazy_start`:: + (Optional, boolean) Whether this job should be allowed to start when there + is insufficient {ml} node capacity for it to be immediately assigned to a node. + The default is `false`, which means that the <> + will return an error if a {ml} node with capacity to run the + job cannot immediately be found. (However, this is also subject to + the cluster-wide `xpack.ml.max_lazy_ml_nodes` setting - see + <>.) If this option is set to `true` then + the <> will not return an error, and the job will + wait in the `starting` state until sufficient {ml} node capacity + is available. + [[ml-put-dfanalytics-example]] ==== {api-examples-title} @@ -197,7 +209,8 @@ The API returns the following result: }, "model_memory_limit": "1gb", "create_time" : 1562265491319, - "version" : "8.0.0" + "version" : "8.0.0", + "allow_lazy_start" : false } ---- // TESTRESPONSE[s/1562265491319/$body.$_path/] @@ -257,7 +270,8 @@ The API returns the following result: }, "model_memory_limit" : "1gb", "create_time" : 1567168659127, - "version" : "8.0.0" + "version" : "8.0.0", + "allow_lazy_start" : false } ---- // TESTRESPONSE[s/1567168659127/$body.$_path/] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java index 11dc2e77c27f4..42c178aa49ec0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java @@ -143,12 +143,14 @@ public static DatafeedState getDatafeedState(String datafeedId, @Nullable Persis public static DataFrameAnalyticsState getDataFrameAnalyticsState(String analyticsId, @Nullable PersistentTasksCustomMetaData tasks) { PersistentTasksCustomMetaData.PersistentTask task = getDataFrameAnalyticsTask(analyticsId, tasks); - if (task != null && task.getState() != null) { + if (task != null) { DataFrameAnalyticsTaskState taskState = (DataFrameAnalyticsTaskState) task.getState(); + if (taskState == null) { + return DataFrameAnalyticsState.STARTING; + } return taskState.getState(); - } else { - return DataFrameAnalyticsState.STOPPED; } + return DataFrameAnalyticsState.STOPPED; } /** @@ -178,32 +180,29 @@ public static Set openJobIds(@Nullable PersistentTasksCustomMetaData tas * @param nodes The cluster nodes * @return The job Ids of tasks to do not have an assignment. */ - public static Set unallocatedJobIds(@Nullable PersistentTasksCustomMetaData tasks, - DiscoveryNodes nodes) { - return unallocatedJobTasks(tasks, nodes).stream() + public static Set unassignedJobIds(@Nullable PersistentTasksCustomMetaData tasks, + DiscoveryNodes nodes) { + return unassignedJobTasks(tasks, nodes).stream() .map(task -> task.getId().substring(JOB_TASK_ID_PREFIX.length())) .collect(Collectors.toSet()); } /** - * The job tasks that do not have an allocation as determined by + * The job tasks that do not have an assignment as determined by * {@link PersistentTasksClusterService#needsReassignment(PersistentTasksCustomMetaData.Assignment, DiscoveryNodes)} * * @param tasks Persistent tasks. If null an empty set is returned. * @param nodes The cluster nodes - * @return Unallocated job tasks + * @return Unassigned job tasks */ - public static Collection> unallocatedJobTasks( + public static Collection> unassignedJobTasks( @Nullable PersistentTasksCustomMetaData tasks, DiscoveryNodes nodes) { if (tasks == null) { return Collections.emptyList(); } - return tasks.findTasks(JOB_TASK_NAME, task -> true) - .stream() - .filter(task -> PersistentTasksClusterService.needsReassignment(task.getAssignment(), nodes)) - .collect(Collectors.toList()); + return tasks.findTasks(JOB_TASK_NAME, task -> PersistentTasksClusterService.needsReassignment(task.getAssignment(), nodes)); } /** @@ -231,32 +230,29 @@ public static Set startedDatafeedIds(@Nullable PersistentTasksCustomMeta * @param nodes The cluster nodes * @return The job Ids of tasks to do not have an assignment. */ - public static Set unallocatedDatafeedIds(@Nullable PersistentTasksCustomMetaData tasks, - DiscoveryNodes nodes) { + public static Set unassignedDatafeedIds(@Nullable PersistentTasksCustomMetaData tasks, + DiscoveryNodes nodes) { - return unallocatedDatafeedTasks(tasks, nodes).stream() + return unassignedDatafeedTasks(tasks, nodes).stream() .map(task -> task.getId().substring(DATAFEED_TASK_ID_PREFIX.length())) .collect(Collectors.toSet()); } /** - * The datafeed tasks that do not have an allocation as determined by + * The datafeed tasks that do not have an assignment as determined by * {@link PersistentTasksClusterService#needsReassignment(PersistentTasksCustomMetaData.Assignment, DiscoveryNodes)} * * @param tasks Persistent tasks. If null an empty set is returned. * @param nodes The cluster nodes - * @return Unallocated datafeed tasks + * @return Unassigned datafeed tasks */ - public static Collection> unallocatedDatafeedTasks( + public static Collection> unassignedDatafeedTasks( @Nullable PersistentTasksCustomMetaData tasks, DiscoveryNodes nodes) { if (tasks == null) { return Collections.emptyList(); } - return tasks.findTasks(DATAFEED_TASK_NAME, task -> true) - .stream() - .filter(task -> PersistentTasksClusterService.needsReassignment(task.getAssignment(), nodes)) - .collect(Collectors.toList()); + return tasks.findTasks(DATAFEED_TASK_NAME, task -> PersistentTasksClusterService.needsReassignment(task.getAssignment(), nodes)); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java index 6712c1f8ecf23..a894faf599324 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java @@ -209,6 +209,7 @@ private static List readProgressFromLegacy(DataFrameAnalyticsStat case REINDEXING: reindexingProgress = legacyProgressPercent; break; + case STARTING: case STARTED: case STOPPED: case STOPPING: @@ -235,6 +236,14 @@ public List getProgress() { return progress; } + public DiscoveryNode getNode() { + return node; + } + + public String getAssignmentExplanation() { + return assignmentExplanation; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { // TODO: Have callers wrap the content with an object as they choose rather than forcing it upon them @@ -297,6 +306,7 @@ private void writeProgressToLegacy(StreamOutput out) throws IOException { case REINDEXING: targetPhase = "reindexing"; break; + case STARTING: case STARTED: case STOPPED: case STOPPING: diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsAction.java index b4b6c21c7990a..15baa03079a39 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsAction.java @@ -156,13 +156,16 @@ public static class TaskParams implements PersistentTaskParams { private static final ParseField PROGRESS_ON_START = new ParseField("progress_on_start"); + @SuppressWarnings("unchecked") public static ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, true, a -> new TaskParams((String) a[0], (String) a[1], (List) a[2])); + MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, true, + a -> new TaskParams((String) a[0], (String) a[1], (List) a[2], (Boolean) a[3])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameAnalyticsConfig.ID); PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameAnalyticsConfig.VERSION); PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), PhaseProgress.PARSER, PROGRESS_ON_START); + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), DataFrameAnalyticsConfig.ALLOW_LAZY_START); } public static TaskParams fromXContent(XContentParser parser) { @@ -172,15 +175,18 @@ public static TaskParams fromXContent(XContentParser parser) { private final String id; private final Version version; private final List progressOnStart; + private final boolean allowLazyStart; - public TaskParams(String id, Version version, List progressOnStart) { + public TaskParams(String id, Version version, List progressOnStart, boolean allowLazyStart) { this.id = Objects.requireNonNull(id); this.version = Objects.requireNonNull(version); this.progressOnStart = Collections.unmodifiableList(progressOnStart); + this.allowLazyStart = allowLazyStart; } - private TaskParams(String id, String version, @Nullable List progressOnStart) { - this(id, Version.fromString(version), progressOnStart == null ? Collections.emptyList() : progressOnStart); + private TaskParams(String id, String version, @Nullable List progressOnStart, Boolean allowLazyStart) { + this(id, Version.fromString(version), progressOnStart == null ? Collections.emptyList() : progressOnStart, + allowLazyStart != null && allowLazyStart); } public TaskParams(StreamInput in) throws IOException { @@ -191,6 +197,12 @@ public TaskParams(StreamInput in) throws IOException { } else { progressOnStart = Collections.emptyList(); } + // TODO: change version in backport + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + allowLazyStart = in.readBoolean(); + } else { + allowLazyStart = false; + } } public String getId() { @@ -201,6 +213,10 @@ public List getProgressOnStart() { return progressOnStart; } + public boolean isAllowLazyStart() { + return allowLazyStart; + } + @Override public String getWriteableName() { return MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME; @@ -218,6 +234,10 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_5_0)) { out.writeList(progressOnStart); } + // TODO: change version in backport + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(allowLazyStart); + } } @Override @@ -226,13 +246,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(DataFrameAnalyticsConfig.ID.getPreferredName(), id); builder.field(DataFrameAnalyticsConfig.VERSION.getPreferredName(), version); builder.field(PROGRESS_ON_START.getPreferredName(), progressOnStart); + builder.field(DataFrameAnalyticsConfig.ALLOW_LAZY_START.getPreferredName(), allowLazyStart); builder.endObject(); return builder; } @Override public int hashCode() { - return Objects.hash(id, version, progressOnStart); + return Objects.hash(id, version, progressOnStart, allowLazyStart); } @Override @@ -243,7 +264,8 @@ public boolean equals(Object o) { TaskParams other = (TaskParams) o; return Objects.equals(id, other.id) && Objects.equals(version, other.version) - && Objects.equals(progressOnStart, other.progressOnStart); + && Objects.equals(progressOnStart, other.progressOnStart) + && Objects.equals(allowLazyStart, other.allowLazyStart); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java index b327f4ec8882c..7307fa38ed4af 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java @@ -58,6 +58,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { public static final ParseField HEADERS = new ParseField("headers"); public static final ParseField CREATE_TIME = new ParseField("create_time"); public static final ParseField VERSION = new ParseField("version"); + public static final ParseField ALLOW_LAZY_START = new ParseField("allow_lazy_start"); public static final ObjectParser STRICT_PARSER = createParser(false); public static final ObjectParser LENIENT_PARSER = createParser(true); @@ -77,6 +78,7 @@ private static ObjectParser createParser(boolean ignoreUnknownFie OBJECT_ARRAY_BOOLEAN_OR_STRING); parser.declareField(Builder::setModelMemoryLimit, (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MODEL_MEMORY_LIMIT.getPreferredName()), MODEL_MEMORY_LIMIT, VALUE); + parser.declareBoolean(Builder::setAllowLazyStart, ALLOW_LAZY_START); if (ignoreUnknownFields) { // Headers are not parsed by the strict (config) parser, so headers supplied in the _body_ of a REST request will be rejected. // (For config, headers are explicitly transferred from the auth headers by code in the put data frame actions.) @@ -123,10 +125,11 @@ private static DataFrameAnalysis parseAnalysis(XContentParser parser, boolean ig private final Map headers; private final Instant createTime; private final Version version; + private final boolean allowLazyStart; public DataFrameAnalyticsConfig(String id, String description, DataFrameAnalyticsSource source, DataFrameAnalyticsDest dest, DataFrameAnalysis analysis, Map headers, ByteSizeValue modelMemoryLimit, - FetchSourceContext analyzedFields, Instant createTime, Version version) { + FetchSourceContext analyzedFields, Instant createTime, Version version, boolean allowLazyStart) { this.id = ExceptionsHelper.requireNonNull(id, ID); this.description = description; this.source = ExceptionsHelper.requireNonNull(source, SOURCE); @@ -137,6 +140,7 @@ public DataFrameAnalyticsConfig(String id, String description, DataFrameAnalytic this.headers = Collections.unmodifiableMap(headers); this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli()); this.version = version; + this.allowLazyStart = allowLazyStart; } public DataFrameAnalyticsConfig(StreamInput in) throws IOException { @@ -159,6 +163,12 @@ public DataFrameAnalyticsConfig(StreamInput in) throws IOException { createTime = null; version = null; } + // TODO: change version in backport + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + allowLazyStart = in.readBoolean(); + } else { + allowLazyStart = false; + } } public String getId() { @@ -201,6 +211,10 @@ public Version getVersion() { return version; } + public boolean isAllowLazyStart() { + return allowLazyStart; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -231,6 +245,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (version != null) { builder.field(VERSION.getPreferredName(), version); } + builder.field(ALLOW_LAZY_START.getPreferredName(), allowLazyStart); builder.endObject(); return builder; } @@ -256,6 +271,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(false); } } + // TODO: change version in backport + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(allowLazyStart); + } } @Override @@ -273,12 +292,14 @@ public boolean equals(Object o) { && Objects.equals(getModelMemoryLimit(), other.getModelMemoryLimit()) && Objects.equals(analyzedFields, other.analyzedFields) && Objects.equals(createTime, other.createTime) - && Objects.equals(version, other.version); + && Objects.equals(version, other.version) + && Objects.equals(allowLazyStart, other.allowLazyStart); } @Override public int hashCode() { - return Objects.hash(id, description, source, dest, analysis, headers, getModelMemoryLimit(), analyzedFields, createTime, version); + return Objects.hash(id, description, source, dest, analysis, headers, getModelMemoryLimit(), analyzedFields, createTime, version, + allowLazyStart); } @Override @@ -303,6 +324,7 @@ public static class Builder { private Map headers = Collections.emptyMap(); private Instant createTime; private Version version; + private boolean allowLazyStart; public Builder() {} @@ -324,6 +346,7 @@ public Builder(DataFrameAnalyticsConfig config, ByteSizeValue maxModelMemoryLimi } this.createTime = config.createTime; this.version = config.version; + this.allowLazyStart = config.allowLazyStart; } public String getId() { @@ -380,13 +403,18 @@ public Builder setVersion(Version version) { return this; } + public Builder setAllowLazyStart(boolean isLazyStart) { + this.allowLazyStart = isLazyStart; + return this; + } + /** * Builds {@link DataFrameAnalyticsConfig} object. */ public DataFrameAnalyticsConfig build() { applyMaxModelMemoryLimit(); return new DataFrameAnalyticsConfig(id, description, source, dest, analysis, headers, modelMemoryLimit, analyzedFields, - createTime, version); + createTime, version, allowLazyStart); } /** @@ -405,7 +433,8 @@ public DataFrameAnalyticsConfig buildForMemoryEstimation() { modelMemoryLimit, analyzedFields, createTime, - version); + version, + allowLazyStart); } private void applyMaxModelMemoryLimit() { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsState.java index 8f0a481ed9030..61279b192bacc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsState.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.ml.dataframe; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -15,7 +16,7 @@ public enum DataFrameAnalyticsState implements Writeable { - STARTED, REINDEXING, ANALYZING, STOPPING, STOPPED, FAILED; + STARTED, REINDEXING, ANALYZING, STOPPING, STOPPED, FAILED, STARTING; public static DataFrameAnalyticsState fromString(String name) { return valueOf(name.trim().toUpperCase(Locale.ROOT)); @@ -27,7 +28,14 @@ public static DataFrameAnalyticsState fromStream(StreamInput in) throws IOExcept @Override public void writeTo(StreamOutput out) throws IOException { - out.writeEnum(this); + DataFrameAnalyticsState toWrite = this; + // TODO: change version in backport + if (out.getVersion().before(Version.V_8_0_0) && toWrite == STARTING) { + // Before 7.5.0 there was no STARTING state and jobs for which + // tasks existed but were unassigned were considered STOPPED + toWrite = STOPPED; + } + out.writeEnum(toWrite); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java index f96d498b7e8a6..28ab4778d1483 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java @@ -77,6 +77,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO public static final ParseField MODEL_SNAPSHOT_MIN_VERSION = new ParseField("model_snapshot_min_version"); public static final ParseField RESULTS_INDEX_NAME = new ParseField("results_index_name"); public static final ParseField DELETING = new ParseField("deleting"); + public static final ParseField ALLOW_LAZY_OPEN = new ParseField("allow_lazy_open"); // Used for QueryPage public static final ParseField RESULTS_FIELD = new ParseField("jobs"); @@ -127,6 +128,7 @@ private static ObjectParser createParser(boolean ignoreUnknownFie parser.declareStringOrNull(Builder::setModelSnapshotMinVersion, MODEL_SNAPSHOT_MIN_VERSION); parser.declareString(Builder::setResultsIndexName, RESULTS_INDEX_NAME); parser.declareBoolean(Builder::setDeleting, DELETING); + parser.declareBoolean(Builder::setAllowLazyOpen, ALLOW_LAZY_OPEN); return parser; } @@ -159,13 +161,14 @@ private static ObjectParser createParser(boolean ignoreUnknownFie private final Version modelSnapshotMinVersion; private final String resultsIndexName; private final boolean deleting; + private final boolean allowLazyOpen; private Job(String jobId, String jobType, Version jobVersion, List groups, String description, Date createTime, Date finishedTime, AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription, ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval, Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map customSettings, - String modelSnapshotId, Version modelSnapshotMinVersion, String resultsIndexName, boolean deleting) { + String modelSnapshotId, Version modelSnapshotMinVersion, String resultsIndexName, boolean deleting, boolean allowLazyOpen) { this.jobId = jobId; this.jobType = jobType; @@ -187,6 +190,7 @@ private Job(String jobId, String jobType, Version jobVersion, List group this.modelSnapshotMinVersion = modelSnapshotMinVersion; this.resultsIndexName = resultsIndexName; this.deleting = deleting; + this.allowLazyOpen = allowLazyOpen; } public Job(StreamInput in) throws IOException { @@ -221,6 +225,12 @@ public Job(StreamInput in) throws IOException { } resultsIndexName = in.readString(); deleting = in.readBoolean(); + // TODO: change version in backport + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + allowLazyOpen = in.readBoolean(); + } else { + allowLazyOpen = false; + } } /** @@ -390,6 +400,10 @@ public boolean isDeleting() { return deleting; } + public boolean allowLazyOpen() { + return allowLazyOpen; + } + /** * Get all input data fields mentioned in the job configuration, * namely analysis fields and the time field. @@ -481,6 +495,10 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeString(resultsIndexName); out.writeBoolean(deleting); + // TODO: change version in backport + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(allowLazyOpen); + } } @Override @@ -545,6 +563,7 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th if (deleting) { builder.field(DELETING.getPreferredName(), deleting); } + builder.field(ALLOW_LAZY_OPEN.getPreferredName(), allowLazyOpen); return builder; } @@ -578,7 +597,8 @@ public boolean equals(Object other) { && Objects.equals(this.modelSnapshotId, that.modelSnapshotId) && Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion) && Objects.equals(this.resultsIndexName, that.resultsIndexName) - && Objects.equals(this.deleting, that.deleting); + && Objects.equals(this.deleting, that.deleting) + && Objects.equals(this.allowLazyOpen, that.allowLazyOpen); } @Override @@ -586,7 +606,7 @@ public int hashCode() { return Objects.hash(jobId, jobType, jobVersion, groups, description, createTime, finishedTime, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, - modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting); + modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting, allowLazyOpen); } // Class already extends from AbstractDiffable, so copied from ToXContentToBytes#toString() @@ -634,6 +654,7 @@ public static class Builder implements Writeable, ToXContentObject { private Version modelSnapshotMinVersion; private String resultsIndexName; private boolean deleting; + private boolean allowLazyOpen; public Builder() { } @@ -663,6 +684,7 @@ public Builder(Job job) { this.modelSnapshotMinVersion = job.getModelSnapshotMinVersion(); this.resultsIndexName = job.getResultsIndexNameNoPrefix(); this.deleting = job.isDeleting(); + this.allowLazyOpen = job.allowLazyOpen(); } public Builder(StreamInput in) throws IOException { @@ -696,6 +718,10 @@ public Builder(StreamInput in) throws IOException { } resultsIndexName = in.readOptionalString(); deleting = in.readBoolean(); + // TODO: change version in backport + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + allowLazyOpen = in.readBoolean(); + } } public Builder setId(String id) { @@ -816,6 +842,11 @@ public Builder setDeleting(boolean deleting) { return this; } + public Builder setAllowLazyOpen(boolean allowLazyOpen) { + this.allowLazyOpen = allowLazyOpen; + return this; + } + /** * Return the list of fields that have been set and are invalid to * be set when the job is created e.g. model snapshot Id should not @@ -884,6 +915,10 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeOptionalString(resultsIndexName); out.writeBoolean(deleting); + // TODO: change version in backport + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(allowLazyOpen); + } } @Override @@ -944,6 +979,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (deleting) { builder.field(DELETING.getPreferredName(), deleting); } + if (allowLazyOpen) { + builder.field(ALLOW_LAZY_OPEN.getPreferredName(), allowLazyOpen); + } builder.endObject(); return builder; @@ -974,7 +1012,8 @@ public boolean equals(Object o) { && Objects.equals(this.modelSnapshotId, that.modelSnapshotId) && Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion) && Objects.equals(this.resultsIndexName, that.resultsIndexName) - && Objects.equals(this.deleting, that.deleting); + && Objects.equals(this.deleting, that.deleting) + && Objects.equals(this.allowLazyOpen, that.allowLazyOpen); } @Override @@ -982,7 +1021,7 @@ public int hashCode() { return Objects.hash(id, jobType, jobVersion, groups, description, analysisConfig, analysisLimits, dataDescription, createTime, finishedTime, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, - modelSnapshotMinVersion, resultsIndexName, deleting); + modelSnapshotMinVersion, resultsIndexName, deleting, allowLazyOpen); } /** @@ -1109,7 +1148,7 @@ public Job build() { id, jobType, jobVersion, groups, description, createTime, finishedTime, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, - modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting); + modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting, allowLazyOpen); } private void checkValidBackgroundPersistInterval() { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java index c1058e9c0639e..2b95d4571dd61 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java @@ -54,6 +54,7 @@ public class JobUpdate implements Writeable, ToXContentObject { parser.declareLong(Builder::setModelSnapshotRetentionDays, Job.MODEL_SNAPSHOT_RETENTION_DAYS); parser.declareStringArray(Builder::setCategorizationFilters, AnalysisConfig.CATEGORIZATION_FILTERS); parser.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT); + parser.declareBoolean(Builder::setAllowLazyOpen, Job.ALLOW_LAZY_OPEN); } // These fields should not be set by a REST request INTERNAL_PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID); @@ -78,6 +79,7 @@ public class JobUpdate implements Writeable, ToXContentObject { private final Version modelSnapshotMinVersion; private final Version jobVersion; private final Boolean clearJobFinishTime; + private final Boolean allowLazyOpen; private JobUpdate(String jobId, @Nullable List groups, @Nullable String description, @Nullable List detectorUpdates, @Nullable ModelPlotConfig modelPlotConfig, @@ -85,7 +87,8 @@ private JobUpdate(String jobId, @Nullable List groups, @Nullable String @Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays, @Nullable Long modelSnapshotRetentionDays, @Nullable List categorisationFilters, @Nullable Map customSettings, @Nullable String modelSnapshotId, - @Nullable Version modelSnapshotMinVersion, @Nullable Version jobVersion, @Nullable Boolean clearJobFinishTime) { + @Nullable Version modelSnapshotMinVersion, @Nullable Version jobVersion, @Nullable Boolean clearJobFinishTime, + @Nullable Boolean allowLazyOpen) { this.jobId = jobId; this.groups = groups; this.description = description; @@ -102,6 +105,7 @@ private JobUpdate(String jobId, @Nullable List groups, @Nullable String this.modelSnapshotMinVersion = modelSnapshotMinVersion; this.jobVersion = jobVersion; this.clearJobFinishTime = clearJobFinishTime; + this.allowLazyOpen = allowLazyOpen; } public JobUpdate(StreamInput in) throws IOException { @@ -138,6 +142,12 @@ public JobUpdate(StreamInput in) throws IOException { } else { modelSnapshotMinVersion = null; } + // TODO: change version in backport + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + allowLazyOpen = in.readOptionalBoolean(); + } else { + allowLazyOpen = null; + } } @Override @@ -177,6 +187,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(false); } } + // TODO: change version in backport + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeOptionalBoolean(allowLazyOpen); + } } public String getJobId() { @@ -243,6 +257,10 @@ public Boolean getClearJobFinishTime() { return clearJobFinishTime; } + public Boolean getAllowLazyOpen() { + return allowLazyOpen; + } + public boolean isAutodetectProcessUpdate() { return modelPlotConfig != null || detectorUpdates != null || groups != null; } @@ -296,6 +314,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (clearJobFinishTime != null) { builder.field(CLEAR_JOB_FINISH_TIME.getPreferredName(), clearJobFinishTime); } + if (allowLazyOpen != null) { + builder.field(Job.ALLOW_LAZY_OPEN.getPreferredName(), allowLazyOpen); + } builder.endObject(); return builder; } @@ -344,6 +365,9 @@ public Set getUpdateFields() { if (jobVersion != null) { updateFields.add(Job.JOB_VERSION.getPreferredName()); } + if (allowLazyOpen != null) { + updateFields.add(Job.ALLOW_LAZY_OPEN.getPreferredName()); + } return updateFields; } @@ -419,10 +443,12 @@ public Job mergeWithJob(Job source, ByteSizeValue maxModelMemoryLimit) { if (jobVersion != null) { builder.setJobVersion(jobVersion); } - if (clearJobFinishTime != null && clearJobFinishTime) { builder.setFinishedTime(null); } + if (allowLazyOpen != null) { + builder.setAllowLazyOpen(allowLazyOpen); + } builder.setAnalysisConfig(newAnalysisConfig); return builder.build(); @@ -444,7 +470,8 @@ && updatesDetectors(job) == false && (modelSnapshotId == null || Objects.equals(modelSnapshotId, job.getModelSnapshotId())) && (modelSnapshotMinVersion == null || Objects.equals(modelSnapshotMinVersion, job.getModelSnapshotMinVersion())) && (jobVersion == null || Objects.equals(jobVersion, job.getJobVersion())) - && ((clearJobFinishTime == null || clearJobFinishTime == false) || job.getFinishedTime() == null); + && (clearJobFinishTime == null || clearJobFinishTime == false || job.getFinishedTime() == null) + && (allowLazyOpen == null || Objects.equals(allowLazyOpen, job.allowLazyOpen())); } boolean updatesDetectors(Job job) { @@ -492,14 +519,15 @@ public boolean equals(Object other) { && Objects.equals(this.modelSnapshotId, that.modelSnapshotId) && Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion) && Objects.equals(this.jobVersion, that.jobVersion) - && Objects.equals(this.clearJobFinishTime, that.clearJobFinishTime); + && Objects.equals(this.clearJobFinishTime, that.clearJobFinishTime) + && Objects.equals(this.allowLazyOpen, that.allowLazyOpen); } @Override public int hashCode() { return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings, - modelSnapshotId, modelSnapshotMinVersion, jobVersion, clearJobFinishTime); + modelSnapshotId, modelSnapshotMinVersion, jobVersion, clearJobFinishTime, allowLazyOpen); } public static class DetectorUpdate implements Writeable, ToXContentObject { @@ -611,6 +639,7 @@ public static class Builder { private Version modelSnapshotMinVersion; private Version jobVersion; private Boolean clearJobFinishTime; + private Boolean allowLazyOpen; public Builder(String jobId) { this.jobId = jobId; @@ -701,6 +730,11 @@ public Builder setJobVersion(String version) { return this; } + public Builder setAllowLazyOpen(boolean allowLazyOpen) { + this.allowLazyOpen = allowLazyOpen; + return this; + } + public Builder setClearFinishTime(boolean clearJobFinishTime) { this.clearJobFinishTime = clearJobFinishTime; return this; @@ -709,7 +743,7 @@ public Builder setClearFinishTime(boolean clearJobFinishTime) { public JobUpdate build() { return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval, renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings, - modelSnapshotId, modelSnapshotMinVersion, jobVersion, clearJobFinishTime); + modelSnapshotId, modelSnapshotMinVersion, jobVersion, clearJobFinishTime, allowLazyOpen); } } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java index f2015b1a2bbb5..e6d621aa462a4 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java @@ -136,7 +136,7 @@ public void testUnallocatedJobIds() { .masterNodeId("node-1") .build(); - assertThat(MlTasks.unallocatedJobIds(tasksBuilder.build(), nodes), + assertThat(MlTasks.unassignedJobIds(tasksBuilder.build(), nodes), containsInAnyOrder("job_without_assignment", "job_without_node")); } @@ -159,7 +159,7 @@ public void testUnallocatedDatafeedIds() { .masterNodeId("node-1") .build(); - assertThat(MlTasks.unallocatedDatafeedIds(tasksBuilder.build(), nodes), + assertThat(MlTasks.unassignedDatafeedIds(tasksBuilder.build(), nodes), containsInAnyOrder("datafeed_without_assignment", "datafeed_without_node")); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsActionTaskParamsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsActionTaskParamsTests.java index a330860266ec8..fa8da28e133ad 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsActionTaskParamsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsActionTaskParamsTests.java @@ -30,7 +30,7 @@ protected StartDataFrameAnalyticsAction.TaskParams createTestInstance() { for (int i = 0; i < phaseCount; i++) { progressOnStart.add(new PhaseProgress(randomAlphaOfLength(10), randomIntBetween(0, 100))); } - return new StartDataFrameAnalyticsAction.TaskParams(randomAlphaOfLength(10), Version.CURRENT, progressOnStart); + return new StartDataFrameAnalyticsAction.TaskParams(randomAlphaOfLength(10), Version.CURRENT, progressOnStart, randomBoolean()); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java index e9d338eb4d188..4f6af0e5dda3c 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java @@ -124,6 +124,9 @@ public static DataFrameAnalyticsConfig.Builder createRandomBuilder(String id, bo builder.setVersion(Version.CURRENT); } } + if (randomBoolean()) { + builder.setAllowLazyStart(randomBoolean()); + } return builder; } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsStateTests.java index 1b0800c91e3f6..a6258718b4439 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsStateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsStateTests.java @@ -5,14 +5,23 @@ */ package org.elasticsearch.xpack.core.ml.dataframe; +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.test.ESTestCase; +import java.io.IOException; + import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class DataFrameAnalyticsStateTests extends ESTestCase { public void testFromString() { + assertThat(DataFrameAnalyticsState.fromString("starting"), equalTo(DataFrameAnalyticsState.STARTING)); assertThat(DataFrameAnalyticsState.fromString("started"), equalTo(DataFrameAnalyticsState.STARTED)); assertThat(DataFrameAnalyticsState.fromString("reindexing"), equalTo(DataFrameAnalyticsState.REINDEXING)); assertThat(DataFrameAnalyticsState.fromString("analyzing"), equalTo(DataFrameAnalyticsState.ANALYZING)); @@ -22,6 +31,7 @@ public void testFromString() { } public void testToString() { + assertThat(DataFrameAnalyticsState.STARTING.toString(), equalTo("starting")); assertThat(DataFrameAnalyticsState.STARTED.toString(), equalTo("started")); assertThat(DataFrameAnalyticsState.REINDEXING.toString(), equalTo("reindexing")); assertThat(DataFrameAnalyticsState.ANALYZING.toString(), equalTo("analyzing")); @@ -30,6 +40,21 @@ public void testToString() { assertThat(DataFrameAnalyticsState.FAILED.toString(), equalTo("failed")); } + public void testWriteStartingStateToPre75() throws IOException { + StreamOutput streamOutput = mock(StreamOutput.class); + when(streamOutput.getVersion()).thenReturn(Version.V_7_4_1); + DataFrameAnalyticsState.STARTING.writeTo(streamOutput); + verify(streamOutput, times(1)).writeEnum(DataFrameAnalyticsState.STOPPED); + } + + public void testWriteStartingStateToPost75() throws IOException { + StreamOutput streamOutput = mock(StreamOutput.class); + // TODO: change version in backport + when(streamOutput.getVersion()).thenReturn(Version.V_8_0_0); + DataFrameAnalyticsState.STARTING.writeTo(streamOutput); + verify(streamOutput, times(1)).writeEnum(DataFrameAnalyticsState.STARTING); + } + public void testIsAnyOf() { assertThat(DataFrameAnalyticsState.STARTED.isAnyOf(), is(false)); assertThat(DataFrameAnalyticsState.STARTED.isAnyOf(DataFrameAnalyticsState.STARTED), is(true)); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java index 47e96f8b5ada2..f8fea739a2500 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java @@ -109,6 +109,7 @@ public void testConstructor_GivenEmptyJobConfiguration() { assertNull(job.getResultsRetentionDays()); assertNotNull(job.allInputFields()); assertFalse(job.allInputFields().isEmpty()); + assertFalse(job.allowLazyOpen()); } public void testNoId() { @@ -640,6 +641,9 @@ public static Job createRandomizedJob() { if (randomBoolean()) { builder.setResultsIndexName(randomValidJobId()); } + if (randomBoolean()) { + builder.setAllowLazyOpen(randomBoolean()); + } return builder.build(); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java index eb4f2c0bbc2da..b4e50a97b1abe 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java @@ -97,6 +97,9 @@ public JobUpdate createRandom(String jobId, @Nullable Job job) { if (useInternalParser) { update.setClearFinishTime(randomBoolean()); } + if (randomBoolean()) { + update.setAllowLazyOpen(randomBoolean()); + } return update.build(); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java index 0244c2584e0a1..0264501e60683 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java @@ -48,7 +48,7 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase { private String destIndex; @After - public void cleanup() throws Exception { + public void cleanup() { cleanUp(); } @@ -83,6 +83,7 @@ public void testSingleNumericFeatureAndMixedTrainingAndNonTrainingRows() throws assertThatAuditMessagesMatch(jobId, "Created analytics with analysis type [classification]", "Estimated memory usage for this analytics to be", + "Starting analytics on node", "Started analytics", "Creating destination index [" + destIndex + "]", "Finished reindexing to destination index [" + destIndex + "]", @@ -119,6 +120,7 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsHundred() throws Excepti assertThatAuditMessagesMatch(jobId, "Created analytics with analysis type [classification]", "Estimated memory usage for this analytics to be", + "Starting analytics on node", "Started analytics", "Creating destination index [" + destIndex + "]", "Finished reindexing to destination index [" + destIndex + "]", @@ -170,6 +172,7 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty() throws Exception assertThatAuditMessagesMatch(jobId, "Created analytics with analysis type [classification]", "Estimated memory usage for this analytics to be", + "Starting analytics on node", "Started analytics", "Creating destination index [" + destIndex + "]", "Finished reindexing to destination index [" + destIndex + "]", @@ -213,6 +216,7 @@ public void testSingleNumericFeatureAndMixedTrainingAndNonTrainingRows_TopClasse assertThatAuditMessagesMatch(jobId, "Created analytics with analysis type [classification]", "Estimated memory usage for this analytics to be", + "Starting analytics on node", "Started analytics", "Creating destination index [" + destIndex + "]", "Finished reindexing to destination index [" + destIndex + "]", @@ -289,6 +293,7 @@ private static Map getMlResultsObjectFromDestDoc(Map resultsObject, int numTopClasses) { assertThat(resultsObject.containsKey("top_classes"), is(true)); + @SuppressWarnings("unchecked") List> topClasses = (List>) resultsObject.get("top_classes"); assertThat(topClasses, hasSize(numTopClasses)); List classNames = new ArrayList<>(topClasses.size()); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index d9cb886ba28b5..d73c5df40e681 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -42,7 +42,7 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase { private String destIndex; @After - public void cleanup() throws Exception { + public void cleanup() { cleanUp(); } @@ -110,6 +110,7 @@ public void testSingleNumericFeatureAndMixedTrainingAndNonTrainingRows() throws assertThatAuditMessagesMatch(jobId, "Created analytics with analysis type [regression]", "Estimated memory usage for this analytics to be", + "Starting analytics on node", "Started analytics", "Creating destination index [" + destIndex + "]", "Finished reindexing to destination index [" + destIndex + "]", @@ -161,6 +162,7 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsHundred() throws Excepti assertThatAuditMessagesMatch(jobId, "Created analytics with analysis type [regression]", "Estimated memory usage for this analytics to be", + "Starting analytics on node", "Started analytics", "Creating destination index [" + destIndex + "]", "Finished reindexing to destination index [" + destIndex + "]", @@ -227,6 +229,7 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty() throws Exception assertThatAuditMessagesMatch(jobId, "Created analytics with analysis type [regression]", "Estimated memory usage for this analytics to be", + "Starting analytics on node", "Started analytics", "Creating destination index [" + destIndex + "]", "Finished reindexing to destination index [" + destIndex + "]", diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java index 71ecacd557fb3..6a04afc0a06e0 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; @@ -32,12 +33,14 @@ import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTestCase { @@ -120,6 +123,7 @@ public void testOutlierDetectionWithFewDocuments() throws Exception { assertThatAuditMessagesMatch(id, "Created analytics with analysis type [outlier_detection]", "Estimated memory usage for this analytics to be", + "Starting analytics on node", "Started analytics", "Creating destination index [test-outlier-detection-with-few-docs-results]", "Finished reindexing to destination index [test-outlier-detection-with-few-docs-results]", @@ -174,6 +178,7 @@ public void testOutlierDetectionWithEnoughDocumentsToScroll() throws Exception { assertThatAuditMessagesMatch(id, "Created analytics with analysis type [outlier_detection]", "Estimated memory usage for this analytics to be", + "Starting analytics on node", "Started analytics", "Creating destination index [test-outlier-detection-with-enough-docs-to-scroll-results]", "Finished reindexing to destination index [test-outlier-detection-with-enough-docs-to-scroll-results]", @@ -254,6 +259,7 @@ public void testOutlierDetectionWithMoreFieldsThanDocValueFieldLimit() throws Ex assertThatAuditMessagesMatch(id, "Created analytics with analysis type [outlier_detection]", "Estimated memory usage for this analytics to be", + "Starting analytics on node", "Started analytics", "Creating destination index [test-outlier-detection-with-more-fields-than-docvalue-limit-results]", "Finished reindexing to destination index [test-outlier-detection-with-more-fields-than-docvalue-limit-results]", @@ -312,6 +318,7 @@ public void testStopOutlierDetectionWithEnoughDocumentsToScroll() throws Excepti assertThatAuditMessagesMatch(id, "Created analytics with analysis type [outlier_detection]", "Estimated memory usage for this analytics to be", + "Starting analytics on node", "Started analytics", "Creating destination index [test-stop-outlier-detection-with-enough-docs-to-scroll-results]", "Stopped analytics"); @@ -377,6 +384,7 @@ public void testOutlierDetectionWithMultipleSourceIndices() throws Exception { assertThatAuditMessagesMatch(id, "Created analytics with analysis type [outlier_detection]", "Estimated memory usage for this analytics to be", + "Starting analytics on node", "Started analytics", "Creating destination index [test-outlier-detection-with-multiple-source-indices-results]", "Finished reindexing to destination index [test-outlier-detection-with-multiple-source-indices-results]", @@ -434,6 +442,7 @@ public void testOutlierDetectionWithPreExistingDestIndex() throws Exception { assertThatAuditMessagesMatch(id, "Created analytics with analysis type [outlier_detection]", "Estimated memory usage for this analytics to be", + "Starting analytics on node", "Started analytics", "Using existing destination index [test-outlier-detection-with-pre-existing-dest-index-results]", "Finished reindexing to destination index [test-outlier-detection-with-pre-existing-dest-index-results]", @@ -485,6 +494,60 @@ public void testModelMemoryLimitLowerThanEstimatedMemoryUsage() throws Exception "Estimated memory usage for this analytics to be"); } + public void testLazyAssignmentWithModelMemoryLimitTooHighForAssignment() throws Exception { + String sourceIndex = "test-lazy-assign-model-memory-limit-too-high"; + + client().admin().indices().prepareCreate(sourceIndex) + .addMapping("_doc", "col_1", "type=double", "col_2", "type=float", "col_3", "type=keyword") + .get(); + + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + IndexRequest indexRequest = new IndexRequest(sourceIndex) + .id("doc_1") + .source("col_1", 1.0, "col_2", 1.0, "col_3", "str"); + bulkRequestBuilder.add(indexRequest); + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } + + String id = "test_lazy_assign_model_memory_limit_too_high"; + // Assuming a 1TB job will never fit on the test machine - increase this when machines get really big! + ByteSizeValue modelMemoryLimit = new ByteSizeValue(1, ByteSizeUnit.TB); + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() + .setId(id) + .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null)) + .setDest(new DataFrameAnalyticsDest(sourceIndex + "-results", null)) + .setAnalysis(new OutlierDetection.Builder().build()) + .setModelMemoryLimit(modelMemoryLimit) + .setAllowLazyStart(true) + .build(); + + registerAnalytics(config); + putAnalytics(config); + assertState(id, DataFrameAnalyticsState.STOPPED); + + // Due to lazy start being allowed, this should succeed even though no node currently in the cluster is big enough + startAnalytics(id); + + // Wait until state is STARTING, there is no node but there is an assignment explanation. + assertBusy(() -> { + GetDataFrameAnalyticsStatsAction.Response.Stats stats = getAnalyticsStats(id).get(0); + assertThat(stats.getState(), equalTo(DataFrameAnalyticsState.STARTING)); + assertThat(stats.getNode(), is(nullValue())); + assertThat(stats.getAssignmentExplanation(), containsString("persistent task is awaiting node assignment")); + }); + stopAnalytics(id); + waitUntilAnalyticsIsStopped(id); + + assertThatAuditMessagesMatch(id, + "Created analytics with analysis type [outlier_detection]", + "Estimated memory usage for this analytics to be", + "No node found to start analytics. Reasons [persistent task is awaiting node assignment.]", + "Started analytics", + "Stopped analytics"); + } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47612") public void testOutlierDetectionStopAndRestart() throws Exception { String sourceIndex = "test-outlier-detection-stop-and-restart"; @@ -634,6 +697,7 @@ public void testOutlierDetectionWithCustomParams() throws Exception { assertThatAuditMessagesMatch(id, "Created analytics with analysis type [outlier_detection]", "Estimated memory usage for this analytics to be", + "Starting analytics on node", "Started analytics", "Creating destination index [test-outlier-detection-with-custom-params-results]", "Finished reindexing to destination index [test-outlier-detection-with-custom-params-results]", diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 277d7018e8d4e..7522177b159fc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -584,6 +584,8 @@ public Collection createComponents(Client client, ClusterService cluster this.memoryTracker.set(memoryTracker); MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(clusterService, datafeedManager, mlController, autodetectProcessManager, memoryTracker); + MlAssignmentNotifier mlAssignmentNotifier = new MlAssignmentNotifier(anomalyDetectionAuditor, dataFrameAnalyticsAuditor, threadPool, + new MlConfigMigrator(settings, client, clusterService), clusterService); // this object registers as a license state listener, and is never removed, so there's no need to retain another reference to it final InvalidLicenseEnforcer enforcer = @@ -603,12 +605,12 @@ public Collection createComponents(Client client, ClusterService cluster jobManager, jobManagerHolder, autodetectProcessManager, - new MlInitializationService(settings, threadPool, clusterService, client), + new MlInitializationService(settings, threadPool, clusterService, client, mlAssignmentNotifier), jobDataCountsPersister, datafeedManager, anomalyDetectionAuditor, dataFrameAnalyticsAuditor, - new MlAssignmentNotifier(settings, anomalyDetectionAuditor, threadPool, client, clusterService), + mlAssignmentNotifier, memoryTracker, analyticsProcessManager, memoryEstimationProcessManager, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java index 6ce313f95396f..210a1544fcdda 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java @@ -8,20 +8,21 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; +import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; import java.util.Objects; @@ -29,21 +30,15 @@ public class MlAssignmentNotifier implements ClusterStateListener { private static final Logger logger = LogManager.getLogger(MlAssignmentNotifier.class); - private final AnomalyDetectionAuditor auditor; + private final AnomalyDetectionAuditor anomalyDetectionAuditor; + private final DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor; private final MlConfigMigrator mlConfigMigrator; private final ThreadPool threadPool; - MlAssignmentNotifier(Settings settings, AnomalyDetectionAuditor auditor, ThreadPool threadPool, Client client, - ClusterService clusterService) { - this.auditor = auditor; - this.mlConfigMigrator = new MlConfigMigrator(settings, client, clusterService); - this.threadPool = threadPool; - clusterService.addListener(this); - } - - MlAssignmentNotifier(AnomalyDetectionAuditor auditor, ThreadPool threadPool, MlConfigMigrator mlConfigMigrator, - ClusterService clusterService) { - this.auditor = auditor; + MlAssignmentNotifier(AnomalyDetectionAuditor anomalyDetectionAuditor, DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor, + ThreadPool threadPool, MlConfigMigrator mlConfigMigrator, ClusterService clusterService) { + this.anomalyDetectionAuditor = anomalyDetectionAuditor; + this.dataFrameAnalyticsAuditor = dataFrameAnalyticsAuditor; this.mlConfigMigrator = mlConfigMigrator; this.threadPool = threadPool; clusterService.addListener(this); @@ -75,44 +70,76 @@ private void auditChangesToMlTasks(ClusterChangedEvent event) { return; } - PersistentTasksCustomMetaData previous = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - PersistentTasksCustomMetaData current = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + PersistentTasksCustomMetaData previousTasks = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + PersistentTasksCustomMetaData currentTasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - if (Objects.equals(previous, current)) { + if (Objects.equals(previousTasks, currentTasks)) { return; } - for (PersistentTask currentTask : current.tasks()) { + auditMlTasks(event.state().nodes(), previousTasks, currentTasks, false); + } + + /** + * Creates an audit warning for all currently unassigned ML + * tasks, even if a previous audit warning has been created. + * Care must be taken not to call this method frequently. + */ + public void auditUnassignedMlTasks(DiscoveryNodes nodes, PersistentTasksCustomMetaData tasks) { + auditMlTasks(nodes, tasks, tasks, true); + } + + private void auditMlTasks(DiscoveryNodes nodes, PersistentTasksCustomMetaData previousTasks, PersistentTasksCustomMetaData currentTasks, + boolean alwaysAuditUnassigned) { + + for (PersistentTask currentTask : currentTasks.tasks()) { Assignment currentAssignment = currentTask.getAssignment(); - PersistentTask previousTask = previous != null ? previous.getTask(currentTask.getId()) : null; + PersistentTask previousTask = previousTasks != null ? previousTasks.getTask(currentTask.getId()) : null; Assignment previousAssignment = previousTask != null ? previousTask.getAssignment() : null; - if (Objects.equals(currentAssignment, previousAssignment)) { + + boolean isTaskAssigned = (currentAssignment.getExecutorNode() != null); + if (Objects.equals(currentAssignment, previousAssignment) && + (isTaskAssigned || alwaysAuditUnassigned == false)) { continue; } + if (MlTasks.JOB_TASK_NAME.equals(currentTask.getTaskName())) { String jobId = ((OpenJobAction.JobParams) currentTask.getParams()).getJobId(); - if (currentAssignment.getExecutorNode() == null) { - auditor.warning(jobId, "No node found to open job. Reasons [" + currentAssignment.getExplanation() + "]"); + if (isTaskAssigned) { + DiscoveryNode node = nodes.get(currentAssignment.getExecutorNode()); + anomalyDetectionAuditor.info(jobId, "Opening job on node [" + node.toString() + "]"); } else { - DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode()); - auditor.info(jobId, "Opening job on node [" + node.toString() + "]"); + anomalyDetectionAuditor.warning(jobId, + "No node found to open job. Reasons [" + currentAssignment.getExplanation() + "]"); } } else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) { StartDatafeedAction.DatafeedParams datafeedParams = (StartDatafeedAction.DatafeedParams) currentTask.getParams(); String jobId = datafeedParams.getJobId(); - if (currentAssignment.getExecutorNode() == null) { - String msg = "No node found to start datafeed [" + datafeedParams.getDatafeedId() +"]. Reasons [" + - currentAssignment.getExplanation() + "]"; - logger.warn("[{}] {}", jobId, msg); + if (isTaskAssigned) { + DiscoveryNode node = nodes.get(currentAssignment.getExecutorNode()); if (jobId != null) { - auditor.warning(jobId, msg); + anomalyDetectionAuditor.info(jobId, + "Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + node + "]"); } } else { - DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode()); + String msg = "No node found to start datafeed [" + datafeedParams.getDatafeedId() +"]. Reasons [" + + currentAssignment.getExplanation() + "]"; + if (alwaysAuditUnassigned == false) { + logger.warn("[{}] {}", jobId, msg); + } if (jobId != null) { - auditor.info(jobId, "Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + node + "]"); + anomalyDetectionAuditor.warning(jobId, msg); } } + } else if (MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME.equals(currentTask.getTaskName())) { + String id = ((StartDataFrameAnalyticsAction.TaskParams) currentTask.getParams()).getId(); + if (isTaskAssigned) { + DiscoveryNode node = nodes.get(currentAssignment.getExecutorNode()); + dataFrameAnalyticsAuditor.info(id, "Starting analytics on node [" + node.toString() + "]"); + } else { + dataFrameAnalyticsAuditor.warning(id, + "No node found to start analytics. Reasons [" + currentAssignment.getExplanation() + "]"); + } } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java index 1c1a28dc8943e..d5bc84c240f10 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java @@ -92,7 +92,7 @@ public boolean jobIsEligibleForMigration(String jobId, ClusterState clusterState PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); return MlTasks.openJobIds(persistentTasks).contains(jobId) == false || - MlTasks.unallocatedJobIds(persistentTasks, clusterState.nodes()).contains(jobId); + MlTasks.unassignedJobIds(persistentTasks, clusterState.nodes()).contains(jobId); } /** @@ -119,6 +119,6 @@ public boolean datafeedIsEligibleForMigration(String datafeedId, ClusterState cl PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false - || MlTasks.unallocatedDatafeedIds(persistentTasks, clusterState.nodes()).contains(datafeedId); + || MlTasks.unassignedDatafeedIds(persistentTasks, clusterState.nodes()).contains(datafeedId); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index 219619f72ce43..94b3784a1e3e7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -295,9 +295,9 @@ public static PersistentTasksCustomMetaData rewritePersistentTaskParams(Map> unallocatedJobTasks = MlTasks.unallocatedJobTasks(currentTasks, nodes); + Collection> unallocatedJobTasks = MlTasks.unassignedJobTasks(currentTasks, nodes); Collection> unallocatedDatafeedsTasks = - MlTasks.unallocatedDatafeedTasks(currentTasks, nodes); + MlTasks.unassignedDatafeedTasks(currentTasks, nodes); if (unallocatedJobTasks.isEmpty() && unallocatedDatafeedsTasks.isEmpty()) { return currentTasks; @@ -549,7 +549,7 @@ public static List nonDeletingJobs(List jobs) { public static List closedOrUnallocatedJobs(ClusterState clusterState) { PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); Set openJobIds = MlTasks.openJobIds(persistentTasks); - openJobIds.removeAll(MlTasks.unallocatedJobIds(persistentTasks, clusterState.nodes())); + openJobIds.removeAll(MlTasks.unassignedJobIds(persistentTasks, clusterState.nodes())); MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); return mlMetadata.getJobs().values().stream() @@ -569,7 +569,7 @@ public static List closedOrUnallocatedJobs(ClusterState clusterState) { public static List stopppedOrUnallocatedDatafeeds(ClusterState clusterState) { PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); Set startedDatafeedIds = MlTasks.startedDatafeedIds(persistentTasks); - startedDatafeedIds.removeAll(MlTasks.unallocatedDatafeedIds(persistentTasks, clusterState.nodes())); + startedDatafeedIds.removeAll(MlTasks.unassignedDatafeedIds(persistentTasks, clusterState.nodes())); MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); return mlMetadata.getDatafeeds().values().stream() diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index f8cc3fd35a644..21be7b1b7cf08 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -10,9 +10,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; @@ -37,6 +40,8 @@ public class MlDailyMaintenanceService implements Releasable { private final ThreadPool threadPool; private final Client client; + private final ClusterService clusterService; + private final MlAssignmentNotifier mlAssignmentNotifier; /** * An interface to abstract the calculation of the delay to the next execution. @@ -46,14 +51,18 @@ public class MlDailyMaintenanceService implements Releasable { private volatile Scheduler.Cancellable cancellable; - MlDailyMaintenanceService(ThreadPool threadPool, Client client, Supplier scheduleProvider) { + MlDailyMaintenanceService(ThreadPool threadPool, Client client, ClusterService clusterService, + MlAssignmentNotifier mlAssignmentNotifier, Supplier scheduleProvider) { this.threadPool = Objects.requireNonNull(threadPool); this.client = Objects.requireNonNull(client); + this.clusterService = Objects.requireNonNull(clusterService); + this.mlAssignmentNotifier = Objects.requireNonNull(mlAssignmentNotifier); this.schedulerProvider = Objects.requireNonNull(scheduleProvider); } - public MlDailyMaintenanceService(ClusterName clusterName, ThreadPool threadPool, Client client) { - this(threadPool, client, () -> delayToNextTime(clusterName)); + public MlDailyMaintenanceService(ClusterName clusterName, ThreadPool threadPool, Client client, ClusterService clusterService, + MlAssignmentNotifier mlAssignmentNotifier) { + this(threadPool, client, clusterService, mlAssignmentNotifier, () -> delayToNextTime(clusterName)); } /** @@ -113,17 +122,34 @@ private synchronized void scheduleNext() { } private void triggerTasks() { - LOGGER.info("triggering scheduled [ML] maintenance tasks"); - executeAsyncWithOrigin(client, ML_ORIGIN, DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request(), + try { + LOGGER.info("triggering scheduled [ML] maintenance tasks"); + executeAsyncWithOrigin(client, ML_ORIGIN, DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request(), ActionListener.wrap( - response -> { - if (response.isDeleted()) { - LOGGER.info("Successfully completed [ML] maintenance tasks"); - } else { - LOGGER.info("Halting [ML] maintenance tasks before completion as elapsed time is too great"); - } - }, - e -> LOGGER.error("An error occurred during maintenance tasks execution", e))); - scheduleNext(); + response -> { + if (response.isDeleted()) { + LOGGER.info("Successfully completed [ML] maintenance tasks"); + } else { + LOGGER.info("Halting [ML] maintenance tasks before completion as elapsed time is too great"); + } + }, + e -> LOGGER.error("An error occurred during maintenance tasks execution", e))); + auditUnassignedMlTasks(clusterService.state()); + } finally { + scheduleNext(); + } + } + + /** + * The idea of this is that if tasks are unassigned for days on end then they'll get a duplicate + * audit warning every day, and that will mean they'll permanently have a yellow triangle next + * to their entries in the UI jobs list. (This functionality may need revisiting if the condition + * for displaying a yellow triangle in the UI jobs list changes.) + */ + private void auditUnassignedMlTasks(ClusterState state) { + PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + if (tasks != null) { + mlAssignmentNotifier.auditUnassignedMlTasks(state.nodes(), tasks); + } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java index 89ee978ca79e7..ac8d9f63849e9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java @@ -19,6 +19,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; class MlInitializationService implements LocalNodeMasterListener, ClusterStateListener { @@ -29,15 +30,18 @@ class MlInitializationService implements LocalNodeMasterListener, ClusterStateLi private final ThreadPool threadPool; private final ClusterService clusterService; private final Client client; + private final MlAssignmentNotifier mlAssignmentNotifier; private final AtomicBoolean isIndexCreationInProgress = new AtomicBoolean(false); private volatile MlDailyMaintenanceService mlDailyMaintenanceService; - MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client) { - this.settings = settings; - this.threadPool = threadPool; - this.clusterService = clusterService; - this.client = client; + MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client, + MlAssignmentNotifier mlAssignmentNotifier) { + this.settings = Objects.requireNonNull(settings); + this.threadPool = Objects.requireNonNull(threadPool); + this.clusterService = Objects.requireNonNull(clusterService); + this.client = Objects.requireNonNull(client); + this.mlAssignmentNotifier = Objects.requireNonNull(mlAssignmentNotifier); clusterService.addListener(this); clusterService.addLocalNodeMasterListener(this); } @@ -83,7 +87,8 @@ public String executorName() { private synchronized void installDailyMaintenanceService() { if (mlDailyMaintenanceService == null) { - mlDailyMaintenanceService = new MlDailyMaintenanceService(clusterService.getClusterName(), threadPool, client); + mlDailyMaintenanceService = + new MlDailyMaintenanceService(clusterService.getClusterName(), threadPool, client, clusterService, mlAssignmentNotifier); mlDailyMaintenanceService.start(); clusterService.addLifecycleListener(new LifecycleListener() { @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index 9ed0210aa9a9a..bd3f2d205d32a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -116,30 +116,43 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen return; } - if (request.isForce() == false) { + if (request.isForce()) { + List jobIdsToForceClose = new ArrayList<>(response.openJobIds); + jobIdsToForceClose.addAll(response.closingJobIds); + forceCloseJob(state, request, jobIdsToForceClose, listener); + } else { Set executorNodes = new HashSet<>(); PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); for (String resolvedJobId : request.getOpenJobIds()) { PersistentTasksCustomMetaData.PersistentTask jobTask = - MlTasks.getJobTask(resolvedJobId, tasks); - - if (jobTask == null || jobTask.isAssigned() == false) { - String message = "Cannot close job [" + resolvedJobId + "] because the job does not have " - + "an assigned node. Use force close to close the job"; - listener.onFailure(ExceptionsHelper.conflictStatusException(message)); - return; - } else { + MlTasks.getJobTask(resolvedJobId, tasks); + + if (jobTask == null) { + // This should not happen, because openJobIds was + // derived from the same tasks metadata as jobTask + String msg = "Requested job [" + resolvedJobId + + "] be stopped, but job's task could not be found."; + assert jobTask != null : msg; + logger.error(msg); + } else if (jobTask.isAssigned()) { executorNodes.add(jobTask.getExecutorNode()); + } else { + // This is the easy case - the job is not currently assigned to a node, so can + // be gracefully stopped simply by removing its persistent task. (Usually a + // graceful stop cannot be achieved by simply removing the persistent task, but + // if the job has no running code then graceful/forceful are basically the same.) + // The listener here can be a no-op, as waitForJobClosed() already waits for + // these persistent tasks to disappear. + persistentTasksService.sendRemoveRequest(jobTask.getId(), + ActionListener.wrap( + r -> logger.trace("[{}] removed task to close unassigned job", resolvedJobId), + e -> logger.error("[" + resolvedJobId + + "] failed to remove task to close unassigned job", e) + )); } } - request.setNodes(executorNodes.toArray(new String[executorNodes.size()])); - } + request.setNodes(executorNodes.toArray(new String[0])); - if (request.isForce()) { - List jobIdsToForceClose = new ArrayList<>(response.openJobIds); - jobIdsToForceClose.addAll(response.closingJobIds); - forceCloseJob(state, request, jobIdsToForceClose, listener); - } else { normalCloseJob(state, task, request, response.openJobIds, response.closingJobIds, listener); } }, @@ -148,7 +161,6 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen }, listener::onFailure )); - } } @@ -353,21 +365,20 @@ public void onFailure(Exception e) { private void sendResponseOrFailure(String jobId, ActionListener listener, AtomicArray failures) { - List catchedExceptions = failures.asList(); - if (catchedExceptions.size() == 0) { + List caughtExceptions = failures.asList(); + if (caughtExceptions.size() == 0) { listener.onResponse(new CloseJobAction.Response(true)); return; } String msg = "Failed to force close job [" + jobId + "] with [" - + catchedExceptions.size() + + caughtExceptions.size() + "] failures, rethrowing last, all Exceptions: [" - + catchedExceptions.stream().map(Exception::getMessage) + + caughtExceptions.stream().map(Exception::getMessage) .collect(Collectors.joining(", ")) + "]"; - ElasticsearchException e = new ElasticsearchException(msg, - catchedExceptions.get(0)); + ElasticsearchException e = new ElasticsearchException(msg, caughtExceptions.get(0)); listener.onFailure(e); } }); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 332bcb79bebdb..c9b07fadb2408 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -376,7 +376,8 @@ public PersistentTasksCustomMetaData.Assignment getAssignment(OpenJobAction.JobP // If the task parameters do not have a job field then the job // was first opened on a pre v6.6 node and has not been migrated - if (params.getJob() == null) { + Job job = params.getJob(); + if (job == null) { return AWAITING_MIGRATION; } @@ -405,9 +406,8 @@ public PersistentTasksCustomMetaData.Assignment getAssignment(OpenJobAction.JobP } } - Job job = params.getJob(); JobNodeSelector jobNodeSelector = new JobNodeSelector(clusterState, jobId, MlTasks.JOB_TASK_NAME, memoryTracker, - maxLazyMLNodes, node -> nodeFilter(node, job)); + job.allowLazyOpen() ? Integer.MAX_VALUE : maxLazyMLNodes, node -> nodeFilter(node, job)); return jobNodeSelector.selectNode( maxOpenJobs, maxConcurrentJobAllocations, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed); } @@ -558,6 +558,11 @@ public boolean test(PersistentTasksCustomMetaData.PersistentTask persistentTa } } switch (jobState) { + // The OPENING case here is expected to be incredibly short-lived, just occurring during the + // time period when a job has successfully been assigned to a node but the request to update + // its task state is still in-flight. (The long-lived OPENING case when a lazy node needs to + // be added to the cluster to accommodate the job was dealt with higher up this method when the + // magic AWAITING_LAZY_ASSIGNMENT assignment was checked for.) case OPENING: case CLOSED: return false; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index b0e8bdd2810ae..46bec44ea562a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -166,7 +166,8 @@ public void onFailure(Exception e) { ActionListener memoryUsageHandledListener = ActionListener.wrap( startContext -> { StartDataFrameAnalyticsAction.TaskParams taskParams = new StartDataFrameAnalyticsAction.TaskParams( - request.getId(), startContext.config.getVersion(), startContext.progressOnStart); + request.getId(), startContext.config.getVersion(), startContext.progressOnStart, + startContext.config.isAllowLazyStart()); persistentTasksService.sendStartRequest(MlTasks.dataFrameAnalyticsTaskId(request.getId()), MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, taskParams, waitForAnalyticsToStart); }, @@ -432,6 +433,12 @@ public boolean test(PersistentTasksCustomMetaData.PersistentTask persistentTa case STOPPING: exception = ExceptionsHelper.conflictStatusException("the task has been stopped while waiting to be started"); return true; + // The STARTING case here is expected to be incredibly short-lived, just occurring during the + // time period when a job has successfully been assigned to a node but the request to update + // its task state is still in-flight. (The long-lived STARTING case when a lazy node needs to + // be added to the cluster to accommodate the job was dealt with higher up this method when the + // magic AWAITING_LAZY_ASSIGNMENT assignment was checked for.) + case STARTING: case STOPPED: return false; case FAILED: @@ -548,7 +555,7 @@ public PersistentTasksCustomMetaData.Assignment getAssignment(StartDataFrameAnal } JobNodeSelector jobNodeSelector = new JobNodeSelector(clusterState, id, MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, - maxLazyMLNodes, node -> nodeFilter(node, id)); + params.isAllowLazyStart() ? Integer.MAX_VALUE : maxLazyMLNodes, node -> nodeFilter(node, id)); // Pass an effectively infinite value for max concurrent opening jobs, because data frame analytics jobs do // not have an "opening" state so would never be rejected for causing too many jobs in the "opening" state return jobNodeSelector.selectNode( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java index 58b63413871a8..2d0f3adfd14ab 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java @@ -140,6 +140,7 @@ private static void sortAnalyticsByTaskState(Set analyticsIds, Persisten Set failedAnalytics) { for (String analyticsId : analyticsIds) { switch (MlTasks.getDataFrameAnalyticsState(analyticsId, tasks)) { + case STARTING: case STARTED: case REINDEXING: case ANALYZING: diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java index e4384138056f1..45a9090b0b856 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; +import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; import org.junit.Before; import java.net.InetAddress; @@ -38,7 +39,8 @@ public class MlAssignmentNotifierTests extends ESTestCase { - private AnomalyDetectionAuditor auditor; + private AnomalyDetectionAuditor anomalyDetectionAuditor; + private DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor; private ClusterService clusterService; private ThreadPool threadPool; private MlConfigMigrator configMigrator; @@ -46,7 +48,8 @@ public class MlAssignmentNotifierTests extends ESTestCase { @Before @SuppressWarnings("unchecked") private void setupMocks() { - auditor = mock(AnomalyDetectionAuditor.class); + anomalyDetectionAuditor = mock(AnomalyDetectionAuditor.class); + dataFrameAnalyticsAuditor = mock(DataFrameAnalyticsAuditor.class); clusterService = mock(ClusterService.class); threadPool = mock(ThreadPool.class); configMigrator = mock(MlConfigMigrator.class); @@ -67,7 +70,8 @@ private void setupMocks() { } public void testClusterChanged_info() { - MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService); + MlAssignmentNotifier notifier = new MlAssignmentNotifier(anomalyDetectionAuditor, dataFrameAnalyticsAuditor, threadPool, + configMigrator, clusterService); ClusterState previous = ClusterState.builder(new ClusterName("_name")) .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, @@ -86,7 +90,7 @@ public void testClusterChanged_info() { .masterNodeId("_node_id")) .build(); notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); - verify(auditor, times(1)).info(eq("job_id"), any()); + verify(anomalyDetectionAuditor, times(1)).info(eq("job_id"), any()); verify(configMigrator, times(1)).migrateConfigs(eq(newState), any()); // no longer master @@ -96,11 +100,12 @@ public void testClusterChanged_info() { .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT))) .build(); notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); - verifyNoMoreInteractions(auditor); + verifyNoMoreInteractions(anomalyDetectionAuditor); } public void testClusterChanged_warning() { - MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService); + MlAssignmentNotifier notifier = new MlAssignmentNotifier(anomalyDetectionAuditor, dataFrameAnalyticsAuditor, threadPool, + configMigrator, clusterService); ClusterState previous = ClusterState.builder(new ClusterName("_name")) .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, @@ -119,7 +124,7 @@ public void testClusterChanged_warning() { .masterNodeId("_node_id")) .build(); notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); - verify(auditor, times(1)).warning(eq("job_id"), any()); + verify(anomalyDetectionAuditor, times(1)).warning(eq("job_id"), any()); verify(configMigrator, times(1)).migrateConfigs(eq(newState), any()); // no longer master @@ -130,11 +135,12 @@ public void testClusterChanged_warning() { .build(); notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); - verifyNoMoreInteractions(auditor); + verifyNoMoreInteractions(anomalyDetectionAuditor); } public void testClusterChanged_noPersistentTaskChanges() { - MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService); + MlAssignmentNotifier notifier = new MlAssignmentNotifier(anomalyDetectionAuditor, dataFrameAnalyticsAuditor, threadPool, + configMigrator, clusterService); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", null, null, tasksBuilder); @@ -154,7 +160,7 @@ public void testClusterChanged_noPersistentTaskChanges() { notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); verify(configMigrator, times(1)).migrateConfigs(any(), any()); - verifyNoMoreInteractions(auditor); + verifyNoMoreInteractions(anomalyDetectionAuditor); // no longer master newState = ClusterState.builder(new ClusterName("_name")) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java similarity index 61% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java index 4d2b77561296e..da753fff94eff 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java @@ -6,7 +6,13 @@ package org.elasticsearch.xpack.ml; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -24,20 +30,29 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class MlDailyManagementServiceTests extends ESTestCase { +public class MlDailyMaintenanceServiceTests extends ESTestCase { private ThreadPool threadPool; private Client client; + private ClusterService clusterService; + private MlAssignmentNotifier mlAssignmentNotifier; @Before public void setUpTests() { - threadPool = new TestThreadPool("MlDailyManagementServiceTests"); + threadPool = new TestThreadPool("MlDailyMaintenanceServiceTests"); client = mock(Client.class); when(client.threadPool()).thenReturn(threadPool); + clusterService = mock(ClusterService.class); + ClusterState state = ClusterState.builder(new ClusterName("MlDailyMaintenanceServiceTests")) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData.builder().build())) + .nodes(DiscoveryNodes.builder().build()) + .build(); + when(clusterService.state()).thenReturn(state); + mlAssignmentNotifier = mock(MlAssignmentNotifier.class); } @After - public void stop() throws InterruptedException { + public void stop() { terminate(threadPool); } @@ -50,12 +65,13 @@ public void testScheduledTriggering() throws InterruptedException { } verify(client, Mockito.atLeast(triggerCount - 1)).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any()); + verify(mlAssignmentNotifier, Mockito.atLeast(triggerCount - 1)).auditUnassignedMlTasks(any(), any()); } private MlDailyMaintenanceService createService(CountDownLatch latch, Client client) { - return new MlDailyMaintenanceService(threadPool, client, () -> { + return new MlDailyMaintenanceService(threadPool, client, clusterService, mlAssignmentNotifier, () -> { latch.countDown(); return TimeValue.timeValueMillis(100); }); } -} \ No newline at end of file +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java index a71e06bddef67..860f461327537 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java @@ -32,6 +32,7 @@ public class MlInitializationServiceTests extends ESTestCase { private ExecutorService executorService; private ClusterService clusterService; private Client client; + private MlAssignmentNotifier mlAssignmentNotifier; @Before public void setUpMocks() { @@ -39,6 +40,7 @@ public void setUpMocks() { executorService = mock(ExecutorService.class); clusterService = mock(ClusterService.class); client = mock(Client.class); + mlAssignmentNotifier = mock(MlAssignmentNotifier.class); doAnswer(invocation -> { ((Runnable) invocation.getArguments()[0]).run(); @@ -53,19 +55,22 @@ public void setUpMocks() { } public void testInitialize() { - MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); + MlInitializationService initializationService = + new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, mlAssignmentNotifier); initializationService.onMaster(); assertThat(initializationService.getDailyMaintenanceService().isStarted(), is(true)); } public void testInitialize_noMasterNode() { - MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); + MlInitializationService initializationService = + new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, mlAssignmentNotifier); initializationService.offMaster(); assertThat(initializationService.getDailyMaintenanceService(), is(nullValue())); } public void testInitialize_alreadyInitialized() { - MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); + MlInitializationService initializationService = + new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, mlAssignmentNotifier); MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class); initializationService.setDailyMaintenanceService(initialDailyMaintenanceService); initializationService.onMaster(); @@ -74,7 +79,8 @@ public void testInitialize_alreadyInitialized() { } public void testNodeGoesFromMasterToNonMasterAndBack() { - MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); + MlInitializationService initializationService = + new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, mlAssignmentNotifier); MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class); initializationService.setDailyMaintenanceService(initialDailyMaintenanceService); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index b4bad2e894953..2809d7e57efc1 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -49,6 +49,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; import org.elasticsearch.xpack.core.ml.notifications.AuditorField; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.JobNodeSelector; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; @@ -186,6 +187,35 @@ public void testGetAssignment_GivenUnavailableIndicesWithLazyNode() { executor.getAssignment(params, csBuilder.build()).getExplanation()); } + public void testGetAssignment_GivenLazyJobAndNoGlobalLazyNodes() { + Settings settings = Settings.builder().put(MachineLearning.MAX_LAZY_ML_NODES.getKey(), 0).build(); + ClusterService clusterService = mock(ClusterService.class); + ClusterSettings clusterSettings = new ClusterSettings(settings, + Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT, + MachineLearning.MAX_LAZY_ML_NODES, MachineLearning.MAX_OPEN_JOBS_PER_NODE) + ); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addIndices(metaData, routingTable); + csBuilder.metaData(metaData); + csBuilder.routingTable(routingTable.build()); + + TransportOpenJobAction.OpenJobPersistentTasksExecutor executor = new TransportOpenJobAction.OpenJobPersistentTasksExecutor( + settings, clusterService, mock(AutodetectProcessManager.class), mock(MlMemoryTracker.class), mock(Client.class)); + + Job job = mock(Job.class); + when(job.allowLazyOpen()).thenReturn(true); + OpenJobAction.JobParams params = new OpenJobAction.JobParams("lazy_job"); + params.setJob(job); + Assignment assignment = executor.getAssignment(params, csBuilder.build()); + assertNotNull(assignment); + assertNull(assignment.getExecutorNode()); + assertEquals(JobNodeSelector.AWAITING_LAZY_ASSIGNMENT.getExplanation(), assignment.getExplanation()); + } + public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder) { addJobTask(jobId, nodeId, jobState, builder, false); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsActionTests.java index bff0c83726ac2..1fd2ddc7b9913 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsActionTests.java @@ -27,6 +27,7 @@ public class TransportStopDataFrameAnalyticsActionTests extends ESTestCase { public void testFindAnalyticsToStop_GivenOneFailedTaskAndNotForce() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addAnalyticsTask(tasksBuilder, "starting", "foo-node", null); addAnalyticsTask(tasksBuilder, "started", "foo-node", DataFrameAnalyticsState.STARTED); addAnalyticsTask(tasksBuilder, "reindexing", "foo-node", DataFrameAnalyticsState.REINDEXING); addAnalyticsTask(tasksBuilder, "analyzing", "foo-node", DataFrameAnalyticsState.ANALYZING); @@ -34,7 +35,7 @@ public void testFindAnalyticsToStop_GivenOneFailedTaskAndNotForce() { addAnalyticsTask(tasksBuilder, "stopped", "foo-node", DataFrameAnalyticsState.STOPPED); addAnalyticsTask(tasksBuilder, "failed", "foo-node", DataFrameAnalyticsState.FAILED); - Set ids = new HashSet<>(Arrays.asList("started", "reindexing", "analyzing", "stopping", "stopped", "failed")); + Set ids = new HashSet<>(Arrays.asList("starting", "started", "reindexing", "analyzing", "stopping", "stopped", "failed")); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> TransportStopDataFrameAnalyticsAction.findAnalyticsToStop(tasksBuilder.build(), ids, false)); @@ -59,6 +60,7 @@ public void testFindAnalyticsToStop_GivenTwoFailedTasksAndNotForce() { public void testFindAnalyticsToStop_GivenFailedTaskAndForce() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addAnalyticsTask(tasksBuilder, "starting", "foo-node", null); addAnalyticsTask(tasksBuilder, "started", "foo-node", DataFrameAnalyticsState.STARTED); addAnalyticsTask(tasksBuilder, "reindexing", "foo-node", DataFrameAnalyticsState.REINDEXING); addAnalyticsTask(tasksBuilder, "analyzing", "foo-node", DataFrameAnalyticsState.ANALYZING); @@ -66,21 +68,27 @@ public void testFindAnalyticsToStop_GivenFailedTaskAndForce() { addAnalyticsTask(tasksBuilder, "stopped", "foo-node", DataFrameAnalyticsState.STOPPED); addAnalyticsTask(tasksBuilder, "failed", "foo-node", DataFrameAnalyticsState.FAILED); - Set ids = new HashSet<>(Arrays.asList("started", "reindexing", "analyzing", "stopping", "stopped", "failed")); + Set ids = new HashSet<>(Arrays.asList("starting", "started", "reindexing", "analyzing", "stopping", "stopped", "failed")); Set analyticsToStop = TransportStopDataFrameAnalyticsAction.findAnalyticsToStop(tasksBuilder.build(), ids, true); - assertThat(analyticsToStop, containsInAnyOrder("started", "reindexing", "analyzing", "failed")); + assertThat(analyticsToStop, containsInAnyOrder("starting", "started", "reindexing", "analyzing", "failed")); } private static void addAnalyticsTask(PersistentTasksCustomMetaData.Builder builder, String analyticsId, String nodeId, DataFrameAnalyticsState state) { + addAnalyticsTask(builder, analyticsId, nodeId, state, false); + } + + private static void addAnalyticsTask(PersistentTasksCustomMetaData.Builder builder, String analyticsId, String nodeId, + DataFrameAnalyticsState state, boolean allowLazyStart) { builder.addTask(MlTasks.dataFrameAnalyticsTaskId(analyticsId), MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, - new StartDataFrameAnalyticsAction.TaskParams(analyticsId, Version.CURRENT, Collections.emptyList()), + new StartDataFrameAnalyticsAction.TaskParams(analyticsId, Version.CURRENT, Collections.emptyList(), allowLazyStart), new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment")); - builder.updateTaskState(MlTasks.dataFrameAnalyticsTaskId(analyticsId), - new DataFrameAnalyticsTaskState(state, builder.getLastAllocationId(), null)); - + if (state != null) { + builder.updateTaskState(MlTasks.dataFrameAnalyticsTaskId(analyticsId), + new DataFrameAnalyticsTaskState(state, builder.getLastAllocationId(), null)); + } } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java index ab65f8d8c7938..296139b01d2a9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java @@ -611,13 +611,13 @@ private ClusterState.Builder fillNodesWithRunningJobs(Map nodeAt static void addDataFrameAnalyticsJobTask(String id, String nodeId, DataFrameAnalyticsState state, PersistentTasksCustomMetaData.Builder builder) { - addDataFrameAnalyticsJobTask(id, nodeId, state, builder, false); + addDataFrameAnalyticsJobTask(id, nodeId, state, builder, false, false); } static void addDataFrameAnalyticsJobTask(String id, String nodeId, DataFrameAnalyticsState state, - PersistentTasksCustomMetaData.Builder builder, boolean isStale) { + PersistentTasksCustomMetaData.Builder builder, boolean isStale, boolean allowLazyStart) { builder.addTask(MlTasks.dataFrameAnalyticsTaskId(id), MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, - new StartDataFrameAnalyticsAction.TaskParams(id, Version.CURRENT, Collections.emptyList()), + new StartDataFrameAnalyticsAction.TaskParams(id, Version.CURRENT, Collections.emptyList(), allowLazyStart), new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment")); if (state != null) { builder.updateTaskState(MlTasks.dataFrameAnalyticsTaskId(id), diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java index 7a37ba2fa60ce..3ca9bd0c54ca9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java @@ -101,7 +101,7 @@ public void testRefreshAll() { for (int i = 1; i <= numDataFrameAnalyticsTasks; ++i) { String id = "analytics" + i; allIds.add(id); - PersistentTasksCustomMetaData.PersistentTask task = makeTestDataFrameAnalyticsTask(id); + PersistentTasksCustomMetaData.PersistentTask task = makeTestDataFrameAnalyticsTask(id, false); tasks.put(task.getId(), task); } @@ -142,7 +142,7 @@ public void testRefreshAllFailure() { int numDataFrameAnalyticsTasks = randomIntBetween(2, 5); for (int i = 1; i <= numDataFrameAnalyticsTasks; ++i) { String id = "analytics" + i; - PersistentTasksCustomMetaData.PersistentTask task = makeTestDataFrameAnalyticsTask(id); + PersistentTasksCustomMetaData.PersistentTask task = makeTestDataFrameAnalyticsTask(id, false); tasks.put(task.getId(), task); } @@ -261,9 +261,10 @@ private PersistentTasksCustomMetaData.PersistentTask ma } private - PersistentTasksCustomMetaData.PersistentTask makeTestDataFrameAnalyticsTask(String id) { + PersistentTasksCustomMetaData.PersistentTask + makeTestDataFrameAnalyticsTask(String id, boolean allowLazyStart) { return new PersistentTasksCustomMetaData.PersistentTask<>(MlTasks.dataFrameAnalyticsTaskId(id), MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, new StartDataFrameAnalyticsAction.TaskParams(id, Version.CURRENT, - Collections.emptyList()), 0, PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT); + Collections.emptyList(), allowLazyStart), 0, PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT); } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml index eb6186c3a64d6..9f32ff22dedf2 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml @@ -113,8 +113,9 @@ - match: { analysis_limits.model_memory_limit: "2048mb" } --- -"Test put job with model_memory_limit as string": - +"Test put job with model_memory_limit as string and lazy open": + - skip: + features: headers - do: ml.put_job: job_id: job-model-memory-limit-as-string @@ -126,11 +127,42 @@ "data_description" : { }, "analysis_limits": { - "model_memory_limit": "3g" - } + "model_memory_limit": "3000g" + }, + "allow_lazy_open": true } - match: { job_id: "job-model-memory-limit-as-string" } - - match: { analysis_limits.model_memory_limit: "3072mb" } + - match: { analysis_limits.model_memory_limit: "3072000mb" } + + # The assumption here is that a 3000GB job will not fit on the test + # node - increase in future if the test ever fails because of this! + # But because the job is allowed to open lazily, opening it shouldn't + # throw an exception - it should wait for a big enough node to be + # added to the cluster. + - do: + ml.open_job: + job_id: job-model-memory-limit-as-string + - match: { opened: false } + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + ml.get_job_stats: + job_id: job-model-memory-limit-as-string + - match: {"jobs.0.state": opening} + + # Despite never being assigned to a node the job should close gracefully + - do: + ml.close_job: + job_id: job-model-memory-limit-as-string + - match: { closed: true } + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + ml.get_job_stats: + job_id: job-model-memory-limit-as-string + - match: {"jobs.0.state": closed} --- "Test get job API with non existing job id":