Skip to content

Commit c6fcb60

Browse files
authored
Add support for 'ack watch' to the HLRC. (#33962)
1 parent 2d64e3d commit c6fcb60

File tree

11 files changed

+592
-0
lines changed

11 files changed

+592
-0
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherClient.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.elasticsearch.client;
2020

2121
import org.elasticsearch.action.ActionListener;
22+
import org.elasticsearch.client.watcher.AckWatchRequest;
23+
import org.elasticsearch.client.watcher.AckWatchResponse;
2224
import org.elasticsearch.protocol.xpack.watcher.DeleteWatchRequest;
2325
import org.elasticsearch.protocol.xpack.watcher.DeleteWatchResponse;
2426
import org.elasticsearch.protocol.xpack.watcher.PutWatchRequest;
@@ -91,4 +93,32 @@ public void deleteWatchAsync(DeleteWatchRequest request, RequestOptions options,
9193
restHighLevelClient.performRequestAsyncAndParseEntity(request, WatcherRequestConverters::deleteWatch, options,
9294
DeleteWatchResponse::fromXContent, listener, singleton(404));
9395
}
96+
97+
/**
98+
* Acknowledges a watch.
99+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-ack-watch.html">
100+
* the docs</a> for more information.
101+
* @param request the request
102+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
103+
* @return the response
104+
* @throws IOException if there is a problem sending the request or parsing back the response
105+
*/
106+
public AckWatchResponse ackWatch(AckWatchRequest request, RequestOptions options) throws IOException {
107+
return restHighLevelClient.performRequestAndParseEntity(request, WatcherRequestConverters::ackWatch, options,
108+
AckWatchResponse::fromXContent, emptySet());
109+
}
110+
111+
/**
112+
* Asynchronously acknowledges a watch.
113+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-ack-watch.html">
114+
* the docs</a> for more information.
115+
* @param request the request
116+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
117+
* @param listener the listener to be notified upon completion of the request
118+
*/
119+
public void ackWatchAsync(AckWatchRequest request, RequestOptions options, ActionListener<AckWatchResponse> listener) {
120+
restHighLevelClient.performRequestAsyncAndParseEntity(request, WatcherRequestConverters::ackWatch, options,
121+
AckWatchResponse::fromXContent, listener, emptySet());
122+
}
123+
94124
}

