From 5d85418f4835557f82144271cb0f2709d6e6efe9 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 1 Oct 2018 17:15:57 +0100 Subject: [PATCH 1/7] Add job and datafeed to datafeed task params --- .../core/ml/action/StartDatafeedAction.java | 33 +++- .../xpack/ml/MachineLearning.java | 3 + .../action/TransportStartDatafeedAction.java | 159 ++++++++++-------- .../ml/datafeed/DatafeedNodeSelector.java | 23 +-- .../TransportStartDatafeedActionTests.java | 36 +--- .../datafeed/DatafeedNodeSelectorTests.java | 113 +++++-------- 6 files changed, 181 insertions(+), 186 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java index 16e5810b9a465..46692bfb876d0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -194,6 +195,10 @@ public DatafeedParams(StreamInput in) throws IOException { startTime = in.readVLong(); endTime = in.readOptionalLong(); timeout = TimeValue.timeValueMillis(in.readVLong()); + if (in.getVersion().onOrAfter(Version.CURRENT)) { + datafeedConfig = in.readOptionalWriteable(DatafeedConfig::new); + job = in.readOptionalWriteable(Job::new); + } } DatafeedParams() { @@ -203,6 +208,8 @@ public DatafeedParams(StreamInput in) throws IOException { private long startTime; private Long endTime; private TimeValue timeout = TimeValue.timeValueSeconds(20); + private DatafeedConfig datafeedConfig; + private Job job; public String getDatafeedId() { return datafeedId; @@ -232,6 +239,22 @@ public void setTimeout(TimeValue timeout) { this.timeout = timeout; } + public DatafeedConfig getDatafeedConfig() { + return datafeedConfig; + } + + public void setDatafeedConfig(DatafeedConfig datafeedConfig) { + this.datafeedConfig = datafeedConfig; + } + + public Job getJob() { + return job; + } + + public void setJob(Job job) { + this.job = job; + } + @Override public String getWriteableName() { return MlTasks.DATAFEED_TASK_NAME; @@ -248,6 +271,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(startTime); out.writeOptionalLong(endTime); out.writeVLong(timeout.millis()); + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeOptionalWriteable(datafeedConfig); + out.writeOptionalWriteable(job); + } } @Override @@ -265,7 +292,7 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par @Override public int hashCode() { - return Objects.hash(datafeedId, startTime, endTime, timeout); + return Objects.hash(datafeedId, startTime, endTime, timeout, datafeedConfig, job); } @Override @@ -280,7 +307,9 @@ public boolean equals(Object obj) { return Objects.equals(datafeedId, other.datafeedId) && Objects.equals(startTime, other.startTime) && Objects.equals(endTime, other.endTime) && - Objects.equals(timeout, other.timeout); + Objects.equals(timeout, other.timeout) && + Objects.equals(datafeedConfig, other.datafeedConfig) && + Objects.equals(job, other.job); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 219096f128b05..f1a0745d58fed 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -162,6 +162,7 @@ import org.elasticsearch.xpack.ml.action.TransportValidateJobConfigAction; import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.UpdateJobProcessNotifier; import org.elasticsearch.xpack.ml.job.categorization.MlClassicTokenizer; @@ -369,6 +370,7 @@ public Collection createComponents(Client client, ClusterService cluster Auditor auditor = new Auditor(client, clusterService.nodeName()); JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings); JobConfigProvider jobConfigProvider = new JobConfigProvider(client, settings); + DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client, settings, xContentRegistry); UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, client, clusterService, threadPool); JobManager jobManager = new JobManager(env, settings, jobResultsProvider, clusterService, auditor, threadPool, client, notifier); @@ -426,6 +428,7 @@ public Collection createComponents(Client client, ClusterService cluster mlLifeCycleService, jobResultsProvider, jobConfigProvider, + datafeedConfigProvider, jobManager, autodetectProcessManager, new MlInitializationService(settings, threadPool, clusterService, client), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 578f4ee5f983b..912ca9551dff7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -35,7 +35,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackField; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -48,10 +47,13 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Predicate; /* This class extends from TransportMasterNodeAction for cluster state observing purposes. @@ -67,34 +69,30 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction listener) { StartDatafeedAction.DatafeedParams params = request.getParams(); - if (licenseState.isMachineLearningAllowed()) { - - ActionListener> waitForTaskListener = - new ActionListener>() { - @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask - persistentTask) { - waitForDatafeedStarted(persistentTask.getId(), params, listener); - } - - @Override - public void onFailure(Exception e) { - if (e instanceof ResourceAlreadyExistsException) { - logger.debug("datafeed already started", e); - e = new ElasticsearchStatusException("cannot start datafeed [" + params.getDatafeedId() + - "] because it has already been started", RestStatus.CONFLICT); - } - listener.onFailure(e); - } - }; - - // Verify data extractor factory can be created, then start persistent task - MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); - PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - validate(params.getDatafeedId(), mlMetadata, tasks); - DatafeedConfig datafeed = mlMetadata.getDatafeed(params.getDatafeedId()); - Job job = mlMetadata.getJobs().get(datafeed.getJobId()); - - if (RemoteClusterLicenseChecker.containsRemoteIndex(datafeed.getIndices())) { - final RemoteClusterLicenseChecker remoteClusterLicenseChecker = - new RemoteClusterLicenseChecker(client, XPackLicenseState::isMachineLearningAllowedForOperationMode); - remoteClusterLicenseChecker.checkRemoteClusterLicenses( - RemoteClusterLicenseChecker.remoteClusterAliases(datafeed.getIndices()), - ActionListener.wrap( - response -> { - if (response.isSuccess() == false) { - listener.onFailure(createUnlicensedError(datafeed.getId(), response)); - } else { - createDataExtractor(job, datafeed, params, waitForTaskListener); - } - }, - e -> listener.onFailure( - createUnknownLicenseError( - datafeed.getId(), RemoteClusterLicenseChecker.remoteIndices(datafeed.getIndices()), e)) - )); - } else { - createDataExtractor(job, datafeed, params, waitForTaskListener); - } - } else { + if (licenseState.isMachineLearningAllowed() == false) { listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING)); + return; } + + PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + + ActionListener> waitForTaskListener = + new ActionListener>() { + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask + persistentTask) { + waitForDatafeedStarted(persistentTask.getId(), params, listener); + } + + @Override + public void onFailure(Exception e) { + if (e instanceof ResourceAlreadyExistsException) { + logger.debug("datafeed already started", e); + e = new ElasticsearchStatusException("cannot start datafeed [" + params.getDatafeedId() + + "] because it has already been started", RestStatus.CONFLICT); + } + listener.onFailure(e); + } + }; + + // Verify data extractor factory can be created, then start persistent task + Consumer createDataExtrator = ok -> { + if (RemoteClusterLicenseChecker.containsRemoteIndex(params.getDatafeedConfig().getIndices())) { + final RemoteClusterLicenseChecker remoteClusterLicenseChecker = + new RemoteClusterLicenseChecker(client, XPackLicenseState::isMachineLearningAllowedForOperationMode); + remoteClusterLicenseChecker.checkRemoteClusterLicenses( + RemoteClusterLicenseChecker.remoteClusterAliases(params.getDatafeedConfig().getIndices()), + ActionListener.wrap( + response -> { + if (response.isSuccess() == false) { + listener.onFailure(createUnlicensedError(params.getDatafeedConfig().getId(), response)); + } else { + createDataExtractor(params.getJob(), params.getDatafeedConfig(), params, waitForTaskListener); + } + }, + e -> listener.onFailure( + createUnknownLicenseError( + params.getDatafeedConfig().getId(), + RemoteClusterLicenseChecker.remoteIndices(params.getDatafeedConfig().getIndices()), e)) + )); + } else { + createDataExtractor(params.getJob(), params.getDatafeedConfig(), params, waitForTaskListener); + } + }; + + ActionListener jobListener = ActionListener.wrap( + jobBuilder -> { + try { + params.setJob(jobBuilder.build()); + validate(params.getJob(), params.getDatafeedConfig(), tasks); + createDataExtrator.accept(Boolean.TRUE); + } catch (Exception e) { + listener.onFailure(e); + } + }, + listener::onFailure + ); + + ActionListener datafeedListener = ActionListener.wrap( + datafeedConfig -> { + try { + params.setDatafeedConfig(datafeedConfig.build()); + jobConfigProvider.getJob(params.getDatafeedConfig().getJobId(), jobListener); + } catch (Exception e) { + listener.onFailure(e); + } + }, + listener::onFailure + ); + + datafeedConfigProvider.getDatafeedConfig(params.getDatafeedId(), datafeedListener); } private void createDataExtractor(Job job, DatafeedConfig datafeed, StartDatafeedAction.DatafeedParams params, @@ -280,14 +304,15 @@ public StartDatafeedPersistentTasksExecutor(Settings settings, DatafeedManager d @Override public PersistentTasksCustomMetaData.Assignment getAssignment(StartDatafeedAction.DatafeedParams params, ClusterState clusterState) { - return new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId()).selectNode(); + return new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId(), params.getDatafeedConfig()).selectNode(); } @Override public void validate(StartDatafeedAction.DatafeedParams params, ClusterState clusterState) { PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - TransportStartDatafeedAction.validate(params.getDatafeedId(), MlMetadata.getMlMetadata(clusterState), tasks); - new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId()).checkDatafeedTaskCanBeCreated(); + TransportStartDatafeedAction.validate(params.getJob(), params.getDatafeedConfig(), tasks); + new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId(), params.getDatafeedConfig()) + .checkDatafeedTaskCanBeCreated(); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index ce3f611b2227a..39c123c119c6a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.JobState; @@ -29,14 +28,18 @@ public class DatafeedNodeSelector { private static final Logger LOGGER = Loggers.getLogger(DatafeedNodeSelector.class); private final DatafeedConfig datafeed; + private final String datafeedId; private final PersistentTasksCustomMetaData.PersistentTask jobTask; private final ClusterState clusterState; private final IndexNameExpressionResolver resolver; - public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolver resolver, String datafeedId) { - MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + // TODO The datafeed config may be null because of streaming the + // DatafeedParams in a mixed cluster state + public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolver resolver, String datafeedId, + @Nullable DatafeedConfig datafeed) { PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - this.datafeed = mlMetadata.getDatafeed(datafeedId); + this.datafeed = datafeed; + this.datafeedId = datafeedId; this.jobTask = MlTasks.getJobTask(datafeed.getJobId(), tasks); this.clusterState = Objects.requireNonNull(clusterState); this.resolver = Objects.requireNonNull(resolver); @@ -45,8 +48,7 @@ public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolv public void checkDatafeedTaskCanBeCreated() { AssignmentFailure assignmentFailure = checkAssignment(); if (assignmentFailure != null && assignmentFailure.isCriticalForTaskCreation) { - String msg = "No node found to start datafeed [" + datafeed.getId() + "], allocation explanation [" + assignmentFailure.reason - + "]"; + String msg = "No node found to start datafeed [" + datafeedId + "], allocation explanation [" + assignmentFailure.reason + "]"; LOGGER.debug(msg); throw ExceptionsHelper.conflictStatusException(msg); } @@ -64,7 +66,7 @@ public PersistentTasksCustomMetaData.Assignment selectNode() { @Nullable private AssignmentFailure checkAssignment() { PriorityFailureCollector priorityFailureCollector = new PriorityFailureCollector(); - priorityFailureCollector.add(verifyIndicesActive(datafeed)); + priorityFailureCollector.add(verifyIndicesActive()); JobTaskState jobTaskState = null; JobState jobState = JobState.CLOSED; @@ -75,13 +77,14 @@ private AssignmentFailure checkAssignment() { if (jobState.isAnyOf(JobState.OPENING, JobState.OPENED) == false) { // lets try again later when the job has been opened: - String reason = "cannot start datafeed [" + datafeed.getId() + "], because job's [" + datafeed.getJobId() + + String reason = "cannot start datafeed [" + datafeed.getId() + "], because the job's [" + datafeed.getJobId() + "] state is [" + jobState + "] while state [" + JobState.OPENED + "] is required"; priorityFailureCollector.add(new AssignmentFailure(reason, true)); } if (jobTaskState != null && jobTaskState.isStatusStale(jobTask)) { - String reason = "cannot start datafeed [" + datafeed.getId() + "], job [" + datafeed.getJobId() + "] state is stale"; + String reason = "cannot start datafeed [" + datafeed.getId() + "], because the job's [" + datafeed.getJobId() + + "] state is stale"; priorityFailureCollector.add(new AssignmentFailure(reason, true)); } @@ -89,7 +92,7 @@ private AssignmentFailure checkAssignment() { } @Nullable - private AssignmentFailure verifyIndicesActive(DatafeedConfig datafeed) { + private AssignmentFailure verifyIndicesActive() { List indices = datafeed.getIndices(); for (String index : indices) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java index 610a5c1b92fb6..7afb048f7aedb 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java @@ -7,11 +7,9 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -28,61 +26,33 @@ public class TransportStartDatafeedActionTests extends ESTestCase { - public void testValidate_GivenDatafeedIsMissing() { - Job job = DatafeedManagerTests.createDatafeedJob().build(new Date()); - MlMetadata mlMetadata = new MlMetadata.Builder() - .putJob(job, false) - .build(); - Exception e = expectThrows(ResourceNotFoundException.class, - () -> TransportStartDatafeedAction.validate("some-datafeed", mlMetadata, null)); - assertThat(e.getMessage(), equalTo("No datafeed with id [some-datafeed] exists")); - } - public void testValidate_jobClosed() { Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date()); - MlMetadata mlMetadata1 = new MlMetadata.Builder() - .putJob(job1, false) - .build(); PersistentTasksCustomMetaData tasks = PersistentTasksCustomMetaData.builder().build(); DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); - MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) - .putDatafeed(datafeedConfig1, Collections.emptyMap()) - .build(); Exception e = expectThrows(ElasticsearchStatusException.class, - () -> TransportStartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks)); + () -> TransportStartDatafeedAction.validate(job1, datafeedConfig1, tasks)); assertThat(e.getMessage(), equalTo("cannot start datafeed [foo-datafeed] because job [job_id] is closed")); } public void testValidate_jobOpening() { Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date()); - MlMetadata mlMetadata1 = new MlMetadata.Builder() - .putJob(job1, false) - .build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", INITIAL_ASSIGNMENT.getExecutorNode(), null, tasksBuilder); PersistentTasksCustomMetaData tasks = tasksBuilder.build(); DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); - MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) - .putDatafeed(datafeedConfig1, Collections.emptyMap()) - .build(); - TransportStartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks); + TransportStartDatafeedAction.validate(job1, datafeedConfig1, tasks); } public void testValidate_jobOpened() { Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date()); - MlMetadata mlMetadata1 = new MlMetadata.Builder() - .putJob(job1, false) - .build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", INITIAL_ASSIGNMENT.getExecutorNode(), JobState.OPENED, tasksBuilder); PersistentTasksCustomMetaData tasks = tasksBuilder.build(); DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); - MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) - .putDatafeed(datafeedConfig1, Collections.emptyMap()) - .build(); - TransportStartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks); + TransportStartDatafeedAction.validate(job1, datafeedConfig1, tasks); } public static TransportStartDatafeedAction.DatafeedTask createDatafeedTask(long id, String type, String action, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java index 4b8ad1d08aed3..cb10de0801d59 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java @@ -26,13 +26,13 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.junit.Before; import java.net.InetAddress; @@ -52,7 +52,6 @@ public class DatafeedNodeSelectorTests extends ESTestCase { private IndexNameExpressionResolver resolver; private DiscoveryNodes nodes; private ClusterState clusterState; - private MlMetadata mlMetadata; private PersistentTasksCustomMetaData tasks; @Before @@ -65,11 +64,8 @@ public void init() { } public void testSelectNode_GivenJobIsOpened() { - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); - mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), Collections.emptyMap()); - mlMetadata = mlMetadataBuilder.build(); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); @@ -77,17 +73,14 @@ public void testSelectNode_GivenJobIsOpened() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).selectNode(); assertEquals("node_id", result.getExecutorNode()); - new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).checkDatafeedTaskCanBeCreated(); } public void testSelectNode_GivenJobIsOpening() { - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); - mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), Collections.emptyMap()); - mlMetadata = mlMetadataBuilder.build(); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", null, tasksBuilder); @@ -95,41 +88,35 @@ public void testSelectNode_GivenJobIsOpening() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).selectNode(); assertEquals("node_id", result.getExecutorNode()); - new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).checkDatafeedTaskCanBeCreated(); } public void testNoJobTask() { - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); - mlMetadataBuilder.putJob(job, false); // Using wildcard index name to test for index resolving as well - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")), Collections.emptyMap()); - mlMetadata = mlMetadataBuilder.build(); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")); tasks = PersistentTasksCustomMetaData.builder().build(); givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).selectNode(); assertNull(result.getExecutorNode()); - assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id], because job's [job_id] state is " + + assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id], because the job's [job_id] state is " + "[closed] while state [opened] is required")); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " - + "[cannot start datafeed [datafeed_id], because job's [job_id] state is [closed] while state [opened] is required]")); + + "[cannot start datafeed [datafeed_id], because the job's [job_id] state is [closed] while state [opened] is required]")); } public void testSelectNode_GivenJobFailedOrClosed() { - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); - mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), Collections.emptyMap()); - mlMetadata = mlMetadataBuilder.build(); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED); @@ -138,26 +125,23 @@ public void testSelectNode_GivenJobFailedOrClosed() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).selectNode(); assertNull(result.getExecutorNode()); - assertEquals("cannot start datafeed [datafeed_id], because job's [job_id] state is [" + jobState + + assertEquals("cannot start datafeed [datafeed_id], because the job's [job_id] state is [" + jobState + "] while state [opened] is required", result.getExplanation()); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " - + "[cannot start datafeed [datafeed_id], because job's [job_id] state is [" + jobState + + "[cannot start datafeed [datafeed_id], because the job's [job_id] state is [" + jobState + "] while state [opened] is required]")); } public void testShardUnassigned() { - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); - mlMetadataBuilder.putJob(job, false); // Using wildcard index name to test for index resolving as well - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")), Collections.emptyMap()); - mlMetadata = mlMetadataBuilder.build(); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); @@ -168,22 +152,19 @@ public void testShardUnassigned() { givenClusterState("foo", 1, 0, states); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " + "does not have all primary shards active yet.")); - new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).checkDatafeedTaskCanBeCreated(); } public void testShardNotAllActive() { - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); - mlMetadataBuilder.putJob(job, false); // Using wildcard index name to test for index resolving as well - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")), Collections.emptyMap()); - mlMetadata = mlMetadataBuilder.build(); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("fo*")); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); @@ -195,21 +176,17 @@ public void testShardNotAllActive() { givenClusterState("foo", 2, 0, states); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " + "does not have all primary shards active yet.")); - new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).checkDatafeedTaskCanBeCreated(); } public void testIndexDoesntExist() { - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); - mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")), - Collections.emptyMap()); - mlMetadata = mlMetadataBuilder.build(); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); @@ -217,24 +194,20 @@ public void testIndexDoesntExist() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [not_foo] " + "does not exist, is closed, or is still initializing.")); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]")); } public void testRemoteIndex() { - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); - mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("remote:foo")), - Collections.emptyMap()); - mlMetadata = mlMetadataBuilder.build(); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("remote:foo")); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); @@ -242,16 +215,13 @@ public void testRemoteIndex() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).selectNode(); assertNotNull(result.getExecutorNode()); } public void testSelectNode_jobTaskStale() { - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); - mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")), Collections.emptyMap()); - mlMetadata = mlMetadataBuilder.build(); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")); String nodeId = randomBoolean() ? "node_id2" : null; PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); @@ -262,44 +232,40 @@ public void testSelectNode_jobTaskStale() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).selectNode(); assertNull(result.getExecutorNode()); - assertEquals("cannot start datafeed [datafeed_id], job [job_id] state is stale", + assertEquals("cannot start datafeed [datafeed_id], because the job's [job_id] state is stale", result.getExplanation()); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " - + "[cannot start datafeed [datafeed_id], job [job_id] state is stale]")); + + "[cannot start datafeed [datafeed_id], because the job's [job_id] state is stale]")); tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id1", JobState.OPENED, tasksBuilder); tasks = tasksBuilder.build(); givenClusterState("foo", 1, 0); - result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); + result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).selectNode(); assertEquals("node_id1", result.getExecutorNode()); - new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).checkDatafeedTaskCanBeCreated(); } public void testSelectNode_GivenJobOpeningAndIndexDoesNotExist() { // Here we test that when there are 2 problems, the most critical gets reported first. // In this case job is Opening (non-critical) and the index does not exist (critical) - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); Job job = createScheduledJob("job_id").build(new Date()); - mlMetadataBuilder.putJob(job, false); - mlMetadataBuilder.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")), - Collections.emptyMap()); - mlMetadata = mlMetadataBuilder.build(); + DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("not_foo")); - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", JobState.OPENING, tasksBuilder); tasks = tasksBuilder.build(); givenClusterState("foo", 1, 0); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]")); } @@ -319,7 +285,6 @@ private void givenClusterState(String index, int numberOfShards, int numberOfRep clusterState = ClusterState.builder(new ClusterName("cluster_name")) .metaData(new MetaData.Builder() - .putCustom(MlMetadata.TYPE, mlMetadata) .putCustom(PersistentTasksCustomMetaData.TYPE, tasks) .put(indexMetaData, false)) .nodes(nodes) From 4c5bf8756cdb3f29efcfbac8e1828361159b07bf Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 1 Oct 2018 17:50:23 +0100 Subject: [PATCH 2/7] Pass job & datafeed config to datafeed manager --- .../action/TransportStartDatafeedAction.java | 2 +- .../xpack/ml/datafeed/DatafeedManager.java | 15 ++--- .../ml/datafeed/DatafeedManagerTests.java | 60 ++++++++----------- 3 files changed, 30 insertions(+), 47 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 912ca9551dff7..3ac497f557902 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -321,7 +321,7 @@ protected void nodeOperation(final AllocatedPersistentTask allocatedPersistentTa final PersistentTaskState state) { DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask; datafeedTask.datafeedManager = datafeedManager; - datafeedManager.run(datafeedTask, + datafeedManager.run(datafeedTask, params.getJob(), params.getDatafeedConfig(), (error) -> { if (error != null) { datafeedTask.markAsFailed(error); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 3d4d66eba92a3..c0702af90e1f3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -18,9 +18,10 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; @@ -29,8 +30,6 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction; import org.elasticsearch.xpack.ml.notifications.Auditor; @@ -48,9 +47,9 @@ import java.util.function.Consumer; import java.util.function.Supplier; +import static org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskListener; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; -import static org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskListener; public class DatafeedManager extends AbstractComponent { @@ -77,13 +76,7 @@ public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clus clusterService.addListener(taskRunner); } - public void run(TransportStartDatafeedAction.DatafeedTask task, Consumer taskHandler) { - String datafeedId = task.getDatafeedId(); - ClusterState state = clusterService.state(); - MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); - - DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); - Job job = mlMetadata.getJobs().get(datafeed.getJobId()); + public void run(TransportStartDatafeedAction.DatafeedTask task, Job job, DatafeedConfig datafeed, Consumer taskHandler) { ActionListener datafeedJobHandler = ActionListener.wrap( datafeedJob -> { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index a9dec7c66d4b6..26d35a3dc1615 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -21,9 +21,10 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -34,11 +35,9 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.notifications.AuditMessage; import org.elasticsearch.xpack.core.ml.notifications.AuditorField; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.action.TransportStartDatafeedActionTests; import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction.DatafeedTask; +import org.elasticsearch.xpack.ml.action.TransportStartDatafeedActionTests; import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.Before; @@ -74,16 +73,15 @@ public class DatafeedManagerTests extends ESTestCase { private DatafeedManager datafeedManager; private long currentTime = 120000; private Auditor auditor; + private Job job; + private DatafeedConfig datafeed; private ArgumentCaptor capturedClusterStateListener = ArgumentCaptor.forClass(ClusterStateListener.class); @Before @SuppressWarnings("unchecked") public void setUpTests() { - MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); - Job job = createDatafeedJob().build(new Date()); - mlMetadata.putJob(job, false); - DatafeedConfig datafeed = createDatafeedConfig("datafeed_id", job.getId()).build(); - mlMetadata.putDatafeed(datafeed, Collections.emptyMap()); + job = createDatafeedJob().build(new Date()); + datafeed = createDatafeedConfig("datafeed_id", job.getId()).build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); PersistentTasksCustomMetaData tasks = tasksBuilder.build(); @@ -92,8 +90,7 @@ public void setUpTests() { Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .build(); ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasks)) + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasks)) .nodes(nodes); clusterService = mock(ClusterService.class); @@ -141,7 +138,7 @@ public void testLookbackOnly_WarnsWhenNoDataIsRetrieved() throws Exception { when(datafeedJob.runLookBack(0L, 60000L)).thenThrow(new DatafeedJob.EmptyDataCountException(0L)); Consumer handler = mockConsumer(); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); - datafeedManager.run(task, handler); + datafeedManager.run(task, job, datafeed, handler); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); @@ -152,7 +149,7 @@ public void testStart_GivenNewlyCreatedJobLookback() throws Exception { when(datafeedJob.runLookBack(0L, 60000L)).thenReturn(null); Consumer handler = mockConsumer(); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); - datafeedManager.run(task, handler); + datafeedManager.run(task, job, datafeed, handler); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); @@ -162,7 +159,7 @@ public void testStart_extractionProblem() throws Exception { when(datafeedJob.runLookBack(0, 60000L)).thenThrow(new DatafeedJob.ExtractionProblemException(0L, new RuntimeException("dummy"))); Consumer handler = mockConsumer(); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); - datafeedManager.run(task, handler); + datafeedManager.run(task, job, datafeed, handler); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); @@ -186,7 +183,7 @@ public void testStart_emptyDataCountException() throws Exception { Consumer handler = mockConsumer(); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, null); - datafeedManager.run(task, handler); + datafeedManager.run(task, job, datafeed, handler); verify(threadPool, times(11)).schedule(any(), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any()); verify(auditor, times(1)).warning(eq("job_id"), anyString()); @@ -201,7 +198,7 @@ public void testRealTime_GivenStoppingAnalysisProblem() throws Exception { DatafeedTask task = TransportStartDatafeedActionTests.createDatafeedTask(1, "type", "action", null, params, datafeedManager); task = spyDatafeedTask(task); - datafeedManager.run(task, handler); + datafeedManager.run(task, job, datafeed, handler); ArgumentCaptor analysisProblemCaptor = ArgumentCaptor.forClass(DatafeedJob.AnalysisProblemException.class); @@ -220,7 +217,7 @@ public void testRealTime_GivenNonStoppingAnalysisProblem() throws Exception { DatafeedTask task = TransportStartDatafeedActionTests.createDatafeedTask(1, "type", "action", null, params, datafeedManager); task = spyDatafeedTask(task); - datafeedManager.run(task, handler); + datafeedManager.run(task, job, datafeed, handler); verify(auditor).error("job_id", "Datafeed is encountering errors submitting data for analysis: non-stopping"); assertThat(datafeedManager.isRunning(task.getAllocationId()), is(true)); @@ -236,7 +233,7 @@ public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception DatafeedTask task = TransportStartDatafeedActionTests.createDatafeedTask(1, "type", "action", null, params, datafeedManager); task = spyDatafeedTask(task); - datafeedManager.run(task, handler); + datafeedManager.run(task, job, datafeed, handler); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); if (cancelled) { @@ -253,13 +250,12 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); ClusterState.Builder cs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, MlMetadata.getMlMetadata(clusterService.state())) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); when(clusterService.state()).thenReturn(cs.build()); Consumer handler = mockConsumer(); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); - datafeedManager.run(task, handler); + datafeedManager.run(task, job, datafeed, handler); // Verify datafeed has not started running yet as job is still opening verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); @@ -268,8 +264,7 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() { addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder); ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, MlMetadata.getMlMetadata(clusterService.state())) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", anotherJobCs.build(), cs.build())); @@ -279,8 +274,7 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() { tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); ClusterState.Builder jobOpenedCs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, MlMetadata.getMlMetadata(clusterService.state())) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); capturedClusterStateListener.getValue().clusterChanged( new ClusterChangedEvent("_source", jobOpenedCs.build(), anotherJobCs.build())); @@ -293,13 +287,12 @@ public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); ClusterState.Builder cs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, MlMetadata.getMlMetadata(clusterService.state())) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); when(clusterService.state()).thenReturn(cs.build()); Consumer handler = mockConsumer(); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); - datafeedManager.run(task, handler); + datafeedManager.run(task, job, datafeed, handler); // Verify datafeed has not started running yet as job is still opening verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); @@ -307,8 +300,7 @@ public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() { tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.FAILED, tasksBuilder); ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, MlMetadata.getMlMetadata(clusterService.state())) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", updatedCs.build(), cs.build())); @@ -321,13 +313,12 @@ public void testDatafeedGetsStoppedWhileWaitingForJobToOpen() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); ClusterState.Builder cs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, MlMetadata.getMlMetadata(clusterService.state())) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); when(clusterService.state()).thenReturn(cs.build()); Consumer handler = mockConsumer(); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); - datafeedManager.run(task, handler); + datafeedManager.run(task, job, datafeed, handler); // Verify datafeed has not started running yet as job is still opening verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); @@ -339,8 +330,7 @@ public void testDatafeedGetsStoppedWhileWaitingForJobToOpen() { tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, MlMetadata.getMlMetadata(clusterService.state())) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", cs.build(), updatedCs.build())); From b75ebd128abee5390e4d83622b241858e0542719 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 1 Oct 2018 18:03:25 +0100 Subject: [PATCH 3/7] datafeed config cannot be null --- .../action/TransportStartDatafeedAction.java | 4 +- .../ml/datafeed/DatafeedNodeSelector.java | 10 ++--- .../datafeed/DatafeedNodeSelectorTests.java | 40 +++++++++---------- 3 files changed, 25 insertions(+), 29 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 3ac497f557902..67ec5db65a269 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -304,14 +304,14 @@ public StartDatafeedPersistentTasksExecutor(Settings settings, DatafeedManager d @Override public PersistentTasksCustomMetaData.Assignment getAssignment(StartDatafeedAction.DatafeedParams params, ClusterState clusterState) { - return new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId(), params.getDatafeedConfig()).selectNode(); + return new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedConfig()).selectNode(); } @Override public void validate(StartDatafeedAction.DatafeedParams params, ClusterState clusterState) { PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); TransportStartDatafeedAction.validate(params.getJob(), params.getDatafeedConfig(), tasks); - new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId(), params.getDatafeedConfig()) + new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedConfig()) .checkDatafeedTaskCanBeCreated(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index 39c123c119c6a..b0b9ca0ba28b0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -28,18 +28,13 @@ public class DatafeedNodeSelector { private static final Logger LOGGER = Loggers.getLogger(DatafeedNodeSelector.class); private final DatafeedConfig datafeed; - private final String datafeedId; private final PersistentTasksCustomMetaData.PersistentTask jobTask; private final ClusterState clusterState; private final IndexNameExpressionResolver resolver; - // TODO The datafeed config may be null because of streaming the - // DatafeedParams in a mixed cluster state - public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolver resolver, String datafeedId, - @Nullable DatafeedConfig datafeed) { + public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolver resolver, DatafeedConfig datafeed) { PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); this.datafeed = datafeed; - this.datafeedId = datafeedId; this.jobTask = MlTasks.getJobTask(datafeed.getJobId(), tasks); this.clusterState = Objects.requireNonNull(clusterState); this.resolver = Objects.requireNonNull(resolver); @@ -48,7 +43,8 @@ public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolv public void checkDatafeedTaskCanBeCreated() { AssignmentFailure assignmentFailure = checkAssignment(); if (assignmentFailure != null && assignmentFailure.isCriticalForTaskCreation) { - String msg = "No node found to start datafeed [" + datafeedId + "], allocation explanation [" + assignmentFailure.reason + "]"; + String msg = "No node found to start datafeed [" + datafeed.getId() + "], " + + "allocation explanation [" + assignmentFailure.reason + "]"; LOGGER.debug(msg); throw ExceptionsHelper.conflictStatusException(msg); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java index cb10de0801d59..fcdade93ba49e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java @@ -73,9 +73,9 @@ public void testSelectNode_GivenJobIsOpened() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, df).selectNode(); assertEquals("node_id", result.getExecutorNode()); - new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, resolver, df).checkDatafeedTaskCanBeCreated(); } public void testSelectNode_GivenJobIsOpening() { @@ -88,9 +88,9 @@ public void testSelectNode_GivenJobIsOpening() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, df).selectNode(); assertEquals("node_id", result.getExecutorNode()); - new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, resolver, df).checkDatafeedTaskCanBeCreated(); } public void testNoJobTask() { @@ -103,13 +103,13 @@ public void testNoJobTask() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, df).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id], because the job's [job_id] state is " + "[closed] while state [opened] is required")); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, resolver, df).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id], because the job's [job_id] state is [closed] while state [opened] is required]")); } @@ -125,13 +125,13 @@ public void testSelectNode_GivenJobFailedOrClosed() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, df).selectNode(); assertNull(result.getExecutorNode()); assertEquals("cannot start datafeed [datafeed_id], because the job's [job_id] state is [" + jobState + "] while state [opened] is required", result.getExplanation()); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, resolver, df).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id], because the job's [job_id] state is [" + jobState + "] while state [opened] is required]")); @@ -152,12 +152,12 @@ public void testShardUnassigned() { givenClusterState("foo", 1, 0, states); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, df).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " + "does not have all primary shards active yet.")); - new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, resolver, df).checkDatafeedTaskCanBeCreated(); } public void testShardNotAllActive() { @@ -176,12 +176,12 @@ public void testShardNotAllActive() { givenClusterState("foo", 2, 0, states); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, df).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " + "does not have all primary shards active yet.")); - new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, resolver, df).checkDatafeedTaskCanBeCreated(); } public void testIndexDoesntExist() { @@ -194,13 +194,13 @@ public void testIndexDoesntExist() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, df).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [not_foo] " + "does not exist, is closed, or is still initializing.")); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, resolver, df).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]")); } @@ -215,7 +215,7 @@ public void testRemoteIndex() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, df).selectNode(); assertNotNull(result.getExecutorNode()); } @@ -232,13 +232,13 @@ public void testSelectNode_jobTaskStale() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, df).selectNode(); assertNull(result.getExecutorNode()); assertEquals("cannot start datafeed [datafeed_id], because the job's [job_id] state is stale", result.getExplanation()); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, resolver, df).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id], because the job's [job_id] state is stale]")); @@ -246,9 +246,9 @@ public void testSelectNode_jobTaskStale() { addJobTask(job.getId(), "node_id1", JobState.OPENED, tasksBuilder); tasks = tasksBuilder.build(); givenClusterState("foo", 1, 0); - result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).selectNode(); + result = new DatafeedNodeSelector(clusterState, resolver, df).selectNode(); assertEquals("node_id1", result.getExecutorNode()); - new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, resolver, df).checkDatafeedTaskCanBeCreated(); } public void testSelectNode_GivenJobOpeningAndIndexDoesNotExist() { @@ -265,7 +265,7 @@ public void testSelectNode_GivenJobOpeningAndIndexDoesNotExist() { givenClusterState("foo", 1, 0); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id", df).checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, resolver, df).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]")); } From df6857c45563c82b38ad9ea6f8e6725aea753a00 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 2 Oct 2018 11:28:40 +0100 Subject: [PATCH 4/7] MlAssignmentNotifier cannot read config from state --- .../xpack/ml/MlAssignmentNotifier.java | 24 ++++++++++--------- .../xpack/ml/MlAssignmentNotifierTests.java | 4 ++-- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java index 5df11f02a3610..f3eebffe33669 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java @@ -12,15 +12,13 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.ml.notifications.Auditor; import java.util.Objects; @@ -89,16 +87,20 @@ public void clusterChanged(ClusterChangedEvent event) { auditor.info(jobId, "Opening job on node [" + node.toString() + "]"); } } else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) { - String datafeedId = ((StartDatafeedAction.DatafeedParams) currentTask.getParams()).getDatafeedId(); - DatafeedConfig datafeedConfig = MlMetadata.getMlMetadata(event.state()).getDatafeed(datafeedId); + StartDatafeedAction.DatafeedParams datafeedParams = (StartDatafeedAction.DatafeedParams) currentTask.getParams(); + String jobId = datafeedParams.getJob() != null ? datafeedParams.getJob().getId() : null; if (currentAssignment.getExecutorNode() == null) { - String msg = "No node found to start datafeed [" + datafeedId +"]. Reasons [" + + String msg = "No node found to start datafeed [" + datafeedParams.getDatafeedId() +"]. Reasons [" + currentAssignment.getExplanation() + "]"; - logger.warn("[{}] {}", datafeedConfig.getJobId(), msg); - auditor.warning(datafeedConfig.getJobId(), msg); + logger.warn("[{}] {}", jobId, msg); + if (jobId != null) { + auditor.warning(jobId, msg); + } } else { DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode()); - auditor.info(datafeedConfig.getJobId(), "Starting datafeed [" + datafeedId + "] on node [" + node + "]"); + if (jobId != null) { + auditor.info(jobId, "Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + node + "]"); + } } } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java index 3055dc2bb37f9..2defead100774 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java @@ -32,7 +32,7 @@ public class MlAssignmentNotifierTests extends ESTestCase { - public void testClusterChanged_info() throws Exception { + public void testClusterChanged_info() { Auditor auditor = mock(Auditor.class); ClusterService clusterService = mock(ClusterService.class); MlAssignmentNotifier notifier = new MlAssignmentNotifier(Settings.EMPTY, auditor, clusterService); @@ -60,7 +60,7 @@ public void testClusterChanged_info() throws Exception { verifyNoMoreInteractions(auditor); } - public void testClusterChanged_warning() throws Exception { + public void testClusterChanged_warning() { Auditor auditor = mock(Auditor.class); ClusterService clusterService = mock(ClusterService.class); MlAssignmentNotifier notifier = new MlAssignmentNotifier(Settings.EMPTY, auditor, clusterService); From 5a5f111fcf05cba814eed73103bb918de4fad16f Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 4 Oct 2018 17:09:38 +0100 Subject: [PATCH 5/7] Read config in datafeed manager --- .../elasticsearch/xpack/core/ml/MlTasks.java | 2 +- .../core/ml/action/StartDatafeedAction.java | 38 +++--- .../xpack/ml/MachineLearning.java | 3 +- .../xpack/ml/MlAssignmentNotifier.java | 2 +- .../action/TransportStartDatafeedAction.java | 45 +++---- .../xpack/ml/datafeed/DatafeedJob.java | 4 + .../xpack/ml/datafeed/DatafeedJobBuilder.java | 111 +++++++++++++----- .../xpack/ml/datafeed/DatafeedManager.java | 57 ++++----- .../ml/datafeed/DatafeedNodeSelector.java | 31 ++--- .../ml/datafeed/DatafeedJobBuilderTests.java | 66 +++++++++-- .../ml/datafeed/DatafeedManagerTests.java | 31 +++-- .../datafeed/DatafeedNodeSelectorTests.java | 54 +++++---- .../ml/job/persistence/MockClientBuilder.java | 2 +- 13 files changed, 288 insertions(+), 158 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java index a56d3d639239d..46685001153d7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java @@ -97,7 +97,7 @@ public static Set openJobIds(PersistentTasksCustomMetaData tasks) { * Is there an ml anomaly detector job task for the job {@code jobId}? * @param jobId The job id * @param tasks Persistent tasks - * @return + * @return True if the job has a task */ public static boolean taskExistsForJob(String jobId, PersistentTasksCustomMetaData tasks) { return openJobIds(tasks).contains(jobId); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java index 46692bfb876d0..06c2abd0f0fa9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java @@ -28,11 +28,12 @@ import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.Objects; import java.util.function.LongSupplier; @@ -196,8 +197,8 @@ public DatafeedParams(StreamInput in) throws IOException { endTime = in.readOptionalLong(); timeout = TimeValue.timeValueMillis(in.readVLong()); if (in.getVersion().onOrAfter(Version.CURRENT)) { - datafeedConfig = in.readOptionalWriteable(DatafeedConfig::new); - job = in.readOptionalWriteable(Job::new); + jobId = in.readOptionalString(); + datafeedIndices = in.readList(StreamInput::readString); } } @@ -208,8 +209,9 @@ public DatafeedParams(StreamInput in) throws IOException { private long startTime; private Long endTime; private TimeValue timeout = TimeValue.timeValueSeconds(20); - private DatafeedConfig datafeedConfig; - private Job job; + private List datafeedIndices = Collections.emptyList(); + private String jobId; + public String getDatafeedId() { return datafeedId; @@ -239,20 +241,20 @@ public void setTimeout(TimeValue timeout) { this.timeout = timeout; } - public DatafeedConfig getDatafeedConfig() { - return datafeedConfig; + public String getJobId() { + return jobId; } - public void setDatafeedConfig(DatafeedConfig datafeedConfig) { - this.datafeedConfig = datafeedConfig; + public void setJobId(String jobId) { + this.jobId = jobId; } - public Job getJob() { - return job; + public List getDatafeedIndices() { + return datafeedIndices; } - public void setJob(Job job) { - this.job = job; + public void setDatafeedIndices(List datafeedIndices) { + this.datafeedIndices = datafeedIndices; } @Override @@ -272,8 +274,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalLong(endTime); out.writeVLong(timeout.millis()); if (out.getVersion().onOrAfter(Version.CURRENT)) { - out.writeOptionalWriteable(datafeedConfig); - out.writeOptionalWriteable(job); + out.writeOptionalString(jobId); + out.writeStringList(datafeedIndices); } } @@ -292,7 +294,7 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par @Override public int hashCode() { - return Objects.hash(datafeedId, startTime, endTime, timeout, datafeedConfig, job); + return Objects.hash(datafeedId, startTime, endTime, timeout, jobId, datafeedIndices); } @Override @@ -308,8 +310,8 @@ public boolean equals(Object obj) { Objects.equals(startTime, other.startTime) && Objects.equals(endTime, other.endTime) && Objects.equals(timeout, other.timeout) && - Objects.equals(datafeedConfig, other.datafeedConfig) && - Objects.equals(job, other.job); + Objects.equals(jobId, other.jobId) && + Objects.equals(datafeedIndices, other.datafeedIndices); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index f1a0745d58fed..b2b68dfeb9f75 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -411,7 +411,8 @@ public Collection createComponents(Client client, ClusterService cluster jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, xContentRegistry, auditor); this.autodetectProcessManager.set(autodetectProcessManager); - DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobResultsProvider, auditor, System::currentTimeMillis); + DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, settings, xContentRegistry, + auditor, System::currentTimeMillis); DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, System::currentTimeMillis, auditor); this.datafeedManager.set(datafeedManager); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java index f3eebffe33669..c7e867bb7de70 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java @@ -88,7 +88,7 @@ public void clusterChanged(ClusterChangedEvent event) { } } else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) { StartDatafeedAction.DatafeedParams datafeedParams = (StartDatafeedAction.DatafeedParams) currentTask.getParams(); - String jobId = datafeedParams.getJob() != null ? datafeedParams.getJob().getId() : null; + String jobId = datafeedParams.getJobId(); if (currentAssignment.getExecutorNode() == null) { String msg = "No node found to start datafeed [" + datafeedParams.getDatafeedId() +"]. Reasons [" + currentAssignment.getExplanation() + "]"; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 67ec5db65a269..aa416b6fa1d54 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -53,6 +53,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Predicate; @@ -117,6 +118,7 @@ protected void masterOperation(StartDatafeedAction.Request request, ClusterState return; } + AtomicReference datafeedConfigHolder = new AtomicReference<>(); PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); ActionListener> waitForTaskListener = @@ -139,36 +141,37 @@ public void onFailure(Exception e) { }; // Verify data extractor factory can be created, then start persistent task - Consumer createDataExtrator = ok -> { - if (RemoteClusterLicenseChecker.containsRemoteIndex(params.getDatafeedConfig().getIndices())) { + Consumer createDataExtrator = job -> { + if (RemoteClusterLicenseChecker.containsRemoteIndex(params.getDatafeedIndices())) { final RemoteClusterLicenseChecker remoteClusterLicenseChecker = new RemoteClusterLicenseChecker(client, XPackLicenseState::isMachineLearningAllowedForOperationMode); remoteClusterLicenseChecker.checkRemoteClusterLicenses( - RemoteClusterLicenseChecker.remoteClusterAliases(params.getDatafeedConfig().getIndices()), + RemoteClusterLicenseChecker.remoteClusterAliases(params.getDatafeedIndices()), ActionListener.wrap( response -> { if (response.isSuccess() == false) { - listener.onFailure(createUnlicensedError(params.getDatafeedConfig().getId(), response)); + listener.onFailure(createUnlicensedError(params.getDatafeedId(), response)); } else { - createDataExtractor(params.getJob(), params.getDatafeedConfig(), params, waitForTaskListener); + createDataExtractor(job, datafeedConfigHolder.get(), params, waitForTaskListener); } }, e -> listener.onFailure( createUnknownLicenseError( - params.getDatafeedConfig().getId(), - RemoteClusterLicenseChecker.remoteIndices(params.getDatafeedConfig().getIndices()), e)) - )); + params.getDatafeedId(), + RemoteClusterLicenseChecker.remoteIndices(params.getDatafeedIndices()), e)) + ) + ); } else { - createDataExtractor(params.getJob(), params.getDatafeedConfig(), params, waitForTaskListener); + createDataExtractor(job, datafeedConfigHolder.get(), params, waitForTaskListener); } }; ActionListener jobListener = ActionListener.wrap( jobBuilder -> { try { - params.setJob(jobBuilder.build()); - validate(params.getJob(), params.getDatafeedConfig(), tasks); - createDataExtrator.accept(Boolean.TRUE); + Job job = jobBuilder.build(); + validate(job, datafeedConfigHolder.get(), tasks); + createDataExtrator.accept(job); } catch (Exception e) { listener.onFailure(e); } @@ -177,10 +180,13 @@ public void onFailure(Exception e) { ); ActionListener datafeedListener = ActionListener.wrap( - datafeedConfig -> { + datafeedBuilder -> { try { - params.setDatafeedConfig(datafeedConfig.build()); - jobConfigProvider.getJob(params.getDatafeedConfig().getJobId(), jobListener); + DatafeedConfig datafeedConfig = datafeedBuilder.build(); + params.setDatafeedIndices(datafeedConfig.getIndices()); + params.setJobId(datafeedConfig.getJobId()); + datafeedConfigHolder.set(datafeedConfig); + jobConfigProvider.getJob(datafeedConfig.getJobId(), jobListener); } catch (Exception e) { listener.onFailure(e); } @@ -304,14 +310,13 @@ public StartDatafeedPersistentTasksExecutor(Settings settings, DatafeedManager d @Override public PersistentTasksCustomMetaData.Assignment getAssignment(StartDatafeedAction.DatafeedParams params, ClusterState clusterState) { - return new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedConfig()).selectNode(); + return new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId(), params.getJobId(), + params.getDatafeedIndices()).selectNode(); } @Override public void validate(StartDatafeedAction.DatafeedParams params, ClusterState clusterState) { - PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - TransportStartDatafeedAction.validate(params.getJob(), params.getDatafeedConfig(), tasks); - new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedConfig()) + new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId(), params.getJobId(), params.getDatafeedIndices()) .checkDatafeedTaskCanBeCreated(); } @@ -321,7 +326,7 @@ protected void nodeOperation(final AllocatedPersistentTask allocatedPersistentTa final PersistentTaskState state) { DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask; datafeedTask.datafeedManager = datafeedManager; - datafeedManager.run(datafeedTask, params.getJob(), params.getDatafeedConfig(), + datafeedManager.run(datafeedTask, (error) -> { if (error != null) { datafeedTask.markAsFailed(error); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 1fa402f4e2485..20900c3f0d8e7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -82,6 +82,10 @@ boolean isIsolated() { return isIsolated; } + public String getJobId() { + return jobId; + } + Long runLookBack(long startTime, Long endTime) throws Exception { lookbackStartTimeMs = skipToStartTime(startTime); Optional endMs = Optional.ofNullable(endTime); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java index efe332346efec..3b76b05639ea2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java @@ -8,93 +8,150 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.Result; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.notifications.Auditor; import java.util.Collections; import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; public class DatafeedJobBuilder { private final Client client; - private final JobResultsProvider jobResultsProvider; + private final Settings settings; + private final NamedXContentRegistry xContentRegistry; private final Auditor auditor; private final Supplier currentTimeSupplier; - public DatafeedJobBuilder(Client client, JobResultsProvider jobResultsProvider, Auditor auditor, Supplier currentTimeSupplier) { + public DatafeedJobBuilder(Client client, Settings settings, NamedXContentRegistry xContentRegistry, + Auditor auditor, Supplier currentTimeSupplier) { this.client = client; - this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider); + this.settings = Objects.requireNonNull(settings); + this.xContentRegistry = Objects.requireNonNull(xContentRegistry); this.auditor = Objects.requireNonNull(auditor); this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); } - void build(Job job, DatafeedConfig datafeed, ActionListener listener) { + void build(String datafeedId, ActionListener listener) { + + JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings); + JobConfigProvider jobConfigProvider = new JobConfigProvider(client, settings); + DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client, settings, xContentRegistry); + + build(datafeedId, jobResultsProvider, jobConfigProvider, datafeedConfigProvider, listener); + } + + /** + * For testing only. + * Use {@link #build(String, ActionListener)} instead + */ + void build(String datafeedId, JobResultsProvider jobResultsProvider, JobConfigProvider jobConfigProvider, + DatafeedConfigProvider datafeedConfigProvider, ActionListener listener) { + + AtomicReference jobHolder = new AtomicReference<>(); + AtomicReference datafeedConfigHolder = new AtomicReference<>(); // Step 5. Build datafeed job object Consumer contextHanlder = context -> { - TimeValue frequency = getFrequencyOrDefault(datafeed, job); - TimeValue queryDelay = datafeed.getQueryDelay(); - DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.millis(), queryDelay.millis(), + TimeValue frequency = getFrequencyOrDefault(datafeedConfigHolder.get(), jobHolder.get()); + TimeValue queryDelay = datafeedConfigHolder.get().getQueryDelay(); + DatafeedJob datafeedJob = new DatafeedJob(jobHolder.get().getId(), buildDataDescription(jobHolder.get()), + frequency.millis(), queryDelay.millis(), context.dataExtractorFactory, client, auditor, currentTimeSupplier, context.latestFinalBucketEndMs, context.latestRecordTimeMs); + listener.onResponse(datafeedJob); }; final Context context = new Context(); - // Step 4. Context building complete - invoke final listener + // Context building complete - invoke final listener ActionListener dataExtractorFactoryHandler = ActionListener.wrap( dataExtractorFactory -> { context.dataExtractorFactory = dataExtractorFactory; contextHanlder.accept(context); }, e -> { - auditor.error(job.getId(), e.getMessage()); + auditor.error(jobHolder.get().getId(), e.getMessage()); listener.onFailure(e); } ); - // Step 3. Create data extractor factory + // Create data extractor factory Consumer dataCountsHandler = dataCounts -> { if (dataCounts.getLatestRecordTimeStamp() != null) { context.latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime(); } - DataExtractorFactory.create(client, datafeed, job, dataExtractorFactoryHandler); + DataExtractorFactory.create(client, datafeedConfigHolder.get(), jobHolder.get(), dataExtractorFactoryHandler); }; - // Step 2. Collect data counts + // Collect data counts Consumer> bucketsHandler = buckets -> { if (buckets.results().size() == 1) { - TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan(); + TimeValue bucketSpan = jobHolder.get().getAnalysisConfig().getBucketSpan(); context.latestFinalBucketEndMs = buckets.results().get(0).getTimestamp().getTime() + bucketSpan.millis() - 1; } - jobResultsProvider.dataCounts(job.getId(), dataCountsHandler, listener::onFailure); + jobResultsProvider.dataCounts(jobHolder.get().getId(), dataCountsHandler, listener::onFailure); }; - // Step 1. Collect latest bucket - BucketsQueryBuilder latestBucketQuery = new BucketsQueryBuilder() - .sortField(Result.TIMESTAMP.getPreferredName()) - .sortDescending(true).size(1) - .includeInterim(false); - jobResultsProvider.bucketsViaInternalClient(job.getId(), latestBucketQuery, bucketsHandler, e -> { - if (e instanceof ResourceNotFoundException) { - QueryPage empty = new QueryPage<>(Collections.emptyList(), 0, Bucket.RESULT_TYPE_FIELD); - bucketsHandler.accept(empty); - } else { - listener.onFailure(e); - } - }); + // Collect latest bucket + Consumer jobIdConsumer = jobId -> { + BucketsQueryBuilder latestBucketQuery = new BucketsQueryBuilder() + .sortField(Result.TIMESTAMP.getPreferredName()) + .sortDescending(true).size(1) + .includeInterim(false); + jobResultsProvider.bucketsViaInternalClient(jobId, latestBucketQuery, bucketsHandler, e -> { + if (e instanceof ResourceNotFoundException) { + QueryPage empty = new QueryPage<>(Collections.emptyList(), 0, Bucket.RESULT_TYPE_FIELD); + bucketsHandler.accept(empty); + } else { + listener.onFailure(e); + } + }); + }; + + // Get the job config + ActionListener jobConfigListener = ActionListener.wrap( + jobBuilder -> { + try { + jobHolder.set(jobBuilder.build()); + jobIdConsumer.accept(jobHolder.get().getId()); + } catch (Exception e) { + listener.onFailure(e); + } + }, + listener::onFailure + ); + + // Get the datafeed config + ActionListener datafeedConfigListener = ActionListener.wrap( + configBuilder -> { + try { + datafeedConfigHolder.set(configBuilder.build()); + jobConfigProvider.getJob(datafeedConfigHolder.get().getJobId(), jobConfigListener); + } catch (Exception e) { + listener.onFailure(e); + } + }, + listener::onFailure + ); + + datafeedConfigProvider.getDatafeedConfig(datafeedId, datafeedConfigListener); } private static TimeValue getFrequencyOrDefault(DatafeedConfig datafeed, Job job) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index c0702af90e1f3..c801118fe0fa7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -25,9 +25,7 @@ import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; -import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.MachineLearning; @@ -76,11 +74,14 @@ public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clus clusterService.addListener(taskRunner); } - public void run(TransportStartDatafeedAction.DatafeedTask task, Job job, DatafeedConfig datafeed, Consumer taskHandler) { + + public void run(TransportStartDatafeedAction.DatafeedTask task, Consumer finishHandler) { + String datafeedId = task.getDatafeedId(); ActionListener datafeedJobHandler = ActionListener.wrap( datafeedJob -> { - Holder holder = new Holder(task, datafeed, datafeedJob, new ProblemTracker(auditor, job.getId()), taskHandler); + Holder holder = new Holder(task, datafeedId, datafeedJob, + new ProblemTracker(auditor, datafeedJob.getJobId()), finishHandler); runningDatafeedsOnThisNode.put(task.getAllocationId(), holder); task.updatePersistentTaskState(DatafeedState.STARTED, new ActionListener>() { @Override @@ -90,13 +91,13 @@ public void onResponse(PersistentTask persistentTask) { @Override public void onFailure(Exception e) { - taskHandler.accept(e); + finishHandler.accept(e); } }); - }, taskHandler::accept + }, finishHandler::accept ); - datafeedJobBuilder.build(job, datafeed, datafeedJobHandler); + datafeedJobBuilder.build(datafeedId, datafeedJobHandler); } public void stopDatafeed(TransportStartDatafeedAction.DatafeedTask task, String reason, TimeValue timeout) { @@ -151,7 +152,7 @@ private void innerRun(Holder holder, long startTime, Long endTime) { @Override public void onFailure(Exception e) { - logger.error("Failed lookback import for job [" + holder.datafeed.getJobId() + "]", e); + logger.error("Failed lookback import for job [" + holder.datafeedJob.getJobId() + "]", e); holder.stop("general_lookback_failure", TimeValue.timeValueSeconds(20), e); } @@ -181,17 +182,17 @@ protected void doRun() { } else { // Notify that a lookback-only run found no data String lookbackNoDataMsg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_LOOKBACK_NO_DATA); - logger.warn("[{}] {}", holder.datafeed.getJobId(), lookbackNoDataMsg); - auditor.warning(holder.datafeed.getJobId(), lookbackNoDataMsg); + logger.warn("[{}] {}", holder.datafeedJob.getJobId(), lookbackNoDataMsg); + auditor.warning(holder.datafeedJob.getJobId(), lookbackNoDataMsg); } } catch (Exception e) { - logger.error("Failed lookback import for job [" + holder.datafeed.getJobId() + "]", e); + logger.error("Failed lookback import for job [" + holder.datafeedJob.getJobId() + "]", e); holder.stop("general_lookback_failure", TimeValue.timeValueSeconds(20), e); return; } if (isolated == false) { if (next != null) { - doDatafeedRealtime(next, holder.datafeed.getJobId(), holder); + doDatafeedRealtime(next, holder.datafeedJob.getJobId(), holder); } else { holder.stop("no_realtime", TimeValue.timeValueSeconds(20), null); holder.problemTracker.finishReport(); @@ -269,29 +270,29 @@ public class Holder { private final TransportStartDatafeedAction.DatafeedTask task; private final long allocationId; - private final DatafeedConfig datafeed; + private final String datafeedId; // To ensure that we wait until loopback / realtime search has completed before we stop the datafeed private final ReentrantLock datafeedJobLock = new ReentrantLock(true); private final DatafeedJob datafeedJob; private final boolean autoCloseJob; private final ProblemTracker problemTracker; - private final Consumer handler; + private final Consumer finishHandler; volatile Future future; private volatile boolean isRelocating; - Holder(TransportStartDatafeedAction.DatafeedTask task, DatafeedConfig datafeed, DatafeedJob datafeedJob, - ProblemTracker problemTracker, Consumer handler) { + Holder(TransportStartDatafeedAction.DatafeedTask task, String datafeedId, DatafeedJob datafeedJob, + ProblemTracker problemTracker, Consumer finishHandler) { this.task = task; this.allocationId = task.getAllocationId(); - this.datafeed = datafeed; + this.datafeedId = datafeedId; this.datafeedJob = datafeedJob; this.autoCloseJob = task.isLookbackOnly(); this.problemTracker = problemTracker; - this.handler = handler; + this.finishHandler = finishHandler; } String getJobId() { - return datafeed.getJobId(); + return datafeedJob.getJobId(); } boolean isRunning() { @@ -307,23 +308,23 @@ public void stop(String source, TimeValue timeout, Exception e) { return; } - logger.info("[{}] attempt to stop datafeed [{}] for job [{}]", source, datafeed.getId(), datafeed.getJobId()); + logger.info("[{}] attempt to stop datafeed [{}] for job [{}]", source, datafeedId, datafeedJob.getJobId()); if (datafeedJob.stop()) { boolean acquired = false; try { - logger.info("[{}] try lock [{}] to stop datafeed [{}] for job [{}]...", source, timeout, datafeed.getId(), - datafeed.getJobId()); + logger.info("[{}] try lock [{}] to stop datafeed [{}] for job [{}]...", source, timeout, datafeedId, + datafeedJob.getJobId()); acquired = datafeedJobLock.tryLock(timeout.millis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e1) { Thread.currentThread().interrupt(); } finally { - logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", source, datafeed.getId(), - datafeed.getJobId(), acquired); + logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", source, datafeedId, + datafeedJob.getJobId(), acquired); runningDatafeedsOnThisNode.remove(allocationId); FutureUtils.cancel(future); - auditor.info(datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); - handler.accept(e); - logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeed.getId(), datafeed.getJobId(), + auditor.info(datafeedJob.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); + finishHandler.accept(e); + logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeedId, datafeedJob.getJobId(), acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired"); if (autoCloseJob) { closeJob(); @@ -333,7 +334,7 @@ public void stop(String source, TimeValue timeout, Exception e) { } } } else { - logger.info("[{}] datafeed [{}] for job [{}] was already stopped", source, datafeed.getId(), datafeed.getJobId()); + logger.info("[{}] datafeed [{}] for job [{}] was already stopped", source, datafeedId, datafeedJob.getJobId()); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index b0b9ca0ba28b0..a45bfc7822071 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -15,7 +15,6 @@ import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.core.ml.MlTasks; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -27,15 +26,20 @@ public class DatafeedNodeSelector { private static final Logger LOGGER = Loggers.getLogger(DatafeedNodeSelector.class); - private final DatafeedConfig datafeed; + private final String datafeedId; + private final String jobId; + private final List datafeedIndices; private final PersistentTasksCustomMetaData.PersistentTask jobTask; private final ClusterState clusterState; private final IndexNameExpressionResolver resolver; - public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolver resolver, DatafeedConfig datafeed) { + public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolver resolver, String datafeedId, + String jobId, List datafeedIndices) { PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - this.datafeed = datafeed; - this.jobTask = MlTasks.getJobTask(datafeed.getJobId(), tasks); + this.datafeedId = datafeedId; + this.jobId = jobId; + this.datafeedIndices = datafeedIndices; + this.jobTask = MlTasks.getJobTask(jobId, tasks); this.clusterState = Objects.requireNonNull(clusterState); this.resolver = Objects.requireNonNull(resolver); } @@ -43,7 +47,7 @@ public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolv public void checkDatafeedTaskCanBeCreated() { AssignmentFailure assignmentFailure = checkAssignment(); if (assignmentFailure != null && assignmentFailure.isCriticalForTaskCreation) { - String msg = "No node found to start datafeed [" + datafeed.getId() + "], " + + String msg = "No node found to start datafeed [" + datafeedId + "], " + "allocation explanation [" + assignmentFailure.reason + "]"; LOGGER.debug(msg); throw ExceptionsHelper.conflictStatusException(msg); @@ -73,14 +77,14 @@ private AssignmentFailure checkAssignment() { if (jobState.isAnyOf(JobState.OPENING, JobState.OPENED) == false) { // lets try again later when the job has been opened: - String reason = "cannot start datafeed [" + datafeed.getId() + "], because the job's [" + datafeed.getJobId() + - "] state is [" + jobState + "] while state [" + JobState.OPENED + "] is required"; + String reason = "cannot start datafeed [" + datafeedId + "], because the job's [" + jobId + + "] state is [" + jobState + "] while state [" + JobState.OPENED + "] is required"; priorityFailureCollector.add(new AssignmentFailure(reason, true)); } if (jobTaskState != null && jobTaskState.isStatusStale(jobTask)) { - String reason = "cannot start datafeed [" + datafeed.getId() + "], because the job's [" + datafeed.getJobId() + - "] state is stale"; + String reason = "cannot start datafeed [" + datafeedId + "], because the job's [" + jobId + + "] state is stale"; priorityFailureCollector.add(new AssignmentFailure(reason, true)); } @@ -89,8 +93,7 @@ private AssignmentFailure checkAssignment() { @Nullable private AssignmentFailure verifyIndicesActive() { - List indices = datafeed.getIndices(); - for (String index : indices) { + for (String index : datafeedIndices) { if (RemoteClusterLicenseChecker.isRemoteIndex(index)) { // We cannot verify remote indices @@ -98,7 +101,7 @@ private AssignmentFailure verifyIndicesActive() { } String[] concreteIndices; - String reason = "cannot start datafeed [" + datafeed.getId() + "] because index [" + String reason = "cannot start datafeed [" + datafeedId + "] because index [" + index + "] does not exist, is closed, or is still initializing."; try { @@ -114,7 +117,7 @@ private AssignmentFailure verifyIndicesActive() { for (String concreteIndex : concreteIndices) { IndexRoutingTable routingTable = clusterState.getRoutingTable().index(concreteIndex); if (routingTable == null || !routingTable.allPrimaryShardsActive()) { - reason = "cannot start datafeed [" + datafeed.getId() + "] because index [" + reason = "cannot start datafeed [" + datafeedId + "] because index [" + concreteIndex + "] does not have all primary shards active yet."; return new AssignmentFailure(reason, false); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java index 3d9ee17bac0c9..d13e311531da2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java @@ -19,6 +19,8 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.Before; @@ -32,6 +34,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -41,8 +44,10 @@ public class DatafeedJobBuilderTests extends ESTestCase { private Client client; private Auditor auditor; - private JobResultsProvider jobResultsProvider; private Consumer taskHandler; + private JobResultsProvider jobResultsProvider; + private JobConfigProvider jobConfigProvider; + private DatafeedConfigProvider datafeedConfigProvider; private DatafeedJobBuilder datafeedJobBuilder; @@ -54,10 +59,10 @@ public void init() { when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); when(client.settings()).thenReturn(Settings.EMPTY); auditor = mock(Auditor.class); - jobResultsProvider = mock(JobResultsProvider.class); taskHandler = mock(Consumer.class); - datafeedJobBuilder = new DatafeedJobBuilder(client, jobResultsProvider, auditor, System::currentTimeMillis); + datafeedJobBuilder = new DatafeedJobBuilder(client, Settings.EMPTY, xContentRegistry(), auditor, System::currentTimeMillis); + jobResultsProvider = mock(JobResultsProvider.class); Mockito.doAnswer(invocationOnMock -> { String jobId = (String) invocationOnMock.getArguments()[0]; @SuppressWarnings("unchecked") @@ -72,6 +77,9 @@ public void init() { consumer.accept(new ResourceNotFoundException("dummy")); return null; }).when(jobResultsProvider).bucketsViaInternalClient(any(), any(), any(), any()); + + jobConfigProvider = mock(JobConfigProvider.class); + datafeedConfigProvider = mock(DatafeedConfigProvider.class); } public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception { @@ -79,7 +87,8 @@ public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception { dataDescription.setTimeField("time"); Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); jobBuilder.setDataDescription(dataDescription); - DatafeedConfig datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo").build(); + jobBuilder.setCreateTime(new Date()); + DatafeedConfig.Builder datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", jobBuilder.getId()); AtomicBoolean wasHandlerCalled = new AtomicBoolean(false); ActionListener datafeedJobHandler = ActionListener.wrap( @@ -91,7 +100,10 @@ public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception { }, e -> fail() ); - datafeedJobBuilder.build(jobBuilder.build(new Date()), datafeed, datafeedJobHandler); + givenJob(jobBuilder); + givenDatafeed(datafeed); + + datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigProvider, datafeedJobHandler); assertBusy(() -> wasHandlerCalled.get()); } @@ -101,7 +113,8 @@ public void testBuild_GivenScrollDatafeedAndOldJobWithLatestRecordTimestampAfter dataDescription.setTimeField("time"); Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); jobBuilder.setDataDescription(dataDescription); - DatafeedConfig datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo").build(); + jobBuilder.setCreateTime(new Date()); + DatafeedConfig.Builder datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", jobBuilder.getId()); givenLatestTimes(7_200_000L, 3_600_000L); @@ -115,7 +128,10 @@ public void testBuild_GivenScrollDatafeedAndOldJobWithLatestRecordTimestampAfter }, e -> fail() ); - datafeedJobBuilder.build(jobBuilder.build(new Date()), datafeed, datafeedJobHandler); + givenJob(jobBuilder); + givenDatafeed(datafeed); + + datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigProvider, datafeedJobHandler); assertBusy(() -> wasHandlerCalled.get()); } @@ -125,7 +141,8 @@ public void testBuild_GivenScrollDatafeedAndOldJobWithLatestBucketAfterLatestRec dataDescription.setTimeField("time"); Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); jobBuilder.setDataDescription(dataDescription); - DatafeedConfig datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo").build(); + jobBuilder.setCreateTime(new Date()); + DatafeedConfig.Builder datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", jobBuilder.getId()); givenLatestTimes(3_800_000L, 3_600_000L); @@ -139,7 +156,10 @@ public void testBuild_GivenScrollDatafeedAndOldJobWithLatestBucketAfterLatestRec }, e -> fail() ); - datafeedJobBuilder.build(jobBuilder.build(new Date()), datafeed, datafeedJobHandler); + givenJob(jobBuilder); + givenDatafeed(datafeed); + + datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigProvider, datafeedJobHandler); assertBusy(() -> wasHandlerCalled.get()); } @@ -149,7 +169,8 @@ public void testBuild_GivenBucketsRequestFails() { dataDescription.setTimeField("time"); Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); jobBuilder.setDataDescription(dataDescription); - DatafeedConfig datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo").build(); + jobBuilder.setCreateTime(new Date()); + DatafeedConfig.Builder datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", jobBuilder.getId()); Exception error = new RuntimeException("error"); doAnswer(invocationOnMock -> { @@ -159,11 +180,34 @@ public void testBuild_GivenBucketsRequestFails() { return null; }).when(jobResultsProvider).bucketsViaInternalClient(any(), any(), any(), any()); - datafeedJobBuilder.build(jobBuilder.build(new Date()), datafeed, ActionListener.wrap(datafeedJob -> fail(), taskHandler)); + + givenJob(jobBuilder); + givenDatafeed(datafeed); + + datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigProvider, + ActionListener.wrap(datafeedJob -> fail(), taskHandler)); verify(taskHandler).accept(error); } + private void givenJob(Job.Builder job) { + Mockito.doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener handler = (ActionListener) invocationOnMock.getArguments()[1]; + handler.onResponse(job); + return null; + }).when(jobConfigProvider).getJob(eq(job.getId()), any()); + } + + private void givenDatafeed(DatafeedConfig.Builder datafeed) { + Mockito.doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener handler = (ActionListener) invocationOnMock.getArguments()[1]; + handler.onResponse(datafeed); + return null; + }).when(datafeedConfigProvider).getDatafeedConfig(eq(datafeed.getId()), any()); + } + private void givenLatestTimes(long latestRecordTimestamp, long latestBucketTimestamp) { Mockito.doAnswer(invocationOnMock -> { String jobId = (String) invocationOnMock.getArguments()[0]; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index 26d35a3dc1615..54aa3ade8e1b9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -73,15 +73,13 @@ public class DatafeedManagerTests extends ESTestCase { private DatafeedManager datafeedManager; private long currentTime = 120000; private Auditor auditor; - private Job job; - private DatafeedConfig datafeed; private ArgumentCaptor capturedClusterStateListener = ArgumentCaptor.forClass(ClusterStateListener.class); @Before @SuppressWarnings("unchecked") public void setUpTests() { - job = createDatafeedJob().build(new Date()); - datafeed = createDatafeedConfig("datafeed_id", job.getId()).build(); + Job.Builder job = createDatafeedJob().setCreateTime(new Date()); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); PersistentTasksCustomMetaData tasks = tasksBuilder.build(); @@ -121,13 +119,14 @@ public void setUpTests() { datafeedJob = mock(DatafeedJob.class); when(datafeedJob.isRunning()).thenReturn(true); when(datafeedJob.stop()).thenReturn(true); + when(datafeedJob.getJobId()).thenReturn(job.getId()); DatafeedJobBuilder datafeedJobBuilder = mock(DatafeedJobBuilder.class); doAnswer(invocationOnMock -> { @SuppressWarnings("rawtypes") - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; listener.onResponse(datafeedJob); return null; - }).when(datafeedJobBuilder).build(any(), any(), any()); + }).when(datafeedJobBuilder).build(any(), any()); datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, () -> currentTime, auditor); @@ -138,7 +137,7 @@ public void testLookbackOnly_WarnsWhenNoDataIsRetrieved() throws Exception { when(datafeedJob.runLookBack(0L, 60000L)).thenThrow(new DatafeedJob.EmptyDataCountException(0L)); Consumer handler = mockConsumer(); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); - datafeedManager.run(task, job, datafeed, handler); + datafeedManager.run(task, handler); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); @@ -149,7 +148,7 @@ public void testStart_GivenNewlyCreatedJobLookback() throws Exception { when(datafeedJob.runLookBack(0L, 60000L)).thenReturn(null); Consumer handler = mockConsumer(); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); - datafeedManager.run(task, job, datafeed, handler); + datafeedManager.run(task, handler); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); @@ -159,7 +158,7 @@ public void testStart_extractionProblem() throws Exception { when(datafeedJob.runLookBack(0, 60000L)).thenThrow(new DatafeedJob.ExtractionProblemException(0L, new RuntimeException("dummy"))); Consumer handler = mockConsumer(); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); - datafeedManager.run(task, job, datafeed, handler); + datafeedManager.run(task, handler); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); @@ -183,7 +182,7 @@ public void testStart_emptyDataCountException() throws Exception { Consumer handler = mockConsumer(); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, null); - datafeedManager.run(task, job, datafeed, handler); + datafeedManager.run(task, handler); verify(threadPool, times(11)).schedule(any(), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any()); verify(auditor, times(1)).warning(eq("job_id"), anyString()); @@ -198,7 +197,7 @@ public void testRealTime_GivenStoppingAnalysisProblem() throws Exception { DatafeedTask task = TransportStartDatafeedActionTests.createDatafeedTask(1, "type", "action", null, params, datafeedManager); task = spyDatafeedTask(task); - datafeedManager.run(task, job, datafeed, handler); + datafeedManager.run(task, handler); ArgumentCaptor analysisProblemCaptor = ArgumentCaptor.forClass(DatafeedJob.AnalysisProblemException.class); @@ -217,7 +216,7 @@ public void testRealTime_GivenNonStoppingAnalysisProblem() throws Exception { DatafeedTask task = TransportStartDatafeedActionTests.createDatafeedTask(1, "type", "action", null, params, datafeedManager); task = spyDatafeedTask(task); - datafeedManager.run(task, job, datafeed, handler); + datafeedManager.run(task, handler); verify(auditor).error("job_id", "Datafeed is encountering errors submitting data for analysis: non-stopping"); assertThat(datafeedManager.isRunning(task.getAllocationId()), is(true)); @@ -233,7 +232,7 @@ public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception DatafeedTask task = TransportStartDatafeedActionTests.createDatafeedTask(1, "type", "action", null, params, datafeedManager); task = spyDatafeedTask(task); - datafeedManager.run(task, job, datafeed, handler); + datafeedManager.run(task, handler); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); if (cancelled) { @@ -255,7 +254,7 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() { Consumer handler = mockConsumer(); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); - datafeedManager.run(task, job, datafeed, handler); + datafeedManager.run(task, handler); // Verify datafeed has not started running yet as job is still opening verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); @@ -292,7 +291,7 @@ public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() { Consumer handler = mockConsumer(); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); - datafeedManager.run(task, job, datafeed, handler); + datafeedManager.run(task, handler); // Verify datafeed has not started running yet as job is still opening verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); @@ -318,7 +317,7 @@ public void testDatafeedGetsStoppedWhileWaitingForJobToOpen() { Consumer handler = mockConsumer(); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); - datafeedManager.run(task, job, datafeed, handler); + datafeedManager.run(task, handler); // Verify datafeed has not started running yet as job is still opening verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java index fcdade93ba49e..dfaf9f03c0dec 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java @@ -73,9 +73,10 @@ public void testSelectNode_GivenJobIsOpened() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, df).selectNode(); + PersistentTasksCustomMetaData.Assignment result = + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); assertEquals("node_id", result.getExecutorNode()); - new DatafeedNodeSelector(clusterState, resolver, df).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); } public void testSelectNode_GivenJobIsOpening() { @@ -88,9 +89,10 @@ public void testSelectNode_GivenJobIsOpening() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, df).selectNode(); + PersistentTasksCustomMetaData.Assignment result = + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); assertEquals("node_id", result.getExecutorNode()); - new DatafeedNodeSelector(clusterState, resolver, df).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); } public void testNoJobTask() { @@ -103,13 +105,15 @@ public void testNoJobTask() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, df).selectNode(); + PersistentTasksCustomMetaData.Assignment result = + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id], because the job's [job_id] state is " + "[closed] while state [opened] is required")); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df).checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) + .checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id], because the job's [job_id] state is [closed] while state [opened] is required]")); } @@ -125,13 +129,15 @@ public void testSelectNode_GivenJobFailedOrClosed() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, df).selectNode(); + PersistentTasksCustomMetaData.Assignment result = + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); assertNull(result.getExecutorNode()); assertEquals("cannot start datafeed [datafeed_id], because the job's [job_id] state is [" + jobState + "] while state [opened] is required", result.getExplanation()); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df).checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) + .checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id], because the job's [job_id] state is [" + jobState + "] while state [opened] is required]")); @@ -152,12 +158,13 @@ public void testShardUnassigned() { givenClusterState("foo", 1, 0, states); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, df).selectNode(); + PersistentTasksCustomMetaData.Assignment result = + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " + "does not have all primary shards active yet.")); - new DatafeedNodeSelector(clusterState, resolver, df).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); } public void testShardNotAllActive() { @@ -176,12 +183,13 @@ public void testShardNotAllActive() { givenClusterState("foo", 2, 0, states); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, df).selectNode(); + PersistentTasksCustomMetaData.Assignment result = + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " + "does not have all primary shards active yet.")); - new DatafeedNodeSelector(clusterState, resolver, df).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); } public void testIndexDoesntExist() { @@ -194,13 +202,15 @@ public void testIndexDoesntExist() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, df).selectNode(); + PersistentTasksCustomMetaData.Assignment result = + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [not_foo] " + "does not exist, is closed, or is still initializing.")); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df).checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) + .checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]")); } @@ -215,7 +225,8 @@ public void testRemoteIndex() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, df).selectNode(); + PersistentTasksCustomMetaData.Assignment result = + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); assertNotNull(result.getExecutorNode()); } @@ -232,13 +243,15 @@ public void testSelectNode_jobTaskStale() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, df).selectNode(); + PersistentTasksCustomMetaData.Assignment result = + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); assertNull(result.getExecutorNode()); assertEquals("cannot start datafeed [datafeed_id], because the job's [job_id] state is stale", result.getExplanation()); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df).checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) + .checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id], because the job's [job_id] state is stale]")); @@ -246,9 +259,9 @@ public void testSelectNode_jobTaskStale() { addJobTask(job.getId(), "node_id1", JobState.OPENED, tasksBuilder); tasks = tasksBuilder.build(); givenClusterState("foo", 1, 0); - result = new DatafeedNodeSelector(clusterState, resolver, df).selectNode(); + result = new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); assertEquals("node_id1", result.getExecutorNode()); - new DatafeedNodeSelector(clusterState, resolver, df).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); } public void testSelectNode_GivenJobOpeningAndIndexDoesNotExist() { @@ -265,7 +278,8 @@ public void testSelectNode_GivenJobOpeningAndIndexDoesNotExist() { givenClusterState("foo", 1, 0); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df).checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) + .checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]")); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java index 726b815728f52..c10af20aba79f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java @@ -274,7 +274,7 @@ public MockClientBuilder prepareSearch(String index, String type, int from, int * Creates a {@link SearchResponse} with a {@link SearchHit} for each element of {@code docs} * @param indexName Index being searched * @param docs Returned in the SearchResponse - * @return + * @return this */ @SuppressWarnings("unchecked") public MockClientBuilder prepareSearch(String indexName, List docs) { From 2db0fe47afe7b4aa50aa7ca6eb7da458ec3346f5 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 15 Oct 2018 15:41:23 +0100 Subject: [PATCH 6/7] Re-validate after read --- .../elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java index 3b76b05639ea2..2f3c93be92b3f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; @@ -125,11 +126,14 @@ void build(String datafeedId, JobResultsProvider jobResultsProvider, JobConfigPr }); }; - // Get the job config + // Get the job config and re-validate + // Re-validation is required as the config has been re-read since + // the previous validation ActionListener jobConfigListener = ActionListener.wrap( jobBuilder -> { try { jobHolder.set(jobBuilder.build()); + DatafeedJobValidator.validate(datafeedConfigHolder.get(), jobHolder.get()); jobIdConsumer.accept(jobHolder.get().getId()); } catch (Exception e) { listener.onFailure(e); From 04e3bafd03e191169a365ad8d367448cbc0c5b9c Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 17 Oct 2018 13:27:35 +0100 Subject: [PATCH 7/7] Write job_id and indices fields in toXContent --- .../xpack/core/ml/action/StartDatafeedAction.java | 11 +++++++++++ .../xpack/core/ml/action/DatafeedParamsTests.java | 8 ++++++++ 2 files changed, 19 insertions(+) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java index 06c2abd0f0fa9..f30706c5cbfc1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -149,6 +150,8 @@ public boolean equals(Object obj) { public static class DatafeedParams implements XPackPlugin.XPackPersistentTaskParams { + public static final ParseField INDICES = new ParseField("indices"); + public static ObjectParser PARSER = new ObjectParser<>(MlTasks.DATAFEED_TASK_NAME, true, DatafeedParams::new); static { PARSER.declareString((params, datafeedId) -> params.datafeedId = datafeedId, DatafeedConfig.ID); @@ -157,6 +160,8 @@ public static class DatafeedParams implements XPackPlugin.XPackPersistentTaskPar PARSER.declareString(DatafeedParams::setEndTime, END_TIME); PARSER.declareString((params, val) -> params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); + PARSER.declareString(DatafeedParams::setJobId, Job.ID); + PARSER.declareStringArray(DatafeedParams::setDatafeedIndices, INDICES); } static long parseDateOrThrow(String date, ParseField paramName, LongSupplier now) { @@ -288,6 +293,12 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par builder.field(END_TIME.getPreferredName(), String.valueOf(endTime)); } builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep()); + if (jobId != null) { + builder.field(Job.ID.getPreferredName(), jobId); + } + if (datafeedIndices.isEmpty() == false) { + builder.field(INDICES.getPreferredName(), datafeedIndices); + } builder.endObject(); return builder; } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DatafeedParamsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DatafeedParamsTests.java index 24a6dbacfada5..79bfcde76e067 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DatafeedParamsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DatafeedParamsTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.test.AbstractSerializingTestCase; import java.io.IOException; +import java.util.Arrays; public class DatafeedParamsTests extends AbstractSerializingTestCase { @Override @@ -28,6 +29,13 @@ public static StartDatafeedAction.DatafeedParams createDatafeedParams() { if (randomBoolean()) { params.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); } + if (randomBoolean()) { + params.setJobId(randomAlphaOfLength(10)); + } + if (randomBoolean()) { + params.setDatafeedIndices(Arrays.asList(randomAlphaOfLength(10), randomAlphaOfLength(10))); + } + return params; }