diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java index b9c795df9b78c..6b871c074619e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java @@ -5,14 +5,15 @@ */ package org.elasticsearch.xpack.ml.job; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.LifecycleListener; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; @@ -31,9 +32,26 @@ import static org.elasticsearch.xpack.core.ml.action.UpdateProcessAction.Request; import static org.elasticsearch.xpack.core.ml.action.UpdateProcessAction.Response; -public class UpdateJobProcessNotifier extends AbstractComponent implements LocalNodeMasterListener { +/** + * This class serves as a queue for updates to the job process. + * Queueing is important for 2 reasons: first, it throttles the updates + * to the process, and second and most important, it preserves the order of the updates + * for actions that run on the master node. For preserving the order of the updates + * to the job config, it's necessary to handle the whole update chain on the master + * node. However, for updates to resources the job uses (e.g. calendars, filters), + * they can be handled on non-master nodes as long as the update process action + * is fetching the latest version of those resources from the index instead of + * using the version that existed while the handling action was at work. This makes + * sure that even if the order of updates gets reversed, the final process update + * will fetch the valid state of those external resources ensuring the process is + * in sync. + */ +public class UpdateJobProcessNotifier extends AbstractComponent { + + private static final Logger LOGGER = Loggers.getLogger(UpdateJobProcessNotifier.class); private final Client client; + private final ClusterService clusterService; private final ThreadPool threadPool; private final LinkedBlockingQueue orderedJobUpdates = new LinkedBlockingQueue<>(1000); @@ -42,9 +60,15 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local public UpdateJobProcessNotifier(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool) { super(settings); this.client = client; + this.clusterService = clusterService; this.threadPool = threadPool; - clusterService.addLocalNodeMasterListener(this); clusterService.addLifecycleListener(new LifecycleListener() { + + @Override + public void beforeStart() { + start(); + } + @Override public void beforeStop() { stop(); @@ -56,16 +80,6 @@ boolean submitJobUpdate(UpdateParams update, ActionListener listener) { return orderedJobUpdates.offer(new UpdateHolder(update, listener)); } - @Override - public void onMaster() { - start(); - } - - @Override - public void offMaster() { - stop(); - } - private void start() { cancellable = threadPool.scheduleWithFixedDelay(this::processNextUpdate, TimeValue.timeValueSeconds(1), ThreadPool.Names.GENERIC); } @@ -79,12 +93,6 @@ private void stop() { } } - @Override - public String executorName() { - // SAME is ok here, because both start() and stop() are inexpensive: - return ThreadPool.Names.SAME; - } - private void processNextUpdate() { List updates = new ArrayList<>(orderedJobUpdates.size()); try { @@ -101,6 +109,15 @@ void executeProcessUpdates(Iterator updatesIterator) { } UpdateHolder updateHolder = updatesIterator.next(); UpdateParams update = updateHolder.update; + + if (update.isJobUpdate() && clusterService.localNode().isMasterNode() == false) { + assert clusterService.localNode().isMasterNode(); + LOGGER.error("Job update was submitted to non-master node [" + clusterService.nodeName() + "]; update for job [" + + update.getJobId() + "] will be ignored"); + executeProcessUpdates(updatesIterator); + return; + } + Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates(), update.getFilter(), update.isUpdateScheduledEvents()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 09a7f3c11040d..82e72c90f4ca4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -16,28 +16,27 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; -import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; -import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.CategorizationAnalyzerConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.CountingInputStream; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriter; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriterFactory; -import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.Closeable; import java.io.IOException; @@ -45,7 +44,6 @@ import java.time.Duration; import java.time.ZonedDateTime; import java.util.Collections; -import java.util.List; import java.util.Locale; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -205,30 +203,29 @@ public void killProcess(boolean awaitCompletion, boolean finish) throws IOExcept } } - public void writeUpdateProcessMessage(UpdateParams updateParams, List scheduledEvents, - BiConsumer handler) { + public void writeUpdateProcessMessage(UpdateProcessMessage update, BiConsumer handler) { submitOperation(() -> { - if (updateParams.getModelPlotConfig() != null) { - autodetectProcess.writeUpdateModelPlotMessage(updateParams.getModelPlotConfig()); + if (update.getModelPlotConfig() != null) { + autodetectProcess.writeUpdateModelPlotMessage(update.getModelPlotConfig()); } // Filters have to be written before detectors - if (updateParams.getFilter() != null) { - autodetectProcess.writeUpdateFiltersMessage(Collections.singletonList(updateParams.getFilter())); + if (update.getFilter() != null) { + autodetectProcess.writeUpdateFiltersMessage(Collections.singletonList(update.getFilter())); } // Add detector rules - if (updateParams.getDetectorUpdates() != null) { - for (JobUpdate.DetectorUpdate update : updateParams.getDetectorUpdates()) { - if (update.getRules() != null) { - autodetectProcess.writeUpdateDetectorRulesMessage(update.getDetectorIndex(), update.getRules()); + if (update.getDetectorUpdates() != null) { + for (JobUpdate.DetectorUpdate detectorUpdate : update.getDetectorUpdates()) { + if (detectorUpdate.getRules() != null) { + autodetectProcess.writeUpdateDetectorRulesMessage(detectorUpdate.getDetectorIndex(), detectorUpdate.getRules()); } } } // Add scheduled events; null means there's no update but an empty list means we should clear any events in the process - if (scheduledEvents != null) { - autodetectProcess.writeUpdateScheduledEventsMessage(scheduledEvents, job.getAnalysisConfig().getBucketSpan()); + if (update.getScheduledEvents() != null) { + autodetectProcess.writeUpdateScheduledEventsMessage(update.getScheduledEvents(), job.getAnalysisConfig().getBucketSpan()); } return null; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index b6efb688c1797..77e7fe1471611 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -5,8 +5,6 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect; -import org.elasticsearch.common.xcontent.XContentElasticsearchExtension; -import org.elasticsearch.core.internal.io.IOUtils; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; @@ -22,24 +20,26 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentElasticsearchExtension; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ml.action.GetFiltersAction; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; -import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder; +import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; -import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.TransportOpenJobAction.JobTask; import org.elasticsearch.xpack.ml.job.JobManager; @@ -47,10 +47,12 @@ import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.NativeStorageProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; @@ -82,6 +84,8 @@ import java.util.function.Consumer; import static org.elasticsearch.common.settings.Setting.Property; +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; public class AutodetectProcessManager extends AbstractComponent { @@ -156,7 +160,7 @@ public void onNodeStartup() { } } - public synchronized void closeAllJobsOnThisNode(String reason) throws IOException { + public synchronized void closeAllJobsOnThisNode(String reason) { int numJobs = processByAllocation.size(); if (numJobs != 0) { logger.info("Closing [{}] jobs, because [{}]", numJobs, reason); @@ -322,8 +326,7 @@ public void forecastJob(JobTask jobTask, ForecastParams params, Consumer handler) { + public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams, Consumer handler) { AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); if (communicator == null) { String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + "] is not open"; @@ -332,25 +335,59 @@ public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams return; } + UpdateProcessMessage.Builder updateProcessMessage = new UpdateProcessMessage.Builder(); + updateProcessMessage.setModelPlotConfig(updateParams.getModelPlotConfig()); + updateProcessMessage.setDetectorUpdates(updateParams.getDetectorUpdates()); + + // Step 3. Set scheduled events on message and write update process message ActionListener> eventsListener = ActionListener.wrap( events -> { - communicator.writeUpdateProcessMessage(updateParams, events == null ? null : events.results(), (aVoid, e) -> { + updateProcessMessage.setScheduledEvents(events == null ? null : events.results()); + communicator.writeUpdateProcessMessage(updateProcessMessage.build(), (aVoid, e) -> { if (e == null) { handler.accept(null); } else { handler.accept(e); } }); - }, - handler::accept); - - if (updateParams.isUpdateScheduledEvents()) { - Job job = jobManager.getJobOrThrowIfUnknown(jobTask.getJobId()); - DataCounts dataCounts = getStatistics(jobTask).get().v1(); - ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts)); - jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener); + }, handler + ); + + // Step 2. Set the filter on the message and get scheduled events + ActionListener filterListener = ActionListener.wrap( + filter -> { + updateProcessMessage.setFilter(filter); + + if (updateParams.isUpdateScheduledEvents()) { + Job job = jobManager.getJobOrThrowIfUnknown(jobTask.getJobId()); + DataCounts dataCounts = getStatistics(jobTask).get().v1(); + ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts)); + jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener); + } else { + eventsListener.onResponse(null); + } + }, handler + ); + + // Step 1. Get the filter + if (updateParams.getFilter() == null) { + filterListener.onResponse(null); } else { - eventsListener.onResponse(null); + GetFiltersAction.Request getFilterRequest = new GetFiltersAction.Request(); + getFilterRequest.setFilterId(updateParams.getFilter().getId()); + executeAsyncWithOrigin(client, ML_ORIGIN, GetFiltersAction.INSTANCE, getFilterRequest, + new ActionListener() { + + @Override + public void onResponse(GetFiltersAction.Response response) { + filterListener.onResponse(response.getFilters().results().get(0)); + } + + @Override + public void onFailure(Exception e) { + handler.accept(e); + } + }); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java index ac41dcccbcff9..127fb18e5fff4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java @@ -49,6 +49,15 @@ public MlFilter getFilter() { return filter; } + /** + * Returns true if the update params include a job update, + * ie an update to the job config directly rather than an + * update to external resources a job uses (e.g. calendars, filters). + */ + public boolean isJobUpdate() { + return modelPlotConfig != null || detectorUpdates != null; + } + public boolean isUpdateScheduledEvents() { return updateScheduledEvents; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateProcessMessage.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateProcessMessage.java new file mode 100644 index 0000000000000..4686d4ed37273 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateProcessMessage.java @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process.autodetect; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; +import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.core.ml.job.config.MlFilter; +import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; + +import java.util.List; + +public final class UpdateProcessMessage { + + @Nullable private final ModelPlotConfig modelPlotConfig; + @Nullable private final List detectorUpdates; + @Nullable private final MlFilter filter; + @Nullable private final List scheduledEvents; + + private UpdateProcessMessage(@Nullable ModelPlotConfig modelPlotConfig, @Nullable List detectorUpdates, + @Nullable MlFilter filter, List scheduledEvents) { + this.modelPlotConfig = modelPlotConfig; + this.detectorUpdates = detectorUpdates; + this.filter = filter; + this.scheduledEvents = scheduledEvents; + } + + @Nullable + public ModelPlotConfig getModelPlotConfig() { + return modelPlotConfig; + } + + @Nullable + public List getDetectorUpdates() { + return detectorUpdates; + } + + @Nullable + public MlFilter getFilter() { + return filter; + } + + @Nullable + public List getScheduledEvents() { + return scheduledEvents; + } + + public static class Builder { + + @Nullable private ModelPlotConfig modelPlotConfig; + @Nullable private List detectorUpdates; + @Nullable private MlFilter filter; + @Nullable private List scheduledEvents; + + public Builder setModelPlotConfig(ModelPlotConfig modelPlotConfig) { + this.modelPlotConfig = modelPlotConfig; + return this; + } + + public Builder setDetectorUpdates(List detectorUpdates) { + this.detectorUpdates = detectorUpdates; + return this; + } + + public Builder setFilter(MlFilter filter) { + this.filter = filter; + return this; + } + + public Builder setScheduledEvents(List scheduledEvents) { + this.scheduledEvents = scheduledEvents; + return this; + } + + public UpdateProcessMessage build() { + return new UpdateProcessMessage(modelPlotConfig, detectorUpdates, filter, scheduledEvents); + } + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index 57e5f6cfdb3ff..ab24aadb9dc3a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -95,11 +95,12 @@ public void testWriteUpdateProcessMessage() throws IOException { List detectorUpdates = Collections.singletonList( new JobUpdate.DetectorUpdate(0, "updated description", Collections.singletonList(updatedRule))); - UpdateParams updateParams = UpdateParams.builder("foo").detectorUpdates(detectorUpdates).build(); List events = Collections.singletonList( ScheduledEventTests.createScheduledEvent(randomAlphaOfLength(10))); + UpdateProcessMessage.Builder updateProcessMessage = new UpdateProcessMessage.Builder().setDetectorUpdates(detectorUpdates); + updateProcessMessage.setScheduledEvents(events); - communicator.writeUpdateProcessMessage(updateParams, events, ((aVoid, e) -> {})); + communicator.writeUpdateProcessMessage(updateProcessMessage.build(), ((aVoid, e) -> {})); verify(process).writeUpdateDetectorRulesMessage(eq(0), eq(Collections.singletonList(updatedRule))); verify(process).writeUpdateScheduledEventsMessage(events, AnalysisConfig.Builder.DEFAULT_BUCKET_SPAN); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index a1b9aad452b9e..313f449cadd81 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -48,6 +48,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.Before; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import java.io.ByteArrayInputStream; @@ -489,8 +490,15 @@ public void testWriteUpdateProcessMessage() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); UpdateParams updateParams = UpdateParams.builder("foo").modelPlotConfig(modelConfig).detectorUpdates(detectorUpdates).build(); + manager.writeUpdateProcessMessage(jobTask, updateParams, e -> {}); - verify(communicator).writeUpdateProcessMessage(same(updateParams), eq(null), any()); + + ArgumentCaptor captor = ArgumentCaptor.forClass(UpdateProcessMessage.class); + verify(communicator).writeUpdateProcessMessage(captor.capture(), any()); + + UpdateProcessMessage updateProcessMessage = captor.getValue(); + assertThat(updateProcessMessage.getModelPlotConfig(), equalTo(modelConfig)); + assertThat(updateProcessMessage.getDetectorUpdates(), equalTo(detectorUpdates)); } public void testJobHasActiveAutodetectProcess() {