client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.http.client.methods.HttpPut;
2424
import org.apache.http.entity.ByteArrayEntity;
2525
import org.apache.http.entity.ContentType;
26+
import org.elasticsearch.client.watcher.AckWatchRequest;
2627
import org.elasticsearch.common.bytes.BytesReference;
2728
import org.elasticsearch.protocol.xpack.watcher.DeleteWatchRequest;
2829
import org.elasticsearch.protocol.xpack.watcher.PutWatchRequest;
@@ -59,4 +60,17 @@ static Request deleteWatch(DeleteWatchRequest deleteWatchRequest) {
5960
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
6061
return request;
6162
}
63+
64+
public static Request ackWatch(AckWatchRequest ackWatchRequest) {
65+
String endpoint = new RequestConverters.EndpointBuilder()
66+
.addPathPartAsIs("_xpack")
67+
.addPathPartAsIs("watcher")
68+
.addPathPartAsIs("watch")
69+
.addPathPart(ackWatchRequest.getWatchId())
70+
.addPathPartAsIs("_ack")
71+
.addCommaSeparatedPathParts(ackWatchRequest.getActionIds())
72+
.build();
73+
Request request = new Request(HttpPut.METHOD_NAME, endpoint);
74+
return request;
75+
}
6276
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.client.watcher;
21+
22+
import org.elasticsearch.client.Validatable;
23+
import org.elasticsearch.client.ValidationException;
24+
import org.elasticsearch.protocol.xpack.watcher.PutWatchRequest;
25+
26+
import java.util.Locale;
27+
28+
/**
29+
* A request to explicitly acknowledge a watch.
30+
*/
31+
public class AckWatchRequest implements Validatable {
32+
33+
private final String watchId;
34+
private final String[] actionIds;
35+
36+
public AckWatchRequest(String watchId, String... actionIds) {
37+
validateIds(watchId, actionIds);
38+
this.watchId = watchId;
39+
this.actionIds = actionIds;
40+
}
41+
42+
private void validateIds(String watchId, String... actionIds) {
43+
ValidationException exception = new ValidationException();
44+
if (watchId == null) {
45+
exception.addValidationError("watch id is missing");
46+
} else if (PutWatchRequest.isValidId(watchId) == false) {
47+
exception.addValidationError("watch id contains whitespace");
48+
}
49+
50+
if (actionIds != null) {
51+
for (String actionId : actionIds) {
52+
if (actionId == null) {
53+
exception.addValidationError(String.format(Locale.ROOT, "action id may not be null"));
54+
} else if (PutWatchRequest.isValidId(actionId) == false) {
55+
exception.addValidationError(
56+
String.format(Locale.ROOT, "action id [%s] contains whitespace", actionId));
57+
}
58+
}
59+
}
60+
61+
if (!exception.validationErrors().isEmpty()) {
62+
throw exception;
63+
}
64+
}
65+
66+
/**
67+
* @return The ID of the watch to be acked.
68+
*/
69+
public String getWatchId() {
70+
return watchId;
71+
}
72+
73+
/**
74+
* @return The IDs of the actions to be acked. If omitted,
75+
* all actions for the given watch will be acknowledged.
76+
*/
77+
public String[] getActionIds() {
78+
return actionIds;
79+
}
80+
81+
@Override
82+
public String toString() {
83+
StringBuilder sb = new StringBuilder("ack [").append(watchId).append("]");
84+
if (actionIds.length > 0) {
85+
sb.append("[");
86+
for (int i = 0; i < actionIds.length; i++) {
87+
if (i > 0) {
88+
sb.append(", ");
89+
}
90+
sb.append(actionIds[i]);
91+
}
92+
sb.append("]");
93+
}
94+
return sb.toString();
95+
}
96+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.client.watcher;
21+
22+
import org.elasticsearch.common.ParseField;
23+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
24+
import org.elasticsearch.common.xcontent.XContentParser;
25+
26+
import java.io.IOException;
27+
28+
/**
29+
* The response from an 'ack watch' request.
30+
*/
31+
public class AckWatchResponse {
32+
33+
private final WatchStatus status;
34+
35+
public AckWatchResponse(WatchStatus status) {
36+
this.status = status;
37+
}
38+
39+
/**
40+
* @return the status of the requested watch. If an action was
41+
* successfully acknowledged, this will be reflected in its status.
42+
*/
43+
public WatchStatus getStatus() {
44+
return status;
45+
}
46+
47+
private static final ParseField STATUS_FIELD = new ParseField("status");
48+
private static ConstructingObjectParser<AckWatchResponse, Void> PARSER =
49+
new ConstructingObjectParser<>("ack_watch_response", true,
50+
a -> new AckWatchResponse((WatchStatus) a[0]));
51+
52+
static {
53+
PARSER.declareObject(ConstructingObjectParser.constructorArg(),
54+
(parser, context) -> WatchStatus.parse(parser),
55+
STATUS_FIELD);
56+
}
57+
58+
public static AckWatchResponse fromXContent(XContentParser parser) throws IOException {
59+
return PARSER.parse(parser, null);
60+
}
61+
}

client/rest-high-level/src/test/java/org/elasticsearch/client/WatcherIT.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,19 @@
1818
*/
1919
package org.elasticsearch.client;
2020

21+
import org.elasticsearch.ElasticsearchStatusException;
22+
import org.elasticsearch.client.watcher.AckWatchRequest;
23+
import org.elasticsearch.client.watcher.AckWatchResponse;
24+
import org.elasticsearch.client.watcher.ActionStatus;
25+
import org.elasticsearch.client.watcher.ActionStatus.AckStatus;
2126
import org.elasticsearch.common.bytes.BytesArray;
2227
import org.elasticsearch.common.bytes.BytesReference;
2328
import org.elasticsearch.common.xcontent.XContentType;
2429
import org.elasticsearch.protocol.xpack.watcher.DeleteWatchRequest;
2530
import org.elasticsearch.protocol.xpack.watcher.DeleteWatchResponse;
2631
import org.elasticsearch.protocol.xpack.watcher.PutWatchRequest;
2732
import org.elasticsearch.protocol.xpack.watcher.PutWatchResponse;
33+
import org.elasticsearch.rest.RestStatus;
2834

2935
import static org.hamcrest.Matchers.is;
3036

@@ -72,4 +78,34 @@ public void testDeleteWatch() throws Exception {
7278
}
7379
}
7480

