From db12caa5f5570373d064a3bde905e24d034eca07 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 4 Jul 2018 16:04:13 +0100 Subject: [PATCH 1/2] [ML] Fix calendar and filter updates from non-master nodes Job updates or changes to calendars or filters may result into updating the job process if it has been running. To preserve the order of updates, process updates are queued through the UpdateJobProcessNotifier which is only running on the master node. All actions performing such updates must run on the master node. However, the CRUD actions for calendars and filters are not master node actions. They have been submitting the updates to the UpdateJobProcessNotifier even though it might have not been running (given the action was run on a non-master node). When that happens, the update never reaches the process. This commit fixes this problem by ensuring the notifier runs on all nodes and by ensuring the process update action gets the resources again before updating the process (instead of having those resources passed in the request). This ensures that even if the order of the updates gets messed up, the latest update will read the latest state of those resource and the process will get back in sync. This leaves us with 2 types of updates: 1. updates to the job config should happen on the master node. This is because we cannot refetch the entire job and update it. We need to know the parts that have been changed. 2. updates to resources the job uses. Those can be handled on non-master nodes but they should be re-fetched by the update process action. Closes #31803 --- .../ml/job/UpdateJobProcessNotifier.java | 55 ++++++++----- .../autodetect/AutodetectCommunicator.java | 39 ++++----- .../autodetect/AutodetectProcessManager.java | 72 ++++++++++++---- .../job/process/autodetect/UpdateParams.java | 9 ++ .../autodetect/UpdateProcessMessage.java | 82 +++++++++++++++++++ .../AutodetectCommunicatorTests.java | 5 +- .../AutodetectProcessManagerTests.java | 10 ++- 7 files changed, 211 insertions(+), 61 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateProcessMessage.java diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java index b9c795df9b78c..6b871c074619e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java @@ -5,14 +5,15 @@ */ package org.elasticsearch.xpack.ml.job; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.LifecycleListener; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; @@ -31,9 +32,26 @@ import static org.elasticsearch.xpack.core.ml.action.UpdateProcessAction.Request; import static org.elasticsearch.xpack.core.ml.action.UpdateProcessAction.Response; -public class UpdateJobProcessNotifier extends AbstractComponent implements LocalNodeMasterListener { +/** + * This class serves as a queue for updates to the job process. + * Queueing is important for 2 reasons: first, it throttles the updates + * to the process, and second and most important, it preserves the order of the updates + * for actions that run on the master node. For preserving the order of the updates + * to the job config, it's necessary to handle the whole update chain on the master + * node. However, for updates to resources the job uses (e.g. calendars, filters), + * they can be handled on non-master nodes as long as the update process action + * is fetching the latest version of those resources from the index instead of + * using the version that existed while the handling action was at work. This makes + * sure that even if the order of updates gets reversed, the final process update + * will fetch the valid state of those external resources ensuring the process is + * in sync. + */ +public class UpdateJobProcessNotifier extends AbstractComponent { + + private static final Logger LOGGER = Loggers.getLogger(UpdateJobProcessNotifier.class); private final Client client; + private final ClusterService clusterService; private final ThreadPool threadPool; private final LinkedBlockingQueue orderedJobUpdates = new LinkedBlockingQueue<>(1000); @@ -42,9 +60,15 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local public UpdateJobProcessNotifier(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool) { super(settings); this.client = client; + this.clusterService = clusterService; this.threadPool = threadPool; - clusterService.addLocalNodeMasterListener(this); clusterService.addLifecycleListener(new LifecycleListener() { + + @Override + public void beforeStart() { + start(); + } + @Override public void beforeStop() { stop(); @@ -56,16 +80,6 @@ boolean submitJobUpdate(UpdateParams update, ActionListener listener) { return orderedJobUpdates.offer(new UpdateHolder(update, listener)); } - @Override - public void onMaster() { - start(); - } - - @Override - public void offMaster() { - stop(); - } - private void start() { cancellable = threadPool.scheduleWithFixedDelay(this::processNextUpdate, TimeValue.timeValueSeconds(1), ThreadPool.Names.GENERIC); } @@ -79,12 +93,6 @@ private void stop() { } } - @Override - public String executorName() { - // SAME is ok here, because both start() and stop() are inexpensive: - return ThreadPool.Names.SAME; - } - private void processNextUpdate() { List updates = new ArrayList<>(orderedJobUpdates.size()); try { @@ -101,6 +109,15 @@ void executeProcessUpdates(Iterator updatesIterator) { } UpdateHolder updateHolder = updatesIterator.next(); UpdateParams update = updateHolder.update; + + if (update.isJobUpdate() && clusterService.localNode().isMasterNode() == false) { + assert clusterService.localNode().isMasterNode(); + LOGGER.error("Job update was submitted to non-master node [" + clusterService.nodeName() + "]; update for job [" + + update.getJobId() + "] will be ignored"); + executeProcessUpdates(updatesIterator); + return; + } + Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates(), update.getFilter(), update.isUpdateScheduledEvents()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 09a7f3c11040d..82e72c90f4ca4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -16,28 +16,27 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; -import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; -import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.CategorizationAnalyzerConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.CountingInputStream; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriter; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriterFactory; -import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.Closeable; import java.io.IOException; @@ -45,7 +44,6 @@ import java.time.Duration; import java.time.ZonedDateTime; import java.util.Collections; -import java.util.List; import java.util.Locale; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -205,30 +203,29 @@ public void killProcess(boolean awaitCompletion, boolean finish) throws IOExcept } } - public void writeUpdateProcessMessage(UpdateParams updateParams, List scheduledEvents, - BiConsumer handler) { + public void writeUpdateProcessMessage(UpdateProcessMessage update, BiConsumer handler) { submitOperation(() -> { - if (updateParams.getModelPlotConfig() != null) { - autodetectProcess.writeUpdateModelPlotMessage(updateParams.getModelPlotConfig()); + if (update.getModelPlotConfig() != null) { + autodetectProcess.writeUpdateModelPlotMessage(update.getModelPlotConfig()); } // Filters have to be written before detectors - if (updateParams.getFilter() != null) { - autodetectProcess.writeUpdateFiltersMessage(Collections.singletonList(updateParams.getFilter())); + if (update.getFilter() != null) { + autodetectProcess.writeUpdateFiltersMessage(Collections.singletonList(update.getFilter())); } // Add detector rules - if (updateParams.getDetectorUpdates() != null) { - for (JobUpdate.DetectorUpdate update : updateParams.getDetectorUpdates()) { - if (update.getRules() != null) { - autodetectProcess.writeUpdateDetectorRulesMessage(update.getDetectorIndex(), update.getRules()); + if (update.getDetectorUpdates() != null) { + for (JobUpdate.DetectorUpdate detectorUpdate : update.getDetectorUpdates()) { + if (detectorUpdate.getRules() != null) { + autodetectProcess.writeUpdateDetectorRulesMessage(detectorUpdate.getDetectorIndex(), detectorUpdate.getRules()); } } } // Add scheduled events; null means there's no update but an empty list means we should clear any events in the process - if (scheduledEvents != null) { - autodetectProcess.writeUpdateScheduledEventsMessage(scheduledEvents, job.getAnalysisConfig().getBucketSpan()); + if (update.getScheduledEvents() != null) { + autodetectProcess.writeUpdateScheduledEventsMessage(update.getScheduledEvents(), job.getAnalysisConfig().getBucketSpan()); } return null; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index b6efb688c1797..a263b12836f13 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -5,8 +5,6 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect; -import org.elasticsearch.common.xcontent.XContentElasticsearchExtension; -import org.elasticsearch.core.internal.io.IOUtils; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; @@ -22,24 +20,26 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentElasticsearchExtension; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ml.action.GetFiltersAction; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; -import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder; +import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; -import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.TransportOpenJobAction.JobTask; import org.elasticsearch.xpack.ml.job.JobManager; @@ -47,10 +47,12 @@ import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.NativeStorageProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; @@ -82,6 +84,8 @@ import java.util.function.Consumer; import static org.elasticsearch.common.settings.Setting.Property; +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; public class AutodetectProcessManager extends AbstractComponent { @@ -156,7 +160,7 @@ public void onNodeStartup() { } } - public synchronized void closeAllJobsOnThisNode(String reason) throws IOException { + public synchronized void closeAllJobsOnThisNode(String reason) { int numJobs = processByAllocation.size(); if (numJobs != 0) { logger.info("Closing [{}] jobs, because [{}]", numJobs, reason); @@ -322,8 +326,7 @@ public void forecastJob(JobTask jobTask, ForecastParams params, Consumer handler) { + public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams, Consumer handler) { AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); if (communicator == null) { String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + "] is not open"; @@ -332,25 +335,58 @@ public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams return; } + UpdateProcessMessage.Builder updateProcessMessage = new UpdateProcessMessage.Builder(); + updateProcessMessage.setModelPlotConfig(updateParams.getModelPlotConfig()); + updateProcessMessage.setDetectorUpdates(updateParams.getDetectorUpdates()); + + // Step 3. Set scheduled events on message and write update process message ActionListener> eventsListener = ActionListener.wrap( events -> { - communicator.writeUpdateProcessMessage(updateParams, events == null ? null : events.results(), (aVoid, e) -> { + updateProcessMessage.setScheduledEvents(events == null ? null : events.results()); + communicator.writeUpdateProcessMessage(updateProcessMessage.build(), (aVoid, e) -> { if (e == null) { handler.accept(null); } else { handler.accept(e); } }); - }, - handler::accept); - - if (updateParams.isUpdateScheduledEvents()) { - Job job = jobManager.getJobOrThrowIfUnknown(jobTask.getJobId()); - DataCounts dataCounts = getStatistics(jobTask).get().v1(); - ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts)); - jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener); + }, handler + ); + + // Step 2. Set the filter on the message and get scheduled events + ActionListener filterListener = ActionListener.wrap( + filter -> { + updateProcessMessage.setFilter(filter); + + if (updateParams.isUpdateScheduledEvents()) { + Job job = jobManager.getJobOrThrowIfUnknown(jobTask.getJobId()); + DataCounts dataCounts = getStatistics(jobTask).get().v1(); + ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts)); + jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener); + } else { + eventsListener.onResponse(null); + } + }, handler + ); + + // Step 1. Get the filter + if (updateParams.getFilter() == null) { + filterListener.onResponse(null); } else { - eventsListener.onResponse(null); + GetFiltersAction.Request getFilterRequest = new GetFiltersAction.Request(); + getFilterRequest.setFilterId(updateParams.getFilter().getId()); + executeAsyncWithOrigin(client, ML_ORIGIN, GetFiltersAction.INSTANCE, getFilterRequest, new ActionListener() { + + @Override + public void onResponse(GetFiltersAction.Response response) { + filterListener.onResponse(response.getFilters().results().get(0)); + } + + @Override + public void onFailure(Exception e) { + handler.accept(e); + } + }); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java index ac41dcccbcff9..127fb18e5fff4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java @@ -49,6 +49,15 @@ public MlFilter getFilter() { return filter; } + /** + * Returns true if the update params include a job update, + * ie an update to the job config directly rather than an + * update to external resources a job uses (e.g. calendars, filters). + */ + public boolean isJobUpdate() { + return modelPlotConfig != null || detectorUpdates != null; + } + public boolean isUpdateScheduledEvents() { return updateScheduledEvents; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateProcessMessage.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateProcessMessage.java new file mode 100644 index 0000000000000..4686d4ed37273 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateProcessMessage.java @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process.autodetect; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; +import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.core.ml.job.config.MlFilter; +import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; + +import java.util.List; + +public final class UpdateProcessMessage { + + @Nullable private final ModelPlotConfig modelPlotConfig; + @Nullable private final List detectorUpdates; + @Nullable private final MlFilter filter; + @Nullable private final List scheduledEvents; + + private UpdateProcessMessage(@Nullable ModelPlotConfig modelPlotConfig, @Nullable List detectorUpdates, + @Nullable MlFilter filter, List scheduledEvents) { + this.modelPlotConfig = modelPlotConfig; + this.detectorUpdates = detectorUpdates; + this.filter = filter; + this.scheduledEvents = scheduledEvents; + } + + @Nullable + public ModelPlotConfig getModelPlotConfig() { + return modelPlotConfig; + } + + @Nullable + public List getDetectorUpdates() { + return detectorUpdates; + } + + @Nullable + public MlFilter getFilter() { + return filter; + } + + @Nullable + public List getScheduledEvents() { + return scheduledEvents; + } + + public static class Builder { + + @Nullable private ModelPlotConfig modelPlotConfig; + @Nullable private List detectorUpdates; + @Nullable private MlFilter filter; + @Nullable private List scheduledEvents; + + public Builder setModelPlotConfig(ModelPlotConfig modelPlotConfig) { + this.modelPlotConfig = modelPlotConfig; + return this; + } + + public Builder setDetectorUpdates(List detectorUpdates) { + this.detectorUpdates = detectorUpdates; + return this; + } + + public Builder setFilter(MlFilter filter) { + this.filter = filter; + return this; + } + + public Builder setScheduledEvents(List scheduledEvents) { + this.scheduledEvents = scheduledEvents; + return this; + } + + public UpdateProcessMessage build() { + return new UpdateProcessMessage(modelPlotConfig, detectorUpdates, filter, scheduledEvents); + } + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index 57e5f6cfdb3ff..ab24aadb9dc3a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -95,11 +95,12 @@ public void testWriteUpdateProcessMessage() throws IOException { List detectorUpdates = Collections.singletonList( new JobUpdate.DetectorUpdate(0, "updated description", Collections.singletonList(updatedRule))); - UpdateParams updateParams = UpdateParams.builder("foo").detectorUpdates(detectorUpdates).build(); List events = Collections.singletonList( ScheduledEventTests.createScheduledEvent(randomAlphaOfLength(10))); + UpdateProcessMessage.Builder updateProcessMessage = new UpdateProcessMessage.Builder().setDetectorUpdates(detectorUpdates); + updateProcessMessage.setScheduledEvents(events); - communicator.writeUpdateProcessMessage(updateParams, events, ((aVoid, e) -> {})); + communicator.writeUpdateProcessMessage(updateProcessMessage.build(), ((aVoid, e) -> {})); verify(process).writeUpdateDetectorRulesMessage(eq(0), eq(Collections.singletonList(updatedRule))); verify(process).writeUpdateScheduledEventsMessage(events, AnalysisConfig.Builder.DEFAULT_BUCKET_SPAN); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index a1b9aad452b9e..313f449cadd81 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -48,6 +48,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.Before; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import java.io.ByteArrayInputStream; @@ -489,8 +490,15 @@ public void testWriteUpdateProcessMessage() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); UpdateParams updateParams = UpdateParams.builder("foo").modelPlotConfig(modelConfig).detectorUpdates(detectorUpdates).build(); + manager.writeUpdateProcessMessage(jobTask, updateParams, e -> {}); - verify(communicator).writeUpdateProcessMessage(same(updateParams), eq(null), any()); + + ArgumentCaptor captor = ArgumentCaptor.forClass(UpdateProcessMessage.class); + verify(communicator).writeUpdateProcessMessage(captor.capture(), any()); + + UpdateProcessMessage updateProcessMessage = captor.getValue(); + assertThat(updateProcessMessage.getModelPlotConfig(), equalTo(modelConfig)); + assertThat(updateProcessMessage.getDetectorUpdates(), equalTo(detectorUpdates)); } public void testJobHasActiveAutodetectProcess() { From facc7a838c220bf76bcfdd3fce20d7c5338c77ea Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Thu, 5 Jul 2018 10:35:37 +0100 Subject: [PATCH 2/2] Fix long line --- .../ml/job/process/autodetect/AutodetectProcessManager.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index a263b12836f13..77e7fe1471611 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -375,7 +375,8 @@ public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams } else { GetFiltersAction.Request getFilterRequest = new GetFiltersAction.Request(); getFilterRequest.setFilterId(updateParams.getFilter().getId()); - executeAsyncWithOrigin(client, ML_ORIGIN, GetFiltersAction.INSTANCE, getFilterRequest, new ActionListener() { + executeAsyncWithOrigin(client, ML_ORIGIN, GetFiltersAction.INSTANCE, getFilterRequest, + new ActionListener() { @Override public void onResponse(GetFiltersAction.Response response) {