Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.watcher.test;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
Expand Down Expand Up @@ -70,10 +71,12 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
Expand Down Expand Up @@ -177,7 +180,7 @@ protected boolean timeWarped() {
public void _setup() throws Exception {
if (timeWarped()) {
timeWarp = new TimeWarp(internalCluster().getInstances(ScheduleTriggerEngineMock.class),
(ClockMock)getInstanceFromMaster(Clock.class));
(ClockMock)getInstanceFromMaster(Clock.class), logger);
}

if (internalCluster().size() > 0) {
Expand Down Expand Up @@ -536,24 +539,28 @@ public EmailSent send(Email email, Authentication auth, Profile profile, String

protected static class TimeWarp {

protected final Iterable<ScheduleTriggerEngineMock> schedulers;
protected final ClockMock clock;
private final List<ScheduleTriggerEngineMock> schedulers;
private final ClockMock clock;
private final Logger logger;

public TimeWarp(Iterable<ScheduleTriggerEngineMock> schedulers, ClockMock clock) {
this.schedulers = schedulers;
TimeWarp(Iterable<ScheduleTriggerEngineMock> schedulers, ClockMock clock, Logger logger) {
this.schedulers = StreamSupport.stream(schedulers.spliterator(), false).collect(Collectors.toList());
this.clock = clock;
this.logger = logger;
}

public void trigger(String jobName) {
schedulers.forEach(scheduler -> scheduler.trigger(jobName));
trigger(jobName, 1, null);
}

public ClockMock clock() {
return clock;
}

public void trigger(String id, int times, TimeValue timeValue) {
schedulers.forEach(scheduler -> scheduler.trigger(id, times, timeValue));
public void trigger(String watchId, int times, TimeValue timeValue) {
boolean isTriggered = schedulers.stream().anyMatch(scheduler -> scheduler.trigger(watchId, times, timeValue));
String msg = String.format(Locale.ROOT, "could not find watch [%s] to trigger", watchId);
assertThat(msg, isTriggered, is(true));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,13 @@ public boolean remove(String jobId) {
return watches.remove(jobId) != null;
}

public void trigger(String jobName) {
trigger(jobName, 1, null);
public boolean trigger(String jobName) {
return trigger(jobName, 1, null);
}

public void trigger(String jobName, int times) {
trigger(jobName, times, null);
}

public void trigger(String jobName, int times, TimeValue interval) {
public boolean trigger(String jobName, int times, TimeValue interval) {
if (watches.containsKey(jobName) == false) {
logger.trace("not executing job [{}], not found", jobName);
return;
return false;
}

for (int i = 0; i < times; i++) {
Expand All @@ -108,5 +103,7 @@ public void trigger(String jobName, int times, TimeValue interval) {
}
}
}

return true;
}
}