81+
public void testAckWatch() throws Exception {
82+
String watchId = randomAlphaOfLength(10);
83+
String actionId = "logme";
84+
85+
PutWatchResponse putWatchResponse = createWatch(watchId);
86+
assertThat(putWatchResponse.isCreated(), is(true));
87+
88+
AckWatchResponse response = highLevelClient().watcher().ackWatch(
89+
new AckWatchRequest(watchId, actionId), RequestOptions.DEFAULT);
90+
91+
ActionStatus actionStatus = response.getStatus().actionStatus(actionId);
92+
assertEquals(AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION, actionStatus.ackStatus().state());
93+
94+
// TODO: use the high-level REST client here once it supports 'execute watch'.
95+
Request executeWatchRequest = new Request("POST", "_xpack/watcher/watch/" + watchId + "/_execute");
96+
executeWatchRequest.setJsonEntity("{ \"record_execution\": true }");
97+
Response executeResponse = client().performRequest(executeWatchRequest);
98+
assertEquals(RestStatus.OK.getStatus(), executeResponse.getStatusLine().getStatusCode());
99+
100+
response = highLevelClient().watcher().ackWatch(
101+
new AckWatchRequest(watchId, actionId), RequestOptions.DEFAULT);
102+
103+
actionStatus = response.getStatus().actionStatus(actionId);
104+
assertEquals(AckStatus.State.ACKED, actionStatus.ackStatus().state());
105+
106+
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class,
107+
() -> highLevelClient().watcher().ackWatch(
108+
new AckWatchRequest("nonexistent"), RequestOptions.DEFAULT));
109+
assertEquals(RestStatus.NOT_FOUND, exception.status());
110+
}
75111
}

client/rest-high-level/src/test/java/org/elasticsearch/client/WatcherRequestConvertersTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.http.client.methods.HttpDelete;
2323
import org.apache.http.client.methods.HttpPut;
24+
import org.elasticsearch.client.watcher.AckWatchRequest;
2425
import org.elasticsearch.common.bytes.BytesArray;
2526
import org.elasticsearch.common.xcontent.XContentType;
2627
import org.elasticsearch.protocol.xpack.watcher.DeleteWatchRequest;
@@ -30,6 +31,7 @@
3031
import java.io.ByteArrayOutputStream;
3132
import java.util.HashMap;
3233
import java.util.Map;
34+
import java.util.StringJoiner;
3335

3436
import static org.hamcrest.Matchers.is;
3537
import static org.hamcrest.Matchers.nullValue;
@@ -75,4 +77,24 @@ public void testDeleteWatch() {
7577
assertEquals("/_xpack/watcher/watch/" + watchId, request.getEndpoint());
7678
assertThat(request.getEntity(), nullValue());
7779
}
80+
81+
public void testAckWatch() {
82+
String watchId = randomAlphaOfLength(10);
83+
String[] actionIds = generateRandomStringArray(5, 10, false, true);
84+
85+
AckWatchRequest ackWatchRequest = new AckWatchRequest(watchId, actionIds);
86+
Request request = WatcherRequestConverters.ackWatch(ackWatchRequest);
87+
88+
assertEquals(HttpPut.METHOD_NAME, request.getMethod());
89+
90+
StringJoiner expectedEndpoint = new StringJoiner("/", "/", "")
91+
.add("_xpack").add("watcher").add("watch").add(watchId).add("_ack");
92+
if (ackWatchRequest.getActionIds().length > 0) {
93+
String actionsParam = String.join(",", ackWatchRequest.getActionIds());
94+
expectedEndpoint.add(actionsParam);
95+
}
96+
97+
assertEquals(expectedEndpoint.toString(), request.getEndpoint());
98+
assertThat(request.getEntity(), nullValue());
99+
}
78100
}

0 commit comments

Comments
 (0)