From e4da2726a0a7e438f71d5a9e1214b899dbf73fac Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 25 Jan 2022 13:33:27 +0000 Subject: [PATCH 1/2] Update running process when global calendar changes --- .../xpack/core/ml/job/config/JobUpdate.java | 6 ++ .../ml/integration/ScheduledEventsIT.java | 60 ++++++++++++---- .../xpack/ml/job/JobManager.java | 69 ++++++++++--------- 3 files changed, 91 insertions(+), 44 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java index 0ce7366d4a281..0152e234c4078 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.ml.job.config; import org.elasticsearch.Version; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -395,6 +396,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + @Override + public String toString() { + return Strings.toString(this::toXContent); + } + public Set getUpdateFields() { Set updateFields = new TreeSet<>(); if (groups != null) { diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java index 0a69500e51d5e..d7778bf570430 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -372,7 +373,10 @@ public void testAddOpenedJobToGroupWithCalendar() throws Exception { } /** - * An open job that later gets added to a calendar, should take the scheduled events into account + * Add a global calendar then create a job that will pick + * up the calendar. + * Add a new scheduled event to the calendar, the open + * job should pick up the new event */ public void testNewJobWithGlobalCalendar() throws Exception { String calendarId = "test-global-calendar"; @@ -381,28 +385,56 @@ public void testNewJobWithGlobalCalendar() throws Exception { putCalendar(calendarId, Collections.singletonList(Metadata.ALL), "testNewJobWithGlobalCalendar calendar"); long startTime = 1514764800000L; - final int bucketCount = 3; + final int bucketCount = 6; TimeValue bucketSpan = TimeValue.timeValueMinutes(30); // Put events in the calendar - List events = new ArrayList<>(); + List preOpenEvents = new ArrayList<>(); long eventStartTime = startTime; long eventEndTime = eventStartTime + (long) (1.5 * bucketSpan.millis()); - events.add( - new ScheduledEvent.Builder().description("Some Event") + preOpenEvents.add( + new ScheduledEvent.Builder().description("Pre open Event") .startTime((Instant.ofEpochMilli(eventStartTime))) .endTime((Instant.ofEpochMilli(eventEndTime))) .calendarId(calendarId) .build() ); - postScheduledEvents(calendarId, events); - - Job.Builder job = createJob("scheduled-events-add-to-new-job--with-global-calendar", bucketSpan); + postScheduledEvents(calendarId, preOpenEvents); // Open the job + Job.Builder job = createJob("scheduled-events-add-to-new-job--with-global-calendar", bucketSpan); openJob(job.getId()); + // Add another event after the job is opened + List postOpenJobEvents = new ArrayList<>(); + eventStartTime = eventEndTime + (3 * bucketSpan.millis()); + eventEndTime = eventStartTime + bucketSpan.millis(); + postOpenJobEvents.add( + new ScheduledEvent.Builder().description("Event added after job is opened") + .startTime((Instant.ofEpochMilli(eventStartTime))) + .endTime((Instant.ofEpochMilli(eventEndTime))) + .calendarId(calendarId) + .build() + ); + postScheduledEvents(calendarId, postOpenJobEvents); + + // Wait until the notification that the job was updated is indexed + assertBusy(() -> { + SearchResponse searchResponse = client().prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX) + .setSize(1) + .addSort("timestamp", SortOrder.DESC) + .setQuery( + QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery("job_id", job.getId())) + .filter(QueryBuilders.termQuery("level", "info")) + ) + .get(); + SearchHit[] hits = searchResponse.getHits().getHits(); + assertThat(hits.length, equalTo(1)); + assertThat(hits[0].getSourceAsMap().get("message"), equalTo("Updated calendars in running process")); + }); + // write some buckets of data postData( job.getId(), @@ -416,12 +448,14 @@ public void testNewJobWithGlobalCalendar() throws Exception { GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(job.getId()); List buckets = getBuckets(getBucketsRequest); - // 1st and 2nd buckets have the event but the last one does not - assertEquals(1, buckets.get(0).getScheduledEvents().size()); - assertEquals("Some Event", buckets.get(0).getScheduledEvents().get(0)); - assertEquals(1, buckets.get(1).getScheduledEvents().size()); - assertEquals("Some Event", buckets.get(1).getScheduledEvents().get(0)); + // 1st and 2nd buckets have the first event + // 5th and 6th buckets have the second event + assertThat(buckets.get(0).getScheduledEvents(), contains("Pre open Event")); + assertThat(buckets.get(1).getScheduledEvents(), contains("Pre open Event")); assertEquals(0, buckets.get(2).getScheduledEvents().size()); + assertEquals(0, buckets.get(3).getScheduledEvents().size()); + assertThat(buckets.get(4).getScheduledEvents(), contains("Event added after job is opened")); + assertThat(buckets.get(5).getScheduledEvents(), contains("Event added after job is opened")); } private Job.Builder createJob(String jobId, TimeValue bucketSpan) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index be859dd02a211..ab63c05df7908 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; @@ -27,9 +28,6 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.NamedXContentRegistry; -import org.elasticsearch.xcontent.ToXContent; -import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xpack.core.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.MlConfigIndex; import org.elasticsearch.xpack.core.ml.MlTasks; @@ -63,6 +61,7 @@ import org.elasticsearch.xpack.ml.utils.VoidChainTaskExecutor; import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.List; @@ -426,28 +425,28 @@ public void deleteJob( private void postJobUpdate(UpdateJobAction.Request request, Job updatedJob, ActionListener actionListener) { // Autodetect must be updated if the fields that the C++ uses are changed - if (request.getJobUpdate().isAutodetectProcessUpdate()) { - JobUpdate jobUpdate = request.getJobUpdate(); + JobUpdate jobUpdate = request.getJobUpdate(); + if (jobUpdate.isAutodetectProcessUpdate()) { if (isJobOpen(clusterService.state(), request.getJobId())) { updateJobProcessNotifier.submitJobUpdate(UpdateParams.fromJobUpdate(jobUpdate), ActionListener.wrap(isUpdated -> { if (isUpdated) { auditJobUpdatedIfNotInternal(request); + } else { + logger.error("[{}] Updating autodetect failed for job update [{}]", jobUpdate.getJobId(), jobUpdate); } }, e -> { - // No need to do anything + logger.error( + new ParameterizedMessage( + "[{}] Updating autodetect failed with an exception, job update [{}] ", + jobUpdate.getJobId(), + jobUpdate + ), + e + ); })); } } else { - logger.debug("[{}] No process update required for job update: {}", request::getJobId, () -> { - try { - XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); - request.getJobUpdate().toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); - return Strings.toString(jsonBuilder); - } catch (IOException e) { - return "(unprintable due to " + e.getMessage() + ")"; - } - }); - + logger.debug("[{}] No process update required for job update: {}", jobUpdate::getJobId, jobUpdate::toString); auditJobUpdatedIfNotInternal(request); } @@ -610,32 +609,40 @@ public void updateProcessOnCalendarChanged(List calendarJobIds, ActionLi return; } + boolean appliesToAllJobs = calendarJobIds.stream().anyMatch(Metadata.ALL::equals); + if (appliesToAllJobs) { + submitJobEventUpdate(openJobIds, updateListener); + return; + } + // calendarJobIds may be a group or job jobConfigProvider.expandGroupIds( calendarJobIds, ActionListener.wrap(expandedIds -> threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { - // Merge the expended group members with the request Ids. + // Merge the expanded group members with the request Ids. // Ids that aren't jobs will be filtered by isJobOpen() expandedIds.addAll(calendarJobIds); - for (String jobId : expandedIds) { - if (isJobOpen(clusterState, jobId)) { - updateJobProcessNotifier.submitJobUpdate( - UpdateParams.scheduledEventsUpdate(jobId), - ActionListener.wrap(isUpdated -> { - if (isUpdated) { - auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS)); - } - }, e -> logger.error("[" + jobId + "] failed submitting process update on calendar change", e)) - ); - } - } - - updateListener.onResponse(Boolean.TRUE); + openJobIds.retainAll(expandedIds); + submitJobEventUpdate(openJobIds, updateListener); }), updateListener::onFailure) ); } + private void submitJobEventUpdate(Collection jobIds, ActionListener updateListener) { + for (String jobId : jobIds) { + updateJobProcessNotifier.submitJobUpdate( + UpdateParams.scheduledEventsUpdate(jobId), + ActionListener.wrap( + isUpdated -> { auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS)); }, + e -> logger.error("[" + jobId + "] failed submitting process update on calendar change", e) + ) + ); + } + + updateListener.onResponse(Boolean.TRUE); + } + public void revertSnapshot( RevertModelSnapshotAction.Request request, ActionListener actionListener, From 18557702ef1a3f547eac65d8083475c1490d3207 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 25 Jan 2022 13:46:32 +0000 Subject: [PATCH 2/2] Update docs/changelog/83044.yaml --- docs/changelog/83044.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/83044.yaml diff --git a/docs/changelog/83044.yaml b/docs/changelog/83044.yaml new file mode 100644 index 0000000000000..4ba59ff68d073 --- /dev/null +++ b/docs/changelog/83044.yaml @@ -0,0 +1,5 @@ +pr: 83044 +summary: Update running process when global calendar changes +area: Machine Learning +type: bug +issues: []