Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/83044.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 83044
summary: Update running process when global calendar changes
area: Machine Learning
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.core.ml.job.config;

import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -395,6 +396,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

@Override
public String toString() {
return Strings.toString(this::toXContent);
}

public Set<String> getUpdateFields() {
Set<String> updateFields = new TreeSet<>();
if (groups != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -372,7 +373,10 @@ public void testAddOpenedJobToGroupWithCalendar() throws Exception {
}

/**
* An open job that later gets added to a calendar, should take the scheduled events into account
* Add a global calendar then create a job that will pick
* up the calendar.
* Add a new scheduled event to the calendar, the open
* job should pick up the new event
*/
public void testNewJobWithGlobalCalendar() throws Exception {
String calendarId = "test-global-calendar";
Expand All @@ -381,28 +385,56 @@ public void testNewJobWithGlobalCalendar() throws Exception {
putCalendar(calendarId, Collections.singletonList(Metadata.ALL), "testNewJobWithGlobalCalendar calendar");

long startTime = 1514764800000L;
final int bucketCount = 3;
final int bucketCount = 6;
TimeValue bucketSpan = TimeValue.timeValueMinutes(30);

// Put events in the calendar
List<ScheduledEvent> events = new ArrayList<>();
List<ScheduledEvent> preOpenEvents = new ArrayList<>();
long eventStartTime = startTime;
long eventEndTime = eventStartTime + (long) (1.5 * bucketSpan.millis());
events.add(
new ScheduledEvent.Builder().description("Some Event")
preOpenEvents.add(
new ScheduledEvent.Builder().description("Pre open Event")
.startTime((Instant.ofEpochMilli(eventStartTime)))
.endTime((Instant.ofEpochMilli(eventEndTime)))
.calendarId(calendarId)
.build()
);

postScheduledEvents(calendarId, events);

Job.Builder job = createJob("scheduled-events-add-to-new-job--with-global-calendar", bucketSpan);
postScheduledEvents(calendarId, preOpenEvents);

// Open the job
Job.Builder job = createJob("scheduled-events-add-to-new-job--with-global-calendar", bucketSpan);
openJob(job.getId());

// Add another event after the job is opened
List<ScheduledEvent> postOpenJobEvents = new ArrayList<>();
eventStartTime = eventEndTime + (3 * bucketSpan.millis());
eventEndTime = eventStartTime + bucketSpan.millis();
postOpenJobEvents.add(
new ScheduledEvent.Builder().description("Event added after job is opened")
.startTime((Instant.ofEpochMilli(eventStartTime)))
.endTime((Instant.ofEpochMilli(eventEndTime)))
.calendarId(calendarId)
.build()
);
postScheduledEvents(calendarId, postOpenJobEvents);

// Wait until the notification that the job was updated is indexed
assertBusy(() -> {
SearchResponse searchResponse = client().prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX)
.setSize(1)
.addSort("timestamp", SortOrder.DESC)
.setQuery(
QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery("job_id", job.getId()))
.filter(QueryBuilders.termQuery("level", "info"))
)
.get();
SearchHit[] hits = searchResponse.getHits().getHits();
assertThat(hits.length, equalTo(1));
assertThat(hits[0].getSourceAsMap().get("message"), equalTo("Updated calendars in running process"));
});

// write some buckets of data
postData(
job.getId(),
Expand All @@ -416,12 +448,14 @@ public void testNewJobWithGlobalCalendar() throws Exception {
GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(job.getId());
List<Bucket> buckets = getBuckets(getBucketsRequest);

// 1st and 2nd buckets have the event but the last one does not
assertEquals(1, buckets.get(0).getScheduledEvents().size());
assertEquals("Some Event", buckets.get(0).getScheduledEvents().get(0));
assertEquals(1, buckets.get(1).getScheduledEvents().size());
assertEquals("Some Event", buckets.get(1).getScheduledEvents().get(0));
// 1st and 2nd buckets have the first event
// 5th and 6th buckets have the second event
assertThat(buckets.get(0).getScheduledEvents(), contains("Pre open Event"));
assertThat(buckets.get(1).getScheduledEvents(), contains("Pre open Event"));
assertEquals(0, buckets.get(2).getScheduledEvents().size());
assertEquals(0, buckets.get(3).getScheduledEvents().size());
assertThat(buckets.get(4).getScheduledEvents(), contains("Event added after job is opened"));
assertThat(buckets.get(5).getScheduledEvents(), contains("Event added after job is opened"));
}

private Job.Builder createJob(String jobId, TimeValue bucketSpan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
Expand All @@ -27,9 +28,6 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.MlConfigIndex;
import org.elasticsearch.xpack.core.ml.MlTasks;
Expand Down Expand Up @@ -63,6 +61,7 @@
import org.elasticsearch.xpack.ml.utils.VoidChainTaskExecutor;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
Expand Down Expand Up @@ -426,28 +425,28 @@ public void deleteJob(

private void postJobUpdate(UpdateJobAction.Request request, Job updatedJob, ActionListener<PutJobAction.Response> actionListener) {
// Autodetect must be updated if the fields that the C++ uses are changed
if (request.getJobUpdate().isAutodetectProcessUpdate()) {
JobUpdate jobUpdate = request.getJobUpdate();
JobUpdate jobUpdate = request.getJobUpdate();
if (jobUpdate.isAutodetectProcessUpdate()) {
if (isJobOpen(clusterService.state(), request.getJobId())) {
updateJobProcessNotifier.submitJobUpdate(UpdateParams.fromJobUpdate(jobUpdate), ActionListener.wrap(isUpdated -> {
if (isUpdated) {
auditJobUpdatedIfNotInternal(request);
} else {
logger.error("[{}] Updating autodetect failed for job update [{}]", jobUpdate.getJobId(), jobUpdate);
}
}, e -> {
// No need to do anything
logger.error(
new ParameterizedMessage(
"[{}] Updating autodetect failed with an exception, job update [{}] ",
jobUpdate.getJobId(),
jobUpdate
),
e
);
}));
}
} else {
logger.debug("[{}] No process update required for job update: {}", request::getJobId, () -> {
try {
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
request.getJobUpdate().toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
return Strings.toString(jsonBuilder);
} catch (IOException e) {
return "(unprintable due to " + e.getMessage() + ")";
}
});

logger.debug("[{}] No process update required for job update: {}", jobUpdate::getJobId, jobUpdate::toString);
auditJobUpdatedIfNotInternal(request);
}

Expand Down Expand Up @@ -610,32 +609,40 @@ public void updateProcessOnCalendarChanged(List<String> calendarJobIds, ActionLi
return;
}

boolean appliesToAllJobs = calendarJobIds.stream().anyMatch(Metadata.ALL::equals);
if (appliesToAllJobs) {
submitJobEventUpdate(openJobIds, updateListener);
return;
}

// calendarJobIds may be a group or job
jobConfigProvider.expandGroupIds(
calendarJobIds,
ActionListener.wrap(expandedIds -> threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> {
// Merge the expended group members with the request Ids.
// Merge the expanded group members with the request Ids.
// Ids that aren't jobs will be filtered by isJobOpen()
expandedIds.addAll(calendarJobIds);

for (String jobId : expandedIds) {
if (isJobOpen(clusterState, jobId)) {
updateJobProcessNotifier.submitJobUpdate(
UpdateParams.scheduledEventsUpdate(jobId),
ActionListener.wrap(isUpdated -> {
if (isUpdated) {
auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS));
}
}, e -> logger.error("[" + jobId + "] failed submitting process update on calendar change", e))
);
}
}

updateListener.onResponse(Boolean.TRUE);
openJobIds.retainAll(expandedIds);
submitJobEventUpdate(openJobIds, updateListener);
}), updateListener::onFailure)
);
}

private void submitJobEventUpdate(Collection<String> jobIds, ActionListener<Boolean> updateListener) {
for (String jobId : jobIds) {
updateJobProcessNotifier.submitJobUpdate(
UpdateParams.scheduledEventsUpdate(jobId),
ActionListener.wrap(
isUpdated -> { auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS)); },
e -> logger.error("[" + jobId + "] failed submitting process update on calendar change", e)
)
);
}

updateListener.onResponse(Boolean.TRUE);
}

public void revertSnapshot(
RevertModelSnapshotAction.Request request,
ActionListener<RevertModelSnapshotAction.Response> actionListener,
Expand Down