diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 019715b30853e..c4e5793b45171 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -18,12 +18,14 @@ import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.AliasOrIndex; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; @@ -471,12 +473,25 @@ protected ClusterBlockException checkBlock(OpenJobAction.Request request, Cluste protected void masterOperation(OpenJobAction.Request request, ClusterState state, ActionListener listener) { OpenJobAction.JobParams jobParams = request.getJobParams(); if (licenseState.isMachineLearningAllowed()) { - // Step 5. Wait for job to be started and respond - ActionListener> finalListener = + + // Step 6. Clear job finished time once the job is started and respond + ActionListener clearJobFinishTime = ActionListener.wrap( + response -> { + if (response.isAcknowledged()) { + clearJobFinishedTime(jobParams.getJobId(), listener); + } else { + listener.onResponse(response); + } + }, + listener::onFailure + ); + + // Step 5. Wait for job to be started + ActionListener> waitForJobToStart = new ActionListener>() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { - waitForJobStarted(task.getId(), jobParams, listener); + waitForJobStarted(task.getId(), jobParams, clearJobFinishTime); } @Override @@ -492,7 +507,7 @@ public void onFailure(Exception e) { // Step 4. Start job task ActionListener establishedMemoryUpdateListener = ActionListener.wrap( response -> persistentTasksService.sendStartRequest(MlTasks.jobTaskId(jobParams.getJobId()), - OpenJobAction.TASK_NAME, jobParams, finalListener), + OpenJobAction.TASK_NAME, jobParams, waitForJobToStart), listener::onFailure ); @@ -574,6 +589,35 @@ public void onTimeout(TimeValue timeout) { }); } + private void clearJobFinishedTime(String jobId, ActionListener listener) { + clusterService.submitStateUpdateTask("clearing-job-finish-time-for-" + jobId, new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState); + MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata); + Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId)); + jobBuilder.setFinishedTime(null); + + mlMetadataBuilder.putJob(jobBuilder.build(), true); + ClusterState.Builder builder = ClusterState.builder(currentState); + return builder.metaData(new MetaData.Builder(currentState.metaData()) + .putCustom(MlMetadata.TYPE, mlMetadataBuilder.build())) + .build(); + } + + @Override + public void onFailure(String source, Exception e) { + logger.error("[" + jobId + "] Failed to clear finished_time; source [" + source + "]", e); + listener.onResponse(new OpenJobAction.Response(true)); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, + ClusterState newState) { + listener.onResponse(new OpenJobAction.Response(true)); + } + }); + } private void cancelJobStart(PersistentTasksCustomMetaData.PersistentTask persistentTask, Exception exception, ActionListener listener) { persistentTasksService.sendRemoveRequest(persistentTask.getId(), diff --git a/x-pack/qa/ml-native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobResetsFinishedTimeIT.java b/x-pack/qa/ml-native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobResetsFinishedTimeIT.java new file mode 100644 index 0000000000000..325b1370315ca --- /dev/null +++ b/x-pack/qa/ml-native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobResetsFinishedTimeIT.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.junit.After; + +import java.util.Collections; + +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.Matchers.is; + +public class ReopenJobResetsFinishedTimeIT extends MlNativeAutodetectIntegTestCase { + + @After + public void cleanUpTest() { + cleanUp(); + } + + public void test() { + final String jobId = "reset-finished-time-test"; + Job.Builder job = createJob(jobId); + + registerJob(job); + putJob(job); + openJob(job.getId()); + + assertThat(getSingleJob(jobId).getFinishedTime(), is(nullValue())); + + closeJob(jobId); + assertThat(getSingleJob(jobId).getFinishedTime(), is(notNullValue())); + + openJob(jobId); + assertThat(getSingleJob(jobId).getFinishedTime(), is(nullValue())); + } + + private Job getSingleJob(String jobId) { + return getJob(jobId).get(0); + } + + private Job.Builder createJob(String id) { + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); + dataDescription.setTimeFormat(DataDescription.EPOCH_MS); + + Detector.Builder d = new Detector.Builder("count", null); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); + + Job.Builder builder = new Job.Builder(); + builder.setId(id); + builder.setAnalysisConfig(analysisConfig); + builder.setDataDescription(dataDescription); + return builder; + } +}