Skip to content

Commit 3a464bd

Browse files
author
Christoph Büscher
authored
Add start rollup job support to HL REST Client (#34623)
This change adds support for starting a rollup job to High Level REST Client. Relates to #29827
1 parent 3ef1fa5 commit 3a464bd

File tree

14 files changed

+388
-42
lines changed

14 files changed

+388
-42
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/RollupClient.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.elasticsearch.client.rollup.GetRollupCapsResponse;
2929
import org.elasticsearch.client.rollup.PutRollupJobRequest;
3030
import org.elasticsearch.client.rollup.PutRollupJobResponse;
31+
import org.elasticsearch.client.rollup.StartRollupJobRequest;
32+
import org.elasticsearch.client.rollup.StartRollupJobResponse;
3133

3234
import java.io.IOException;
3335
import java.util.Collections;
@@ -80,6 +82,40 @@ public void putRollupJobAsync(PutRollupJobRequest request, RequestOptions option
8082
listener, Collections.emptySet());
8183
}
8284

85+
/**
86+
* Start a rollup job
87+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/rollup-start-job.html">
88+
* the docs</a> for more.
89+
* @param request the request
90+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
91+
* @return the response
92+
* @throws IOException in case there is a problem sending the request or parsing back the response
93+
*/
94+
public StartRollupJobResponse startRollupJob(StartRollupJobRequest request, RequestOptions options) throws IOException {
95+
return restHighLevelClient.performRequestAndParseEntity(request,
96+
RollupRequestConverters::startJob,
97+
options,
98+
StartRollupJobResponse::fromXContent,
99+
Collections.emptySet());
100+
}
101+
102+
/**
103+
* Asynchronously start a rollup job
104+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/rollup-start-job.html">
105+
* the docs</a> for more.
106+
* @param request the request
107+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
108+
* @param listener the listener to be notified upon request completion
109+
*/
110+
public void startRollupJobAsync(StartRollupJobRequest request, RequestOptions options,
111+
ActionListener<StartRollupJobResponse> listener) {
112+
restHighLevelClient.performRequestAsyncAndParseEntity(request,
113+
RollupRequestConverters::startJob,
114+
options,
115+
StartRollupJobResponse::fromXContent,
116+
listener, Collections.emptySet());
117+
}
118+
83119
/**
84120
* Delete a rollup job from the cluster
85121
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/rollup-delete-job.html">

client/rest-high-level/src/main/java/org/elasticsearch/client/RollupRequestConverters.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020

2121
import org.apache.http.client.methods.HttpDelete;
2222
import org.apache.http.client.methods.HttpGet;
23+
import org.apache.http.client.methods.HttpPost;
2324
import org.apache.http.client.methods.HttpPut;
2425
import org.elasticsearch.client.rollup.DeleteRollupJobRequest;
25-
import org.elasticsearch.client.rollup.GetRollupJobRequest;
2626
import org.elasticsearch.client.rollup.GetRollupCapsRequest;
27+
import org.elasticsearch.client.rollup.GetRollupJobRequest;
2728
import org.elasticsearch.client.rollup.PutRollupJobRequest;
29+
import org.elasticsearch.client.rollup.StartRollupJobRequest;
2830

2931
import java.io.IOException;
3032

@@ -38,31 +40,35 @@ private RollupRequestConverters() {
3840

3941
static Request putJob(final PutRollupJobRequest putRollupJobRequest) throws IOException {
4042
String endpoint = new RequestConverters.EndpointBuilder()
41-
.addPathPartAsIs("_xpack")
42-
.addPathPartAsIs("rollup")
43-
.addPathPartAsIs("job")
43+
.addPathPartAsIs("_xpack", "rollup", "job")
4444
.addPathPart(putRollupJobRequest.getConfig().getId())
4545
.build();
4646
Request request = new Request(HttpPut.METHOD_NAME, endpoint);
4747
request.setEntity(createEntity(putRollupJobRequest, REQUEST_BODY_CONTENT_TYPE));
4848
return request;
4949
}
5050

51+
static Request startJob(final StartRollupJobRequest startRollupJobRequest) throws IOException {
52+
String endpoint = new RequestConverters.EndpointBuilder()
53+
.addPathPartAsIs("_xpack", "rollup", "job")
54+
.addPathPart(startRollupJobRequest.getJobId())
55+
.addPathPartAsIs("_start")
56+
.build();
57+
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
58+
return request;
59+
}
60+
5161
static Request getJob(final GetRollupJobRequest getRollupJobRequest) {
5262
String endpoint = new RequestConverters.EndpointBuilder()
53-
.addPathPartAsIs("_xpack")
54-
.addPathPartAsIs("rollup")
55-
.addPathPartAsIs("job")
63+
.addPathPartAsIs("_xpack", "rollup", "job")
5664
.addPathPart(getRollupJobRequest.getJobId())
5765
.build();
5866
return new Request(HttpGet.METHOD_NAME, endpoint);
5967
}
6068

6169
static Request deleteJob(final DeleteRollupJobRequest deleteRollupJobRequest) throws IOException {
6270
String endpoint = new RequestConverters.EndpointBuilder()
63-
.addPathPartAsIs("_xpack")
64-
.addPathPartAsIs("rollup")
65-
.addPathPartAsIs("job")
71+
.addPathPartAsIs("_xpack", "rollup", "job")
6672
.addPathPart(deleteRollupJobRequest.getId())
6773
.build();
6874
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
@@ -72,9 +78,7 @@ static Request deleteJob(final DeleteRollupJobRequest deleteRollupJobRequest) th
7278

7379
static Request getRollupCaps(final GetRollupCapsRequest getRollupCapsRequest) throws IOException {
7480
String endpoint = new RequestConverters.EndpointBuilder()
75-
.addPathPartAsIs("_xpack")
76-
.addPathPartAsIs("rollup")
77-
.addPathPartAsIs("data")
81+
.addPathPartAsIs("_xpack", "rollup", "data")
7882
.addPathPart(getRollupCapsRequest.getIndexPattern())
7983
.build();
8084
Request request = new Request(HttpGet.METHOD_NAME, endpoint);

client/rest-high-level/src/main/java/org/elasticsearch/client/rollup/AcknowledgedResponse.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,21 @@
1919

2020
package org.elasticsearch.client.rollup;
2121

22+
import org.elasticsearch.common.ParseField;
23+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
2224
import org.elasticsearch.common.xcontent.ToXContent;
2325
import org.elasticsearch.common.xcontent.ToXContentObject;
2426
import org.elasticsearch.common.xcontent.XContentBuilder;
2527

2628
import java.io.IOException;
2729
import java.util.Objects;
30+
import java.util.function.Function;
31+
32+
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
2833

2934
public abstract class AcknowledgedResponse implements ToXContentObject {
35+
36+
protected static final String PARSE_FIELD_NAME = "acknowledged";
3037
private final boolean acknowledged;
3138

3239
public AcknowledgedResponse(final boolean acknowledged) {
@@ -37,6 +44,12 @@ public boolean isAcknowledged() {
3744
return acknowledged;
3845
}
3946

47+
protected static <T> ConstructingObjectParser<T, Void> generateParser(String name, Function<Boolean, T> ctor, String parseField) {
48+
ConstructingObjectParser<T, Void> p = new ConstructingObjectParser<>(name, true, args -> ctor.apply((boolean) args[0]));
49+
p.declareBoolean(constructorArg(), new ParseField(parseField));
50+
return p;
51+
}
52+
4053
@Override
4154
public boolean equals(Object o) {
4255
if (this == o) {
@@ -58,10 +71,16 @@ public int hashCode() {
5871
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
5972
builder.startObject();
6073
{
61-
builder.field("acknowledged", isAcknowledged());
74+
builder.field(getFieldName(), isAcknowledged());
6275
}
6376
builder.endObject();
6477
return builder;
6578
}
6679

80+
/**
81+
* @return the field name this response uses to output the acknowledged flag
82+
*/
83+
protected String getFieldName() {
84+
return PARSE_FIELD_NAME;
85+
}
6786
}

