Skip to content

Commit 859e387

Browse files
jbonn360jakelandis
authored andcommitted
Watcher max_iterations with foreach action execution (#45715)
Prior to this commit the foreach action execution had a hard coded limit to 100 iterations. This commit allows the max number of iterations to be a configuration ('max_iterations') on the foreach action. The default remains 100.
1 parent e85dcb4 commit 859e387

File tree

7 files changed

+81
-31
lines changed

7 files changed

+81
-31
lines changed

x-pack/docs/en/watcher/actions.asciidoc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,10 @@ image::images/action-throttling.jpg[align="center"]
198198
You can use the `foreach` field in an action to trigger the configured action
199199
for every element within that array.
200200

201-
In order to protect from long running watches, after one hundred runs with an
202-
foreach loop the execution is gracefully stopped.
201+
In order to protect from long running watches, you can use the `max_iterations`
202+
field to limit the maximum amount of runs that each watch executes. If this limit
203+
is reached, the execution is gracefully stopped. If not set, this field defaults
204+
to one hundred.
203205

204206
[source,js]
205207
--------------------------------------------------
@@ -224,6 +226,7 @@ PUT _watcher/watch/log_event_watch
224226
"actions" : {
225227
"log_hits" : {
226228
"foreach" : "ctx.payload.hits.hits", <1>
229+
"max_iterations" : 500,
227230
"logging" : {
228231
"text" : "Found id {{ctx.payload._id}} with field {{ctx.payload._source.my_field}}"
229232
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,6 @@
4747

4848
public class ActionWrapper implements ToXContentObject {
4949

50-
private final int MAXIMUM_FOREACH_RUNS = 100;
51-
5250
private String id;
5351
@Nullable
5452
private final ExecutableCondition condition;
@@ -58,18 +56,21 @@ public class ActionWrapper implements ToXContentObject {
5856
private final ExecutableAction<? extends Action> action;
5957
@Nullable
6058
private String path;
59+
private final Integer maxIterations;
6160

6261
public ActionWrapper(String id, ActionThrottler throttler,
6362
@Nullable ExecutableCondition condition,
6463
@Nullable ExecutableTransform<Transform, Transform.Result> transform,
6564
ExecutableAction<? extends Action> action,
66-
@Nullable String path) {
65+
@Nullable String path,
66+
@Nullable Integer maxIterations) {
6767
this.id = id;
6868
this.condition = condition;
6969
this.throttler = throttler;
7070
this.transform = transform;
7171
this.action = action;
7272
this.path = path;
73+
this.maxIterations = (maxIterations != null) ? maxIterations : 100;
7374
}
7475

7576
public String id() {
@@ -177,7 +178,7 @@ public ActionWrapperResult execute(WatchExecutionContext ctx) {
177178
throw new ElasticsearchException("foreach object [{}] was an empty list, could not run any action", path);
178179
} else {
179180
for (Object o : collection) {
180-
if (runs >= MAXIMUM_FOREACH_RUNS) {
181+
if (runs >= maxIterations) {
181182
break;
182183
}
183184
if (o instanceof Map) {
@@ -216,6 +217,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
216217
builder.endObject();
217218
}
218219
builder.endArray();
220+
builder.field(WatchField.MAX_ITERATIONS.getPreferredName(), maxIterations);
219221
return builder;
220222
}
221223
});
@@ -279,7 +281,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
279281
}
280282
if (Strings.isEmpty(path) == false) {
281283
builder.field(WatchField.FOREACH.getPreferredName(), path);
284+
builder.field(WatchField.MAX_ITERATIONS.getPreferredName(), maxIterations);
282285
}
286+
283287
builder.field(action.type(), action, params);
284288
return builder.endObject();
285289
}
@@ -294,6 +298,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
294298
TimeValue throttlePeriod = null;
295299
String path = null;
296300
ExecutableAction<? extends Action> action = null;
301+
Integer maxIterations = null;
297302

298303
String currentFieldName = null;
299304
XContentParser.Token token;
@@ -316,6 +321,8 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
316321
throw new ElasticsearchParseException("could not parse action [{}/{}]. failed to parse field [{}] as time value",
317322
pe, watchId, actionId, currentFieldName);
318323
}
324+
} else if (WatchField.MAX_ITERATIONS.match(currentFieldName, parser.getDeprecationHandler())) {
325+
maxIterations = parser.intValue();
319326
} else {
320327
// it's the type of the action
321328
ActionFactory actionFactory = actionRegistry.factory(currentFieldName);
@@ -332,7 +339,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse
332339
}
333340

334341
ActionThrottler throttler = new ActionThrottler(clock, throttlePeriod, licenseState);
335-
return new ActionWrapper(actionId, throttler, condition, transform, action, path);
342+
return new ActionWrapper(actionId, throttler, condition, transform, action, path, maxIterations);
336343
}
337344

338345
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public final class WatchField {
1414
public static final ParseField ACTIONS = new ParseField("actions");
1515
public static final ParseField TRANSFORM = new ParseField("transform");
1616
public static final ParseField FOREACH = new ParseField("foreach");
17+
public static final ParseField MAX_ITERATIONS = new ParseField("max_iterations");
1718
public static final ParseField THROTTLE_PERIOD = new ParseField("throttle_period_in_millis");
1819
public static final ParseField THROTTLE_PERIOD_HUMAN = new ParseField("throttle_period");
1920
public static final ParseField METADATA = new ParseField("metadata");

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public class ActionWrapperTests extends ESTestCase {
5454
private Watch watch = mock(Watch.class);
5555
@SuppressWarnings("unchecked")
5656
private ExecutableAction<Action> executableAction = mock(ExecutableAction.class);
57-
private ActionWrapper actionWrapper = new ActionWrapper("_action", null, NeverCondition.INSTANCE, null, executableAction, null);
57+
private ActionWrapper actionWrapper = new ActionWrapper("_action", null, NeverCondition.INSTANCE, null, executableAction, null, null);
5858

5959
public void testThatUnmetActionConditionResetsAckStatus() throws Exception {
6060
WatchStatus watchStatus = new WatchStatus(now, Collections.singletonMap("_action", createActionStatus(State.ACKED)));
@@ -84,7 +84,7 @@ public void testThatMultipleResultsCanBeReturned() throws Exception {
8484
final ExecutableAction<LoggingAction> executableAction =
8585
new ExecutableLoggingAction(loggingAction, logger, new MockTextTemplateEngine());
8686
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
87-
"ctx.payload.my_path");
87+
"ctx.payload.my_path", null);
8888

8989
WatchExecutionContext ctx = mockExecutionContent(watch);
9090
Payload.Simple payload = new Payload.Simple(Map.of("my_path",
@@ -111,7 +111,7 @@ public void testThatMultipleResultsCanBeReturned() throws Exception {
111111

112112
public void testThatSpecifiedPathIsNotCollection() {
113113
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
114-
"ctx.payload.my_path");
114+
"ctx.payload.my_path", null);
115115
WatchExecutionContext ctx = mockExecutionContent(watch);
116116
Payload.Simple payload = new Payload.Simple(Map.of("my_path", "not a map"));
117117
when(ctx.payload()).thenReturn(payload);
@@ -127,7 +127,7 @@ public void testThatSpecifiedPathIsNotCollection() {
127127

128128
public void testEmptyCollection() {
129129
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
130-
"ctx.payload.my_path");
130+
"ctx.payload.my_path", null);
131131
WatchExecutionContext ctx = mockExecutionContent(watch);
132132
Payload.Simple payload = new Payload.Simple(Map.of("my_path", Collections.emptyList()));
133133
when(ctx.payload()).thenReturn(payload);
@@ -143,7 +143,7 @@ public void testEmptyCollection() {
143143

144144
public void testPartialFailure() throws Exception {
145145
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
146-
"ctx.payload.my_path");
146+
"ctx.payload.my_path", null);
147147
WatchExecutionContext ctx = mockExecutionContent(watch);
148148
Payload.Simple payload = new Payload.Simple(Map.of("my_path",
149149
List.of(
@@ -165,9 +165,9 @@ public void testPartialFailure() throws Exception {
165165
assertThat(result.action().status(), is(Action.Result.Status.PARTIAL_FAILURE));
166166
}
167167

168-
public void testLimitOfNumberOfActionsExecuted() throws Exception {
168+
public void testDefaultLimitOfNumberOfActionsExecuted() throws Exception {
169169
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
170-
"ctx.payload.my_path");
170+
"ctx.payload.my_path", null);
171171
WatchExecutionContext ctx = mockExecutionContent(watch);
172172
List<Map<String, String>> itemsPayload = new ArrayList<>();
173173
for (int i = 0; i < 101; i++) {
@@ -193,11 +193,49 @@ public void testLimitOfNumberOfActionsExecuted() throws Exception {
193193
assertThat(map.get("foreach"), instanceOf(List.class));
194194
List<Map<String, Object>> actions = (List) map.get("foreach");
195195
assertThat(actions, hasSize(100));
196+
assertThat(map, hasKey("max_iterations"));
197+
assertThat(map.get("max_iterations"), is(100));
196198
assertThat(map, hasKey("number_of_actions_executed"));
197199
assertThat(map.get("number_of_actions_executed"), is(100));
198200
}
199201
}
200202

203+
public void testConfiguredLimitOfNumberOfActionsExecuted() throws Exception {
204+
int randomMaxIterations = randomIntBetween(1, 1000);
205+
ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction,
206+
"ctx.payload.my_path", randomMaxIterations);
207+
WatchExecutionContext ctx = mockExecutionContent(watch);
208+
List<Map<String, String>> itemsPayload = new ArrayList<>();
209+
for (int i = 0; i < randomMaxIterations + 1; i++) {
210+
final Action.Result actionResult = new LoggingAction.Result.Success("log_message " + i);;
211+
final Payload singleItemPayload = new Payload.Simple(Map.of("key", String.valueOf(i)));
212+
itemsPayload.add(Map.of("key", String.valueOf(i)));
213+
when(executableAction.execute(eq("_action"), eq(ctx), eq(singleItemPayload))).thenReturn(actionResult);
214+
}
215+
216+
Payload.Simple payload = new Payload.Simple(Map.of("my_path", itemsPayload));
217+
when(ctx.payload()).thenReturn(payload);
218+
when(executableAction.logger()).thenReturn(logger);
219+
220+
ActionWrapperResult result = wrapper.execute(ctx);
221+
assertThat(result.action().status(), is(Action.Result.Status.SUCCESS));
222+
223+
// check that action toXContent contains all the results
224+
try (XContentBuilder builder = jsonBuilder()) {
225+
result.toXContent(builder, ToXContent.EMPTY_PARAMS);
226+
final String json = Strings.toString(builder);
227+
final Map<String, Object> map = XContentHelper.convertToMap(JsonXContent.jsonXContent, json, true);
228+
assertThat(map, hasKey("foreach"));
229+
assertThat(map.get("foreach"), instanceOf(List.class));
230+
List<Map<String, Object>> actions = (List) map.get("foreach");
231+
assertThat(actions, hasSize(randomMaxIterations));
232+
assertThat(map, hasKey("max_iterations"));
233+
assertThat(map.get("max_iterations"), is(randomMaxIterations));
234+
assertThat(map, hasKey("number_of_actions_executed"));
235+
assertThat(map.get("number_of_actions_executed"), is(randomMaxIterations));
236+
}
237+
}
238+
201239
private WatchExecutionContext mockExecutionContent(Watch watch) {
202240
WatchExecutionContext ctx = mock(WatchExecutionContext.class);
203241
when(watch.id()).thenReturn("watchId");

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ public void testExecute() throws Exception {
227227
when(action.type()).thenReturn("MY_AWESOME_TYPE");
228228
when(action.execute("_action", context, payload)).thenReturn(actionResult);
229229

230-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
230+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
231231

232232
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
233233

@@ -313,7 +313,7 @@ public void testExecuteFailedInput() throws Exception {
313313
ExecutableAction action = mock(ExecutableAction.class);
314314
when(action.execute("_action", context, payload)).thenReturn(actionResult);
315315

316-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
316+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
317317
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
318318

319319
when(watch.input()).thenReturn(input);
@@ -378,7 +378,7 @@ public void testExecuteFailedCondition() throws Exception {
378378
ExecutableAction action = mock(ExecutableAction.class);
379379
when(action.execute("_action", context, payload)).thenReturn(actionResult);
380380

381-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
381+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
382382
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
383383

384384
when(watch.input()).thenReturn(input);
@@ -442,7 +442,7 @@ public void testExecuteFailedWatchTransform() throws Exception {
442442
ExecutableAction action = mock(ExecutableAction.class);
443443
when(action.execute("_action", context, payload)).thenReturn(actionResult);
444444

445-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
445+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
446446
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
447447

448448
when(watch.input()).thenReturn(input);
@@ -520,7 +520,7 @@ public void testExecuteFailedActionTransform() throws Exception {
520520
when(action.logger()).thenReturn(logger);
521521
when(action.execute("_action", context, payload)).thenReturn(actionResult);
522522

523-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
523+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
524524

525525
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
526526

@@ -600,7 +600,7 @@ public void testExecuteInner() throws Exception {
600600
ExecutableAction action = mock(ExecutableAction.class);
601601
when(action.execute("_action", context, payload)).thenReturn(actionResult);
602602

603-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
603+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
604604
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
605605
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));
606606

@@ -649,7 +649,7 @@ public void testExecuteInnerThrottled() throws Exception {
649649

650650
ExecutableAction action = mock(ExecutableAction.class);
651651
when(action.type()).thenReturn("_type");
652-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
652+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
653653
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
654654
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));
655655

@@ -712,7 +712,7 @@ public void testExecuteInnerConditionNotMet() throws Exception {
712712

713713
ExecutableAction action = mock(ExecutableAction.class);
714714
when(action.type()).thenReturn("_type");
715-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
715+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
716716
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
717717
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));
718718

@@ -769,7 +769,7 @@ public void testExecuteInnerConditionNotMetDueToException() throws Exception {
769769
ExecutableAction action = mock(ExecutableAction.class);
770770
when(action.type()).thenReturn("_type");
771771
when(action.logger()).thenReturn(logger);
772-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
772+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
773773
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
774774
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));
775775

@@ -817,7 +817,7 @@ public void testExecuteConditionNotMet() throws Exception {
817817
ExecutableCondition actionCondition = mock(ExecutableCondition.class);
818818
ExecutableTransform actionTransform = mock(ExecutableTransform.class);
819819
ExecutableAction action = mock(ExecutableAction.class);
820-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null);
820+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null, null);
821821

822822
ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC);
823823
WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now)));
@@ -946,7 +946,7 @@ public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exce
946946
when(action.type()).thenReturn("MY_AWESOME_TYPE");
947947
when(action.execute("_action", context, payload)).thenReturn(actionResult);
948948

949-
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, null, null, action, null);
949+
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, null, null, action, null, null);
950950

951951
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
952952

0 commit comments

Comments
 (0)