Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions x-pack/docs/en/watcher/actions.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,10 @@ image::images/action-throttling.jpg[align="center"]
You can use the `foreach` field in an action to trigger the configured action
for every element within that array.

In order to protect from long running watches, after one hundred runs with an
foreach loop the execution is gracefully stopped.
In order to protect from long running watches, you can use the `max_iterations`
field to limit the maximum amount of runs that each watch executes. If this limit
is reached, the execution is gracefully stopped. If not set, this field defaults
to one hundred.

[source,js]
--------------------------------------------------
Expand All @@ -224,6 +226,7 @@ PUT _watcher/watch/log_event_watch
"actions" : {
"log_hits" : {
"foreach" : "ctx.payload.hits.hits", <1>
"max_iterations" : 500,
"logging" : {
"text" : "Found id {{ctx.payload._id}} with field {{ctx.payload._source.my_field}}"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@

public class ActionWrapper implements ToXContentObject {

private final int MAXIMUM_FOREACH_RUNS = 100;

private String id;
@Nullable
private final ExecutableCondition condition;
Expand All @@ -58,18 +56,21 @@ public class ActionWrapper implements ToXContentObject {
private final ExecutableAction<? extends Action> action;
@Nullable
private String path;
private final Integer maxIterations;

public ActionWrapper(String id, ActionThrottler throttler,
@Nullable ExecutableCondition condition,
@Nullable ExecutableTransform<Transform, Transform.Result> transform,
ExecutableAction<? extends Action> action,
@Nullable String path) {
@Nullable String path,
@Nullable Integer maxIterations) {
this.id = id;
this.condition = condition;
this.throttler = throttler;
this.transform = transform;
this.action = action;
this.path = path;
this.maxIterations = (maxIterations != null) ? maxIterations : 100;
}

public String id() {
Expand Down Expand Up @@ -177,7 +178,7 @@ public ActionWrapperResult execute(WatchExecutionContext ctx) {
throw new ElasticsearchException("foreach object [{}] was an empty list, could not run any action", path);
} else {
for (Object o : collection) {
if (runs >= MAXIMUM_FOREACH_RUNS) {
if (runs >= maxIterations) {
break;
}
if (o instanceof Map) {
Expand Down Expand Up @@ -216,6 +217,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.endObject();
}
builder.endArray();
builder.field(WatchField.MAX_ITERATIONS.getPreferredName(), maxIterations);
return builder;
}
});
Expand Down Expand Up @@ -279,7 +281,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
if (Strings.isEmpty(path) == false) {
builder.field(WatchField.FOREACH.getPreferredName(), path);
builder.field(WatchField.MAX_ITERATIONS.getPreferredName(), maxIterations);
}

builder.field(action.type(), action, params);
return builder.endObject();
}
Expand All @@ -294,6 +298,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
TimeValue throttlePeriod = null;
String path = null;
ExecutableAction<? extends Action> action = null;
Integer maxIterations = null;

String currentFieldName = null;
XContentParser.Token token;
Expand All @@ -316,6 +321,8 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
throw new ElasticsearchParseException("could not parse action [{}/{}]. failed to parse field [{}] as time value",
pe, watchId, actionId, currentFieldName);
}
} else if (WatchField.MAX_ITERATIONS.match(currentFieldName, parser.getDeprecationHandler())) {
maxIterations = parser.intValue();
} else {
// it's the type of the action
ActionFactory actionFactory = actionRegistry.factory(currentFieldName);
Expand All @@ -332,7 +339,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
}

ActionThrottler throttler = new ActionThrottler(clock, throttlePeriod, licenseState);
return new ActionWrapper(actionId, throttler, condition, transform, action, path);
return new ActionWrapper(actionId, throttler, condition, transform, action, path, maxIterations);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public final class WatchField {
public static final ParseField ACTIONS = new ParseField("actions");
public static final ParseField TRANSFORM = new ParseField("transform");
public static final ParseField FOREACH = new ParseField("foreach");
public static final ParseField MAX_ITERATIONS = new ParseField("max_iterations");
public static final ParseField THROTTLE_PERIOD = new ParseField("throttle_period_in_millis");
public static final ParseField THROTTLE_PERIOD_HUMAN = new ParseField("throttle_period");
public static final ParseField METADATA = new ParseField("metadata");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class ActionWrapperTests extends ESTestCase {
private Watch watch = mock(Watch.class);
@SuppressWarnings("unchecked")
private ExecutableAction<Action> executableAction = mock(ExecutableAction.class);
private ActionWrapper actionWrapper = new ActionWrapper("_action", null, NeverCondition.INSTANCE, null, executableAction, null);
private ActionWrapper actionWrapper = new ActionWrapper("_action", null, NeverCondition.INSTANCE, null, executableAction, null, null);

public void testThatUnmetActionConditionResetsAckStatus() throws Exception {
WatchStatus watchStatus = new WatchStatus(now, Collections.singletonMap("_action", createActionStatus(State.ACKED)));
Expand Down Expand Up @@ -84,7 +84,7 @@ public void testThatMultipleResultsCanBeReturned() throws Exception {
final ExecutableAction<LoggingAction> executableAction =
new ExecutableLoggingAction(loggingAction, logger, new MockTextTemplateEngine());
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
"ctx.payload.my_path");
"ctx.payload.my_path", null);

WatchExecutionContext ctx = mockExecutionContent(watch);
Payload.Simple payload = new Payload.Simple(Map.of("my_path",
Expand All @@ -111,7 +111,7 @@ public void testThatMultipleResultsCanBeReturned() throws Exception {

public void testThatSpecifiedPathIsNotCollection() {
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
"ctx.payload.my_path");
"ctx.payload.my_path", null);
WatchExecutionContext ctx = mockExecutionContent(watch);
Payload.Simple payload = new Payload.Simple(Map.of("my_path", "not a map"));
when(ctx.payload()).thenReturn(payload);
Expand All @@ -127,7 +127,7 @@ public void testThatSpecifiedPathIsNotCollection() {

public void testEmptyCollection() {
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
"ctx.payload.my_path");
"ctx.payload.my_path", null);
WatchExecutionContext ctx = mockExecutionContent(watch);
Payload.Simple payload = new Payload.Simple(Map.of("my_path", Collections.emptyList()));
when(ctx.payload()).thenReturn(payload);
Expand All @@ -143,7 +143,7 @@ public void testEmptyCollection() {

public void testPartialFailure() throws Exception {
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
"ctx.payload.my_path");
"ctx.payload.my_path", null);
WatchExecutionContext ctx = mockExecutionContent(watch);
Payload.Simple payload = new Payload.Simple(Map.of("my_path",
List.of(
Expand All @@ -165,9 +165,9 @@ public void testPartialFailure() throws Exception {
assertThat(result.action().status(), is(Action.Result.Status.PARTIAL_FAILURE));
}

public void testLimitOfNumberOfActionsExecuted() throws Exception {
public void testDefaultLimitOfNumberOfActionsExecuted() throws Exception {
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
"ctx.payload.my_path");
"ctx.payload.my_path", null);
WatchExecutionContext ctx = mockExecutionContent(watch);
List<Map<String, String>> itemsPayload = new ArrayList<>();
for (int i = 0; i < 101; i++) {
Expand All @@ -193,11 +193,49 @@ public void testLimitOfNumberOfActionsExecuted() throws Exception {
assertThat(map.get("foreach"), instanceOf(List.class));
List<Map<String, Object>> actions = (List) map.get("foreach");
assertThat(actions, hasSize(100));
assertThat(map, hasKey("max_iterations"));
assertThat(map.get("max_iterations"), is(100));
assertThat(map, hasKey("number_of_actions_executed"));
assertThat(map.get("number_of_actions_executed"), is(100));
}
}

public void testConfiguredLimitOfNumberOfActionsExecuted() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly this test should not need to changes to any assertions if your max_iterations is 1 or 50 or 1000. Could you grab a random value between 1 and 1000 (org.elasticsearch.test.ESTestCase#randomIntBetween) ? and use that instead of 50 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback @jakelandis, 50 was just an arbitrary number. I'll be pushing the changes soon 🙂

int randomMaxIterations = randomIntBetween(1, 1000);
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
"ctx.payload.my_path", randomMaxIterations);
WatchExecutionContext ctx = mockExecutionContent(watch);
List<Map<String, String>> itemsPayload = new ArrayList<>();
for (int i = 0; i < randomMaxIterations + 1; i++) {
final Action.Result actionResult = new LoggingAction.Result.Success("log_message " + i);;
final Payload singleItemPayload = new Payload.Simple(Map.of("key", String.valueOf(i)));
itemsPayload.add(Map.of("key", String.valueOf(i)));
when(executableAction.execute(eq("_action"), eq(ctx), eq(singleItemPayload))).thenReturn(actionResult);
}

Payload.Simple payload = new Payload.Simple(Map.of("my_path", itemsPayload));
when(ctx.payload()).thenReturn(payload);
when(executableAction.logger()).thenReturn(logger);

ActionWrapperResult result = wrapper.execute(ctx);
assertThat(result.action().status(), is(Action.Result.Status.SUCCESS));

// check that action toXContent contains all the results
try (XContentBuilder builder = jsonBuilder()) {
result.toXContent(builder, ToXContent.EMPTY_PARAMS);
final String json = Strings.toString(builder);
final Map<String, Object> map = XContentHelper.convertToMap(JsonXContent.jsonXContent, json, true);
assertThat(map, hasKey("foreach"));
assertThat(map.get("foreach"), instanceOf(List.class));
List<Map<String, Object>> actions = (List) map.get("foreach");
assertThat(actions, hasSize(randomMaxIterations));
assertThat(map, hasKey("max_iterations"));
assertThat(map.get("max_iterations"), is(randomMaxIterations));
assertThat(map, hasKey("number_of_actions_executed"));
assertThat(map.get("number_of_actions_executed"), is(randomMaxIterations));
}
}

private WatchExecutionContext mockExecutionContent(Watch watch) {
WatchExecutionContext ctx = mock(WatchExecutionContext.class);
when(watch.id()).thenReturn("watchId");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public void testExecute() throws Exception {
when(action.type()).thenReturn("MY_AWESOME_TYPE");
when(action.execute("_action", context, payload)).thenReturn(actionResult);

ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);

WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));

Expand Down Expand Up @@ -313,7 +313,7 @@ public void testExecuteFailedInput() throws Exception {
ExecutableAction action = mock(ExecutableAction.class);
when(action.execute("_action", context, payload)).thenReturn(actionResult);

ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));

when(watch.input()).thenReturn(input);
Expand Down Expand Up @@ -378,7 +378,7 @@ public void testExecuteFailedCondition() throws Exception {
ExecutableAction action = mock(ExecutableAction.class);
when(action.execute("_action", context, payload)).thenReturn(actionResult);

ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));

when(watch.input()).thenReturn(input);
Expand Down Expand Up @@ -442,7 +442,7 @@ public void testExecuteFailedWatchTransform() throws Exception {
ExecutableAction action = mock(ExecutableAction.class);
when(action.execute("_action", context, payload)).thenReturn(actionResult);

ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));

when(watch.input()).thenReturn(input);
Expand Down Expand Up @@ -520,7 +520,7 @@ public void testExecuteFailedActionTransform() throws Exception {
when(action.logger()).thenReturn(logger);
when(action.execute("_action", context, payload)).thenReturn(actionResult);

ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);

WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));

Expand Down Expand Up @@ -600,7 +600,7 @@ public void testExecuteInner() throws Exception {
ExecutableAction action = mock(ExecutableAction.class);
when(action.execute("_action", context, payload)).thenReturn(actionResult);

ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));

Expand Down Expand Up @@ -649,7 +649,7 @@ public void testExecuteInnerThrottled() throws Exception {

ExecutableAction action = mock(ExecutableAction.class);
when(action.type()).thenReturn("_type");
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));

Expand Down Expand Up @@ -712,7 +712,7 @@ public void testExecuteInnerConditionNotMet() throws Exception {

ExecutableAction action = mock(ExecutableAction.class);
when(action.type()).thenReturn("_type");
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));

Expand Down Expand Up @@ -769,7 +769,7 @@ public void testExecuteInnerConditionNotMetDueToException() throws Exception {
ExecutableAction action = mock(ExecutableAction.class);
when(action.type()).thenReturn("_type");
when(action.logger()).thenReturn(logger);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));

Expand Down Expand Up @@ -817,7 +817,7 @@ public void testExecuteConditionNotMet() throws Exception {
ExecutableCondition actionCondition = mock(ExecutableCondition.class);
ExecutableTransform actionTransform = mock(ExecutableTransform.class);
ExecutableAction action = mock(ExecutableAction.class);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);

ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));
Expand Down Expand Up @@ -946,7 +946,7 @@ public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exce
when(action.type()).thenReturn("MY_AWESOME_TYPE");
when(action.execute("_action", context, payload)).thenReturn(actionResult);

ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, null, null, action, null);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, null, null, action, null, null);

WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));

Expand Down
Loading