Skip to content

Commit bf76445

Browse files
committed
Watcher: Only trigger a watch if new or schedule/changed (#35908)
The trigger engine did always create a new schedule data structure, when the watcher indexing listener called an add. However the indexing listener also called add, when the watch status was updated. This means, that upon a watch status update the watch got retriggered, potentially waiting a defined interval from the watch status update onwards, instead of waiting from the last run. This commit only updates the schedule in the trigger engine, if it actually has changed, otherwise the existing schedule will not be touched. This has two results 1. If a watch is updated by an execution, the existing interval will not be touched (meaning the scheduled time will not move forward). 2. If a watch is updated by a user, but the schedule is not changed, it will not be reset from the update (for example starting to count from 5 minutes again, if the interval was set to 5 minutes). Furthermore some minor cleanups were applied, making variables final in the ctor, preventing double creation of variables.
1 parent e4b6eb4 commit bf76445

File tree

3 files changed

+49
-14
lines changed

3 files changed

+49
-14
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/Schedule.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@
1111

1212
import java.io.IOException;
1313

14+
/**
15+
* This interface is used to implement watcher specific schedules, the existing implementations are either
16+
* based on a cron based or an interval based schedule
17+
*
18+
* In addition to the methods defined here, you also have to implement the equals() method to properly work
19+
* for the trigger engine implementations.
20+
*/
1421
public interface Schedule extends SchedulerEngine.Schedule, ToXContent {
1522

1623
String type();

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.time.Clock;
2323
import java.util.ArrayList;
2424
import java.util.Collection;
25+
import java.util.Collections;
2526
import java.util.HashMap;
2627
import java.util.List;
2728
import java.util.Map;
@@ -37,24 +38,23 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
3738
positiveTimeSetting("xpack.watcher.trigger.schedule.ticker.tick_interval", TimeValue.timeValueMillis(500), Property.NodeScope);
3839

3940
private final TimeValue tickInterval;
40-
private volatile Map<String, ActiveSchedule> schedules;
41-
private Ticker ticker;
41+
private final Map<String, ActiveSchedule> schedules = new ConcurrentHashMap<>();
42+
private final Ticker ticker;
4243

4344
public TickerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) {
4445
super(settings, scheduleRegistry, clock);
4546
this.tickInterval = TICKER_INTERVAL_SETTING.get(settings);
46-
this.schedules = new ConcurrentHashMap<>();
4747
this.ticker = new Ticker(Node.NODE_DATA_SETTING.get(settings));
4848
}
4949

5050
@Override
5151
public synchronized void start(Collection<Watch> jobs) {
5252
long startTime = clock.millis();
53-
Map<String, ActiveSchedule> schedules = new HashMap<>(jobs.size());
53+
Map<String, ActiveSchedule> startingSchedules = new HashMap<>(jobs.size());
5454
for (Watch job : jobs) {
5555
if (job.trigger() instanceof ScheduleTrigger) {
5656
ScheduleTrigger trigger = (ScheduleTrigger) job.trigger();
57-
schedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), startTime));
57+
startingSchedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), startTime));
5858
}
5959
}
6060
// why are we calling putAll() here instead of assigning a brand
@@ -66,7 +66,7 @@ public synchronized void start(Collection<Watch> jobs) {
6666
// this method. The index operation however will run at the same time
6767
// as the reload, so if we clean out the old data structure here,
6868
// that can lead to that one watch not being triggered
69-
this.schedules.putAll(schedules);
69+
this.schedules.putAll(startingSchedules);
7070
}
7171

