From ebab3e08fd0bb2c627bab7aee6dcd41738464606 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 1 Oct 2018 10:47:24 +0100 Subject: [PATCH 01/13] Adjust job finalise action to work with documents --- .../xpack/core/ml/job/config/JobUpdate.java | 60 +++++++++++++++--- .../core/ml/job/config/JobUpdateTests.java | 17 ++++- .../TransportFinalizeJobExecutionAction.java | 63 ++++++++----------- .../ml/action/TransportOpenJobAction.java | 2 +- 4 files changed, 94 insertions(+), 48 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java index e7f26fa83dc21..ef1c2bafbb777 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java @@ -17,10 +17,13 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils; import java.io.IOException; import java.util.Arrays; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.Objects; @@ -59,7 +62,16 @@ public class JobUpdate implements Writeable, ToXContentObject { INTERNAL_PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID); INTERNAL_PARSER.declareLong(Builder::setEstablishedModelMemory, Job.ESTABLISHED_MODEL_MEMORY); INTERNAL_PARSER.declareString(Builder::setJobVersion, Job.JOB_VERSION); - INTERNAL_PARSER.declareBoolean(Builder::setClearJobFinishTime, CLEAR_JOB_FINISH_TIME); + INTERNAL_PARSER.declareBoolean(Builder::setClearFinishTime, CLEAR_JOB_FINISH_TIME); + INTERNAL_PARSER.declareField(Builder::setFinishedTime, p -> { + if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) { + return new Date(p.longValue()); + } else if (p.currentToken() == XContentParser.Token.VALUE_STRING) { + return new Date(TimeUtils.dateStringToEpoch(p.text())); + } + throw new IllegalArgumentException( + "unexpected token [" + p.currentToken() + "] for [" + Job.FINISHED_TIME.getPreferredName() + "]"); + }, Job.FINISHED_TIME, ObjectParser.ValueType.VALUE); } private final String jobId; @@ -78,6 +90,7 @@ public class JobUpdate implements Writeable, ToXContentObject { private final Long establishedModelMemory; private final Version jobVersion; private final Boolean clearJobFinishTime; + private final Date finishedTime; private JobUpdate(String jobId, @Nullable List groups, @Nullable String description, @Nullable List detectorUpdates, @Nullable ModelPlotConfig modelPlotConfig, @@ -85,7 +98,8 @@ private JobUpdate(String jobId, @Nullable List groups, @Nullable String @Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays, @Nullable Long modelSnapshotRetentionDays, @Nullable List categorisationFilters, @Nullable Map customSettings, @Nullable String modelSnapshotId, - @Nullable Long establishedModelMemory, @Nullable Version jobVersion, @Nullable Boolean clearJobFinishTime) { + @Nullable Long establishedModelMemory, @Nullable Version jobVersion, + @Nullable Boolean clearJobFinishTime, @Nullable Date finishedTime) { this.jobId = jobId; this.groups = groups; this.description = description; @@ -102,6 +116,7 @@ private JobUpdate(String jobId, @Nullable List groups, @Nullable String this.establishedModelMemory = establishedModelMemory; this.jobVersion = jobVersion; this.clearJobFinishTime = clearJobFinishTime; + this.finishedTime = finishedTime; } public JobUpdate(StreamInput in) throws IOException { @@ -143,8 +158,10 @@ public JobUpdate(StreamInput in) throws IOException { } if (in.getVersion().onOrAfter(Version.CURRENT)) { clearJobFinishTime = in.readOptionalBoolean(); + finishedTime = in.readBoolean() ? new Date(in.readVLong()) : null; } else { clearJobFinishTime = null; + finishedTime = null; } } @@ -185,6 +202,12 @@ public void writeTo(StreamOutput out) throws IOException { } if (out.getVersion().onOrAfter(Version.CURRENT)) { out.writeOptionalBoolean(clearJobFinishTime); + if (finishedTime != null) { + out.writeBoolean(true); + out.writeVLong(finishedTime.getTime()); + } else { + out.writeBoolean(false); + } } } @@ -252,6 +275,10 @@ public Boolean getClearJobFinishTime() { return clearJobFinishTime; } + public Date getFinishedTime() { + return finishedTime; + } + public boolean isAutodetectProcessUpdate() { return modelPlotConfig != null || detectorUpdates != null || groups != null; } @@ -305,6 +332,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (clearJobFinishTime != null) { builder.field(CLEAR_JOB_FINISH_TIME.getPreferredName(), clearJobFinishTime); } + if (finishedTime != null) { + builder.timeField(Job.FINISHED_TIME.getPreferredName(), finishedTime); + } builder.endObject(); return builder; } @@ -433,10 +463,12 @@ public Job mergeWithJob(Job source, ByteSizeValue maxModelMemoryLimit) { if (jobVersion != null) { builder.setJobVersion(jobVersion); } - if (clearJobFinishTime != null && clearJobFinishTime) { builder.setFinishedTime(null); } + if (finishedTime != null) { + builder.setFinishedTime(finishedTime); + } builder.setAnalysisConfig(newAnalysisConfig); return builder.build(); @@ -458,7 +490,8 @@ && updatesDetectors(job) == false && (modelSnapshotId == null || Objects.equals(modelSnapshotId, job.getModelSnapshotId())) && (establishedModelMemory == null || Objects.equals(establishedModelMemory, job.getEstablishedModelMemory())) && (jobVersion == null || Objects.equals(jobVersion, job.getJobVersion())) - && ((clearJobFinishTime == null || clearJobFinishTime == false) || job.getFinishedTime() == null); + && ((clearJobFinishTime == null || clearJobFinishTime == false) || job.getFinishedTime() == null) + && (finishedTime == null || Objects.equals(finishedTime, job.getFinishedTime())); } boolean updatesDetectors(Job job) { @@ -506,14 +539,15 @@ public boolean equals(Object other) { && Objects.equals(this.modelSnapshotId, that.modelSnapshotId) && Objects.equals(this.establishedModelMemory, that.establishedModelMemory) && Objects.equals(this.jobVersion, that.jobVersion) - && Objects.equals(this.clearJobFinishTime, that.clearJobFinishTime); + && Objects.equals(this.clearJobFinishTime, that.clearJobFinishTime) + && Objects.equals(this.finishedTime, that.finishedTime); } @Override public int hashCode() { return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings, - modelSnapshotId, establishedModelMemory, jobVersion, clearJobFinishTime); + modelSnapshotId, establishedModelMemory, jobVersion, clearJobFinishTime, finishedTime); } public static class DetectorUpdate implements Writeable, ToXContentObject { @@ -624,7 +658,8 @@ public static class Builder { private String modelSnapshotId; private Long establishedModelMemory; private Version jobVersion; - private Boolean clearJobFinishTime; + private Boolean clearFinishTime; + private Date finishedTime; public Builder(String jobId) { this.jobId = jobId; @@ -710,15 +745,20 @@ public Builder setJobVersion(String version) { return this; } - public Builder setClearJobFinishTime(boolean clearJobFinishTime) { - this.clearJobFinishTime = clearJobFinishTime; + public Builder setFinishedTime(Date finishedTime) { + this.finishedTime = finishedTime; + return this; + } + + public Builder setClearFinishTime(boolean clearFinishTime) { + this.clearFinishTime = clearFinishTime; return this; } public JobUpdate build() { return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval, renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings, - modelSnapshotId, establishedModelMemory, jobVersion, clearJobFinishTime); + modelSnapshotId, establishedModelMemory, jobVersion, clearFinishTime, finishedTime); } } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java index 92894de405eb4..d3b5c3af350fc 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java @@ -94,7 +94,11 @@ public JobUpdate createRandom(String jobId, @Nullable Job job) { update.setJobVersion(randomFrom(Version.CURRENT, Version.V_6_2_0, Version.V_6_1_0)); } if (useInternalParser) { - update.setClearJobFinishTime(randomBoolean()); + if (randomBoolean()) { + update.setClearFinishTime(randomBoolean()); + } else { + update.setFinishedTime(new Date()); + } } return update.build(); @@ -214,6 +218,12 @@ public void testMergeWithJob() { updateBuilder.setCustomSettings(customSettings); updateBuilder.setModelSnapshotId(randomAlphaOfLength(10)); updateBuilder.setJobVersion(Version.V_6_1_0); + boolean clearJobFinishedTime = randomBoolean(); + if (clearJobFinishedTime) { + updateBuilder.setClearFinishTime(true); + } else { + updateBuilder.setFinishedTime(new Date()); + } JobUpdate update = updateBuilder.build(); Job.Builder jobBuilder = new Job.Builder("foo"); @@ -243,6 +253,11 @@ public void testMergeWithJob() { assertEquals(update.getCustomSettings(), updatedJob.getCustomSettings()); assertEquals(update.getModelSnapshotId(), updatedJob.getModelSnapshotId()); assertEquals(update.getJobVersion(), updatedJob.getJobVersion()); + if (clearJobFinishedTime) { + assertNull(updatedJob.getFinishedTime()); + } else { + assertEquals(update.getFinishedTime(), updatedJob.getFinishedTime()); + } for (JobUpdate.DetectorUpdate detectorUpdate : update.getDetectorUpdates()) { Detector updatedDetector = updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getDetectorIndex()); assertNotNull(updatedDetector); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java index fb56e61983973..87f1802547f2f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java @@ -10,33 +10,36 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.XPackPlugin; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; -import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.elasticsearch.xpack.ml.utils.ChainTaskExecutor; import java.util.Date; public class TransportFinalizeJobExecutionAction extends TransportMasterNodeAction { + private final JobConfigProvider jobConfigProvider; + @Inject public TransportFinalizeJobExecutionAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver) { + IndexNameExpressionResolver indexNameExpressionResolver, + JobConfigProvider jobConfigProvider) { super(settings, FinalizeJobExecutionAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, FinalizeJobExecutionAction.Request::new); + this.jobConfigProvider = jobConfigProvider; } @Override @@ -51,41 +54,29 @@ protected AcknowledgedResponse newResponse() { @Override protected void masterOperation(FinalizeJobExecutionAction.Request request, ClusterState state, - ActionListener listener) throws Exception { + ActionListener listener) { + String jobIdString = String.join(",", request.getJobIds()); - String source = "finalize_job_execution [" + jobIdString + "]"; logger.debug("finalizing jobs [{}]", jobIdString); - clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - XPackPlugin.checkReadyForXPackCustomMetadata(currentState); - MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState); - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata); - Date finishedTime = new Date(); - for (String jobId : request.getJobIds()) { - Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId)); - jobBuilder.setFinishedTime(finishedTime); - mlMetadataBuilder.putJob(jobBuilder.build(), true); - } - ClusterState.Builder builder = ClusterState.builder(currentState); - return builder.metaData(new MetaData.Builder(currentState.metaData()) - .putCustom(MlMetadata.TYPE, mlMetadataBuilder.build())) - .build(); - } + ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.executor( + MachineLearning.UTILITY_THREAD_POOL_NAME), false); - @Override - public void onFailure(String source, Exception e) { - listener.onFailure(e); - } + Date now = new Date(); + for (String jobId : request.getJobIds()) { + JobUpdate finishedTimeUpdate = new JobUpdate.Builder(jobId).setFinishedTime(now).build(); + chainTaskExecutor.add(updateListener -> { + jobConfigProvider.updateJob(jobId, finishedTimeUpdate, null, ActionListener.wrap( + response -> updateListener.onResponse(null), + updateListener::onFailure + ) ); + }); + } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, - ClusterState newState) { - logger.debug("finalized job [{}]", jobIdString); - listener.onResponse(new AcknowledgedResponse(true)); - } - }); + chainTaskExecutor.execute(ActionListener.wrap( + response -> listener.onResponse(new AcknowledgedResponse(true)), + listener::onFailure + )); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 02ecb511fed60..34161ff34c21a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -564,7 +564,7 @@ public void onTimeout(TimeValue timeout) { } private void clearJobFinishedTime(String jobId, ActionListener listener) { - JobUpdate update = new JobUpdate.Builder(jobId).setClearJobFinishTime(true).build(); + JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build(); jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap( job -> listener.onResponse(new AcknowledgedResponse(true)), From d0da9509add38a7dcdc69133646e0e61776a1af7 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 3 Oct 2018 14:56:02 +0100 Subject: [PATCH 02/13] Set job finished time in autodetect results processor --- .../ml/action/TransportCloseJobAction.java | 9 +-- .../ml/job/persistence/JobConfigProvider.java | 5 +- .../output/AutoDetectResultProcessor.java | 70 +++++++++++-------- 3 files changed, 46 insertions(+), 38 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index e07503a07df2d..e7610074bc0e8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -30,7 +30,6 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; -import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; @@ -50,9 +49,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; -import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; - public class TransportCloseJobAction extends TransportTasksAction { @@ -427,10 +423,7 @@ void waitForJobClosed(CloseJobAction.Request request, WaitForCloseRequest waitFo }, request.getCloseTimeout(), new ActionListener() { @Override public void onResponse(Boolean result) { - FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request( - waitForCloseRequest.jobsToFinalize.toArray(new String[0])); - executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest, - ActionListener.wrap(r -> listener.onResponse(response), listener::onFailure)); + listener.onResponse(response); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index feab1e84a0146..26e9ee3019b04 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -237,7 +237,9 @@ public void onFailure(Exception e) { * * @param jobId The Id of the job to update * @param update The job update - * @param maxModelMemoryLimit The maximum model memory allowed + * @param maxModelMemoryLimit The maximum model memory allowed. This can be {@code null} + * if the job's {@link org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits} + * are not changed. * @param updatedJobListener Updated job listener */ public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, ActionListener updatedJobListener) { @@ -373,7 +375,6 @@ private void indexUpdatedJob(Job updatedJob, long version, ActionListener u } } - /** * Check a job exists. A job exists if it has a configuration document. * If the .ml-config index does not exist it is treated as a missing job diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index f30572f5eb609..ce3fb88ff5e18 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -9,6 +9,9 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.update.UpdateAction; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.Loggers; @@ -18,10 +21,10 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.xpack.core.ml.MachineLearningField; -import org.elasticsearch.xpack.core.ml.action.PutJobAction; -import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; -import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; @@ -34,17 +37,19 @@ import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.notifications.Auditor; import java.time.Duration; +import java.util.Collections; import java.util.Date; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; @@ -164,7 +169,7 @@ public void process(AutodetectProcess process) { } LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount); - runEstablishedModelMemoryUpdate(true); + onAutodetectClose(); } catch (Exception e) { failed = true; @@ -334,9 +339,6 @@ private void notifyModelMemoryStatusChange(Context context, ModelSizeStats model } protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) { - JobUpdate update = new JobUpdate.Builder(jobId).setModelSnapshotId(modelSnapshot.getSnapshotId()).build(); - UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); - try { // This blocks the main processing thread in the unlikely event // there are 2 model snapshots queued up. But it also has the @@ -348,20 +350,21 @@ protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) { return; } - executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, new ActionListener() { - @Override - public void onResponse(PutJobAction.Response response) { - updateModelSnapshotIdSemaphore.release(); - LOGGER.debug("[{}] Updated job with model snapshot id [{}]", jobId, modelSnapshot.getSnapshotId()); - } + updateJob(jobId, Collections.singletonMap(Job.MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshot.getSnapshotId()), + new ActionListener() { + @Override + public void onResponse(UpdateResponse updateResponse) { + updateModelSnapshotIdSemaphore.release(); + LOGGER.debug("[{}] Updated job with model snapshot id [{}]", jobId, modelSnapshot.getSnapshotId()); + } - @Override - public void onFailure(Exception e) { - updateModelSnapshotIdSemaphore.release(); - LOGGER.error("[" + jobId + "] Failed to update job with new model snapshot id [" + - modelSnapshot.getSnapshotId() + "]", e); - } - }); + @Override + public void onFailure(Exception e) { + updateModelSnapshotIdSemaphore.release(); + LOGGER.error("[" + jobId + "] Failed to update job with new model snapshot id [" + + modelSnapshot.getSnapshotId() + "]", e); + } + }); } /** @@ -419,6 +422,13 @@ private synchronized void runEstablishedModelMemoryUpdate(boolean cancelExisting } } + private void onAutodetectClose() { + updateJob(jobId, Collections.singletonMap(Job.FINISHED_TIME.getPreferredName(), new Date()), ActionListener.wrap( + r -> runEstablishedModelMemoryUpdate(true), + e -> LOGGER.error("[" + jobId + "] Failed to finalize job on autodetect close", e)) + ); + } + private void updateEstablishedModelMemoryOnJob() { // Copy these before committing writes, so the calculation is done based on committed documents @@ -430,14 +440,10 @@ private void updateEstablishedModelMemoryOnJob() { jobResultsProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStatsForCalc, establishedModelMemory -> { if (latestEstablishedModelMemory != establishedModelMemory) { - JobUpdate update = new JobUpdate.Builder(jobId).setEstablishedModelMemory(establishedModelMemory).build(); - UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); - updateRequest.setWaitForAck(false); - - executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, - new ActionListener() { + updateJob(jobId, Collections.singletonMap(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory), + new ActionListener() { @Override - public void onResponse(PutJobAction.Response response) { + public void onResponse(UpdateResponse response) { latestEstablishedModelMemory = establishedModelMemory; LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory); } @@ -452,6 +458,14 @@ public void onFailure(Exception e) { }, e -> LOGGER.error("[" + jobId + "] Failed to calculate established model memory", e)); } + private void updateJob(String jobId, Map update, ActionListener listener) { + UpdateRequest updateRequest = new UpdateRequest(AnomalyDetectorsIndex.configIndexName(), + ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); + updateRequest.retryOnConflict(3); + updateRequest.doc(update); + executeAsyncWithOrigin(client, ML_ORIGIN, UpdateAction.INSTANCE, updateRequest, listener); + } + public void awaitCompletion() throws TimeoutException { try { // Although the results won't take 30 minutes to finish, the pipe won't be closed From e647611ccc4c8686e357418213fd89091b4eba02 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 3 Oct 2018 15:07:04 +0100 Subject: [PATCH 03/13] Remove unused internal waitForAck field from UpdateJobAction --- .../xpack/core/ml/action/UpdateJobAction.java | 20 +------------------ .../action/UpdateJobActionRequestTests.java | 1 - 2 files changed, 1 insertion(+), 20 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java index 1fb387b0b6c2a..92d2e93d317d9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java @@ -54,7 +54,6 @@ public static UpdateJobAction.Request parseRequest(String jobId, XContentParser /** Indicates an update that was not triggered by a user */ private boolean isInternal; - private boolean waitForAck = true; public Request(String jobId, JobUpdate update) { this(jobId, update, false); @@ -88,14 +87,6 @@ public boolean isInternal() { return isInternal; } - public boolean isWaitForAck() { - return waitForAck; - } - - public void setWaitForAck(boolean waitForAck) { - this.waitForAck = waitForAck; - } - @Override public ActionRequestValidationException validate() { return null; @@ -111,11 +102,6 @@ public void readFrom(StreamInput in) throws IOException { } else { isInternal = false; } - if (in.getVersion().onOrAfter(Version.V_6_3_0)) { - waitForAck = in.readBoolean(); - } else { - waitForAck = true; - } } @Override @@ -126,9 +112,6 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_6_2_2)) { out.writeBoolean(isInternal); } - if (out.getVersion().onOrAfter(Version.V_6_3_0)) { - out.writeBoolean(waitForAck); - } } @Override @@ -144,8 +127,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; UpdateJobAction.Request that = (UpdateJobAction.Request) o; return Objects.equals(jobId, that.jobId) && - Objects.equals(update, that.update) && - isInternal == that.isInternal; + Objects.equals(update, that.update); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateJobActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateJobActionRequestTests.java index 3b09017147886..85097100e39e0 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateJobActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateJobActionRequestTests.java @@ -19,7 +19,6 @@ protected UpdateJobAction.Request createTestInstance() { JobUpdate.Builder jobUpdate = new JobUpdate.Builder(jobId); jobUpdate.setAnalysisLimits(new AnalysisLimits(100L, 100L)); UpdateJobAction.Request request = new UpdateJobAction.Request(jobId, jobUpdate.build()); - request.setWaitForAck(randomBoolean()); return request; } From 168fd1279aca7d87284aeeac73712279895fcd04 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 3 Oct 2018 15:37:08 +0100 Subject: [PATCH 04/13] Fix get a single job --- .../xpack/ml/job/JobManager.java | 20 +++++++++++++------ .../ml/integration/JobConfigProviderIT.java | 6 ++++++ .../xpack/ml/job/JobManagerTests.java | 13 ++++++++++++ 3 files changed, 33 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 78658e5330047..d0b0504b274c5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -170,7 +170,7 @@ private void getJobFromClusterState(String jobId, ActionListener jobListene * @param jobsListener The jobs listener */ public void expandJobs(String expression, boolean allowNoJobs, ActionListener> jobsListener) { - Map clusterStateJobs = expandJobsFromClusterState(expression, allowNoJobs, clusterService.state()); + Map clusterStateJobs = expandJobsFromClusterState(expression, clusterService.state()); jobConfigProvider.expandJobs(expression, allowNoJobs, ActionListener.wrap( jobBuilders -> { @@ -197,12 +197,20 @@ public void expandJobs(String expression, boolean allowNoJobs, ActionListener expandJobsFromClusterState(String expression, boolean allowNoJobs, ClusterState clusterState) { - Set expandedJobIds = MlMetadata.getMlMetadata(clusterState).expandJobIds(expression, allowNoJobs); - MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + private Map expandJobsFromClusterState(String expression, ClusterState clusterState) { Map jobIdToJob = new HashMap<>(); - for (String expandedJobId : expandedJobIds) { - jobIdToJob.put(expandedJobId, mlMetadata.getJobs().get(expandedJobId)); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + try { + // This call will throw if the expression is not a wild card + // and the job does not exist. This is not the behaviour we + // want as the job may exist in the index. + // TODO jindex review the use of this function. Can it be changed not to throw in a BWC manner? + Set expandedJobIds = mlMetadata.expandJobIds(expression, true); + for (String expandedJobId : expandedJobIds) { + jobIdToJob.put(expandedJobId, mlMetadata.getJobs().get(expandedJobId)); + } + } catch (ResourceNotFoundException e) { + return jobIdToJob; } return jobIdToJob; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java index 63f67c37dd944..303a8c3adfe41 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; @@ -342,6 +343,11 @@ public void testExpandJobs_GroupsAndJobIds() throws Exception { expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); assertThat(expandedJobs, containsInAnyOrder(tom, harry)); + expandedJobsBuilders = blockingCall(actionListener -> + jobConfigProvider.expandJobs("tom", false, actionListener)); + expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); + assertThat(expandedJobs, contains(tom)); + expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("", false, actionListener)); expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 73caeebf19b2f..f1a15df5c844a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -216,6 +216,19 @@ public void testExpandJobsFromClusterStateAndIndex() throws IOException { assertThat(jobsHolder.get().results(), hasSize(3)); jobIds = jobsHolder.get().results().stream().map(Job::getId).collect(Collectors.toList()); assertThat(jobIds, contains("foo-cs-1", "foo-cs-2", "foo-index")); + + + jobsHolder.set(null); + jobManager.expandJobs("foo-index", true, ActionListener.wrap( + jobs -> jobsHolder.set(jobs), + e -> fail(e.getMessage()) + )); + + assertNotNull(jobsHolder.get()); + assertThat(jobsHolder.get().results(), hasSize(1)); + jobIds = jobsHolder.get().results().stream().map(Job::getId).collect(Collectors.toList()); + assertThat(jobIds, contains("foo-index")); + } @SuppressWarnings("unchecked") From c0971340d420dda0b4eecaf2eb3eb0555afb9563 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 3 Oct 2018 15:59:32 +0100 Subject: [PATCH 05/13] Remove finalise job action --- .../xpack/core/XPackClientPlugin.java | 2 - .../ml/action/FinalizeJobExecutionAction.java | 79 ----------------- .../xpack/ml/MachineLearning.java | 3 - .../TransportFinalizeJobExecutionAction.java | 86 ------------------- 4 files changed, 170 deletions(-) delete mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FinalizeJobExecutionAction.java delete mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 80a0bbf3baab6..f663c94f03156 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -54,7 +54,6 @@ import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; -import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.action.FindFileStructureAction; import org.elasticsearch.xpack.core.ml.action.FlushJobAction; import org.elasticsearch.xpack.core.ml.action.ForecastJobAction; @@ -236,7 +235,6 @@ public List getClientActions() { GetRecordsAction.INSTANCE, PostDataAction.INSTANCE, CloseJobAction.INSTANCE, - FinalizeJobExecutionAction.INSTANCE, FlushJobAction.INSTANCE, ValidateDetectorAction.INSTANCE, ValidateJobConfigAction.INSTANCE, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FinalizeJobExecutionAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FinalizeJobExecutionAction.java deleted file mode 100644 index cb176a397ace8..0000000000000 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FinalizeJobExecutionAction.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.core.ml.action; - -import org.elasticsearch.action.Action; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; -import org.elasticsearch.action.support.master.MasterNodeRequest; -import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; - -import java.io.IOException; - -public class FinalizeJobExecutionAction extends Action { - - public static final FinalizeJobExecutionAction INSTANCE = new FinalizeJobExecutionAction(); - public static final String NAME = "cluster:internal/xpack/ml/job/finalize_job_execution"; - - private FinalizeJobExecutionAction() { - super(NAME); - } - - @Override - public RequestBuilder newRequestBuilder(ElasticsearchClient client) { - return new RequestBuilder(client, INSTANCE); - } - - @Override - public AcknowledgedResponse newResponse() { - return new AcknowledgedResponse(); - } - - public static class Request extends MasterNodeRequest { - - private String[] jobIds; - - public Request(String[] jobIds) { - this.jobIds = jobIds; - } - - public Request() { - } - - public String[] getJobIds() { - return jobIds; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - jobIds = in.readStringArray(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeStringArray(jobIds); - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - } - - public static class RequestBuilder - extends MasterNodeOperationRequestBuilder { - - public RequestBuilder(ElasticsearchClient client, FinalizeJobExecutionAction action) { - super(client, action, new Request()); - } - } -} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index f1a0745d58fed..f62acbf85337c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -66,7 +66,6 @@ import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; -import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.action.FindFileStructureAction; import org.elasticsearch.xpack.core.ml.action.FlushJobAction; import org.elasticsearch.xpack.core.ml.action.ForecastJobAction; @@ -120,7 +119,6 @@ import org.elasticsearch.xpack.ml.action.TransportDeleteForecastAction; import org.elasticsearch.xpack.ml.action.TransportDeleteJobAction; import org.elasticsearch.xpack.ml.action.TransportDeleteModelSnapshotAction; -import org.elasticsearch.xpack.ml.action.TransportFinalizeJobExecutionAction; import org.elasticsearch.xpack.ml.action.TransportFindFileStructureAction; import org.elasticsearch.xpack.ml.action.TransportFlushJobAction; import org.elasticsearch.xpack.ml.action.TransportForecastJobAction; @@ -546,7 +544,6 @@ public List getRestHandlers(Settings settings, RestController restC new ActionHandler<>(GetRecordsAction.INSTANCE, TransportGetRecordsAction.class), new ActionHandler<>(PostDataAction.INSTANCE, TransportPostDataAction.class), new ActionHandler<>(CloseJobAction.INSTANCE, TransportCloseJobAction.class), - new ActionHandler<>(FinalizeJobExecutionAction.INSTANCE, TransportFinalizeJobExecutionAction.class), new ActionHandler<>(FlushJobAction.INSTANCE, TransportFlushJobAction.class), new ActionHandler<>(ValidateDetectorAction.INSTANCE, TransportValidateDetectorAction.class), new ActionHandler<>(ValidateJobConfigAction.INSTANCE, TransportValidateJobConfigAction.class), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java deleted file mode 100644 index 87f1802547f2f..0000000000000 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.action; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; -import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; -import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; -import org.elasticsearch.xpack.ml.utils.ChainTaskExecutor; - -import java.util.Date; - -public class TransportFinalizeJobExecutionAction extends TransportMasterNodeAction { - - private final JobConfigProvider jobConfigProvider; - - @Inject - public TransportFinalizeJobExecutionAction(Settings settings, TransportService transportService, - ClusterService clusterService, ThreadPool threadPool, - ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, - JobConfigProvider jobConfigProvider) { - super(settings, FinalizeJobExecutionAction.NAME, transportService, clusterService, threadPool, actionFilters, - indexNameExpressionResolver, FinalizeJobExecutionAction.Request::new); - this.jobConfigProvider = jobConfigProvider; - } - - @Override - protected String executor() { - return ThreadPool.Names.SAME; - } - - @Override - protected AcknowledgedResponse newResponse() { - return new AcknowledgedResponse(); - } - - @Override - protected void masterOperation(FinalizeJobExecutionAction.Request request, ClusterState state, - ActionListener listener) { - - String jobIdString = String.join(",", request.getJobIds()); - logger.debug("finalizing jobs [{}]", jobIdString); - - ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.executor( - MachineLearning.UTILITY_THREAD_POOL_NAME), false); - - Date now = new Date(); - for (String jobId : request.getJobIds()) { - JobUpdate finishedTimeUpdate = new JobUpdate.Builder(jobId).setFinishedTime(now).build(); - chainTaskExecutor.add(updateListener -> { - jobConfigProvider.updateJob(jobId, finishedTimeUpdate, null, ActionListener.wrap( - response -> updateListener.onResponse(null), - updateListener::onFailure - ) ); - }); - } - - chainTaskExecutor.execute(ActionListener.wrap( - response -> listener.onResponse(new AcknowledgedResponse(true)), - listener::onFailure - )); - } - - @Override - protected ClusterBlockException checkBlock(FinalizeJobExecutionAction.Request request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); - } -} From a0b45d5d0091b243dbf476105100ba4ceaa9cbd5 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 3 Oct 2018 15:59:51 +0100 Subject: [PATCH 06/13] Revert "Fix get a single job" This reverts commit 168fd1279aca7d87284aeeac73712279895fcd04. --- .../xpack/ml/job/JobManager.java | 20 ++++++------------- .../ml/integration/JobConfigProviderIT.java | 6 ------ .../xpack/ml/job/JobManagerTests.java | 13 ------------ 3 files changed, 6 insertions(+), 33 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index d0b0504b274c5..78658e5330047 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -170,7 +170,7 @@ private void getJobFromClusterState(String jobId, ActionListener jobListene * @param jobsListener The jobs listener */ public void expandJobs(String expression, boolean allowNoJobs, ActionListener> jobsListener) { - Map clusterStateJobs = expandJobsFromClusterState(expression, clusterService.state()); + Map clusterStateJobs = expandJobsFromClusterState(expression, allowNoJobs, clusterService.state()); jobConfigProvider.expandJobs(expression, allowNoJobs, ActionListener.wrap( jobBuilders -> { @@ -197,20 +197,12 @@ public void expandJobs(String expression, boolean allowNoJobs, ActionListener expandJobsFromClusterState(String expression, ClusterState clusterState) { - Map jobIdToJob = new HashMap<>(); + private Map expandJobsFromClusterState(String expression, boolean allowNoJobs, ClusterState clusterState) { + Set expandedJobIds = MlMetadata.getMlMetadata(clusterState).expandJobIds(expression, allowNoJobs); MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); - try { - // This call will throw if the expression is not a wild card - // and the job does not exist. This is not the behaviour we - // want as the job may exist in the index. - // TODO jindex review the use of this function. Can it be changed not to throw in a BWC manner? - Set expandedJobIds = mlMetadata.expandJobIds(expression, true); - for (String expandedJobId : expandedJobIds) { - jobIdToJob.put(expandedJobId, mlMetadata.getJobs().get(expandedJobId)); - } - } catch (ResourceNotFoundException e) { - return jobIdToJob; + Map jobIdToJob = new HashMap<>(); + for (String expandedJobId : expandedJobIds) { + jobIdToJob.put(expandedJobId, mlMetadata.getJobs().get(expandedJobId)); } return jobIdToJob; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java index 303a8c3adfe41..63f67c37dd944 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java @@ -42,7 +42,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; @@ -343,11 +342,6 @@ public void testExpandJobs_GroupsAndJobIds() throws Exception { expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); assertThat(expandedJobs, containsInAnyOrder(tom, harry)); - expandedJobsBuilders = blockingCall(actionListener -> - jobConfigProvider.expandJobs("tom", false, actionListener)); - expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); - assertThat(expandedJobs, contains(tom)); - expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("", false, actionListener)); expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index f1a15df5c844a..73caeebf19b2f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -216,19 +216,6 @@ public void testExpandJobsFromClusterStateAndIndex() throws IOException { assertThat(jobsHolder.get().results(), hasSize(3)); jobIds = jobsHolder.get().results().stream().map(Job::getId).collect(Collectors.toList()); assertThat(jobIds, contains("foo-cs-1", "foo-cs-2", "foo-index")); - - - jobsHolder.set(null); - jobManager.expandJobs("foo-index", true, ActionListener.wrap( - jobs -> jobsHolder.set(jobs), - e -> fail(e.getMessage()) - )); - - assertNotNull(jobsHolder.get()); - assertThat(jobsHolder.get().results(), hasSize(1)); - jobIds = jobsHolder.get().results().stream().map(Job::getId).collect(Collectors.toList()); - assertThat(jobIds, contains("foo-index")); - } @SuppressWarnings("unchecked") From c5e05ff68155dea6dca3df805c0cac890202493b Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 3 Oct 2018 16:39:23 +0100 Subject: [PATCH 07/13] Revert adding finished time field to job update and fix test --- .../xpack/core/ml/job/config/JobUpdate.java | 58 +++---------------- .../core/ml/job/config/JobUpdateTests.java | 17 +----- .../authz/store/ReservedRolesStoreTests.java | 3 - .../AutoDetectResultProcessorTests.java | 19 +++--- 4 files changed, 22 insertions(+), 75 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java index ef1c2bafbb777..3582e5aa7d8bf 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java @@ -17,13 +17,10 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils; import java.io.IOException; import java.util.Arrays; -import java.util.Date; import java.util.List; import java.util.Map; import java.util.Objects; @@ -63,15 +60,6 @@ public class JobUpdate implements Writeable, ToXContentObject { INTERNAL_PARSER.declareLong(Builder::setEstablishedModelMemory, Job.ESTABLISHED_MODEL_MEMORY); INTERNAL_PARSER.declareString(Builder::setJobVersion, Job.JOB_VERSION); INTERNAL_PARSER.declareBoolean(Builder::setClearFinishTime, CLEAR_JOB_FINISH_TIME); - INTERNAL_PARSER.declareField(Builder::setFinishedTime, p -> { - if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) { - return new Date(p.longValue()); - } else if (p.currentToken() == XContentParser.Token.VALUE_STRING) { - return new Date(TimeUtils.dateStringToEpoch(p.text())); - } - throw new IllegalArgumentException( - "unexpected token [" + p.currentToken() + "] for [" + Job.FINISHED_TIME.getPreferredName() + "]"); - }, Job.FINISHED_TIME, ObjectParser.ValueType.VALUE); } private final String jobId; @@ -90,7 +78,6 @@ public class JobUpdate implements Writeable, ToXContentObject { private final Long establishedModelMemory; private final Version jobVersion; private final Boolean clearJobFinishTime; - private final Date finishedTime; private JobUpdate(String jobId, @Nullable List groups, @Nullable String description, @Nullable List detectorUpdates, @Nullable ModelPlotConfig modelPlotConfig, @@ -98,8 +85,7 @@ private JobUpdate(String jobId, @Nullable List groups, @Nullable String @Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays, @Nullable Long modelSnapshotRetentionDays, @Nullable List categorisationFilters, @Nullable Map customSettings, @Nullable String modelSnapshotId, - @Nullable Long establishedModelMemory, @Nullable Version jobVersion, - @Nullable Boolean clearJobFinishTime, @Nullable Date finishedTime) { + @Nullable Long establishedModelMemory, @Nullable Version jobVersion, @Nullable Boolean clearJobFinishTime) { this.jobId = jobId; this.groups = groups; this.description = description; @@ -116,7 +102,6 @@ private JobUpdate(String jobId, @Nullable List groups, @Nullable String this.establishedModelMemory = establishedModelMemory; this.jobVersion = jobVersion; this.clearJobFinishTime = clearJobFinishTime; - this.finishedTime = finishedTime; } public JobUpdate(StreamInput in) throws IOException { @@ -158,10 +143,8 @@ public JobUpdate(StreamInput in) throws IOException { } if (in.getVersion().onOrAfter(Version.CURRENT)) { clearJobFinishTime = in.readOptionalBoolean(); - finishedTime = in.readBoolean() ? new Date(in.readVLong()) : null; } else { clearJobFinishTime = null; - finishedTime = null; } } @@ -202,12 +185,6 @@ public void writeTo(StreamOutput out) throws IOException { } if (out.getVersion().onOrAfter(Version.CURRENT)) { out.writeOptionalBoolean(clearJobFinishTime); - if (finishedTime != null) { - out.writeBoolean(true); - out.writeVLong(finishedTime.getTime()); - } else { - out.writeBoolean(false); - } } } @@ -275,10 +252,6 @@ public Boolean getClearJobFinishTime() { return clearJobFinishTime; } - public Date getFinishedTime() { - return finishedTime; - } - public boolean isAutodetectProcessUpdate() { return modelPlotConfig != null || detectorUpdates != null || groups != null; } @@ -332,9 +305,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (clearJobFinishTime != null) { builder.field(CLEAR_JOB_FINISH_TIME.getPreferredName(), clearJobFinishTime); } - if (finishedTime != null) { - builder.timeField(Job.FINISHED_TIME.getPreferredName(), finishedTime); - } builder.endObject(); return builder; } @@ -463,12 +433,10 @@ public Job mergeWithJob(Job source, ByteSizeValue maxModelMemoryLimit) { if (jobVersion != null) { builder.setJobVersion(jobVersion); } + if (clearJobFinishTime != null && clearJobFinishTime) { builder.setFinishedTime(null); } - if (finishedTime != null) { - builder.setFinishedTime(finishedTime); - } builder.setAnalysisConfig(newAnalysisConfig); return builder.build(); @@ -490,8 +458,7 @@ && updatesDetectors(job) == false && (modelSnapshotId == null || Objects.equals(modelSnapshotId, job.getModelSnapshotId())) && (establishedModelMemory == null || Objects.equals(establishedModelMemory, job.getEstablishedModelMemory())) && (jobVersion == null || Objects.equals(jobVersion, job.getJobVersion())) - && ((clearJobFinishTime == null || clearJobFinishTime == false) || job.getFinishedTime() == null) - && (finishedTime == null || Objects.equals(finishedTime, job.getFinishedTime())); + && ((clearJobFinishTime == null || clearJobFinishTime == false) || job.getFinishedTime() == null); } boolean updatesDetectors(Job job) { @@ -539,15 +506,14 @@ public boolean equals(Object other) { && Objects.equals(this.modelSnapshotId, that.modelSnapshotId) && Objects.equals(this.establishedModelMemory, that.establishedModelMemory) && Objects.equals(this.jobVersion, that.jobVersion) - && Objects.equals(this.clearJobFinishTime, that.clearJobFinishTime) - && Objects.equals(this.finishedTime, that.finishedTime); + && Objects.equals(this.clearJobFinishTime, that.clearJobFinishTime); } @Override public int hashCode() { return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings, - modelSnapshotId, establishedModelMemory, jobVersion, clearJobFinishTime, finishedTime); + modelSnapshotId, establishedModelMemory, jobVersion, clearJobFinishTime); } public static class DetectorUpdate implements Writeable, ToXContentObject { @@ -658,8 +624,7 @@ public static class Builder { private String modelSnapshotId; private Long establishedModelMemory; private Version jobVersion; - private Boolean clearFinishTime; - private Date finishedTime; + private Boolean clearJobFinishTime; public Builder(String jobId) { this.jobId = jobId; @@ -745,20 +710,15 @@ public Builder setJobVersion(String version) { return this; } - public Builder setFinishedTime(Date finishedTime) { - this.finishedTime = finishedTime; - return this; - } - - public Builder setClearFinishTime(boolean clearFinishTime) { - this.clearFinishTime = clearFinishTime; + public Builder setClearFinishTime(boolean clearJobFinishTime) { + this.clearJobFinishTime = clearJobFinishTime; return this; } public JobUpdate build() { return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval, renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings, - modelSnapshotId, establishedModelMemory, jobVersion, clearFinishTime, finishedTime); + modelSnapshotId, establishedModelMemory, jobVersion, clearJobFinishTime); } } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java index d3b5c3af350fc..7c0a72cf1c6ea 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java @@ -94,11 +94,7 @@ public JobUpdate createRandom(String jobId, @Nullable Job job) { update.setJobVersion(randomFrom(Version.CURRENT, Version.V_6_2_0, Version.V_6_1_0)); } if (useInternalParser) { - if (randomBoolean()) { - update.setClearFinishTime(randomBoolean()); - } else { - update.setFinishedTime(new Date()); - } + update.setClearFinishTime(randomBoolean()); } return update.build(); @@ -218,12 +214,6 @@ public void testMergeWithJob() { updateBuilder.setCustomSettings(customSettings); updateBuilder.setModelSnapshotId(randomAlphaOfLength(10)); updateBuilder.setJobVersion(Version.V_6_1_0); - boolean clearJobFinishedTime = randomBoolean(); - if (clearJobFinishedTime) { - updateBuilder.setClearFinishTime(true); - } else { - updateBuilder.setFinishedTime(new Date()); - } JobUpdate update = updateBuilder.build(); Job.Builder jobBuilder = new Job.Builder("foo"); @@ -253,11 +243,6 @@ public void testMergeWithJob() { assertEquals(update.getCustomSettings(), updatedJob.getCustomSettings()); assertEquals(update.getModelSnapshotId(), updatedJob.getModelSnapshotId()); assertEquals(update.getJobVersion(), updatedJob.getJobVersion()); - if (clearJobFinishedTime) { - assertNull(updatedJob.getFinishedTime()); - } else { - assertEquals(update.getFinishedTime(), updatedJob.getFinishedTime()); - } for (JobUpdate.DetectorUpdate detectorUpdate : update.getDetectorUpdates()) { Detector updatedDetector = updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getDetectorIndex()); assertNotNull(updatedDetector); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java index f822fec67e946..f6b59af54337d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java @@ -43,7 +43,6 @@ import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; -import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.action.FlushJobAction; import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; import org.elasticsearch.xpack.core.ml.action.GetCategoriesAction; @@ -617,7 +616,6 @@ public void testMachineLearningAdminRole() { assertThat(role.cluster().check(DeleteFilterAction.NAME, request), is(true)); assertThat(role.cluster().check(DeleteJobAction.NAME, request), is(true)); assertThat(role.cluster().check(DeleteModelSnapshotAction.NAME, request), is(true)); - assertThat(role.cluster().check(FinalizeJobExecutionAction.NAME, request), is(false)); // internal use only assertThat(role.cluster().check(FlushJobAction.NAME, request), is(true)); assertThat(role.cluster().check(GetBucketsAction.NAME, request), is(true)); assertThat(role.cluster().check(GetCategoriesAction.NAME, request), is(true)); @@ -669,7 +667,6 @@ public void testMachineLearningUserRole() { assertThat(role.cluster().check(DeleteFilterAction.NAME, request), is(false)); assertThat(role.cluster().check(DeleteJobAction.NAME, request), is(false)); assertThat(role.cluster().check(DeleteModelSnapshotAction.NAME, request), is(false)); - assertThat(role.cluster().check(FinalizeJobExecutionAction.NAME, request), is(false)); assertThat(role.cluster().check(FlushJobAction.NAME, request), is(false)); assertThat(role.cluster().check(GetBucketsAction.NAME, request), is(true)); assertThat(role.cluster().check(GetCategoriesAction.NAME, request), is(true)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index a3e772b3a9d83..cdd786068fa2f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -8,6 +8,8 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.update.UpdateAction; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -16,8 +18,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; -import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; @@ -28,14 +29,15 @@ import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.After; import org.junit.Before; +import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import java.time.Duration; @@ -49,6 +51,7 @@ import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; @@ -66,7 +69,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { - private static final String JOB_ID = "_id"; + private static final String JOB_ID = "valid_id"; private static final long BUCKET_SPAN_MS = 1000; private ThreadPool threadPool; @@ -408,11 +411,13 @@ public void testProcessResult_modelSnapshot() { processorUnderTest.processResult(context, result); verify(persister, times(1)).persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE); - UpdateJobAction.Request expectedJobUpdateRequest = UpdateJobAction.Request.internal(JOB_ID, - new JobUpdate.Builder(JOB_ID).setModelSnapshotId("a_snapshot_id").build()); - verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any()); + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(UpdateRequest.class); + verify(client).execute(same(UpdateAction.INSTANCE), requestCaptor.capture(), any()); verifyNoMoreInteractions(persister); + + UpdateRequest capturedRequest = requestCaptor.getValue(); + assertThat(capturedRequest.doc().sourceAsMap().keySet(), contains(Job.MODEL_SNAPSHOT_ID.getPreferredName())); } public void testProcessResult_quantiles_givenRenormalizationIsEnabled() { From dbfaba12db3753dea8cc2951e7f767edbc88926c Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 3 Oct 2018 17:42:52 +0100 Subject: [PATCH 08/13] Check snapshot is persisted --- .../xpack/ml/job/persistence/JobResultsPersister.java | 4 ++-- .../autodetect/output/AutoDetectResultProcessor.java | 8 ++++++-- .../autodetect/output/AutoDetectResultProcessorTests.java | 4 ++++ 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index 233a2b4078ac7..9efdbc1975716 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -242,10 +242,10 @@ public void persistQuantiles(Quantiles quantiles, WriteRequest.RefreshPolicy ref /** * Persist a model snapshot description */ - public void persistModelSnapshot(ModelSnapshot modelSnapshot, WriteRequest.RefreshPolicy refreshPolicy) { + public IndexResponse persistModelSnapshot(ModelSnapshot modelSnapshot, WriteRequest.RefreshPolicy refreshPolicy) { Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.documentId(modelSnapshot)); persistable.setRefreshPolicy(refreshPolicy); - persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelSnapshot.getJobId())).actionGet(); + return persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelSnapshot.getJobId())).actionGet(); } /** diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index ce3fb88ff5e18..de41835484d5b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -8,6 +8,8 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateAction; import org.elasticsearch.action.update.UpdateRequest; @@ -274,8 +276,10 @@ void processResult(Context context, AutodetectResult result) { ModelSnapshot modelSnapshot = result.getModelSnapshot(); if (modelSnapshot != null) { // We need to refresh in order for the snapshot to be available when we try to update the job with it - persister.persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE); - updateModelSnapshotIdOnJob(modelSnapshot); + IndexResponse indexResponse = persister.persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE); + if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) { + updateModelSnapshotIdOnJob(modelSnapshot); + } } Quantiles quantiles = result.getQuantiles(); if (quantiles != null) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index cdd786068fa2f..2595b96f3f78d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -7,6 +7,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateAction; import org.elasticsearch.action.update.UpdateRequest; @@ -16,6 +17,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -92,6 +94,8 @@ public void setUpMocks() { when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); renormalizer = mock(Renormalizer.class); persister = mock(JobResultsPersister.class); + when(persister.persistModelSnapshot(any(), any())) + .thenReturn(new IndexResponse(new ShardId("ml", "uid", 0), "doc", "1", 0L, 0L, 0L, true)); jobResultsProvider = mock(JobResultsProvider.class); flushListener = mock(FlushListener.class); processorUnderTest = new AutoDetectResultProcessor(client, auditor, JOB_ID, renormalizer, persister, jobResultsProvider, From eedbfd2578830d98ff341028c9307da12916bc09 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 5 Oct 2018 13:05:10 +0100 Subject: [PATCH 09/13] Revert "Remove finalise job action" This reverts commit c0971340d420dda0b4eecaf2eb3eb0555afb9563. --- .../xpack/core/XPackClientPlugin.java | 2 + .../ml/action/FinalizeJobExecutionAction.java | 79 +++++++++++++++++ .../xpack/ml/MachineLearning.java | 3 + .../TransportFinalizeJobExecutionAction.java | 86 +++++++++++++++++++ 4 files changed, 170 insertions(+) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FinalizeJobExecutionAction.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index f663c94f03156..80a0bbf3baab6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -54,6 +54,7 @@ import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; +import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.action.FindFileStructureAction; import org.elasticsearch.xpack.core.ml.action.FlushJobAction; import org.elasticsearch.xpack.core.ml.action.ForecastJobAction; @@ -235,6 +236,7 @@ public List getClientActions() { GetRecordsAction.INSTANCE, PostDataAction.INSTANCE, CloseJobAction.INSTANCE, + FinalizeJobExecutionAction.INSTANCE, FlushJobAction.INSTANCE, ValidateDetectorAction.INSTANCE, ValidateJobConfigAction.INSTANCE, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FinalizeJobExecutionAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FinalizeJobExecutionAction.java new file mode 100644 index 0000000000000..cb176a397ace8 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FinalizeJobExecutionAction.java @@ -0,0 +1,79 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +public class FinalizeJobExecutionAction extends Action { + + public static final FinalizeJobExecutionAction INSTANCE = new FinalizeJobExecutionAction(); + public static final String NAME = "cluster:internal/xpack/ml/job/finalize_job_execution"; + + private FinalizeJobExecutionAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client, INSTANCE); + } + + @Override + public AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + public static class Request extends MasterNodeRequest { + + private String[] jobIds; + + public Request(String[] jobIds) { + this.jobIds = jobIds; + } + + public Request() { + } + + public String[] getJobIds() { + return jobIds; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + jobIds = in.readStringArray(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(jobIds); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + } + + public static class RequestBuilder + extends MasterNodeOperationRequestBuilder { + + public RequestBuilder(ElasticsearchClient client, FinalizeJobExecutionAction action) { + super(client, action, new Request()); + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index f62acbf85337c..f1a0745d58fed 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -66,6 +66,7 @@ import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; +import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.action.FindFileStructureAction; import org.elasticsearch.xpack.core.ml.action.FlushJobAction; import org.elasticsearch.xpack.core.ml.action.ForecastJobAction; @@ -119,6 +120,7 @@ import org.elasticsearch.xpack.ml.action.TransportDeleteForecastAction; import org.elasticsearch.xpack.ml.action.TransportDeleteJobAction; import org.elasticsearch.xpack.ml.action.TransportDeleteModelSnapshotAction; +import org.elasticsearch.xpack.ml.action.TransportFinalizeJobExecutionAction; import org.elasticsearch.xpack.ml.action.TransportFindFileStructureAction; import org.elasticsearch.xpack.ml.action.TransportFlushJobAction; import org.elasticsearch.xpack.ml.action.TransportForecastJobAction; @@ -544,6 +546,7 @@ public List getRestHandlers(Settings settings, RestController restC new ActionHandler<>(GetRecordsAction.INSTANCE, TransportGetRecordsAction.class), new ActionHandler<>(PostDataAction.INSTANCE, TransportPostDataAction.class), new ActionHandler<>(CloseJobAction.INSTANCE, TransportCloseJobAction.class), + new ActionHandler<>(FinalizeJobExecutionAction.INSTANCE, TransportFinalizeJobExecutionAction.class), new ActionHandler<>(FlushJobAction.INSTANCE, TransportFlushJobAction.class), new ActionHandler<>(ValidateDetectorAction.INSTANCE, TransportValidateDetectorAction.class), new ActionHandler<>(ValidateJobConfigAction.INSTANCE, TransportValidateJobConfigAction.class), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java new file mode 100644 index 0000000000000..87f1802547f2f --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; +import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.elasticsearch.xpack.ml.utils.ChainTaskExecutor; + +import java.util.Date; + +public class TransportFinalizeJobExecutionAction extends TransportMasterNodeAction { + + private final JobConfigProvider jobConfigProvider; + + @Inject + public TransportFinalizeJobExecutionAction(Settings settings, TransportService transportService, + ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + JobConfigProvider jobConfigProvider) { + super(settings, FinalizeJobExecutionAction.NAME, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, FinalizeJobExecutionAction.Request::new); + this.jobConfigProvider = jobConfigProvider; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + @Override + protected void masterOperation(FinalizeJobExecutionAction.Request request, ClusterState state, + ActionListener listener) { + + String jobIdString = String.join(",", request.getJobIds()); + logger.debug("finalizing jobs [{}]", jobIdString); + + ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.executor( + MachineLearning.UTILITY_THREAD_POOL_NAME), false); + + Date now = new Date(); + for (String jobId : request.getJobIds()) { + JobUpdate finishedTimeUpdate = new JobUpdate.Builder(jobId).setFinishedTime(now).build(); + chainTaskExecutor.add(updateListener -> { + jobConfigProvider.updateJob(jobId, finishedTimeUpdate, null, ActionListener.wrap( + response -> updateListener.onResponse(null), + updateListener::onFailure + ) ); + }); + } + + chainTaskExecutor.execute(ActionListener.wrap( + response -> listener.onResponse(new AcknowledgedResponse(true)), + listener::onFailure + )); + } + + @Override + protected ClusterBlockException checkBlock(FinalizeJobExecutionAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} From 99008ffc563e47edf9b73a847a702745dc8ffed8 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 12 Oct 2018 12:17:22 +0100 Subject: [PATCH 10/13] Make finalise job action a no-op --- .../elasticsearch/xpack/core/ml/MlTasks.java | 2 +- .../TransportFinalizeJobExecutionAction.java | 37 ++----------------- 2 files changed, 5 insertions(+), 34 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java index a56d3d639239d..46685001153d7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java @@ -97,7 +97,7 @@ public static Set openJobIds(PersistentTasksCustomMetaData tasks) { * Is there an ml anomaly detector job task for the job {@code jobId}? * @param jobId The job id * @param tasks Persistent tasks - * @return + * @return True if the job has a task */ public static boolean taskExistsForJob(String jobId, PersistentTasksCustomMetaData tasks) { return openJobIds(tasks).contains(jobId); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java index 87f1802547f2f..c9fdd7b18fb53 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java @@ -19,27 +19,17 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; -import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; -import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; -import org.elasticsearch.xpack.ml.utils.ChainTaskExecutor; - -import java.util.Date; public class TransportFinalizeJobExecutionAction extends TransportMasterNodeAction { - private final JobConfigProvider jobConfigProvider; - @Inject public TransportFinalizeJobExecutionAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, - JobConfigProvider jobConfigProvider) { + IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, FinalizeJobExecutionAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, FinalizeJobExecutionAction.Request::new); - this.jobConfigProvider = jobConfigProvider; } @Override @@ -55,28 +45,9 @@ protected AcknowledgedResponse newResponse() { @Override protected void masterOperation(FinalizeJobExecutionAction.Request request, ClusterState state, ActionListener listener) { - - String jobIdString = String.join(",", request.getJobIds()); - logger.debug("finalizing jobs [{}]", jobIdString); - - ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.executor( - MachineLearning.UTILITY_THREAD_POOL_NAME), false); - - Date now = new Date(); - for (String jobId : request.getJobIds()) { - JobUpdate finishedTimeUpdate = new JobUpdate.Builder(jobId).setFinishedTime(now).build(); - chainTaskExecutor.add(updateListener -> { - jobConfigProvider.updateJob(jobId, finishedTimeUpdate, null, ActionListener.wrap( - response -> updateListener.onResponse(null), - updateListener::onFailure - ) ); - }); - } - - chainTaskExecutor.execute(ActionListener.wrap( - response -> listener.onResponse(new AcknowledgedResponse(true)), - listener::onFailure - )); + // This action is no longer required but needs to be preserved + // in case it is called by an old node in a mixed cluster + listener.onResponse(new AcknowledgedResponse(true)); } @Override From a157a6ef9ff620371bc02616ead719c0d8aa6cea Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 15 Oct 2018 13:39:32 +0100 Subject: [PATCH 11/13] Fix precommit check --- .../xpack/ml/job/persistence/MockClientBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java index 726b815728f52..c10af20aba79f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java @@ -274,7 +274,7 @@ public MockClientBuilder prepareSearch(String index, String type, int from, int * Creates a {@link SearchResponse} with a {@link SearchHit} for each element of {@code docs} * @param indexName Index being searched * @param docs Returned in the SearchResponse - * @return + * @return this */ @SuppressWarnings("unchecked") public MockClientBuilder prepareSearch(String indexName, List docs) { From deb30877b4974123b08dd9dc50114acd5217efab Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 15 Oct 2018 18:16:47 +0100 Subject: [PATCH 12/13] Add Finalize job action back to ReservedRolesStoreTests --- .../core/security/authz/store/ReservedRolesStoreTests.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java index f6b59af54337d..f822fec67e946 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; +import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.action.FlushJobAction; import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; import org.elasticsearch.xpack.core.ml.action.GetCategoriesAction; @@ -616,6 +617,7 @@ public void testMachineLearningAdminRole() { assertThat(role.cluster().check(DeleteFilterAction.NAME, request), is(true)); assertThat(role.cluster().check(DeleteJobAction.NAME, request), is(true)); assertThat(role.cluster().check(DeleteModelSnapshotAction.NAME, request), is(true)); + assertThat(role.cluster().check(FinalizeJobExecutionAction.NAME, request), is(false)); // internal use only assertThat(role.cluster().check(FlushJobAction.NAME, request), is(true)); assertThat(role.cluster().check(GetBucketsAction.NAME, request), is(true)); assertThat(role.cluster().check(GetCategoriesAction.NAME, request), is(true)); @@ -667,6 +669,7 @@ public void testMachineLearningUserRole() { assertThat(role.cluster().check(DeleteFilterAction.NAME, request), is(false)); assertThat(role.cluster().check(DeleteJobAction.NAME, request), is(false)); assertThat(role.cluster().check(DeleteModelSnapshotAction.NAME, request), is(false)); + assertThat(role.cluster().check(FinalizeJobExecutionAction.NAME, request), is(false)); assertThat(role.cluster().check(FlushJobAction.NAME, request), is(false)); assertThat(role.cluster().check(GetBucketsAction.NAME, request), is(true)); assertThat(role.cluster().check(GetCategoriesAction.NAME, request), is(true)); From 70348fc0f652d7c51d8627db7cb1616c43ec878b Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 15 Oct 2018 19:14:10 +0100 Subject: [PATCH 13/13] Fix BWC on UpdateJobAction request --- .../xpack/core/ml/action/UpdateJobAction.java | 11 ++++++++++- .../core/ml/action/UpdateJobActionRequestTests.java | 9 ++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java index 92d2e93d317d9..c3bb4b7e0498d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java @@ -102,6 +102,10 @@ public void readFrom(StreamInput in) throws IOException { } else { isInternal = false; } + // TODO jindex change CURRENT to specific version when feature branch is merged + if (in.getVersion().onOrAfter(Version.V_6_3_0) && in.getVersion().before(Version.CURRENT)) { + in.readBoolean(); // was waitForAck + } } @Override @@ -112,6 +116,10 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_6_2_2)) { out.writeBoolean(isInternal); } + // TODO jindex change CURRENT to specific version when feature branch is merged + if (out.getVersion().onOrAfter(Version.V_6_3_0) && out.getVersion().before(Version.CURRENT)) { + out.writeBoolean(false); // was waitForAck + } } @Override @@ -127,7 +135,8 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; UpdateJobAction.Request that = (UpdateJobAction.Request) o; return Objects.equals(jobId, that.jobId) && - Objects.equals(update, that.update); + Objects.equals(update, that.update) && + isInternal == that.isInternal; } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateJobActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateJobActionRequestTests.java index 85097100e39e0..20d27f03d0c29 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateJobActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateJobActionRequestTests.java @@ -18,7 +18,14 @@ protected UpdateJobAction.Request createTestInstance() { // no need to randomize JobUpdate this is already tested in: JobUpdateTests JobUpdate.Builder jobUpdate = new JobUpdate.Builder(jobId); jobUpdate.setAnalysisLimits(new AnalysisLimits(100L, 100L)); - UpdateJobAction.Request request = new UpdateJobAction.Request(jobId, jobUpdate.build()); + UpdateJobAction.Request request; + if (randomBoolean()) { + request = new UpdateJobAction.Request(jobId, jobUpdate.build()); + } else { + // this call sets isInternal = true + request = UpdateJobAction.Request.internal(jobId, jobUpdate.build()); + } + return request; }