From a54cef87cb1699b0b28c6d0fcb5b27208ee7744c Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 28 Nov 2018 18:12:53 +0000 Subject: [PATCH 1/4] Revert "Prefer index config documents to cluster state config" This reverts commit 417624fae822fe5ced91746877edceeae0105045. --- .../action/TransportDeleteDatafeedAction.java | 24 +- .../ml/action/TransportOpenJobAction.java | 81 +++---- .../action/TransportUpdateDatafeedAction.java | 32 ++- .../ml/datafeed/DatafeedConfigReader.java | 42 ++-- .../xpack/ml/job/JobManager.java | 216 ++++++++---------- .../datafeed/DatafeedConfigReaderTests.java | 32 --- .../xpack/ml/job/JobManagerTests.java | 21 +- 7 files changed, 177 insertions(+), 271 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java index 510d7f411a9e3..a7dbb9d4f93b6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java @@ -131,21 +131,15 @@ private void deleteDatafeedConfig(DeleteDatafeedAction.Request request, ClusterS return; } - - datafeedConfigProvider.deleteDatafeedConfig(request.getDatafeedId(), ActionListener.wrap( - deleteResponse -> listener.onResponse(new AcknowledgedResponse(true)), - e -> { - if (e.getClass() == ResourceNotFoundException.class) { - // is the datafeed in the clusterstate - MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); - if (mlMetadata.getDatafeed(request.getDatafeedId()) != null) { - deleteDatafeedFromMetadata(request, listener); - return; - } - } - listener.onFailure(e); - } - )); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); + if (mlMetadata.getDatafeed(request.getDatafeedId()) != null) { + deleteDatafeedFromMetadata(request, listener); + } else { + datafeedConfigProvider.deleteDatafeedConfig(request.getDatafeedId(), ActionListener.wrap( + deleteResponse -> listener.onResponse(new AcknowledgedResponse(true)), + listener::onFailure + )); + } } private void deleteDatafeedFromMetadata(DeleteDatafeedAction.Request request, ActionListener listener) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index b676abe1a1fad..c9315a178148d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -11,7 +11,6 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; @@ -693,53 +692,47 @@ public void onTimeout(TimeValue timeout) { private void clearJobFinishedTime(String jobId, ActionListener listener) { - JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build(); + boolean jobIsInClusterState = ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), jobId); + if (jobIsInClusterState) { + clusterService.submitStateUpdateTask("clearing-job-finish-time-for-" + jobId, new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState); + MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata); + Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId)); + jobBuilder.setFinishedTime(null); + + mlMetadataBuilder.putJob(jobBuilder.build(), true); + ClusterState.Builder builder = ClusterState.builder(currentState); + return builder.metaData(new MetaData.Builder(currentState.metaData()) + .putCustom(MlMetadata.TYPE, mlMetadataBuilder.build())) + .build(); + } - jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap( - job -> listener.onResponse(new AcknowledgedResponse(true)), - e -> { - if (e.getClass() == ResourceNotFoundException.class) { - // Maybe the config is in the clusterstate - if (ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), jobId)) { - clearJobFinishedTimeClusterState(jobId, listener); - return; - } - } - logger.error("[" + jobId + "] Failed to clear finished_time", e); - // Not a critical error so continue + @Override + public void onFailure(String source, Exception e) { + logger.error("[" + jobId + "] Failed to clear finished_time; source [" + source + "]", e); listener.onResponse(new AcknowledgedResponse(true)); } - )); - } - - private void clearJobFinishedTimeClusterState(String jobId, ActionListener listener) { - clusterService.submitStateUpdateTask("clearing-job-finish-time-for-" + jobId, new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState); - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata); - Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId)); - jobBuilder.setFinishedTime(null); - - mlMetadataBuilder.putJob(jobBuilder.build(), true); - ClusterState.Builder builder = ClusterState.builder(currentState); - return builder.metaData(new MetaData.Builder(currentState.metaData()) - .putCustom(MlMetadata.TYPE, mlMetadataBuilder.build())) - .build(); - } - - @Override - public void onFailure(String source, Exception e) { - logger.error("[" + jobId + "] Failed to clear finished_time; source [" + source + "]", e); - listener.onResponse(new AcknowledgedResponse(true)); - } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, - ClusterState newState) { - listener.onResponse(new AcknowledgedResponse(true)); - } - }); + @Override + public void clusterStateProcessed(String source, ClusterState oldState, + ClusterState newState) { + listener.onResponse(new AcknowledgedResponse(true)); + } + }); + } else { + JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build(); + + jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap( + job -> listener.onResponse(new AcknowledgedResponse(true)), + e -> { + logger.error("[" + jobId + "] Failed to clear finished_time", e); + // Not a critical error so continue + listener.onResponse(new AcknowledgedResponse(true)); + } + )); + } } private void cancelJobStart(PersistentTasksCustomMetaData.PersistentTask persistentTask, Exception exception, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java index 115774cd351d9..8ae6afdb0578c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.action; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; @@ -70,6 +69,19 @@ protected PutDatafeedAction.Response newResponse() { protected void masterOperation(UpdateDatafeedAction.Request request, ClusterState state, ActionListener listener) throws Exception { + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); + boolean datafeedConfigIsInClusterState = mlMetadata.getDatafeed(request.getUpdate().getId()) != null; + if (datafeedConfigIsInClusterState) { + updateDatafeedInClusterState(request, listener); + } else { + updateDatafeedInIndex(request, state, listener); + } + } + + private void updateDatafeedInIndex(UpdateDatafeedAction.Request request, ClusterState state, + ActionListener listener) throws Exception { + final Map headers = threadPool.getThreadContext().getHeaders(); + // Check datafeed is stopped PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); if (MlTasks.getDatafeedTask(request.getUpdate().getId(), tasks) != null) { @@ -79,12 +91,6 @@ protected void masterOperation(UpdateDatafeedAction.Request request, ClusterStat return; } - updateDatafeedInIndex(request, state, listener); - } - - private void updateDatafeedInIndex(UpdateDatafeedAction.Request request, ClusterState state, - ActionListener listener) throws Exception { - final Map headers = threadPool.getThreadContext().getHeaders(); String datafeedId = request.getUpdate().getId(); CheckedConsumer updateConsumer = ok -> { @@ -92,17 +98,7 @@ private void updateDatafeedInIndex(UpdateDatafeedAction.Request request, Cluster jobConfigProvider::validateDatafeedJob, ActionListener.wrap( updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)), - e -> { - if (e.getClass() == ResourceNotFoundException.class) { - // try the clusterstate - MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); - if (mlMetadata.getDatafeed(request.getUpdate().getId()) != null) { - updateDatafeedInClusterState(request, listener); - return; - } - } - listener.onFailure(e); - } + listener::onFailure )); }; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java index 959a9ca3c3a78..3602147be3b2b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.datafeed; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -51,22 +50,17 @@ public DatafeedConfigReader(DatafeedConfigProvider datafeedConfigProvider) { * @param listener DatafeedConfig listener */ public void datafeedConfig(String datafeedId, ClusterState state, ActionListener listener) { - - datafeedConfigProvider.getDatafeedConfig(datafeedId, ActionListener.wrap( - builder -> listener.onResponse(builder.build()), - e -> { - if (e.getClass() == ResourceNotFoundException.class) { - // look in the clusterstate - MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); - DatafeedConfig config = mlMetadata.getDatafeed(datafeedId); - if (config != null) { - listener.onResponse(config); - return; - } - } - listener.onFailure(e); - } - )); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); + DatafeedConfig config = mlMetadata.getDatafeed(datafeedId); + + if (config != null) { + listener.onResponse(config); + } else { + datafeedConfigProvider.getDatafeedConfig(datafeedId, ActionListener.wrap( + builder -> listener.onResponse(builder.build()), + listener::onFailure + )); + } } /** @@ -76,16 +70,14 @@ public void datafeedConfig(String datafeedId, ClusterState state, ActionListener public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, ClusterState clusterState, ActionListener> listener) { + Set clusterStateDatafeedIds = MlMetadata.getMlMetadata(clusterState).expandDatafeedIds(expression); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoDatafeeds); + requiredMatches.filterMatchedIds(clusterStateDatafeedIds); datafeedConfigProvider.expandDatafeedIdsWithoutMissingCheck(expression, ActionListener.wrap( expandedDatafeedIds -> { requiredMatches.filterMatchedIds(expandedDatafeedIds); - // now read from the clusterstate - Set clusterStateDatafeedIds = MlMetadata.getMlMetadata(clusterState).expandDatafeedIds(expression); - requiredMatches.filterMatchedIds(clusterStateDatafeedIds); - if (requiredMatches.hasUnmatchedIds()) { listener.onFailure(ExceptionsHelper.missingDatafeedException(requiredMatches.unmatchedIdsString())); } else { @@ -104,7 +96,10 @@ public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, Clust public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, ClusterState clusterState, ActionListener> listener) { + Map clusterStateConfigs = expandClusterStateDatafeeds(expression, clusterState); + ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoDatafeeds); + requiredMatches.filterMatchedIds(clusterStateConfigs.keySet()); datafeedConfigProvider.expandDatafeedConfigsWithoutMissingCheck(expression, ActionListener.wrap( datafeedBuilders -> { @@ -113,11 +108,9 @@ public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, C datafeedConfigs.add(builder.build()); } - Map clusterStateConfigs = expandClusterStateDatafeeds(expression, clusterState); - // Duplicate configs existing in both the clusterstate and index documents are ok // this may occur during migration of configs. - // Prefer the index configs and filter duplicates from the clusterstate configs. + // Prefer the clusterstate configs and filter duplicates from the index Set indexConfigIds = datafeedConfigs.stream().map(DatafeedConfig::getId).collect(Collectors.toSet()); for (String clusterStateDatafeedId : clusterStateConfigs.keySet()) { if (indexConfigIds.contains(clusterStateDatafeedId) == false) { @@ -130,6 +123,7 @@ public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, C if (requiredMatches.hasUnmatchedIds()) { listener.onFailure(ExceptionsHelper.missingDatafeedException(requiredMatches.unmatchedIdsString())); } else { + datafeedConfigs.addAll(clusterStateConfigs.values()); Collections.sort(datafeedConfigs, Comparator.comparing(DatafeedConfig::getId)); listener.onResponse(datafeedConfigs); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 92337c47e4b21..0aa334b43173f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -213,35 +213,30 @@ private void getJobFromClusterState(String jobId, ActionListener jobListene * @param jobsListener The jobs listener */ public void expandJobs(String expression, boolean allowNoJobs, ActionListener> jobsListener) { + Map clusterStateJobs = expandJobsFromClusterState(expression, clusterService.state()); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoJobs); + requiredMatches.filterMatchedIds(clusterStateJobs.keySet()); + + // If expression contains a group Id it has been expanded to its + // constituent job Ids but Ids matcher needs to know the group + // has been matched + requiredMatches.filterMatchedIds(MlMetadata.getMlMetadata(clusterService.state()).expandGroupIds(expression)); jobConfigProvider.expandJobsWithoutMissingcheck(expression, false, ActionListener.wrap( jobBuilders -> { Set jobAndGroupIds = new HashSet<>(); - // Merge cluster state and index jobs - List jobs = new ArrayList<>(); - for (Job.Builder jb : jobBuilders) { - Job job = jb.build(); - jobAndGroupIds.add(job.getId()); - // If expression contains a group Id it has been expanded to its - // constituent job Ids but Ids matcher needs to know the group - // has been matched - jobAndGroupIds.addAll(job.getGroups()); - jobs.add(job); - } + List jobs = new ArrayList<>(clusterStateJobs.values()); // Duplicate configs existing in both the clusterstate and index documents are ok // this may occur during migration of configs. - // Prefer the index configs and filter duplicates from the clusterstate configs - Map clusterStateJobs = expandJobsFromClusterState(expression, clusterService.state()); - for (String clusterStateJobId : clusterStateJobs.keySet()) { - boolean isDuplicate = jobAndGroupIds.contains(clusterStateJobId); - if (isDuplicate == false) { - Job csJob = clusterStateJobs.get(clusterStateJobId); - jobs.add(csJob); - jobAndGroupIds.add(csJob.getId()); - jobAndGroupIds.addAll(csJob.getGroups()); + // Prefer the clusterstate configs and filter duplicates from the index + for (Job.Builder jb : jobBuilders) { + if (clusterStateJobs.containsKey(jb.getId()) == false) { + Job job = jb.build(); + jobAndGroupIds.add(job.getId()); + jobAndGroupIds.addAll(job.getGroups()); + jobs.add(job); } } @@ -276,21 +271,18 @@ private Map expandJobsFromClusterState(String expression, ClusterSt * @param jobsListener The jobs listener */ public void expandJobIds(String expression, boolean allowNoJobs, ActionListener> jobsListener) { + Set clusterStateJobIds = MlMetadata.getMlMetadata(clusterService.state()).expandJobIds(expression); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoJobs); - + requiredMatches.filterMatchedIds(clusterStateJobIds); + // If expression contains a group Id it has been expanded to its + // constituent job Ids but Ids matcher needs to know the group + // has been matched + requiredMatches.filterMatchedIds(MlMetadata.getMlMetadata(clusterService.state()).expandGroupIds(expression)); jobConfigProvider.expandJobsIdsWithoutMissingCheck(expression, false, ActionListener.wrap( jobIdsAndGroups -> { requiredMatches.filterMatchedIds(jobIdsAndGroups.getJobs()); - - Set clusterStateJobIds = MlMetadata.getMlMetadata(clusterService.state()).expandJobIds(expression); - requiredMatches.filterMatchedIds(clusterStateJobIds); - // If expression contains a group Id it has been expanded to its - // constituent job Ids but Ids matcher needs to know the group - // has been matched requiredMatches.filterMatchedIds(jobIdsAndGroups.getGroups()); - requiredMatches.filterMatchedIds(MlMetadata.getMlMetadata(clusterService.state()).expandGroupIds(expression)); - if (requiredMatches.hasUnmatchedIds()) { jobsListener.onFailure(ExceptionsHelper.missingJobException(requiredMatches.unmatchedIdsString())); } else { @@ -304,50 +296,37 @@ public void expandJobIds(String expression, boolean allowNoJobs, ActionListener< } /** - * Mark the job as being deleted. First looks in the ml-config index - * for the job configuration then the clusterstate + * Mark the job as being deleted. First looks in the cluster state for the + * job configuration then the index * * @param jobId To to mark * @param force Allows an open job to be marked * @param listener listener */ public void markJobAsDeleting(String jobId, boolean force, ActionListener listener) { - jobConfigProvider.markJobAsDeleting(jobId, ActionListener.wrap( - listener::onResponse, - e -> { - if (e.getClass() == ResourceNotFoundException.class) { - // is the config in the cluster state - if (ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), jobId)) { - ClusterStateJobUpdate.markJobAsDeleting(jobId, force, clusterService, listener); - return; - } - } - listener.onFailure(e); - } - )); - + if (ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), jobId)) { + ClusterStateJobUpdate.markJobAsDeleting(jobId, force, clusterService, listener); + } else { + jobConfigProvider.markJobAsDeleting(jobId, listener); + } } /** - * First try to delete the job document from ml-config, if it does not exist - * there try to the clusterstate. + * First try to delete the job from the cluster state, if it does not exist + * there try to delete the index job. * * @param request The delete job request * @param listener Delete listener */ public void deleteJob(DeleteJobAction.Request request, ActionListener listener) { - jobConfigProvider.deleteJob(request.getJobId(), false, ActionListener.wrap( - deleteResponse -> { - if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { - if (ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), request.getJobId())) { - ClusterStateJobUpdate.deleteJob(request, clusterService, listener); - } - } else { - listener.onResponse(deleteResponse.getResult() == DocWriteResponse.Result.DELETED); - } - }, - listener::onFailure - )); + if (ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), request.getJobId())) { + ClusterStateJobUpdate.deleteJob(request, clusterService, listener); + } else { + jobConfigProvider.deleteJob(request.getJobId(), false, ActionListener.wrap( + deleteResponse -> listener.onResponse(Boolean.TRUE), + listener::onFailure + )); + } } /** @@ -467,21 +446,18 @@ public void onFailure(Exception e) { } public void updateJob(UpdateJobAction.Request request, ActionListener actionListener) { - updateJobIndex(request, ActionListener.wrap( - updatedJob -> { - postJobUpdate(clusterService.state(), request); - actionListener.onResponse(new PutJobAction.Response(updatedJob)); - }, - e -> { - if (e.getClass() == ResourceNotFoundException.class) { - MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state()); - if (ClusterStateJobUpdate.jobIsInMlMetadata(mlMetadata, request.getJobId())) { - updateJobClusterState(request, actionListener); - return; - } - } - actionListener.onFailure(e); - })); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state()); + if (ClusterStateJobUpdate.jobIsInMlMetadata(mlMetadata, request.getJobId())) { + updateJobClusterState(request, actionListener); + } else { + updateJobIndex(request, ActionListener.wrap( + updatedJob -> { + postJobUpdate(clusterService.state(), request); + actionListener.onResponse(new PutJobAction.Response(updatedJob)); + }, + actionListener::onFailure + )); + } } private void postJobUpdate(ClusterState clusterState, UpdateJobAction.Request request) { @@ -666,14 +642,14 @@ public void notifyFilterChanged(MlFilter filter, Set addedItems, Set { threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { - List allJobs = new ArrayList<>(indexJobs); + List allJobs = new ArrayList<>(clusterStateJobs.values()); + // Duplicate configs existing in both the clusterstate and index documents are ok // this may occur during migration of configs. // Filter the duplicates so we don't update twice for duplicated jobs - for (String clusterStateJobId : clusterStateJobs.keySet()) { - boolean isDuplicate = allJobs.stream().anyMatch(job -> job.getId().equals(clusterStateJobId)); - if (isDuplicate == false) { - allJobs.add(clusterStateJobs.get(clusterStateJobId)); + for (Job indexJob : indexJobs) { + if (clusterStateJobs.containsKey(indexJob.getId()) == false) { + allJobs.add(indexJob); } } @@ -748,7 +724,7 @@ public void updateProcessOnCalendarChanged(List calendarJobIds, ActionLi jobConfigProvider.expandGroupIds(calendarJobIds, ActionListener.wrap( expandedIds -> { threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { - // Merge the expanded group members with the request Ids + // Merge the expanded group members with the request Ids // which are job ids rather than group Ids. expandedIds.addAll(calendarJobIds); @@ -806,10 +782,39 @@ public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionList // Step 1. update the job // ------- - Consumer establishedMemoryHandler = modelMem -> { + + Consumer updateJobHandler; + + if (ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), request.getJobId())) { + updateJobHandler = response -> clusterService.submitStateUpdateTask("revert-snapshot-" + request.getJobId(), + new AckedClusterStateUpdateTask(request, ActionListener.wrap(updateHandler, actionListener::onFailure)) { + + @Override + protected Boolean newResponse(boolean acknowledged) { + if (acknowledged) { + auditor.info(request.getJobId(), + Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); + return true; + } + actionListener.onFailure(new IllegalStateException("Could not revert modelSnapshot on job [" + + request.getJobId() + "], not acknowledged by master.")); + return false; + } + + @Override + public ClusterState execute(ClusterState currentState) { + Job job = MlMetadata.getMlMetadata(currentState).getJobs().get(request.getJobId()); + Job.Builder builder = new Job.Builder(job); + builder.setModelSnapshotId(modelSnapshot.getSnapshotId()); + builder.setEstablishedModelMemory(response); + return ClusterStateJobUpdate.putJobInClusterState(builder.build(), true, currentState); + } + }); + } else { + updateJobHandler = response -> { JobUpdate update = new JobUpdate.Builder(request.getJobId()) .setModelSnapshotId(modelSnapshot.getSnapshotId()) - .setEstablishedModelMemory(modelMem) + .setEstablishedModelMemory(response) .build(); jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, ActionListener.wrap( @@ -818,53 +823,14 @@ public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionList Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); updateHandler.accept(Boolean.TRUE); }, - e -> { - if (e.getClass() == ResourceNotFoundException.class) { - // Not found? maybe it's in the index - if (ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), request.getJobId())) { - revertModelSnapshotClusterState(request, modelSnapshot, updateHandler, modelMem, actionListener); - return; - } - } - actionListener.onFailure(e); - } + actionListener::onFailure )); }; + } // Step 0. Find the appropriate established model memory for the reverted job // ------- - jobResultsProvider.getEstablishedMemoryUsage(request.getJobId(), modelSizeStats.getTimestamp(), modelSizeStats, - establishedMemoryHandler, actionListener::onFailure); - } - - private void revertModelSnapshotClusterState(RevertModelSnapshotAction.Request request, - ModelSnapshot modelSnapshot, - CheckedConsumer updateHandler, Long modelMem, - ActionListener actionListener) { - clusterService.submitStateUpdateTask("revert-snapshot-" + request.getJobId(), - new AckedClusterStateUpdateTask(request, ActionListener.wrap(updateHandler, actionListener::onFailure)) { - - @Override - protected Boolean newResponse(boolean acknowledged) { - if (acknowledged) { - auditor.info(request.getJobId(), - Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); - return true; - } - // TODO is this an error? can actionListener.onFailure be called twice? - actionListener.onFailure(new IllegalStateException("Could not revert modelSnapshot on job [" - + request.getJobId() + "], not acknowledged by master.")); - return false; - } - - @Override - public ClusterState execute(ClusterState currentState) { - Job job = MlMetadata.getMlMetadata(currentState).getJobs().get(request.getJobId()); - Job.Builder builder = new Job.Builder(job); - builder.setModelSnapshotId(modelSnapshot.getSnapshotId()); - builder.setEstablishedModelMemory(modelMem); - return ClusterStateJobUpdate.putJobInClusterState(builder.build(), true, currentState); - } - }); + jobResultsProvider.getEstablishedMemoryUsage(request.getJobId(), modelSizeStats.getTimestamp(), modelSizeStats, updateJobHandler, + actionListener::onFailure); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java index 929c957695470..986b0d0cf46ae 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java @@ -14,7 +14,6 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.SortedSet; @@ -23,7 +22,6 @@ import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.hasSize; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; @@ -134,41 +132,11 @@ public void testExpandDatafeedConfigs_SplitBetweenClusterStateAndIndex() { e -> fail(e.getMessage()) )); - assertThat(configHolder.get(), hasSize(3)); assertEquals("cs-df", configHolder.get().get(0).getId()); assertEquals("index-df", configHolder.get().get(1).getId()); assertEquals("ll-df", configHolder.get().get(2).getId()); } - public void testExpandDatafeedConfigs_DuplicateConfigReturnsIndexDocument() { - MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); - mlMetadata.putJob(buildJobBuilder("datafeed-in-clusterstate").build(), false); - mlMetadata.putDatafeed(createDatafeedConfig("df1", "datafeed-in-clusterstate"), Collections.emptyMap()); - - ClusterState clusterState = ClusterState.builder(new ClusterName("datafeedconfigreadertests")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build())) - .build(); - - DatafeedConfig.Builder indexConfig1 = createDatafeedConfigBuilder("df1", "datafeed-in-index"); - DatafeedConfig.Builder indexConfig2 = createDatafeedConfigBuilder("df2", "job-c"); - DatafeedConfigProvider provider = mock(DatafeedConfigProvider.class); - mockProviderWithExpectedConfig(provider, "_all", Arrays.asList(indexConfig1, indexConfig2)); - - DatafeedConfigReader reader = new DatafeedConfigReader(provider); - - AtomicReference> configHolder = new AtomicReference<>(); - reader.expandDatafeedConfigs("_all", true, clusterState, ActionListener.wrap( - configHolder::set, - e -> fail(e.getMessage()) - )); - - assertThat(configHolder.get(), hasSize(2)); - assertEquals("df1", configHolder.get().get(0).getId()); - assertEquals("datafeed-in-index", configHolder.get().get(0).getJobId()); - assertEquals("df2", configHolder.get().get(1).getId()); - } - private ClusterState buildClusterStateWithJob(DatafeedConfig datafeed) { MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); mlMetadata.putJob(buildJobBuilder(JOB_ID_FOO).build(), false); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 76fc990995e95..4aba53d2614cb 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -86,6 +86,7 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -242,7 +243,6 @@ public void testExpandJob_GivenDuplicateConfig() throws IOException { List docsAsBytes = new ArrayList<>(); Job.Builder indexJob = buildJobBuilder("dupe"); - indexJob.setCustomSettings(Collections.singletonMap("job-saved-in-index", Boolean.TRUE)); docsAsBytes.add(toBytesReference(indexJob.build())); MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); @@ -256,10 +256,9 @@ public void testExpandJob_GivenDuplicateConfig() throws IOException { exceptionHolder::set )); - assertThat(jobsHolder.get().results(), hasSize(1)); - Job foundJob = jobsHolder.get().results().get(0); - assertTrue((Boolean)foundJob.getCustomSettings().get("job-saved-in-index")); - assertNull(exceptionHolder.get()); + assertNull(jobsHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(IllegalStateException.class)); + assertEquals("Job [dupe] configuration exists in both clusterstate and index", exceptionHolder.get().getMessage()); } public void testExpandJobs_SplitBetweenClusterStateAndIndex() throws IOException { @@ -372,8 +371,9 @@ public void testExpandJobIds_GivenDuplicateConfig() { exceptionHolder::set )); - assertThat(jobIdsHolder.get(), contains("dupe")); - assertNull(exceptionHolder.get()); + assertNull(jobIdsHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(IllegalStateException.class)); + assertEquals("Job [dupe] configuration exists in both clusterstate and index", exceptionHolder.get().getMessage()); } public void testExpandJobIdsFromClusterStateAndIndex_GivenAll() { @@ -870,12 +870,6 @@ public void testRevertSnapshot_GivenJobInClusterState() { when(clusterService.getClusterSettings()).thenReturn(clusterSettings); JobConfigProvider jobConfigProvider = mock(JobConfigProvider.class); - doAnswer(invocationOnMock -> { - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[3]; - listener.onFailure(new ResourceNotFoundException("missing job")); - return null; - }).when(jobConfigProvider).updateJob(anyString(), any(), any(), any(ActionListener.class)); - JobManager jobManager = new JobManager(environment, environment.settings(), jobResultsProvider, clusterService, auditor, threadPool, mock(Client.class), updateJobProcessNotifier, jobConfigProvider); @@ -894,6 +888,7 @@ public void testRevertSnapshot_GivenJobInClusterState() { jobManager.revertSnapshot(request, mock(ActionListener.class), modelSnapshot); verify(clusterService, times(1)).submitStateUpdateTask(eq("revert-snapshot-cs-revert"), any(AckedClusterStateUpdateTask.class)); + verify(jobConfigProvider, never()).updateJob(any(), any(), any(), any()); } private Job.Builder createJob() { From a893ef74ade4ea9c63cd0d2efa794d48fd331861 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 29 Nov 2018 11:14:08 +0000 Subject: [PATCH 2/4] Reinstate reverted tests that are valid --- .../ml/datafeed/DatafeedConfigReader.java | 16 ++++------- .../xpack/ml/job/JobManager.java | 3 +- .../datafeed/DatafeedConfigReaderTests.java | 28 +++++++++++++++++++ .../xpack/ml/job/JobManagerTests.java | 23 ++++++++++----- 4 files changed, 50 insertions(+), 20 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java index 3602147be3b2b..35913bcad04e2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java @@ -99,22 +99,17 @@ public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, C Map clusterStateConfigs = expandClusterStateDatafeeds(expression, clusterState); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoDatafeeds); - requiredMatches.filterMatchedIds(clusterStateConfigs.keySet()); datafeedConfigProvider.expandDatafeedConfigsWithoutMissingCheck(expression, ActionListener.wrap( - datafeedBuilders -> { - List datafeedConfigs = new ArrayList<>(); - for (DatafeedConfig.Builder builder : datafeedBuilders) { - datafeedConfigs.add(builder.build()); - } + indexDatafeeds -> { + List datafeedConfigs = new ArrayList<>(clusterStateConfigs.values()); // Duplicate configs existing in both the clusterstate and index documents are ok // this may occur during migration of configs. // Prefer the clusterstate configs and filter duplicates from the index - Set indexConfigIds = datafeedConfigs.stream().map(DatafeedConfig::getId).collect(Collectors.toSet()); - for (String clusterStateDatafeedId : clusterStateConfigs.keySet()) { - if (indexConfigIds.contains(clusterStateDatafeedId) == false) { - datafeedConfigs.add(clusterStateConfigs.get(clusterStateDatafeedId)); + for (DatafeedConfig.Builder builder : indexDatafeeds) { + if (clusterStateConfigs.containsKey(builder.getId()) == false) { + datafeedConfigs.add(builder.build()); } } @@ -123,7 +118,6 @@ public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, C if (requiredMatches.hasUnmatchedIds()) { listener.onFailure(ExceptionsHelper.missingDatafeedException(requiredMatches.unmatchedIdsString())); } else { - datafeedConfigs.addAll(clusterStateConfigs.values()); Collections.sort(datafeedConfigs, Comparator.comparing(DatafeedConfig::getId)); listener.onResponse(datafeedConfigs); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 0aa334b43173f..462f0886774c6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -10,7 +10,6 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; @@ -724,7 +723,7 @@ public void updateProcessOnCalendarChanged(List calendarJobIds, ActionLi jobConfigProvider.expandGroupIds(calendarJobIds, ActionListener.wrap( expandedIds -> { threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { - // Merge the expanded group members with the request Ids + // Merge the expanded group members with the request Ids // which are job ids rather than group Ids. expandedIds.addAll(calendarJobIds); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java index 986b0d0cf46ae..3317cf6c72965 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.SortedSet; @@ -22,6 +23,7 @@ import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.hasSize; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; @@ -132,11 +134,37 @@ public void testExpandDatafeedConfigs_SplitBetweenClusterStateAndIndex() { e -> fail(e.getMessage()) )); + assertThat(configHolder.get(), hasSize(3)); assertEquals("cs-df", configHolder.get().get(0).getId()); assertEquals("index-df", configHolder.get().get(1).getId()); assertEquals("ll-df", configHolder.get().get(2).getId()); } + public void testExpandDatafeedConfigs_DuplicateConfigReturnsClusterStateConfig() { + // TODO + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(buildJobBuilder("datafeed-in-clusterstate").build(), false); + mlMetadata.putDatafeed(createDatafeedConfig("df1", "datafeed-in-clusterstate"), Collections.emptyMap()); + ClusterState clusterState = ClusterState.builder(new ClusterName("datafeedconfigreadertests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + DatafeedConfig.Builder indexConfig1 = createDatafeedConfigBuilder("df1", "datafeed-in-index"); + DatafeedConfig.Builder indexConfig2 = createDatafeedConfigBuilder("df2", "job-c"); + DatafeedConfigProvider provider = mock(DatafeedConfigProvider.class); + mockProviderWithExpectedConfig(provider, "_all", Arrays.asList(indexConfig1, indexConfig2)); + DatafeedConfigReader reader = new DatafeedConfigReader(provider); + AtomicReference> configHolder = new AtomicReference<>(); + reader.expandDatafeedConfigs("_all", true, clusterState, ActionListener.wrap( + configHolder::set, + e -> fail(e.getMessage()) + )); + assertThat(configHolder.get(), hasSize(2)); + assertEquals("df1", configHolder.get().get(0).getId()); + assertEquals("datafeed-in-clusterstate", configHolder.get().get(0).getJobId()); + assertEquals("df2", configHolder.get().get(1).getId()); + } + private ClusterState buildClusterStateWithJob(DatafeedConfig datafeed) { MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); mlMetadata.putJob(buildJobBuilder(JOB_ID_FOO).build(), false); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 4aba53d2614cb..d3a4071f79dba 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -230,7 +230,9 @@ public void testExpandJobsFromClusterStateAndIndex_GivenAll() throws IOException } public void testExpandJob_GivenDuplicateConfig() throws IOException { - Job csJob = buildJobBuilder("dupe").build(); + Job csJob = buildJobBuilder("dupe") + .setCustomSettings(Collections.singletonMap("job-saved-in-clusterstate", Boolean.TRUE)) + .build(); MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); mlMetadata.putJob(csJob, false); @@ -243,6 +245,7 @@ public void testExpandJob_GivenDuplicateConfig() throws IOException { List docsAsBytes = new ArrayList<>(); Job.Builder indexJob = buildJobBuilder("dupe"); + indexJob.setCustomSettings(Collections.singletonMap("job-saved-in-index", Boolean.TRUE)); docsAsBytes.add(toBytesReference(indexJob.build())); MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); @@ -256,9 +259,10 @@ public void testExpandJob_GivenDuplicateConfig() throws IOException { exceptionHolder::set )); - assertNull(jobsHolder.get()); - assertThat(exceptionHolder.get(), instanceOf(IllegalStateException.class)); - assertEquals("Job [dupe] configuration exists in both clusterstate and index", exceptionHolder.get().getMessage()); + assertThat(jobsHolder.get().results(), hasSize(1)); + Job foundJob = jobsHolder.get().results().get(0); + assertTrue((Boolean)foundJob.getCustomSettings().get("job-saved-in-clusterstate")); + assertNull(exceptionHolder.get()); } public void testExpandJobs_SplitBetweenClusterStateAndIndex() throws IOException { @@ -371,9 +375,8 @@ public void testExpandJobIds_GivenDuplicateConfig() { exceptionHolder::set )); - assertNull(jobIdsHolder.get()); - assertThat(exceptionHolder.get(), instanceOf(IllegalStateException.class)); - assertEquals("Job [dupe] configuration exists in both clusterstate and index", exceptionHolder.get().getMessage()); + assertThat(jobIdsHolder.get(), contains("dupe")); + assertNull(exceptionHolder.get()); } public void testExpandJobIdsFromClusterStateAndIndex_GivenAll() { @@ -870,6 +873,12 @@ public void testRevertSnapshot_GivenJobInClusterState() { when(clusterService.getClusterSettings()).thenReturn(clusterSettings); JobConfigProvider jobConfigProvider = mock(JobConfigProvider.class); + doAnswer(invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[3]; + listener.onFailure(new ResourceNotFoundException("missing job")); + return null; + }).when(jobConfigProvider).updateJob(anyString(), any(), any(), any(ActionListener.class)); + JobManager jobManager = new JobManager(environment, environment.settings(), jobResultsProvider, clusterService, auditor, threadPool, mock(Client.class), updateJobProcessNotifier, jobConfigProvider); From 61f2ae2600bf4ef649a8b4b741488c444bc6c5f3 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 29 Nov 2018 11:57:30 +0000 Subject: [PATCH 3/4] Fix finalization and test --- .../TransportFinalizeJobExecutionAction.java | 16 +++--- ...nsportFinalizeJobExecutionActionTests.java | 52 +++++++++++++++---- 2 files changed, 53 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java index 2512462610ec4..a5fcd5e86881d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java @@ -73,7 +73,6 @@ protected AcknowledgedResponse newResponse() { @Override protected void masterOperation(FinalizeJobExecutionAction.Request request, ClusterState state, ActionListener listener) { - MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); Set jobsInClusterState = Arrays.stream(request.getJobIds()) .filter(id -> mlMetadata.getJobs().containsKey(id)) @@ -83,14 +82,19 @@ protected void masterOperation(FinalizeJobExecutionAction.Request request, Clust finalizeIndexJobs(Arrays.asList(request.getJobIds()), listener); } else { ActionListener finalizeClusterStateJobsListener = ActionListener.wrap( - ack -> finalizeClusterStateJobs(jobsInClusterState, listener), + ack -> { + Set jobsInIndex = new HashSet<>(Arrays.asList(request.getJobIds())); + jobsInIndex.removeAll(jobsInClusterState); + if (jobsInIndex.isEmpty()) { + listener.onResponse(ack); + } else { + finalizeIndexJobs(jobsInIndex, listener); + } + }, listener::onFailure ); - Set jobsInIndex = new HashSet<>(Arrays.asList(request.getJobIds())); - jobsInIndex.removeAll(jobsInClusterState); - - finalizeIndexJobs(jobsInIndex, finalizeClusterStateJobsListener); + finalizeClusterStateJobs(jobsInClusterState, finalizeClusterStateJobsListener); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionActionTests.java index d575d913038ea..bd5560a48b1a7 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionActionTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; @@ -31,8 +32,9 @@ import java.util.concurrent.atomic.AtomicReference; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -43,17 +45,19 @@ public class TransportFinalizeJobExecutionActionTests extends ESTestCase { private ThreadPool threadPool; private Client client; + private ClusterService clusterService; @Before @SuppressWarnings("unchecked") private void setupMocks() { ExecutorService executorService = mock(ExecutorService.class); threadPool = mock(ThreadPool.class); - org.elasticsearch.mock.orig.Mockito.doAnswer(invocation -> { + doAnswer(invocation -> { ((Runnable) invocation.getArguments()[0]).run(); return null; }).when(executorService).execute(any(Runnable.class)); when(threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)).thenReturn(executorService); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); client = mock(Client.class); doAnswer( invocationOnMock -> { @@ -61,14 +65,19 @@ private void setupMocks() { listener.onResponse(null); return null; }).when(client).execute(eq(UpdateAction.INSTANCE), any(), any()); - when(client.threadPool()).thenReturn(threadPool); - when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + + clusterService = mock(ClusterService.class); + + doAnswer( invocationOnMock -> { + ClusterStateUpdateTask updateTask = (ClusterStateUpdateTask)invocationOnMock.getArguments()[1]; + updateTask.clusterStateProcessed(null, null, null); + return null; + }).when(clusterService).submitStateUpdateTask(anyString(), any()); } public void testOperation_noJobsInClusterState() { - ClusterService clusterService = mock(ClusterService.class); - TransportFinalizeJobExecutionAction action = createAction(clusterService); + TransportFinalizeJobExecutionAction action = createAction(); ClusterState clusterState = ClusterState.builder(new ClusterName("finalize-job-action-tests")).build(); @@ -85,8 +94,7 @@ public void testOperation_noJobsInClusterState() { } public void testOperation_jobInClusterState() { - ClusterService clusterService = mock(ClusterService.class); - TransportFinalizeJobExecutionAction action = createAction(clusterService); + TransportFinalizeJobExecutionAction action = createAction(); MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("cs-job").build(new Date()), false); @@ -103,11 +111,37 @@ public void testOperation_jobInClusterState() { e -> fail(e.getMessage()) )); + assertTrue(ack.get().isAcknowledged()); verify(client, never()).execute(eq(UpdateAction.INSTANCE), any(), any()); verify(clusterService, times(1)).submitStateUpdateTask(any(), any()); } - private TransportFinalizeJobExecutionAction createAction(ClusterService clusterService) { + public void testOperation_jobsInBothClusterAndIndex() { + TransportFinalizeJobExecutionAction action = createAction(); + + MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); + mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("cs-job").build(new Date()), false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("finalize-job-action-tests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlBuilder.build())) + .build(); + + FinalizeJobExecutionAction.Request request = + new FinalizeJobExecutionAction.Request(new String[]{"index-job", "cs-job"}); + AtomicReference ack = new AtomicReference<>(); + action.masterOperation(request, clusterState, ActionListener.wrap( + ack::set, + e -> assertNull(e.getMessage()) + )); + + assertTrue(ack.get().isAcknowledged()); + // The job in the clusterstate should not be updated in the index + verify(client, times(1)).execute(eq(UpdateAction.INSTANCE), any(), any()); + verify(clusterService, times(1)).submitStateUpdateTask(any(), any()); + } + + private TransportFinalizeJobExecutionAction createAction() { return new TransportFinalizeJobExecutionAction(Settings.EMPTY, mock(TransportService.class), clusterService, threadPool, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class), client); From 38b996a6b2f6d698df48dc34d39b4b6be4696cc9 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 29 Nov 2018 14:09:45 +0000 Subject: [PATCH 4/4] Check cluster state config for running datafeed --- .../xpack/core/ml/job/messages/Messages.java | 2 + .../ml/action/TransportCloseJobAction.java | 31 +++++++--- .../action/TransportCloseJobActionTests.java | 56 ++++++++++++++++--- 3 files changed, 74 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index 4039d0be59e0f..f1a0c959bb935 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -91,6 +91,8 @@ public final class Messages { public static final String JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT = "Job memory status changed to hard_limit at {0}; adjust the " + "analysis_limits.model_memory_limit setting to ensure all data is analyzed"; + public static final String JOB_CANNOT_CLOSE_BECAUSE_DATAFEED = "cannot close job datafeed [{0}] hasn''t been stopped"; + public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_DUPLICATES = "categorization_filters contain duplicates"; public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_EMPTY = "categorization_filters are not allowed to contain empty strings"; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index 60e1d58a2d3ef..d6c93d713e554 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -28,9 +28,11 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; @@ -46,12 +48,13 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; public class TransportCloseJobAction extends TransportTasksAction { @@ -112,7 +115,7 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); jobManager.expandJobIds(request.getJobId(), request.allowNoJobs(), ActionListener.wrap( expandedJobIds -> { - validate(expandedJobIds, request.isForce(), tasksMetaData, ActionListener.wrap( + validate(expandedJobIds, request.isForce(), MlMetadata.getMlMetadata(state), tasksMetaData, ActionListener.wrap( response -> { request.setOpenJobIds(response.openJobIds.toArray(new String[0])); if (response.openJobIds.isEmpty() && response.closingJobIds.isEmpty()) { @@ -173,13 +176,14 @@ class OpenAndClosingIds { * * @param expandedJobIds The job ids * @param forceClose Force close the job(s) + * @param mlMetadata The ML metadata for un-migrated jobs * @param tasksMetaData Persistent tasks * @param listener Resolved job Ids listener */ - void validate(Collection expandedJobIds, boolean forceClose, PersistentTasksCustomMetaData tasksMetaData, - ActionListener listener) { + void validate(Collection expandedJobIds, boolean forceClose, MlMetadata mlMetadata, + PersistentTasksCustomMetaData tasksMetaData, ActionListener listener) { - checkDatafeedsHaveStopped(expandedJobIds, tasksMetaData, ActionListener.wrap( + checkDatafeedsHaveStopped(expandedJobIds, tasksMetaData, mlMetadata, ActionListener.wrap( response -> { OpenAndClosingIds ids = new OpenAndClosingIds(); List failedJobs = new ArrayList<>(); @@ -209,14 +213,27 @@ void validate(Collection expandedJobIds, boolean forceClose, PersistentT } void checkDatafeedsHaveStopped(Collection jobIds, PersistentTasksCustomMetaData tasksMetaData, - ActionListener listener) { + MlMetadata mlMetadata, ActionListener listener) { + + for (String jobId: jobIds) { + Optional datafeed = mlMetadata.getDatafeedByJobId(jobId); + if (datafeed.isPresent()) { + DatafeedState datafeedState = MlTasks.getDatafeedState(datafeed.get().getId(), tasksMetaData); + if (datafeedState != DatafeedState.STOPPED) { + listener.onFailure( + ExceptionsHelper.conflictStatusException( + Messages.getMessage(Messages.JOB_CANNOT_CLOSE_BECAUSE_DATAFEED, datafeed.get().getId()))); + return; + } + } + } datafeedConfigProvider.findDatafeedsForJobIds(jobIds, ActionListener.wrap( datafeedIds -> { for (String datafeedId : datafeedIds) { DatafeedState datafeedState = MlTasks.getDatafeedState(datafeedId, tasksMetaData); if (datafeedState != DatafeedState.STOPPED) { listener.onFailure(ExceptionsHelper.conflictStatusException( - "cannot close job datafeed [{}] hasn't been stopped", datafeedId)); + Messages.getMessage(Messages.JOB_CANNOT_CLOSE_BECAUSE_DATAFEED, datafeedId))); return; } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java index db2be13b75e0f..e41fa6669d9af 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.CloseJobAction.Request; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; @@ -109,7 +110,8 @@ public void testValidate_datafeedState() { exceptionHolder::set ); - closeJobAction.validate(Collections.singletonList(jobId), false, startDataFeedTaskBuilder.build(), listener); + closeJobAction.validate(Collections.singletonList(jobId), false, MlMetadata.EMPTY_METADATA, + startDataFeedTaskBuilder.build(), listener); assertNull(responseHolder.get()); assertNotNull(exceptionHolder.get()); @@ -125,13 +127,48 @@ public void testValidate_datafeedState() { } exceptionHolder.set(null); - closeJobAction.validate(Collections.singletonList(jobId), false, dataFeedNotStartedTaskBuilder.build(), listener); + closeJobAction.validate(Collections.singletonList(jobId), false, MlMetadata.EMPTY_METADATA, + dataFeedNotStartedTaskBuilder.build(), listener); assertNull(exceptionHolder.get()); assertNotNull(responseHolder.get()); assertThat(responseHolder.get().openJobIds, contains(jobId)); assertThat(responseHolder.get().closingJobIds, empty()); } + public void testValidate_datafeedNotStoppedAndConfigInClusterState() { + final PersistentTasksCustomMetaData.Builder startDataFeedTaskBuilder = PersistentTasksCustomMetaData.builder(); + String jobId = "job-with-started-df"; + String datafeedId = "df1"; + addJobTask(jobId, null, JobState.OPENED, startDataFeedTaskBuilder); + addTask(datafeedId, 0L, null, DatafeedState.STARTED, startDataFeedTaskBuilder); + + mockDatafeedConfigFindDatafeeds(Collections.emptySet()); + DatafeedConfig.Builder dfBuilder = new DatafeedConfig.Builder(datafeedId, jobId); + dfBuilder.setIndices(Collections.singletonList("beats*")); + MlMetadata.Builder mlBuilder = new MlMetadata.Builder() + .putJob(BaseMlIntegTestCase.createFareQuoteJob(jobId).build(new Date()), false) + .putDatafeed(dfBuilder.build(), Collections.emptyMap()); + + AtomicReference exceptionHolder = new AtomicReference<>(); + AtomicReference responseHolder = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap( + responseHolder::set, + exceptionHolder::set + ); + + TransportCloseJobAction closeJobAction = createAction(); + + closeJobAction.validate(Collections.singletonList(jobId), false, mlBuilder.build(), + startDataFeedTaskBuilder.build(), listener); + + assertNull(responseHolder.get()); + assertNotNull(exceptionHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(ElasticsearchStatusException.class)); + ElasticsearchStatusException esException = (ElasticsearchStatusException) exceptionHolder.get(); + assertEquals(RestStatus.CONFLICT, esException.status()); + assertEquals("cannot close job datafeed [df1] hasn't been stopped", esException.getMessage()); + } + public void testValidate_givenFailedJob() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id_failed", null, JobState.FAILED, tasksBuilder); @@ -148,7 +185,8 @@ public void testValidate_givenFailedJob() { ); // force close so not an error for the failed job - closeJobAction.validate(Collections.singletonList("job_id_failed"), true, tasksBuilder.build(), listener); + closeJobAction.validate(Collections.singletonList("job_id_failed"), true, MlMetadata.EMPTY_METADATA, + tasksBuilder.build(), listener); assertNull(exceptionHolder.get()); assertNotNull(responseHolder.get()); assertThat(responseHolder.get().openJobIds, contains("job_id_failed")); @@ -156,7 +194,8 @@ public void testValidate_givenFailedJob() { // not a force close so is an error responseHolder.set(null); - closeJobAction.validate(Collections.singletonList("job_id_failed"), false, tasksBuilder.build(), listener); + closeJobAction.validate(Collections.singletonList("job_id_failed"), false, MlMetadata.EMPTY_METADATA, + tasksBuilder.build(), listener); assertNull(responseHolder.get()); assertNotNull(exceptionHolder.get()); assertThat(exceptionHolder.get(), instanceOf(ElasticsearchStatusException.class)); @@ -182,25 +221,26 @@ public void testValidate_withSpecificJobIds() { ); TransportCloseJobAction closeJobAction = createAction(); - closeJobAction.validate(Arrays.asList("job_id_closing", "job_id_open-1", "job_id_open-2"), false, tasks, listener); + closeJobAction.validate(Arrays.asList("job_id_closing", "job_id_open-1", "job_id_open-2"), false, MlMetadata.EMPTY_METADATA, + tasks, listener); assertNull(exceptionHolder.get()); assertNotNull(responseHolder.get()); assertEquals(Arrays.asList("job_id_open-1", "job_id_open-2"), responseHolder.get().openJobIds); assertEquals(Collections.singletonList("job_id_closing"), responseHolder.get().closingJobIds); - closeJobAction.validate(Arrays.asList("job_id_open-1", "job_id_open-2"), false, tasks, listener); + closeJobAction.validate(Arrays.asList("job_id_open-1", "job_id_open-2"), false, MlMetadata.EMPTY_METADATA, tasks, listener); assertNull(exceptionHolder.get()); assertNotNull(responseHolder.get()); assertEquals(Arrays.asList("job_id_open-1", "job_id_open-2"), responseHolder.get().openJobIds); assertEquals(Collections.emptyList(), responseHolder.get().closingJobIds); - closeJobAction.validate(Collections.singletonList("job_id_closing"), false, tasks, listener); + closeJobAction.validate(Collections.singletonList("job_id_closing"), false, MlMetadata.EMPTY_METADATA, tasks, listener); assertNull(exceptionHolder.get()); assertNotNull(responseHolder.get()); assertEquals(Collections.emptyList(), responseHolder.get().openJobIds); assertEquals(Collections.singletonList("job_id_closing"), responseHolder.get().closingJobIds); - closeJobAction.validate(Collections.singletonList("job_id_open-1"), false, tasks, listener); + closeJobAction.validate(Collections.singletonList("job_id_open-1"), false, MlMetadata.EMPTY_METADATA, tasks, listener); assertNull(exceptionHolder.get()); assertNotNull(responseHolder.get()); assertEquals(Collections.singletonList("job_id_open-1"), responseHolder.get().openJobIds);