7272
@Override
@@ -84,7 +84,14 @@ public synchronized void pauseExecution() {
8484
public void add(Watch watch) {
8585
assert watch.trigger() instanceof ScheduleTrigger;
8686
ScheduleTrigger trigger = (ScheduleTrigger) watch.trigger();
87-
schedules.put(watch.id(), new ActiveSchedule(watch.id(), trigger.getSchedule(), clock.millis()));
87+
ActiveSchedule currentSchedule = schedules.get(watch.id());
88+
// only update the schedules data structure if the scheduled trigger really has changed, otherwise the time would be reset again
89+
// resulting in later executions, as the time would only count after a watch has been stored, as this code is triggered by the
90+
// watcher indexing listener
91+
// this also means that updating an existing watch would not retrigger the schedule time, if it remains the same schedule
92+
if (currentSchedule == null || currentSchedule.schedule.equals(trigger.getSchedule()) == false) {
93+
schedules.put(watch.id(), new ActiveSchedule(watch.id(), trigger.getSchedule(), clock.millis()));
94+
}
8895
}
8996

9097
@Override
@@ -98,10 +105,10 @@ void checkJobs() {
98105
for (ActiveSchedule schedule : schedules.values()) {
99106
long scheduledTime = schedule.check(triggeredTime);
100107
if (scheduledTime > 0) {
101-
logger.debug("triggered job [{}] at [{}] (scheduled time was [{}])", schedule.name,
102-
new DateTime(triggeredTime, UTC), new DateTime(scheduledTime, UTC));
103-
events.add(new ScheduleTriggerEvent(schedule.name, new DateTime(triggeredTime, UTC),
104-
new DateTime(scheduledTime, UTC)));
108+
DateTime triggeredDateTime = new DateTime(triggeredTime, UTC);
109+
DateTime scheduledDateTime = new DateTime(scheduledTime, UTC);
110+
logger.debug("triggered job [{}] at [{}] (scheduled time was [{}])", schedule.name, triggeredDateTime, scheduledDateTime);
111+
events.add(new ScheduleTriggerEvent(schedule.name, triggeredDateTime, scheduledDateTime));
105112
if (events.size() >= 1000) {
106113
notifyListeners(events);
107114
events.clear();
@@ -113,6 +120,11 @@ void checkJobs() {
113120
}
114121
}
115122

123+
// visible for testing
124+
Map<String, ActiveSchedule> getSchedules() {
125+
return Collections.unmodifiableMap(schedules);
126+
}
127+
116128
protected void notifyListeners(List<TriggerEvent> events) {
117129
consumers.forEach(consumer -> consumer.accept(events));
118130
}

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.xpack.core.watcher.watch.Watch;
1414
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
1515
import org.elasticsearch.xpack.watcher.input.none.ExecutableNoneInput;
16-
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
1716
import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule;
1817
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
1918
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
@@ -36,20 +35,21 @@
3635
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
3736
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.weekly;
3837
import static org.hamcrest.Matchers.is;
38+
import static org.hamcrest.Matchers.not;
3939
import static org.joda.time.DateTimeZone.UTC;
4040
import static org.mockito.Mockito.mock;
4141

4242
public class TickerScheduleEngineTests extends ESTestCase {
4343

44-
private TriggerEngine engine;
44+
private TickerScheduleTriggerEngine engine;
4545
protected ClockMock clock = ClockMock.frozen();
4646

4747
@Before
4848
public void init() throws Exception {
4949
engine = createEngine();
5050
}
5151

52-
private TriggerEngine createEngine() {
52+
private TickerScheduleTriggerEngine createEngine() {
5353
Settings settings = Settings.EMPTY;
5454
// having a low value here speeds up the tests tremendously, we still want to run with the defaults every now and then
5555
if (usually()) {
@@ -254,6 +254,22 @@ public void accept(Iterable<TriggerEvent> events) {
254254
assertThat(counter.get(), is(2));
255255
}
256256

257+
public void testAddOnlyWithNewSchedule() {
258+
engine.start(Collections.emptySet());
259+
260+
// add watch with schedule
261+
Watch oncePerSecondWatch = createWatch("_id", interval("1s"));
262+
engine.add(oncePerSecondWatch);
263+
TickerScheduleTriggerEngine.ActiveSchedule activeSchedule = engine.getSchedules().get("_id");
264+
engine.add(oncePerSecondWatch);
265+
assertThat(engine.getSchedules().get("_id"), is(activeSchedule));
266+
267+
// add watch with same id but different watch
268+
Watch oncePerMinuteWatch = createWatch("_id", interval("1m"));
269+
engine.add(oncePerMinuteWatch);
270+
assertThat(engine.getSchedules().get("_id"), not(is(activeSchedule)));
271+
}
272+
257273
private Watch createWatch(String name, Schedule schedule) {
258274
return new Watch(name, new ScheduleTrigger(schedule), new ExecutableNoneInput(),
259275
InternalAlwaysCondition.INSTANCE, null, null,

0 commit comments

Comments
 (0)