From 31f02a9c0cab9cf33bb554ff8d55066194332e8d Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 15 Nov 2018 15:11:08 +0000 Subject: [PATCH 1/4] Small fixes to read from all locations --- .../ml/action/TransportGetBucketsAction.java | 4 +-- .../action/TransportGetInfluencersAction.java | 4 +-- .../ml/action/TransportGetRecordsAction.java | 4 +-- .../TransportPreviewDatafeedAction.java | 31 ++++++++++--------- .../action/TransportStartDatafeedAction.java | 12 +++---- .../ml/datafeed/DatafeedConfigReader.java | 22 +++++++++++++ .../xpack/ml/job/JobManager.java | 2 +- 7 files changed, 52 insertions(+), 27 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetBucketsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetBucketsAction.java index fd770fa88c5cf..22d9a3566c92f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetBucketsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetBucketsAction.java @@ -38,8 +38,8 @@ public TransportGetBucketsAction(Settings settings, ThreadPool threadPool, Trans @Override protected void doExecute(GetBucketsAction.Request request, ActionListener listener) { - jobManager.getJob(request.getJobId(), ActionListener.wrap( - job -> { + jobManager.jobExists(request.getJobId(), ActionListener.wrap( + jobFound -> { BucketsQueryBuilder query = new BucketsQueryBuilder().expand(request.isExpand()) .includeInterim(request.isExcludeInterim() == false) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetInfluencersAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetInfluencersAction.java index 32ad802a204b4..7fed3b4223a46 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetInfluencersAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetInfluencersAction.java @@ -39,8 +39,8 @@ public TransportGetInfluencersAction(Settings settings, ThreadPool threadPool, T @Override protected void doExecute(GetInfluencersAction.Request request, ActionListener listener) { - jobManager.getJob(request.getJobId(), ActionListener.wrap( - job -> { + jobManager.jobExists(request.getJobId(), ActionListener.wrap( + jobFound -> { InfluencersQueryBuilder.InfluencersQuery query = new InfluencersQueryBuilder() .includeInterim(request.isExcludeInterim() == false) .start(request.getStart()) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetRecordsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetRecordsAction.java index f88011d1ea0cf..a29be412be487 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetRecordsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetRecordsAction.java @@ -39,8 +39,8 @@ public TransportGetRecordsAction(Settings settings, ThreadPool threadPool, Trans @Override protected void doExecute(GetRecordsAction.Request request, ActionListener listener) { - jobManager.getJob(request.getJobId(), ActionListener.wrap( - job -> { + jobManager.jobExists(request.getJobId(), ActionListener.wrap( + jobFound -> { RecordsQueryBuilder query = new RecordsQueryBuilder() .includeInterim(request.isExcludeInterim() == false) .epochStart(request.getStart()) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java index 1a928a8bdba11..911eb847ff3e4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java @@ -10,9 +10,11 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; @@ -20,9 +22,9 @@ import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigReader; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; -import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; -import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.elasticsearch.xpack.ml.job.JobManager; import java.io.BufferedReader; import java.io.InputStream; @@ -35,29 +37,30 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction { private final Client client; - private final JobConfigProvider jobConfigProvider; - private final DatafeedConfigProvider datafeedConfigProvider; + private final ClusterService clusterService; + private final JobManager jobManager; + private final DatafeedConfigReader datafeedConfigReader; @Inject public TransportPreviewDatafeedAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - Client client, JobConfigProvider jobConfigProvider, - DatafeedConfigProvider datafeedConfigProvider) { + Client client, JobManager jobManager, NamedXContentRegistry xContentRegistry, + ClusterService clusterService) { super(settings, PreviewDatafeedAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, PreviewDatafeedAction.Request::new); this.client = client; - this.jobConfigProvider = jobConfigProvider; - this.datafeedConfigProvider = datafeedConfigProvider; + this.clusterService = clusterService; + this.jobManager = jobManager; + this.datafeedConfigReader = new DatafeedConfigReader(client, xContentRegistry); } @Override protected void doExecute(PreviewDatafeedAction.Request request, ActionListener listener) { - datafeedConfigProvider.getDatafeedConfig(request.getDatafeedId(), ActionListener.wrap( - datafeedConfigBuilder -> { - DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); - jobConfigProvider.getJob(datafeedConfig.getJobId(), ActionListener.wrap( - jobBuilder -> { + datafeedConfigReader.datafeedConfig(request.getDatafeedId(), clusterService.state(), ActionListener.wrap( + datafeedConfig -> { + jobManager.getJob(datafeedConfig.getJobId(), ActionListener.wrap( + job -> { DatafeedConfig.Builder previewDatafeed = buildPreviewDatafeed(datafeedConfig); Map headers = threadPool.getThreadContext().getHeaders().entrySet().stream() .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) @@ -66,7 +69,7 @@ protected void doExecute(PreviewDatafeedAction.Request request, ActionListener

() { @Override public void onResponse(DataExtractorFactory dataExtractorFactory) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 95103216b8691..c8bb9831abc0d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -44,6 +44,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigReader; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; @@ -71,7 +72,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction datafeedListener = ActionListener.wrap( - datafeedBuilder -> { + ActionListener datafeedListener = ActionListener.wrap( + datafeedConfig -> { try { - DatafeedConfig datafeedConfig = datafeedBuilder.build(); params.setDatafeedIndices(datafeedConfig.getIndices()); params.setJobId(datafeedConfig.getJobId()); datafeedConfigHolder.set(datafeedConfig); @@ -194,7 +194,7 @@ public void onFailure(Exception e) { listener::onFailure ); - datafeedConfigProvider.getDatafeedConfig(params.getDatafeedId(), datafeedListener); + datafeedConfigReader.datafeedConfig(params.getDatafeedId(), state, datafeedListener); } private void createDataExtractor(Job job, DatafeedConfig datafeed, StartDatafeedAction.DatafeedParams params, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java index 52b07ee3b929f..6e55552fb6b24 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java @@ -41,6 +41,28 @@ public DatafeedConfigReader(DatafeedConfigProvider datafeedConfigProvider) { this.datafeedConfigProvider = datafeedConfigProvider; } + /** + * Read the datafeed config from {@code state} and if not found + * look for the index document + * + * @param datafeedId Id of datafeed to get + * @param state Cluster state + * @param listener DatafeedConfig listener + */ + public void datafeedConfig(String datafeedId, ClusterState state, ActionListener listener) { + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); + DatafeedConfig config = mlMetadata.getDatafeed(datafeedId); + + if (config != null) { + listener.onResponse(config); + } else { + datafeedConfigProvider.getDatafeedConfig(datafeedId, ActionListener.wrap( + builder -> listener.onResponse(builder.build()), + listener::onFailure + )); + } + } + /** * Merges the results of {@link MlMetadata#expandDatafeedIds} * and {@link DatafeedConfigProvider#expandDatafeedIds(String, boolean, ActionListener)} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 8287e7b72944b..42d9f27bd380d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -138,7 +138,7 @@ private void setMaxModelMemoryLimit(ByteSizeValue maxModelMemoryLimit) { } public void jobExists(String jobId, ActionListener listener) { - jobConfigProvider.jobExists(jobId, true, ActionListener.wrap( + jobConfigProvider.jobExists(jobId, false, ActionListener.wrap( jobFound -> { if (jobFound) { listener.onResponse(Boolean.TRUE); From d21032953aa271c16d9ad9108467e5382d9a9e5b Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 15 Nov 2018 16:56:49 +0000 Subject: [PATCH 2/4] More updates --- .../ml/job/groups/GroupOrJobLookupTests.java | 3 + .../TransportGetCalendarEventsAction.java | 17 ++--- .../ml/action/TransportOpenJobAction.java | 65 +++++++++++++++---- .../action/TransportUpdateDatafeedAction.java | 48 ++++++++++++++ .../xpack/ml/job/JobManager.java | 10 +++ .../job/persistence/CalendarQueryBuilder.java | 5 +- 6 files changed, 123 insertions(+), 25 deletions(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookupTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookupTests.java index 37472584c5aab..98eabf2917c98 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookupTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookupTests.java @@ -15,6 +15,7 @@ import java.util.List; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.mockito.Mockito.mock; @@ -103,6 +104,8 @@ public void testExpandGroupIds() { assertThat(groupOrJobLookup.expandGroupIds("foo*"), contains("foo-group")); assertThat(groupOrJobLookup.expandGroupIds("bar-group,nogroup"), contains("bar-group")); assertThat(groupOrJobLookup.expandGroupIds("*"), contains("bar-group", "foo-group")); + assertThat(groupOrJobLookup.expandGroupIds("foo-group"), contains("foo-group")); + assertThat(groupOrJobLookup.expandGroupIds("no-group"), empty()); } private static Job mockJob(String jobId, List groups) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java index b37b25fcacfbc..fbcaa09dbad7f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java @@ -17,9 +17,8 @@ import org.elasticsearch.xpack.core.ml.action.GetCalendarsAction; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; -import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder; @@ -29,17 +28,17 @@ public class TransportGetCalendarEventsAction extends HandledTransportAction { private final JobResultsProvider jobResultsProvider; - private final JobConfigProvider jobConfigProvider; + private final JobManager jobManager; @Inject public TransportGetCalendarEventsAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - JobResultsProvider jobResultsProvider, JobConfigProvider jobConfigProvider) { + JobResultsProvider jobResultsProvider, JobManager jobManager) { super(settings, GetCalendarEventsAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, GetCalendarEventsAction.Request::new); this.jobResultsProvider = jobResultsProvider; - this.jobConfigProvider = jobConfigProvider; + this.jobManager = jobManager; } @Override @@ -66,15 +65,13 @@ protected void doExecute(GetCalendarEventsAction.Request request, if (request.getJobId() != null) { - jobConfigProvider.getJob(request.getJobId(), ActionListener.wrap( - jobBuiler -> { - Job job = jobBuiler.build(); + jobManager.getJob(request.getJobId(), ActionListener.wrap( + job -> { jobResultsProvider.scheduledEventsForJob(request.getJobId(), job.getGroups(), query, eventsListener); - }, jobNotFound -> { // is the request Id a group? - jobConfigProvider.groupExists(request.getJobId(), ActionListener.wrap( + jobManager.groupExists(request.getJobId(), ActionListener.wrap( groupExists -> { if (groupExists) { jobResultsProvider.scheduledEventsForJob( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index ce820db0babec..7f84e5d96c85a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -20,12 +20,14 @@ import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.AliasOrIndex; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; @@ -52,6 +54,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.ml.MlMetaIndex; +import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; @@ -66,6 +69,8 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.ClusterStateJobUpdate; +import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; @@ -102,8 +107,9 @@ public class TransportOpenJobAction extends TransportMasterNodeAction { + jobManager.getJob(jobParams.getJobId(), ActionListener.wrap( + job -> { try { - jobParams.setJob(builder.build()); + jobParams.setJob(job); // Try adding results doc mapping addDocMappingIfMissing(AnomalyDetectorsIndex.jobResultsAliasedName(jobParams.getJobId()), @@ -670,16 +677,48 @@ public void onTimeout(TimeValue timeout) { } private void clearJobFinishedTime(String jobId, ActionListener listener) { - JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build(); - jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap( - job -> listener.onResponse(new AcknowledgedResponse(true)), - e -> { - logger.error("[" + jobId + "] Failed to clear finished_time", e); - // Not a critical error so continue + boolean jobIsInClusterState = ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), jobId); + if (jobIsInClusterState) { + clusterService.submitStateUpdateTask("clearing-job-finish-time-for-" + jobId, new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState); + MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata); + Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId)); + jobBuilder.setFinishedTime(null); + + mlMetadataBuilder.putJob(jobBuilder.build(), true); + ClusterState.Builder builder = ClusterState.builder(currentState); + return builder.metaData(new MetaData.Builder(currentState.metaData()) + .putCustom(MlMetadata.TYPE, mlMetadataBuilder.build())) + .build(); + } + + @Override + public void onFailure(String source, Exception e) { + logger.error("[" + jobId + "] Failed to clear finished_time; source [" + source + "]", e); listener.onResponse(new AcknowledgedResponse(true)); } - )); + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, + ClusterState newState) { + listener.onResponse(new AcknowledgedResponse(true)); + } + }); + } else { + JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build(); + + jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap( + job -> listener.onResponse(new AcknowledgedResponse(true)), + e -> { + logger.error("[" + jobId + "] Failed to clear finished_time", e); + // Not a critical error so continue + listener.onResponse(new AcknowledgedResponse(true)); + } + )); + } } private void cancelJobStart(PersistentTasksCustomMetaData.PersistentTask persistentTask, Exception exception, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java index 904cc1ec7a321..8ae6afdb0578c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java @@ -9,10 +9,12 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.inject.Inject; @@ -21,10 +23,13 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; @@ -63,6 +68,18 @@ protected PutDatafeedAction.Response newResponse() { @Override protected void masterOperation(UpdateDatafeedAction.Request request, ClusterState state, ActionListener listener) throws Exception { + + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); + boolean datafeedConfigIsInClusterState = mlMetadata.getDatafeed(request.getUpdate().getId()) != null; + if (datafeedConfigIsInClusterState) { + updateDatafeedInClusterState(request, listener); + } else { + updateDatafeedInIndex(request, state, listener); + } + } + + private void updateDatafeedInIndex(UpdateDatafeedAction.Request request, ClusterState state, + ActionListener listener) throws Exception { final Map headers = threadPool.getThreadContext().getHeaders(); // Check datafeed is stopped @@ -118,6 +135,37 @@ private void checkJobDoesNotHaveADifferentDatafeed(String jobId, String datafeed )); } + private void updateDatafeedInClusterState(UpdateDatafeedAction.Request request, + ActionListener listener) { + final Map headers = threadPool.getThreadContext().getHeaders(); + + clusterService.submitStateUpdateTask("update-datafeed-" + request.getUpdate().getId(), + new AckedClusterStateUpdateTask(request, listener) { + private volatile DatafeedConfig updatedDatafeed; + + @Override + protected PutDatafeedAction.Response newResponse(boolean acknowledged) { + if (acknowledged) { + logger.info("Updated datafeed [{}]", request.getUpdate().getId()); + } + return new PutDatafeedAction.Response(updatedDatafeed); + } + + @Override + public ClusterState execute(ClusterState currentState) { + DatafeedUpdate update = request.getUpdate(); + MlMetadata currentMetadata = MlMetadata.getMlMetadata(currentState); + PersistentTasksCustomMetaData persistentTasks = + currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata) + .updateDatafeed(update, persistentTasks, headers).build(); + updatedDatafeed = newMetadata.getDatafeed(update.getId()); + return ClusterState.builder(currentState).metaData( + MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMetadata).build()).build(); + } + }); + } + @Override protected ClusterBlockException checkBlock(UpdateDatafeedAction.Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 42d9f27bd380d..f403eb8f41d93 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -137,6 +137,16 @@ private void setMaxModelMemoryLimit(ByteSizeValue maxModelMemoryLimit) { this.maxModelMemoryLimit = maxModelMemoryLimit; } + public void groupExists(String groupId, ActionListener listener) { + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state()); + if (mlMetadata.expandGroupIds(groupId).isEmpty() == false) { + listener.onResponse(Boolean.TRUE); + return; + } else { + jobConfigProvider.groupExists(groupId, listener); + } + } + public void jobExists(String jobId, ActionListener listener) { jobConfigProvider.jobExists(jobId, false, ActionListener.wrap( jobFound -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/CalendarQueryBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/CalendarQueryBuilder.java index 2674d29e49def..e10f2503b3337 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/CalendarQueryBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/CalendarQueryBuilder.java @@ -7,6 +7,7 @@ import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.ml.action.util.PageParams; @@ -67,10 +68,10 @@ public SearchSourceBuilder build() { if (jobIdAndGroups.isEmpty() == false) { qb = new BoolQueryBuilder() - .filter(new TermsQueryBuilder(Calendar.TYPE.getPreferredName(), Calendar.CALENDAR_TYPE)) + .filter(new TermQueryBuilder(Calendar.TYPE.getPreferredName(), Calendar.CALENDAR_TYPE)) .filter(new TermsQueryBuilder(Calendar.JOB_IDS.getPreferredName(), jobIdAndGroups)); } else { - qb = new TermsQueryBuilder(Calendar.TYPE.getPreferredName(), Calendar.CALENDAR_TYPE); + qb = new TermQueryBuilder(Calendar.TYPE.getPreferredName(), Calendar.CALENDAR_TYPE); } SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(qb); From dfe94e6a030fc3a63b406f5de29cd387b6519787 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 16 Nov 2018 10:16:21 +0000 Subject: [PATCH 3/4] Improve readability --- .../main/java/org/elasticsearch/xpack/ml/job/JobManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index f403eb8f41d93..be7acda15e8d9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -139,9 +139,9 @@ private void setMaxModelMemoryLimit(ByteSizeValue maxModelMemoryLimit) { public void groupExists(String groupId, ActionListener listener) { MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state()); - if (mlMetadata.expandGroupIds(groupId).isEmpty() == false) { + boolean groupExistsInMlMetadata = mlMetadata.expandGroupIds(groupId).isEmpty() == false; + if (groupExistsInMlMetadata) { listener.onResponse(Boolean.TRUE); - return; } else { jobConfigProvider.groupExists(groupId, listener); } From 98625b5b9a58cbf39e332cf8269fa752ecdd6da3 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 16 Nov 2018 14:48:27 +0000 Subject: [PATCH 4/4] Add index mappings for DelayedDataCheckConfig --- .../ml/job/persistence/ElasticsearchMappings.java | 11 +++++++++++ .../xpack/core/ml/job/results/ReservedFieldNames.java | 4 ++++ 2 files changed, 15 insertions(+) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java index 73817e43f7945..7308ef1cc30c3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java @@ -9,6 +9,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; @@ -358,6 +359,16 @@ public static void addDatafeedConfigFields(XContentBuilder builder) throws IOExc .endObject() .endObject() .endObject() + .startObject(DatafeedConfig.DELAYED_DATA_CHECK_CONFIG.getPreferredName()) + .startObject(PROPERTIES) + .startObject(DelayedDataCheckConfig.ENABLED.getPreferredName()) + .field(TYPE, BOOLEAN) + .endObject() + .startObject(DelayedDataCheckConfig.CHECK_WINDOW.getPreferredName()) + .field(TYPE, KEYWORD) + .endObject() + .endObject() + .endObject() .startObject(DatafeedConfig.HEADERS.getPreferredName()) .field(ENABLED, false) .endObject(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java index 0f344d11e8e93..fa0532f382b96 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java @@ -7,6 +7,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; @@ -251,6 +252,9 @@ public final class ReservedFieldNames { DatafeedConfig.SCRIPT_FIELDS.getPreferredName(), DatafeedConfig.CHUNKING_CONFIG.getPreferredName(), DatafeedConfig.HEADERS.getPreferredName(), + DatafeedConfig.DELAYED_DATA_CHECK_CONFIG.getPreferredName(), + DelayedDataCheckConfig.ENABLED.getPreferredName(), + DelayedDataCheckConfig.CHECK_WINDOW.getPreferredName(), ChunkingConfig.MODE_FIELD.getPreferredName(), ChunkingConfig.TIME_SPAN_FIELD.getPreferredName(),