client/rest-high-level/src/main/java/org/elasticsearch/client/rollup/DeleteRollupJobResponse.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,21 @@
1919

2020
package org.elasticsearch.client.rollup;
2121

22-
import org.elasticsearch.common.ParseField;
2322
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
2423
import org.elasticsearch.common.xcontent.XContentParser;
2524

2625
import java.io.IOException;
2726

28-
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
29-
3027
public class DeleteRollupJobResponse extends AcknowledgedResponse {
3128

3229
public DeleteRollupJobResponse(boolean acknowledged) {
3330
super(acknowledged);
3431
}
3532

33+
private static final ConstructingObjectParser<DeleteRollupJobResponse, Void> PARSER = AcknowledgedResponse
34+
.generateParser("delete_rollup_job_response", DeleteRollupJobResponse::new, AcknowledgedResponse.PARSE_FIELD_NAME);
35+
3636
public static DeleteRollupJobResponse fromXContent(final XContentParser parser) throws IOException {
3737
return PARSER.parse(parser, null);
3838
}
39-
40-
private static final ConstructingObjectParser<DeleteRollupJobResponse, Void> PARSER
41-
= new ConstructingObjectParser<>("delete_rollup_job_response", true,
42-
args -> new DeleteRollupJobResponse((boolean) args[0]));
43-
static {
44-
PARSER.declareBoolean(constructorArg(), new ParseField("acknowledged"));
45-
}
4639
}

