From 54efef595065456be3b3be10d3983236126db8fd Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Thu, 9 May 2019 11:53:38 +0200 Subject: [PATCH 01/11] Watcher: Allow to execute actions for each element in array This adds the ability to execute an action for each element that occurs in an array, for example you could sent a dedicated slack action for each search hit returned from a search. Relates #34546 --- x-pack/docs/en/watcher/actions.asciidoc | 42 ++++++ .../core/watcher/actions/ActionWrapper.java | 86 ++++++++++-- .../watcher/client/WatchSourceBuilder.java | 17 ++- .../WatcherIndexTemplateRegistryField.java | 3 +- .../xpack/core/watcher/watch/WatchField.java | 1 + .../src/main/resources/watch-history.json | 4 + .../test/watcher/execute_watch/80_foreach.yml | 55 ++++++++ .../watcher/actions/ActionWrapperTests.java | 124 +++++++++++++++++- .../execution/ExecutionServiceTests.java | 22 ++-- .../xpack/watcher/test/WatcherTestUtils.java | 4 +- .../xpack/watcher/watch/WatchTests.java | 8 +- 11 files changed, 336 insertions(+), 30 deletions(-) create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml diff --git a/x-pack/docs/en/watcher/actions.asciidoc b/x-pack/docs/en/watcher/actions.asciidoc index 34bdd2513a4d4..68a106cf66844 100644 --- a/x-pack/docs/en/watcher/actions.asciidoc +++ b/x-pack/docs/en/watcher/actions.asciidoc @@ -192,6 +192,48 @@ of a watch during its execution: image::images/action-throttling.jpg[align="center"] +[[action-foreach]] +=== Running an action for each element in an array + +You can use the `foreach` field in an action to trigger the configured action +for every element within that array. Each element within that array has to be a +map. + +[source,js] +-------------------------------------------------- +PUT _watcher/watch/log_event_watch +{ + "trigger" : { + "schedule" : { "interval" : "5m" } + }, + "input" : { + "search" : { + "request" : { + "indices" : "log-events", + "body" : { + "size" : 0, + "query" : { "match" : { "status" : "error" } } + } + } + } + }, + "condition" : { + "compare" : { "ctx.payload.hits.total.value" : { "gt" : 0 } } + }, + "actions" : { + "log_hits" : { + "foreach" : "ctx.payload.hits.hits", <1> + "logging" : { + "text" : "Found id {{_id}} with field {{_source.my_field}}" + } + } + } +} +-------------------------------------------------- +// CONSOLE + +<1> The logging statement will be executed for each of the returned search hits. + [[action-conditions]] === Adding conditions to actions diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java index eec08453498ab..50aea85009011 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java @@ -7,9 +7,12 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ObjectPath; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -30,7 +33,13 @@ import java.time.Clock; import java.time.ZoneOffset; import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; @@ -43,16 +52,20 @@ public class ActionWrapper implements ToXContentObject { private final ExecutableTransform transform; private final ActionThrottler throttler; private final ExecutableAction action; + @Nullable + private String path; public ActionWrapper(String id, ActionThrottler throttler, @Nullable ExecutableCondition condition, @Nullable ExecutableTransform transform, - ExecutableAction action) { + ExecutableAction action, + @Nullable String path) { this.id = id; this.condition = condition; this.throttler = throttler; this.transform = transform; this.action = action; + this.path = path; } public String id() { @@ -140,13 +153,64 @@ public ActionWrapperResult execute(WatchExecutionContext ctx) { return new ActionWrapperResult(id, conditionResult, null, new Action.Result.FailureWithException(action.type(), e)); } } - try { - Action.Result actionResult = action.execute(id, ctx, payload); - return new ActionWrapperResult(id, conditionResult, transformResult, actionResult); - } catch (Exception e) { - action.logger().error( + if (Strings.isEmpty(path)) { + try { + Action.Result actionResult = action.execute(id, ctx, payload); + return new ActionWrapperResult(id, conditionResult, transformResult, actionResult); + } catch (Exception e) { + action.logger().error( + (Supplier) () -> new ParameterizedMessage("failed to execute action [{}/{}]", ctx.watch().id(), id), e); + return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e)); + } + } else { + try { + List results = new ArrayList<>(); + Object object = ObjectPath.eval(path, ctx.payload().data()); + if (object instanceof Collection) { + Collection collection = Collection.class.cast(object); + if (collection.isEmpty()) { + throw new ElasticsearchException("foreach object [{}] was an empty list, could not run any action", path); + } else { + for (Object o : collection) { + if (o instanceof Map) { + results.add(action.execute(id, ctx, new Payload.Simple((Map) o))); + } else { + throw new ElasticsearchException("item in foreach [{}] object was not a map", path); + } + } + } + } else { + throw new ElasticsearchException("specified foreach object was not a an array/collection: [{}]", path); + } + + // check if we have mixed results, then set to partial failure + final Set statuses = results.stream().map(Action.Result::status).collect(Collectors.toSet()); + Action.Result.Status status; + if (statuses.size() == 1) { + status = statuses.iterator().next(); + } else { + status = Action.Result.Status.PARTIAL_FAILURE; + } + + return new ActionWrapperResult(id, conditionResult, transformResult, + new Action.Result(action.type(), status) { + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray(WatchField.FOREACH.getPreferredName()); + for (Action.Result result : results) { + builder.startObject(); + result.toXContent(builder, params); + builder.endObject(); + } + builder.endArray(); + return builder; + } + }); + } catch (Exception e) { + action.logger().error( (Supplier) () -> new ParameterizedMessage("failed to execute action [{}/{}]", ctx.watch().id(), id), e); - return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e)); + return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e)); + } } } @@ -186,6 +250,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws .field(transform.type(), transform, params) .endObject(); } + if (Strings.isEmpty(path) == false) { + builder.field(WatchField.FOREACH.getPreferredName(), path); + } builder.field(action.type(), action, params); return builder.endObject(); } @@ -198,6 +265,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse ExecutableCondition condition = null; ExecutableTransform transform = null; TimeValue throttlePeriod = null; + String path = null; ExecutableAction action = null; String currentFieldName = null; @@ -208,6 +276,8 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse } else { if (WatchField.CONDITION.match(currentFieldName, parser.getDeprecationHandler())) { condition = actionRegistry.getConditionRegistry().parseExecutable(watchId, parser); + } else if (WatchField.FOREACH.match(currentFieldName, parser.getDeprecationHandler())) { + path = parser.text(); } else if (Transform.TRANSFORM.match(currentFieldName, parser.getDeprecationHandler())) { transform = actionRegistry.getTransformRegistry().parse(watchId, parser); } else if (ThrottlerField.THROTTLE_PERIOD.match(currentFieldName, parser.getDeprecationHandler())) { @@ -235,7 +305,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse } ActionThrottler throttler = new ActionThrottler(clock, throttlePeriod, licenseState); - return new ActionWrapper(actionId, throttler, condition, transform, action); + return new ActionWrapper(actionId, throttler, condition, transform, action, path); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/client/WatchSourceBuilder.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/client/WatchSourceBuilder.java index 001a430ddb1e6..12eb6a12ee2ad 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/client/WatchSourceBuilder.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/client/WatchSourceBuilder.java @@ -101,7 +101,7 @@ public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Transfo } public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Transform transform, Action action) { - actions.put(id, new TransformedAction(id, action, throttlePeriod, null, transform)); + actions.put(id, new TransformedAction(id, action, throttlePeriod, null, transform, null)); return this; } @@ -111,7 +111,13 @@ public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Conditi } public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Condition condition, Transform transform, Action action) { - actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform)); + actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform, null)); + return this; + } + + public WatchSourceBuilder addAction(String id, TimeValue throttlePeriod, Condition condition, Transform transform, String path, + Action action) { + actions.put(id, new TransformedAction(id, action, throttlePeriod, condition, transform, path)); return this; } @@ -186,16 +192,18 @@ public final BytesReference buildAsBytes(XContentType contentType) { static class TransformedAction implements ToXContentObject { private final Action action; + @Nullable private String path; @Nullable private final TimeValue throttlePeriod; @Nullable private final Condition condition; @Nullable private final Transform transform; TransformedAction(String id, Action action, @Nullable TimeValue throttlePeriod, - @Nullable Condition condition, @Nullable Transform transform) { + @Nullable Condition condition, @Nullable Transform transform, @Nullable String path) { this.throttlePeriod = throttlePeriod; this.condition = condition; this.transform = transform; this.action = action; + this.path = path; } @Override @@ -215,6 +223,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws .field(transform.type(), transform, params) .endObject(); } + if (path != null) { + builder.field("foreach", path); + } builder.field(action.type(), action, params); return builder.endObject(); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/WatcherIndexTemplateRegistryField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/WatcherIndexTemplateRegistryField.java index 4007b06ee7eca..821ea9a433524 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/WatcherIndexTemplateRegistryField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/WatcherIndexTemplateRegistryField.java @@ -14,8 +14,9 @@ public final class WatcherIndexTemplateRegistryField { // version 7: add full exception stack traces for better debugging // version 8: fix slack attachment property not to be dynamic, causing field type issues // version 9: add a user field defining which user executed the watch + // version 10: add support for foreach path in actions // Note: if you change this, also inform the kibana team around the watcher-ui - public static final String INDEX_TEMPLATE_VERSION = "9"; + public static final String INDEX_TEMPLATE_VERSION = "10"; public static final String HISTORY_TEMPLATE_NAME = ".watch-history-" + INDEX_TEMPLATE_VERSION; public static final String HISTORY_TEMPLATE_NAME_NO_ILM = ".watch-history-no-ilm-" + INDEX_TEMPLATE_VERSION; public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java index 6f6a1955927d9..1bcb62447bf76 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java @@ -13,6 +13,7 @@ public final class WatchField { public static final ParseField CONDITION = new ParseField("condition"); public static final ParseField ACTIONS = new ParseField("actions"); public static final ParseField TRANSFORM = new ParseField("transform"); + public static final ParseField FOREACH = new ParseField("foreach"); public static final ParseField THROTTLE_PERIOD = new ParseField("throttle_period_in_millis"); public static final ParseField THROTTLE_PERIOD_HUMAN = new ParseField("throttle_period"); public static final ParseField METADATA = new ParseField("metadata"); diff --git a/x-pack/plugin/core/src/main/resources/watch-history.json b/x-pack/plugin/core/src/main/resources/watch-history.json index 9c5919f13a171..02a2a789df7bd 100644 --- a/x-pack/plugin/core/src/main/resources/watch-history.json +++ b/x-pack/plugin/core/src/main/resources/watch-history.json @@ -264,6 +264,10 @@ "reason" : { "type" : "keyword" }, + "foreach" : { + "type": "object", + "enabled" : false + }, "email": { "type": "object", "dynamic": true, diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml new file mode 100644 index 0000000000000..74500a9722c24 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml @@ -0,0 +1,55 @@ +--- +setup: + - do: + cluster.health: + wait_for_status: yellow + +--- +teardown: + - do: + watcher.delete_watch: + id: "test_watch" + ignore: 404 + +--- +"Test execute watch api with foreach action": + - do: + watcher.execute_watch: + body: > + { + "watch" : { + "trigger": { + "schedule" : { "cron" : "0 0 0 1 * ? 2099" } + }, + "input": { + "simple": { + "my_array" : [ + { "_doc" : { "_index" : "my_test_index", "_id" : "first", "key" : "first" } }, + { "_doc" : { "_index" : "my_test_index", "_id" : "second", "key" : "second" } } + ] + } + }, + "actions": { + "indexme" : { + "foreach" : "my_array", + "index" : {} + } + } + } + } + + - match: { watch_record.trigger_event.type: "manual" } + - match: { watch_record.state: "executed" } + - match: { watch_record.status.execution_state: "executed" } + + - do: + mget: + body: + docs: + - { _index: my_test_index, _id: "first"} + - { _index: my_test_index, _id: "second"} + + - is_true: docs.0.found + - match: { docs.0._source: { "key": "first" }} + - is_true: docs.1.found + - match: { docs.1._source: { "key": "second" }} diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java index e3a41062caee5..dbd1ffa4154c2 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java @@ -5,6 +5,11 @@ */ package org.elasticsearch.xpack.watcher.actions; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.watcher.actions.Action; import org.elasticsearch.xpack.core.watcher.actions.ActionStatus; @@ -13,16 +18,24 @@ import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult; import org.elasticsearch.xpack.core.watcher.actions.ExecutableAction; import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext; +import org.elasticsearch.xpack.core.watcher.watch.Payload; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.core.watcher.watch.WatchStatus; +import org.elasticsearch.xpack.watcher.actions.logging.LoggingAction; +import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition; import org.elasticsearch.xpack.watcher.condition.NeverCondition; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -34,7 +47,7 @@ public class ActionWrapperTests extends ESTestCase { private Watch watch = mock(Watch.class); @SuppressWarnings("unchecked") private ExecutableAction executableAction = mock(ExecutableAction.class); - private ActionWrapper actionWrapper = new ActionWrapper("_action", null, NeverCondition.INSTANCE, null, executableAction); + private ActionWrapper actionWrapper = new ActionWrapper("_action", null, NeverCondition.INSTANCE, null, executableAction, null); public void testThatUnmetActionConditionResetsAckStatus() throws Exception { WatchStatus watchStatus = new WatchStatus(now, Collections.singletonMap("_action", createActionStatus(State.ACKED))); @@ -59,6 +72,115 @@ public void testOtherActionsAreNotAffectedOnActionConditionReset() throws Except assertThat(watch.status().actionStatus("other").ackStatus().state(), is(otherState)); } + public void testThatMultipleResultsCanBeReturned() throws Exception { + ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, "my_path"); + WatchExecutionContext ctx = mockExecutionContent(watch); + Payload.Simple payload = new Payload.Simple(Map.of("my_path", + List.of( + Map.of("key", "first"), + Map.of("key", "second"), + Map.of("key", "third") + ))); + when(ctx.payload()).thenReturn(payload); + when(executableAction.logger()).thenReturn(logger); + + final Action.Result firstResult = new Action.Result.Failure("MY_TYPE", "first reason"); + final Payload firstPayload = new Payload.Simple(Map.of("key", "first")); + when(executableAction.execute(eq("_action"), eq(ctx), eq(firstPayload))).thenReturn(firstResult); + + final Action.Result secondResult = new Action.Result.Failure("MY_TYPE", "second reason"); + final Payload secondPayload = new Payload.Simple(Map.of("key", "second")); + when(executableAction.execute(eq("_action"), eq(ctx), eq(secondPayload))).thenReturn(secondResult); + + final Action.Result thirdResult = new Action.Result.Failure("MY_TYPE", "third reason"); + final Payload thirdPayload = new Payload.Simple(Map.of("key", "third")); + when(executableAction.execute(eq("_action"), eq(ctx), eq(thirdPayload))).thenReturn(thirdResult); + + ActionWrapperResult result = wrapper.execute(ctx); + assertThat(result.action().status(), is(Action.Result.Status.SUCCESS)); + // check that action toXContent contains all the results + try (XContentBuilder builder = jsonBuilder()) { + result.toXContent(builder, ToXContent.EMPTY_PARAMS); + final String json = Strings.toString(builder); + final Map map = XContentHelper.convertToMap(JsonXContent.jsonXContent, json, true); + assertThat(map, hasKey("actions")); + assertThat(map.get("actions"), instanceOf(List.class)); + List> actions = (List) map.get("actions"); + assertThat(actions, hasSize(3)); + } + } + + public void testThatPathElementIsntInstanceOfMap() throws Exception { + ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, "my_path"); + WatchExecutionContext ctx = mockExecutionContent(watch); + Payload.Simple payload = new Payload.Simple(Map.of("my_path", List.of("first", "second", "third"))); + when(ctx.payload()).thenReturn(payload); + when(executableAction.logger()).thenReturn(logger); + + final Action.Result actionResult = new Action.Result.Failure("MY_TYPE", "first reason"); + final Payload actionPayload = new Payload.Simple(Map.of("key", "first")); + when(executableAction.execute(eq("_action"), eq(ctx), eq(actionPayload))).thenReturn(actionResult); + + ActionWrapperResult result = wrapper.execute(ctx); + assertThat(result.action().status(), is(Action.Result.Status.FAILURE)); + assertThat(result.action(), instanceOf(Action.Result.FailureWithException.class)); + Action.Result.FailureWithException failureWithException = (Action.Result.FailureWithException) result.action(); + assertThat(failureWithException.getException().getMessage(), is("item in foreach [my_path] object was not a map")); + } + + public void testThatSpecifiedPathIsACollection() { + ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, "my_path"); + WatchExecutionContext ctx = mockExecutionContent(watch); + Payload.Simple payload = new Payload.Simple(Map.of("my_path", "not a map")); + when(ctx.payload()).thenReturn(payload); + when(executableAction.logger()).thenReturn(logger); + + ActionWrapperResult result = wrapper.execute(ctx); + assertThat(result.action().status(), is(Action.Result.Status.FAILURE)); + assertThat(result.action(), instanceOf(Action.Result.FailureWithException.class)); + Action.Result.FailureWithException failureWithException = (Action.Result.FailureWithException) result.action(); + assertThat(failureWithException.getException().getMessage(), + is("specified foreach object was not a an array/collection: [my_path]")); + } + + public void testEmptyCollection() { + ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, "my_path"); + WatchExecutionContext ctx = mockExecutionContent(watch); + Payload.Simple payload = new Payload.Simple(Map.of("my_path", Collections.emptyList())); + when(ctx.payload()).thenReturn(payload); + when(executableAction.logger()).thenReturn(logger); + + ActionWrapperResult result = wrapper.execute(ctx); + assertThat(result.action().status(), is(Action.Result.Status.FAILURE)); + assertThat(result.action(), instanceOf(Action.Result.FailureWithException.class)); + Action.Result.FailureWithException failureWithException = (Action.Result.FailureWithException) result.action(); + assertThat(failureWithException.getException().getMessage(), + is("foreach object [my_path] was an empty list, could not run any action")); + } + + public void testPartialFailure() throws Exception { + ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, "my_path"); + WatchExecutionContext ctx = mockExecutionContent(watch); + Payload.Simple payload = new Payload.Simple(Map.of("my_path", + List.of( + Map.of("key", "first"), + Map.of("key", "second") + ))); + when(ctx.payload()).thenReturn(payload); + when(executableAction.logger()).thenReturn(logger); + + final Action.Result firstResult = new LoggingAction.Result.Success("log_message");; + final Payload firstPayload = new Payload.Simple(Map.of("key", "first")); + when(executableAction.execute(eq("_action"), eq(ctx), eq(firstPayload))).thenReturn(firstResult); + + final Action.Result secondResult = new Action.Result.Failure("MY_TYPE", "second reason"); + final Payload secondPayload = new Payload.Simple(Map.of("key", "second")); + when(executableAction.execute(eq("_action"), eq(ctx), eq(secondPayload))).thenReturn(secondResult); + + ActionWrapperResult result = wrapper.execute(ctx); + assertThat(result.action().status(), is(Action.Result.Status.PARTIAL_FAILURE)); + } + private WatchExecutionContext mockExecutionContent(Watch watch) { WatchExecutionContext ctx = mock(WatchExecutionContext.class); when(watch.id()).thenReturn("watchId"); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java index 80cb657a5762e..9f12bb1b1081c 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java @@ -228,7 +228,7 @@ public void testExecute() throws Exception { when(action.type()).thenReturn("MY_AWESOME_TYPE"); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now))); @@ -314,7 +314,7 @@ public void testExecuteFailedInput() throws Exception { ExecutableAction action = mock(ExecutableAction.class); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now))); when(watch.input()).thenReturn(input); @@ -379,7 +379,7 @@ public void testExecuteFailedCondition() throws Exception { ExecutableAction action = mock(ExecutableAction.class); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now))); when(watch.input()).thenReturn(input); @@ -443,7 +443,7 @@ public void testExecuteFailedWatchTransform() throws Exception { ExecutableAction action = mock(ExecutableAction.class); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now))); when(watch.input()).thenReturn(input); @@ -521,7 +521,7 @@ public void testExecuteFailedActionTransform() throws Exception { when(action.logger()).thenReturn(logger); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now))); @@ -601,7 +601,7 @@ public void testExecuteInner() throws Exception { ExecutableAction action = mock(ExecutableAction.class); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC); WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now))); @@ -650,7 +650,7 @@ public void testExecuteInnerThrottled() throws Exception { ExecutableAction action = mock(ExecutableAction.class); when(action.type()).thenReturn("_type"); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC); WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now))); @@ -713,7 +713,7 @@ public void testExecuteInnerConditionNotMet() throws Exception { ExecutableAction action = mock(ExecutableAction.class); when(action.type()).thenReturn("_type"); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC); WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now))); @@ -770,7 +770,7 @@ public void testExecuteInnerConditionNotMetDueToException() throws Exception { ExecutableAction action = mock(ExecutableAction.class); when(action.type()).thenReturn("_type"); when(action.logger()).thenReturn(logger); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC); WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now))); @@ -818,7 +818,7 @@ public void testExecuteConditionNotMet() throws Exception { ExecutableCondition actionCondition = mock(ExecutableCondition.class); ExecutableTransform actionTransform = mock(ExecutableTransform.class); ExecutableAction action = mock(ExecutableAction.class); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action, null); ZonedDateTime time = clock.instant().atZone(ZoneOffset.UTC); WatchStatus watchStatus = new WatchStatus(time, singletonMap("_action", new ActionStatus(now))); @@ -947,7 +947,7 @@ public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exce when(action.type()).thenReturn("MY_AWESOME_TYPE"); when(action.execute("_action", context, payload)).thenReturn(actionResult); - ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, null, null, action); + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, null, null, action, null); WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now))); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java index 9636d159b52b4..aa1231baa17a4 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java @@ -158,7 +158,7 @@ public static Watch createTestWatch(String watchName, Client client, HttpClient httpRequest.path(new TextTemplate("/foobarbaz/{{ctx.watch_id}}")); httpRequest.body(new TextTemplate("{{ctx.watch_id}} executed with {{ctx.payload.response.hits.total_hits}} hits")); actions.add(new ActionWrapper("_webhook", null, null, null, new ExecutableWebhookAction(new WebhookAction(httpRequest.build()), - logger, httpClient, engine))); + logger, httpClient, engine), null)); EmailTemplate email = EmailTemplate.builder().from("from@test.com").to("to@test.com").build(); @@ -166,7 +166,7 @@ public static Watch createTestWatch(String watchName, Client client, HttpClient EmailAction action = new EmailAction(email, "testaccount", auth, Profile.STANDARD, null, null); ExecutableEmailAction executale = new ExecutableEmailAction(action, logger, emailService, engine, new HtmlSanitizer(Settings.EMPTY), Collections.emptyMap()); - actions.add(new ActionWrapper("_email", null, null, null, executale)); + actions.add(new ActionWrapper("_email", null, null, null, executale, null)); ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); Map statuses = new HashMap<>(); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java index 84144b2866b32..feadeba084d3e 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java @@ -441,7 +441,7 @@ public void testParseWatchWithoutTriggerDoesNotWork() throws Exception { private WatchParser createWatchparser() throws Exception { LoggingAction loggingAction = new LoggingAction(new TextTemplate("foo"), null, null); List actions = Collections.singletonList(new ActionWrapper("_logging_", randomThrottler(), null, null, - new ExecutableLoggingAction(loggingAction, logger, new MockTextTemplateEngine()))); + new ExecutableLoggingAction(loggingAction, logger, new MockTextTemplateEngine()), null)); ScheduleRegistry scheduleRegistry = registry(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.SECONDS))); @@ -585,7 +585,7 @@ private List randomActions() { randomFrom(DataAttachment.JSON, DataAttachment.YAML), EmailAttachments.EMPTY_ATTACHMENTS); list.add(new ActionWrapper("_email_" + randomAlphaOfLength(8), randomThrottler(), AlwaysConditionTests.randomCondition(scriptService), randomTransform(), - new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer, Collections.emptyMap()))); + new ExecutableEmailAction(action, logger, emailService, templateEngine, htmlSanitizer, Collections.emptyMap()), null)); } if (randomBoolean()) { ZoneOffset timeZone = randomBoolean() ? ZoneOffset.UTC : null; @@ -596,7 +596,7 @@ private List randomActions() { list.add(new ActionWrapper("_index_" + randomAlphaOfLength(8), randomThrottler(), AlwaysConditionTests.randomCondition(scriptService), randomTransform(), new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30), - TimeValue.timeValueSeconds(30)))); + TimeValue.timeValueSeconds(30)), null)); } if (randomBoolean()) { HttpRequestTemplate httpRequest = HttpRequestTemplate.builder("test.host", randomIntBetween(8000, 9000)) @@ -606,7 +606,7 @@ private List randomActions() { WebhookAction action = new WebhookAction(httpRequest); list.add(new ActionWrapper("_webhook_" + randomAlphaOfLength(8), randomThrottler(), AlwaysConditionTests.randomCondition(scriptService), randomTransform(), - new ExecutableWebhookAction(action, logger, httpClient, templateEngine))); + new ExecutableWebhookAction(action, logger, httpClient, templateEngine), null)); } return list; } From c3cb96d472c5f1067b271ec4cf502ab232168d2e Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Fri, 10 May 2019 14:21:44 +0200 Subject: [PATCH 02/11] fix test --- .../xpack/watcher/actions/ActionWrapperTests.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java index dbd1ffa4154c2..49f21493b08e3 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java @@ -97,15 +97,15 @@ public void testThatMultipleResultsCanBeReturned() throws Exception { when(executableAction.execute(eq("_action"), eq(ctx), eq(thirdPayload))).thenReturn(thirdResult); ActionWrapperResult result = wrapper.execute(ctx); - assertThat(result.action().status(), is(Action.Result.Status.SUCCESS)); + assertThat(result.action().status(), is(Action.Result.Status.FAILURE)); // check that action toXContent contains all the results try (XContentBuilder builder = jsonBuilder()) { result.toXContent(builder, ToXContent.EMPTY_PARAMS); final String json = Strings.toString(builder); final Map map = XContentHelper.convertToMap(JsonXContent.jsonXContent, json, true); - assertThat(map, hasKey("actions")); - assertThat(map.get("actions"), instanceOf(List.class)); - List> actions = (List) map.get("actions"); + assertThat(map, hasKey("foreach")); + assertThat(map.get("foreach"), instanceOf(List.class)); + List> actions = (List) map.get("foreach"); assertThat(actions, hasSize(3)); } } From 3d0ce814c25b50a4dc4d894c0bffec8110bc6662 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 22 May 2019 11:49:08 +0200 Subject: [PATCH 03/11] fix path to use ctx by converting context to map --- .../core/watcher/actions/ActionWrapper.java | 20 +++++++++++- .../test/watcher/execute_watch/80_foreach.yml | 2 +- .../watcher/actions/ActionWrapperTests.java | 32 +++++++++++++------ 3 files changed, 43 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java index 50aea85009011..328b535080384 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.script.JodaCompatibleZonedDateTime; import org.elasticsearch.xpack.core.watcher.actions.throttler.ActionThrottler; import org.elasticsearch.xpack.core.watcher.actions.throttler.Throttler; import org.elasticsearch.xpack.core.watcher.actions.throttler.ThrottlerField; @@ -35,6 +36,7 @@ import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -165,7 +167,7 @@ public ActionWrapperResult execute(WatchExecutionContext ctx) { } else { try { List results = new ArrayList<>(); - Object object = ObjectPath.eval(path, ctx.payload().data()); + Object object = ObjectPath.eval(path, toMap(ctx)); if (object instanceof Collection) { Collection collection = Collection.class.cast(object); if (collection.isEmpty()) { @@ -179,6 +181,8 @@ public ActionWrapperResult execute(WatchExecutionContext ctx) { } } } + } else if (object == null) { + throw new ElasticsearchException("specified foreach object was null: [{}]", path); } else { throw new ElasticsearchException("specified foreach object was not a an array/collection: [{}]", path); } @@ -214,6 +218,20 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } } + private Map toMap(WatchExecutionContext ctx) { + Map model = new HashMap<>(); + model.put("id", ctx.id().value()); + model.put("watch_id", ctx.id().watchId()); + model.put("execution_time", new JodaCompatibleZonedDateTime(ctx.executionTime().toInstant(), ZoneOffset.UTC)); + model.put("trigger", ctx.triggerEvent().data()); + model.put("metadata", ctx.watch().metadata()); + model.put("vars", ctx.vars()); + if (ctx.payload().data() != null) { + model.put("payload", ctx.payload().data()); + } + return Map.of("ctx", model); + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml index 74500a9722c24..b586b50bcf889 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml @@ -31,7 +31,7 @@ teardown: }, "actions": { "indexme" : { - "foreach" : "my_array", + "foreach" : "ctx.payload.my_array", "index" : {} } } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java index 49f21493b08e3..a7c4b594298f1 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java @@ -18,12 +18,15 @@ import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult; import org.elasticsearch.xpack.core.watcher.actions.ExecutableAction; import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext; +import org.elasticsearch.xpack.core.watcher.execution.Wid; +import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent; import org.elasticsearch.xpack.core.watcher.watch.Payload; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.core.watcher.watch.WatchStatus; import org.elasticsearch.xpack.watcher.actions.logging.LoggingAction; import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition; import org.elasticsearch.xpack.watcher.condition.NeverCondition; +import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; import java.time.ZoneOffset; import java.time.ZonedDateTime; @@ -73,7 +76,8 @@ public void testOtherActionsAreNotAffectedOnActionConditionReset() throws Except } public void testThatMultipleResultsCanBeReturned() throws Exception { - ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, "my_path"); + ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, + "ctx.payload.my_path"); WatchExecutionContext ctx = mockExecutionContent(watch); Payload.Simple payload = new Payload.Simple(Map.of("my_path", List.of( @@ -111,7 +115,8 @@ public void testThatMultipleResultsCanBeReturned() throws Exception { } public void testThatPathElementIsntInstanceOfMap() throws Exception { - ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, "my_path"); + ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, "" + + "ctx.payload.my_path"); WatchExecutionContext ctx = mockExecutionContent(watch); Payload.Simple payload = new Payload.Simple(Map.of("my_path", List.of("first", "second", "third"))); when(ctx.payload()).thenReturn(payload); @@ -125,11 +130,12 @@ public void testThatPathElementIsntInstanceOfMap() throws Exception { assertThat(result.action().status(), is(Action.Result.Status.FAILURE)); assertThat(result.action(), instanceOf(Action.Result.FailureWithException.class)); Action.Result.FailureWithException failureWithException = (Action.Result.FailureWithException) result.action(); - assertThat(failureWithException.getException().getMessage(), is("item in foreach [my_path] object was not a map")); + assertThat(failureWithException.getException().getMessage(), is("item in foreach [ctx.payload.my_path] object was not a map")); } - public void testThatSpecifiedPathIsACollection() { - ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, "my_path"); + public void testThatSpecifiedPathIsNotCollection() { + ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, + "ctx.payload.my_path"); WatchExecutionContext ctx = mockExecutionContent(watch); Payload.Simple payload = new Payload.Simple(Map.of("my_path", "not a map")); when(ctx.payload()).thenReturn(payload); @@ -140,11 +146,12 @@ public void testThatSpecifiedPathIsACollection() { assertThat(result.action(), instanceOf(Action.Result.FailureWithException.class)); Action.Result.FailureWithException failureWithException = (Action.Result.FailureWithException) result.action(); assertThat(failureWithException.getException().getMessage(), - is("specified foreach object was not a an array/collection: [my_path]")); + is("specified foreach object was not a an array/collection: [ctx.payload.my_path]")); } public void testEmptyCollection() { - ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, "my_path"); + ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, + "ctx.payload.my_path"); WatchExecutionContext ctx = mockExecutionContent(watch); Payload.Simple payload = new Payload.Simple(Map.of("my_path", Collections.emptyList())); when(ctx.payload()).thenReturn(payload); @@ -155,11 +162,12 @@ public void testEmptyCollection() { assertThat(result.action(), instanceOf(Action.Result.FailureWithException.class)); Action.Result.FailureWithException failureWithException = (Action.Result.FailureWithException) result.action(); assertThat(failureWithException.getException().getMessage(), - is("foreach object [my_path] was an empty list, could not run any action")); + is("foreach object [ctx.payload.my_path] was an empty list, could not run any action")); } public void testPartialFailure() throws Exception { - ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, "my_path"); + ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, + "ctx.payload.my_path"); WatchExecutionContext ctx = mockExecutionContent(watch); Payload.Simple payload = new Payload.Simple(Map.of("my_path", List.of( @@ -185,6 +193,12 @@ private WatchExecutionContext mockExecutionContent(Watch watch) { WatchExecutionContext ctx = mock(WatchExecutionContext.class); when(watch.id()).thenReturn("watchId"); when(ctx.watch()).thenReturn(watch); + final ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); + final Wid wid = new Wid("watchId", now); + when(ctx.id()).thenReturn(wid); + when(ctx.executionTime()).thenReturn(now); + final TriggerEvent triggerEvent = new ScheduleTriggerEvent("watchId", now, now); + when(ctx.triggerEvent()).thenReturn(triggerEvent); when(ctx.skipThrottling(eq("_action"))).thenReturn(true); return ctx; } From 46901c8e689bf0801907458309d4f5066e38a1c8 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Fri, 31 May 2019 17:49:56 +0200 Subject: [PATCH 04/11] WIP --- .../test/watcher/execute_watch/80_foreach.yml | 39 +++++++++++++------ .../watcher/actions/ActionWrapperTests.java | 15 +++++-- 2 files changed, 38 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml index b586b50bcf889..f585157b483cf 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml @@ -13,6 +13,15 @@ teardown: --- "Test execute watch api with foreach action": + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "my_test_index", "_id": "first"}}' + - '{ "title" : "first" }' + - '{"index": {"_index": "my_test_index", "_id": "second"}}' + - '{ "title" : "second" }' + - do: watcher.execute_watch: body: > @@ -22,17 +31,23 @@ teardown: "schedule" : { "cron" : "0 0 0 1 * ? 2099" } }, "input": { - "simple": { - "my_array" : [ - { "_doc" : { "_index" : "my_test_index", "_id" : "first", "key" : "first" } }, - { "_doc" : { "_index" : "my_test_index", "_id" : "second", "key" : "second" } } - ] + "search" : { + "request" : { + "indices" : [ "my_test_index" ], + "body" : { + "query": { + "match_all" : {} + } + } + } } }, "actions": { - "indexme" : { - "foreach" : "ctx.payload.my_array", - "index" : {} + "log_hits" : { + "foreach" : "ctx.payload.hits.hits", + "logging" : { + "text" : "Found error in document {{_id}} with timestamp {{_source.timestamp}}" + } } } } @@ -46,10 +61,10 @@ teardown: mget: body: docs: - - { _index: my_test_index, _id: "first"} - - { _index: my_test_index, _id: "second"} + - { _index: my_test_index_2, _id: "first"} + - { _index: my_test_index_2, _id: "second"} - is_true: docs.0.found - - match: { docs.0._source: { "key": "first" }} + - match: { docs.0._source: { "title": "first" }} - is_true: docs.1.found - - match: { docs.1._source: { "key": "second" }} + - match: { docs.1._source: { "title": "second" }} diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java index a7c4b594298f1..73b22f32fe8d9 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java @@ -23,10 +23,14 @@ import org.elasticsearch.xpack.core.watcher.watch.Payload; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.core.watcher.watch.WatchStatus; +import org.elasticsearch.xpack.watcher.actions.logging.ExecutableLoggingAction; import org.elasticsearch.xpack.watcher.actions.logging.LoggingAction; +import org.elasticsearch.xpack.watcher.common.text.TextTemplate; import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition; import org.elasticsearch.xpack.watcher.condition.NeverCondition; +import org.elasticsearch.xpack.watcher.test.MockTextTemplateEngine; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; +import org.mockito.Mock; import java.time.ZoneOffset; import java.time.ZonedDateTime; @@ -76,8 +80,11 @@ public void testOtherActionsAreNotAffectedOnActionConditionReset() throws Except } public void testThatMultipleResultsCanBeReturned() throws Exception { + final LoggingAction loggingAction = new LoggingAction(new TextTemplate("foo"), null, null); + ExecutableAction executableAction = new ExecutableLoggingAction(loggingAction, logger, new MockTextTemplateEngine()); ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, "ctx.payload.my_path"); + WatchExecutionContext ctx = mockExecutionContent(watch); Payload.Simple payload = new Payload.Simple(Map.of("my_path", List.of( @@ -86,19 +93,19 @@ public void testThatMultipleResultsCanBeReturned() throws Exception { Map.of("key", "third") ))); when(ctx.payload()).thenReturn(payload); - when(executableAction.logger()).thenReturn(logger); +// when(executableAction.logger()).thenReturn(logger); final Action.Result firstResult = new Action.Result.Failure("MY_TYPE", "first reason"); final Payload firstPayload = new Payload.Simple(Map.of("key", "first")); - when(executableAction.execute(eq("_action"), eq(ctx), eq(firstPayload))).thenReturn(firstResult); +// when(executableAction.execute(eq("_action"), eq(ctx), eq(firstPayload))).thenReturn(firstResult); final Action.Result secondResult = new Action.Result.Failure("MY_TYPE", "second reason"); final Payload secondPayload = new Payload.Simple(Map.of("key", "second")); - when(executableAction.execute(eq("_action"), eq(ctx), eq(secondPayload))).thenReturn(secondResult); +// when(executableAction.execute(eq("_action"), eq(ctx), eq(secondPayload))).thenReturn(secondResult); final Action.Result thirdResult = new Action.Result.Failure("MY_TYPE", "third reason"); final Payload thirdPayload = new Payload.Simple(Map.of("key", "third")); - when(executableAction.execute(eq("_action"), eq(ctx), eq(thirdPayload))).thenReturn(thirdResult); +// when(executableAction.execute(eq("_action"), eq(ctx), eq(thirdPayload))).thenReturn(thirdResult); ActionWrapperResult result = wrapper.execute(ctx); assertThat(result.action().status(), is(Action.Result.Status.FAILURE)); From db479d72698f341229b91780a347180cf8f6bfec Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 19 Jun 2019 16:09:49 +0200 Subject: [PATCH 05/11] fix test --- .../test/watcher/execute_watch/80_foreach.yml | 48 +++++-------------- .../watcher/actions/ActionWrapperTests.java | 21 ++------ 2 files changed, 15 insertions(+), 54 deletions(-) diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml index f585157b483cf..590ad38ea84de 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml @@ -4,24 +4,8 @@ setup: cluster.health: wait_for_status: yellow ---- -teardown: - - do: - watcher.delete_watch: - id: "test_watch" - ignore: 404 - --- "Test execute watch api with foreach action": - - do: - bulk: - refresh: true - body: - - '{"index": {"_index": "my_test_index", "_id": "first"}}' - - '{ "title" : "first" }' - - '{"index": {"_index": "my_test_index", "_id": "second"}}' - - '{ "title" : "second" }' - - do: watcher.execute_watch: body: > @@ -31,14 +15,13 @@ teardown: "schedule" : { "cron" : "0 0 0 1 * ? 2099" } }, "input": { - "search" : { - "request" : { - "indices" : [ "my_test_index" ], - "body" : { - "query": { - "match_all" : {} - } - } + "simple" : { + "hits" : { + "hits" : [ + { "key" : "first" }, + { "key" : "second" }, + { "key" : "third" } + ] } } }, @@ -46,7 +29,7 @@ teardown: "log_hits" : { "foreach" : "ctx.payload.hits.hits", "logging" : { - "text" : "Found error in document {{_id}} with timestamp {{_source.timestamp}}" + "text" : "Logging {{ctx.payload.key}}" } } } @@ -56,15 +39,6 @@ teardown: - match: { watch_record.trigger_event.type: "manual" } - match: { watch_record.state: "executed" } - match: { watch_record.status.execution_state: "executed" } - - - do: - mget: - body: - docs: - - { _index: my_test_index_2, _id: "first"} - - { _index: my_test_index_2, _id: "second"} - - - is_true: docs.0.found - - match: { docs.0._source: { "title": "first" }} - - is_true: docs.1.found - - match: { docs.1._source: { "title": "second" }} + - match: { watch_record.result.actions.0.foreach.0.logging.logged_text: "Logging first" } + - match: { watch_record.result.actions.0.foreach.1.logging.logged_text: "Logging second" } + - match: { watch_record.result.actions.0.foreach.2.logging.logged_text: "Logging third" } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java index 73b22f32fe8d9..d38c959cc8b60 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.xpack.watcher.condition.NeverCondition; import org.elasticsearch.xpack.watcher.test.MockTextTemplateEngine; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; -import org.mockito.Mock; import java.time.ZoneOffset; import java.time.ZonedDateTime; @@ -80,8 +79,9 @@ public void testOtherActionsAreNotAffectedOnActionConditionReset() throws Except } public void testThatMultipleResultsCanBeReturned() throws Exception { - final LoggingAction loggingAction = new LoggingAction(new TextTemplate("foo"), null, null); - ExecutableAction executableAction = new ExecutableLoggingAction(loggingAction, logger, new MockTextTemplateEngine()); + final LoggingAction loggingAction = new LoggingAction(new TextTemplate("{{key}}"), null, null); + final ExecutableAction executableAction = + new ExecutableLoggingAction(loggingAction, logger, new MockTextTemplateEngine()); ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, "ctx.payload.my_path"); @@ -93,22 +93,9 @@ public void testThatMultipleResultsCanBeReturned() throws Exception { Map.of("key", "third") ))); when(ctx.payload()).thenReturn(payload); -// when(executableAction.logger()).thenReturn(logger); - - final Action.Result firstResult = new Action.Result.Failure("MY_TYPE", "first reason"); - final Payload firstPayload = new Payload.Simple(Map.of("key", "first")); -// when(executableAction.execute(eq("_action"), eq(ctx), eq(firstPayload))).thenReturn(firstResult); - - final Action.Result secondResult = new Action.Result.Failure("MY_TYPE", "second reason"); - final Payload secondPayload = new Payload.Simple(Map.of("key", "second")); -// when(executableAction.execute(eq("_action"), eq(ctx), eq(secondPayload))).thenReturn(secondResult); - - final Action.Result thirdResult = new Action.Result.Failure("MY_TYPE", "third reason"); - final Payload thirdPayload = new Payload.Simple(Map.of("key", "third")); -// when(executableAction.execute(eq("_action"), eq(ctx), eq(thirdPayload))).thenReturn(thirdResult); ActionWrapperResult result = wrapper.execute(ctx); - assertThat(result.action().status(), is(Action.Result.Status.FAILURE)); + assertThat(result.action().status(), is(Action.Result.Status.SUCCESS)); // check that action toXContent contains all the results try (XContentBuilder builder = jsonBuilder()) { result.toXContent(builder, ToXContent.EMPTY_PARAMS); From d4d0490a59a6db7a8a89bc461cb062c4df50f2db Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 19 Jun 2019 16:23:13 +0200 Subject: [PATCH 06/11] fix docs --- x-pack/docs/en/watcher/actions.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/docs/en/watcher/actions.asciidoc b/x-pack/docs/en/watcher/actions.asciidoc index 68a106cf66844..729fdfda5abc4 100644 --- a/x-pack/docs/en/watcher/actions.asciidoc +++ b/x-pack/docs/en/watcher/actions.asciidoc @@ -224,7 +224,7 @@ PUT _watcher/watch/log_event_watch "log_hits" : { "foreach" : "ctx.payload.hits.hits", <1> "logging" : { - "text" : "Found id {{_id}} with field {{_source.my_field}}" + "text" : "Found id {{ctx.payload._id}} with field {{ctx.payload._source.my_field}}" } } } From 14dcc8d12b3134d73d0f9004beaa3ffaa91e5661 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 26 Jun 2019 09:23:23 +0200 Subject: [PATCH 07/11] review comments: limit runs to 100, doc updates --- x-pack/docs/en/watcher/actions.asciidoc | 6 ++-- .../core/watcher/actions/ActionWrapper.java | 9 +++++ .../watcher/actions/ActionWrapperTests.java | 34 +++++++++++++++++++ 3 files changed, 47 insertions(+), 2 deletions(-) diff --git a/x-pack/docs/en/watcher/actions.asciidoc b/x-pack/docs/en/watcher/actions.asciidoc index 729fdfda5abc4..c9f4e97a8ec3c 100644 --- a/x-pack/docs/en/watcher/actions.asciidoc +++ b/x-pack/docs/en/watcher/actions.asciidoc @@ -199,6 +199,9 @@ You can use the `foreach` field in an action to trigger the configured action for every element within that array. Each element within that array has to be a map. +In order to protect from long running watches, after one hundred runs with an +foreach loop the execution is gracefully stopped. + [source,js] -------------------------------------------------- PUT _watcher/watch/log_event_watch @@ -211,14 +214,13 @@ PUT _watcher/watch/log_event_watch "request" : { "indices" : "log-events", "body" : { - "size" : 0, "query" : { "match" : { "status" : "error" } } } } } }, "condition" : { - "compare" : { "ctx.payload.hits.total.value" : { "gt" : 0 } } + "compare" : { "ctx.payload.hits.total" : { "gt" : 0 } } }, "actions" : { "log_hits" : { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java index 328b535080384..c15f4d4439d06 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java @@ -47,6 +47,8 @@ public class ActionWrapper implements ToXContentObject { + private final int MAXIMUM_FOREACH_RUNS = 100; + private String id; @Nullable private final ExecutableCondition condition; @@ -168,17 +170,22 @@ public ActionWrapperResult execute(WatchExecutionContext ctx) { try { List results = new ArrayList<>(); Object object = ObjectPath.eval(path, toMap(ctx)); + int runs = 0; if (object instanceof Collection) { Collection collection = Collection.class.cast(object); if (collection.isEmpty()) { throw new ElasticsearchException("foreach object [{}] was an empty list, could not run any action", path); } else { for (Object o : collection) { + if (runs >= MAXIMUM_FOREACH_RUNS) { + break; + } if (o instanceof Map) { results.add(action.execute(id, ctx, new Payload.Simple((Map) o))); } else { throw new ElasticsearchException("item in foreach [{}] object was not a map", path); } + runs++; } } } else if (object == null) { @@ -196,10 +203,12 @@ public ActionWrapperResult execute(WatchExecutionContext ctx) { status = Action.Result.Status.PARTIAL_FAILURE; } + final int numberOfActionsExecuted = runs; return new ActionWrapperResult(id, conditionResult, transformResult, new Action.Result(action.type(), status) { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field("number_of_actions_executed", numberOfActionsExecuted); builder.startArray(WatchField.FOREACH.getPreferredName()); for (Action.Result result : results) { builder.startObject(); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java index d38c959cc8b60..c99fe7cded546 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java @@ -33,6 +33,7 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -183,6 +184,39 @@ public void testPartialFailure() throws Exception { assertThat(result.action().status(), is(Action.Result.Status.PARTIAL_FAILURE)); } + public void testLimitOfNumberOfActionsExecuted() throws Exception { + ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, + "ctx.payload.my_path"); + WatchExecutionContext ctx = mockExecutionContent(watch); + List> itemsPayload = new ArrayList<>(); + for (int i = 0; i < 101; i++) { + final Action.Result actionResult = new LoggingAction.Result.Success("log_message " + i);; + final Payload singleItemPayload = new Payload.Simple(Map.of("key", String.valueOf(i))); + itemsPayload.add(Map.of("key", String.valueOf(i))); + when(executableAction.execute(eq("_action"), eq(ctx), eq(singleItemPayload))).thenReturn(actionResult); + } + + Payload.Simple payload = new Payload.Simple(Map.of("my_path", itemsPayload)); + when(ctx.payload()).thenReturn(payload); + when(executableAction.logger()).thenReturn(logger); + + ActionWrapperResult result = wrapper.execute(ctx); + assertThat(result.action().status(), is(Action.Result.Status.SUCCESS)); + + // check that action toXContent contains all the results + try (XContentBuilder builder = jsonBuilder()) { + result.toXContent(builder, ToXContent.EMPTY_PARAMS); + final String json = Strings.toString(builder); + final Map map = XContentHelper.convertToMap(JsonXContent.jsonXContent, json, true); + assertThat(map, hasKey("foreach")); + assertThat(map.get("foreach"), instanceOf(List.class)); + List> actions = (List) map.get("foreach"); + assertThat(actions, hasSize(100)); + assertThat(map, hasKey("number_of_actions_executed")); + assertThat(map.get("number_of_actions_executed"), is(100)); + } + } + private WatchExecutionContext mockExecutionContent(Watch watch) { WatchExecutionContext ctx = mock(WatchExecutionContext.class); when(watch.id()).thenReturn("watchId"); From 2073d460aa46a499986e0c1f2df8e27cb2534734 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 26 Jun 2019 10:01:19 +0200 Subject: [PATCH 08/11] review comment: lift restriction on passing a map --- x-pack/docs/en/watcher/actions.asciidoc | 3 +- .../core/watcher/actions/ActionWrapper.java | 2 +- .../test/watcher/execute_watch/80_foreach.yml | 67 +++++++++++++++++++ .../watcher/actions/ActionWrapperTests.java | 19 ------ 4 files changed, 69 insertions(+), 22 deletions(-) diff --git a/x-pack/docs/en/watcher/actions.asciidoc b/x-pack/docs/en/watcher/actions.asciidoc index c9f4e97a8ec3c..7f5cd3218b5d3 100644 --- a/x-pack/docs/en/watcher/actions.asciidoc +++ b/x-pack/docs/en/watcher/actions.asciidoc @@ -196,8 +196,7 @@ image::images/action-throttling.jpg[align="center"] === Running an action for each element in an array You can use the `foreach` field in an action to trigger the configured action -for every element within that array. Each element within that array has to be a -map. +for every element within that array. In order to protect from long running watches, after one hundred runs with an foreach loop the execution is gracefully stopped. diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java index c15f4d4439d06..951f305edf5bc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/actions/ActionWrapper.java @@ -183,7 +183,7 @@ public ActionWrapperResult execute(WatchExecutionContext ctx) { if (o instanceof Map) { results.add(action.execute(id, ctx, new Payload.Simple((Map) o))); } else { - throw new ElasticsearchException("item in foreach [{}] object was not a map", path); + results.add(action.execute(id, ctx, new Payload.Simple("_value", o))); } runs++; } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml index 590ad38ea84de..0c5ec6fbcbacc 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/execute_watch/80_foreach.yml @@ -42,3 +42,70 @@ setup: - match: { watch_record.result.actions.0.foreach.0.logging.logged_text: "Logging first" } - match: { watch_record.result.actions.0.foreach.1.logging.logged_text: "Logging second" } - match: { watch_record.result.actions.0.foreach.2.logging.logged_text: "Logging third" } + + +--- +"Test execute watch api with foreach action using an array": + - do: + watcher.execute_watch: + body: > + { + "watch" : { + "trigger": { + "schedule" : { "cron" : "0 0 0 1 * ? 2099" } + }, + "input": { + "simple" : { + "values" : [1, 2, 3] + } + }, + "actions": { + "log_hits" : { + "foreach" : "ctx.payload.values", + "logging" : { + "text" : "Logging {{ctx.payload._value}}" + } + } + } + } + } + + - match: { watch_record.trigger_event.type: "manual" } + - match: { watch_record.state: "executed" } + - match: { watch_record.status.execution_state: "executed" } + - match: { watch_record.result.actions.0.foreach.0.logging.logged_text: "Logging 1" } + - match: { watch_record.result.actions.0.foreach.1.logging.logged_text: "Logging 2" } + - match: { watch_record.result.actions.0.foreach.2.logging.logged_text: "Logging 3" } + +--- +"Test execute watch api with foreach action using an array of arrays": + - do: + watcher.execute_watch: + body: > + { + "watch" : { + "trigger": { + "schedule" : { "cron" : "0 0 0 1 * ? 2099" } + }, + "input": { + "simple" : { + "values" : [[1, 2], [2, 3], [3, 4]] + } + }, + "actions": { + "log_hits" : { + "foreach" : "ctx.payload.values", + "logging" : { + "text" : "Logging {{ctx.payload._value.1}}" + } + } + } + } + } + + - match: { watch_record.trigger_event.type: "manual" } + - match: { watch_record.state: "executed" } + - match: { watch_record.status.execution_state: "executed" } + - match: { watch_record.result.actions.0.foreach.0.logging.logged_text: "Logging 2" } + - match: { watch_record.result.actions.0.foreach.1.logging.logged_text: "Logging 3" } + - match: { watch_record.result.actions.0.foreach.2.logging.logged_text: "Logging 4" } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java index c99fe7cded546..8931319501b6d 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/ActionWrapperTests.java @@ -109,25 +109,6 @@ public void testThatMultipleResultsCanBeReturned() throws Exception { } } - public void testThatPathElementIsntInstanceOfMap() throws Exception { - ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, "" + - "ctx.payload.my_path"); - WatchExecutionContext ctx = mockExecutionContent(watch); - Payload.Simple payload = new Payload.Simple(Map.of("my_path", List.of("first", "second", "third"))); - when(ctx.payload()).thenReturn(payload); - when(executableAction.logger()).thenReturn(logger); - - final Action.Result actionResult = new Action.Result.Failure("MY_TYPE", "first reason"); - final Payload actionPayload = new Payload.Simple(Map.of("key", "first")); - when(executableAction.execute(eq("_action"), eq(ctx), eq(actionPayload))).thenReturn(actionResult); - - ActionWrapperResult result = wrapper.execute(ctx); - assertThat(result.action().status(), is(Action.Result.Status.FAILURE)); - assertThat(result.action(), instanceOf(Action.Result.FailureWithException.class)); - Action.Result.FailureWithException failureWithException = (Action.Result.FailureWithException) result.action(); - assertThat(failureWithException.getException().getMessage(), is("item in foreach [ctx.payload.my_path] object was not a map")); - } - public void testThatSpecifiedPathIsNotCollection() { ActionWrapper wrapper = new ActionWrapper("_action", null, InternalAlwaysCondition.INSTANCE, null, executableAction, "ctx.payload.my_path"); From 80295288693478479e153cd0b1a935f427e7b54b Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Mon, 1 Jul 2019 16:18:14 +0200 Subject: [PATCH 09/11] Fix tests --- .../elasticsearch/xpack/test/rest/XPackRestTestConstants.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java b/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java index bfdf051a29235..6c36da31d98bd 100644 --- a/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java +++ b/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java @@ -10,7 +10,7 @@ public final class XPackRestTestConstants { // Watcher constants: - public static final String INDEX_TEMPLATE_VERSION = "9"; + public static final String INDEX_TEMPLATE_VERSION = "10"; public static final String HISTORY_TEMPLATE_NAME_NO_ILM = ".watch-history-no-ilm-" + INDEX_TEMPLATE_VERSION; public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches"; public static final String WATCHES_TEMPLATE_NAME = ".watches"; From 9c0a0ba95ff43c81e2f1faa9c66d89e6017da2f9 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Mon, 1 Jul 2019 16:23:19 +0200 Subject: [PATCH 10/11] update template --- x-pack/plugin/core/src/main/resources/watch-history.json | 3 +++ 1 file changed, 3 insertions(+) diff --git a/x-pack/plugin/core/src/main/resources/watch-history.json b/x-pack/plugin/core/src/main/resources/watch-history.json index 02a2a789df7bd..1cc8d67f9ce27 100644 --- a/x-pack/plugin/core/src/main/resources/watch-history.json +++ b/x-pack/plugin/core/src/main/resources/watch-history.json @@ -264,6 +264,9 @@ "reason" : { "type" : "keyword" }, + "number_of_actions_executed": { + "type": "integer" + }, "foreach" : { "type": "object", "enabled" : false From b989cfb83d61e05ba59c22aad4da14a29c95f6c2 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Tue, 2 Jul 2019 09:43:56 +0200 Subject: [PATCH 11/11] remove tabs --- x-pack/plugin/core/src/main/resources/watch-history.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/core/src/main/resources/watch-history.json b/x-pack/plugin/core/src/main/resources/watch-history.json index 1cc8d67f9ce27..8b6bc435d2e1d 100644 --- a/x-pack/plugin/core/src/main/resources/watch-history.json +++ b/x-pack/plugin/core/src/main/resources/watch-history.json @@ -264,9 +264,9 @@ "reason" : { "type" : "keyword" }, - "number_of_actions_executed": { - "type": "integer" - }, + "number_of_actions_executed": { + "type": "integer" + }, "foreach" : { "type": "object", "enabled" : false