client/rest-high-level/src/main/java/org/elasticsearch/client/rollup/PutRollupJobResponse.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,21 @@
1818
*/
1919
package org.elasticsearch.client.rollup;
2020

21-
import org.elasticsearch.common.ParseField;
2221
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
2322
import org.elasticsearch.common.xcontent.XContentParser;
2423

2524
import java.io.IOException;
2625

27-
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
28-
2926
public class PutRollupJobResponse extends AcknowledgedResponse {
3027

31-
3228
public PutRollupJobResponse(boolean acknowledged) {
3329
super(acknowledged);
3430
}
3531

32+
private static final ConstructingObjectParser<PutRollupJobResponse, Void> PARSER = AcknowledgedResponse
33+
.generateParser("delete_rollup_job_response", PutRollupJobResponse::new, AcknowledgedResponse.PARSE_FIELD_NAME);
34+
3635
public static PutRollupJobResponse fromXContent(final XContentParser parser) throws IOException {
3736
return PARSER.parse(parser, null);
3837
}
39-
40-
private static final ConstructingObjectParser<PutRollupJobResponse, Void> PARSER
41-
= new ConstructingObjectParser<>("put_rollup_job_response", true, args -> new PutRollupJobResponse((boolean) args[0]));
42-
static {
43-
PARSER.declareBoolean(constructorArg(), new ParseField("acknowledged"));
44-
}
4538
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.client.rollup;
20+
21+
import org.elasticsearch.client.Validatable;
22+
23+
import java.util.Objects;
24+
25+
public class StartRollupJobRequest implements Validatable {
26+
27+
private final String jobId;
28+
29+
public StartRollupJobRequest(final String jobId) {
30+
this.jobId = Objects.requireNonNull(jobId, "id parameter must not be null");
31+
}
32+
33+
public String getJobId() {
34+
return jobId;
35+
}
36+
37+
@Override
38+
public boolean equals(Object o) {
39+
if (this == o) return true;
40+
if (o == null || getClass() != o.getClass()) return false;
41+
final StartRollupJobRequest that = (StartRollupJobRequest) o;
42+
return Objects.equals(jobId, that.jobId);
43+
}
44+
45+
@Override
46+
public int hashCode() {
47+
return Objects.hash(jobId);
48+
}
49+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.client.rollup;
21+
22+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
23+
import org.elasticsearch.common.xcontent.XContentParser;
24+
25+
import java.io.IOException;
26+
27+
public class StartRollupJobResponse extends AcknowledgedResponse {
28+
29+
private static final String PARSE_FIELD_NAME = "started";
30+
31+
private static final ConstructingObjectParser<StartRollupJobResponse, Void> PARSER = AcknowledgedResponse
32+
.generateParser("delete_rollup_job_response", StartRollupJobResponse::new, PARSE_FIELD_NAME);
33+
34+
public StartRollupJobResponse(boolean acknowledged) {
35+
super(acknowledged);
36+
}
37+
38+
public static StartRollupJobResponse fromXContent(final XContentParser parser) throws IOException {
39+
return PARSER.parse(parser, null);
40+
}
41+
42+
@Override
43+
protected String getFieldName() {
44+
return PARSE_FIELD_NAME;
45+
}
46+
}

client/rest-high-level/src/test/java/org/elasticsearch/client/RollupIT.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import org.elasticsearch.client.rollup.GetRollupJobResponse.JobWrapper;
4040
import org.elasticsearch.client.rollup.PutRollupJobRequest;
4141
import org.elasticsearch.client.rollup.PutRollupJobResponse;
42+
import org.elasticsearch.client.rollup.StartRollupJobRequest;
43+
import org.elasticsearch.client.rollup.StartRollupJobResponse;
4244
import org.elasticsearch.client.rollup.RollableIndexCaps;
4345
import org.elasticsearch.client.rollup.RollupJobCaps;
4446
import org.elasticsearch.client.rollup.job.config.DateHistogramGroupConfig;
@@ -150,7 +152,7 @@ public void testDeleteRollupJob() throws Exception {
150152
PutRollupJobRequest putRollupJobRequest =
151153
new PutRollupJobRequest(new RollupJobConfig(id, indexPattern, rollupIndex, cron, pageSize, groups, metrics, timeout));
152154
final RollupClient rollupClient = highLevelClient().rollup();
153-
PutRollupJobResponse response = execute(putRollupJobRequest, rollupClient::putRollupJob, rollupClient::putRollupJobAsync);
155+
execute(putRollupJobRequest, rollupClient::putRollupJob, rollupClient::putRollupJobAsync);
154156
DeleteRollupJobRequest deleteRollupJobRequest = new DeleteRollupJobRequest(id);
155157
DeleteRollupJobResponse deleteRollupJobResponse = highLevelClient().rollup()
156158
.deleteRollupJob(deleteRollupJobRequest, RequestOptions.DEFAULT);
@@ -164,8 +166,7 @@ public void testDeleteMissingRollupJob() {
164166
assertThat(responseException.status().getStatus(), is(404));
165167
}
166168

167-
@SuppressWarnings("unchecked")
168-
public void testPutAndGetRollupJob() throws Exception {
169+
public void testPutStartAndGetRollupJob() throws Exception {
169170
// TODO expand this to also test with histogram and terms?
170171
final GroupConfig groups = new GroupConfig(new DateHistogramGroupConfig("date", DateHistogramInterval.DAY));
171172
final List<MetricConfig> metrics = Collections.singletonList(new MetricConfig("value", SUPPORTED_METRICS));
@@ -178,9 +179,9 @@ public void testPutAndGetRollupJob() throws Exception {
178179
PutRollupJobResponse response = execute(putRollupJobRequest, rollupClient::putRollupJob, rollupClient::putRollupJobAsync);
179180
assertTrue(response.isAcknowledged());
180181

181-
// TODO Replace this with the Rollup Start Job API
182-
Response startResponse = client().performRequest(new Request("POST", "/_xpack/rollup/job/" + id + "/_start"));
183-
assertEquals(RestStatus.OK.getStatus(), startResponse.getHttpResponse().getStatusLine().getStatusCode());
182+
StartRollupJobRequest startRequest = new StartRollupJobRequest(id);
183+
StartRollupJobResponse startResponse = execute(startRequest, rollupClient::startRollupJob, rollupClient::startRollupJobAsync);
184+
assertTrue(startResponse.isAcknowledged());
184185

185186
assertBusy(() -> {
186187
SearchResponse searchResponse = highLevelClient().search(new SearchRequest(rollupIndex), RequestOptions.DEFAULT);

0 commit comments

Comments
 (0)