Skip to content

Commit 541af93

Browse files
committed
HLRC: ML start data feed API (#33898)
* HLRC: ML start data feed API
1 parent 251b0ba commit 541af93

File tree

11 files changed

+647
-0
lines changed

11 files changed

+647
-0
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.elasticsearch.client.ml.PutCalendarRequest;
4949
import org.elasticsearch.client.ml.PutDatafeedRequest;
5050
import org.elasticsearch.client.ml.PutJobRequest;
51+
import org.elasticsearch.client.ml.StartDatafeedRequest;
5152
import org.elasticsearch.client.ml.UpdateJobRequest;
5253
import org.elasticsearch.common.Strings;
5354
import org.elasticsearch.common.bytes.BytesReference;
@@ -231,6 +232,19 @@ static Request deleteDatafeed(DeleteDatafeedRequest deleteDatafeedRequest) {
231232
return request;
232233
}
233234

235+
static Request startDatafeed(StartDatafeedRequest startDatafeedRequest) throws IOException {
236+
String endpoint = new EndpointBuilder()
237+
.addPathPartAsIs("_xpack")
238+
.addPathPartAsIs("ml")
239+
.addPathPartAsIs("datafeeds")
240+
.addPathPart(startDatafeedRequest.getDatafeedId())
241+
.addPathPartAsIs("_start")
242+
.build();
243+
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
244+
request.setEntity(createEntity(startDatafeedRequest, REQUEST_BODY_CONTENT_TYPE));
245+
return request;
246+
}
247+
234248
static Request deleteForecast(DeleteForecastRequest deleteForecastRequest) {
235249
String endpoint = new EndpointBuilder()
236250
.addPathPartAsIs("_xpack")

client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@
5858
import org.elasticsearch.client.ml.PutDatafeedResponse;
5959
import org.elasticsearch.client.ml.PutJobRequest;
6060
import org.elasticsearch.client.ml.PutJobResponse;
61+
import org.elasticsearch.client.ml.StartDatafeedRequest;
62+
import org.elasticsearch.client.ml.StartDatafeedResponse;
6163
import org.elasticsearch.client.ml.UpdateJobRequest;
6264
import org.elasticsearch.client.ml.job.stats.JobStats;
6365

@@ -565,6 +567,46 @@ public void deleteDatafeedAsync(DeleteDatafeedRequest request, RequestOptions op
565567
Collections.emptySet());
566568
}
567569

570+
/**
571+
* Starts the given Machine Learning Datafeed
572+
* <p>
573+
* For additional info
574+
* see <a href="http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-start-datafeed.html">
575+
* ML Start Datafeed documentation</a>
576+
*
577+
* @param request The request to start the datafeed
578+
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
579+
* @return action acknowledgement
580+
* @throws IOException when there is a serialization issue sending the request or receiving the response
581+
*/
582+
public StartDatafeedResponse startDatafeed(StartDatafeedRequest request, RequestOptions options) throws IOException {
583+
return restHighLevelClient.performRequestAndParseEntity(request,
584+
MLRequestConverters::startDatafeed,
585+
options,
586+
StartDatafeedResponse::fromXContent,
587+
Collections.emptySet());
588+
}
589+
590+
/**
591+
* Starts the given Machine Learning Datafeed asynchronously and notifies the listener on completion
592+
* <p>
593+
* For additional info
594+
* see <a href="http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-start-datafeed.html">
595+
* ML Start Datafeed documentation</a>
596+
*
597+
* @param request The request to start the datafeed
598+
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
599+
* @param listener Listener to be notified upon request completion
600+
*/
601+
public void startDatafeedAsync(StartDatafeedRequest request, RequestOptions options, ActionListener<StartDatafeedResponse> listener) {
602+
restHighLevelClient.performRequestAsyncAndParseEntity(request,
603+
MLRequestConverters::startDatafeed,
604+
options,
605+
StartDatafeedResponse::fromXContent,
606+
listener,
607+
Collections.emptySet());
608+
}
609+
568610
/**
569611
* Updates a Machine Learning {@link org.elasticsearch.client.ml.job.config.Job}
570612
* <p>
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.client.ml;
20+
21+
import org.elasticsearch.action.ActionRequest;
22+
import org.elasticsearch.action.ActionRequestValidationException;
23+
import org.elasticsearch.client.ml.datafeed.DatafeedConfig;
24+
import org.elasticsearch.common.ParseField;
25+
import org.elasticsearch.common.unit.TimeValue;
26+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
27+
import org.elasticsearch.common.xcontent.ToXContentObject;
28+
import org.elasticsearch.common.xcontent.XContentBuilder;
29+
30+
import java.io.IOException;
31+
import java.util.Objects;
32+
33+
/**
34+
* Request to start a Datafeed
35+
*/
36+
public class StartDatafeedRequest extends ActionRequest implements ToXContentObject {
37+
38+
public static final ParseField START = new ParseField("start");
39+
public static final ParseField END = new ParseField("end");
40+
public static final ParseField TIMEOUT = new ParseField("timeout");
41+
42+
public static ConstructingObjectParser<StartDatafeedRequest, Void> PARSER =
43+
new ConstructingObjectParser<>("start_datafeed_request", a -> new StartDatafeedRequest((String)a[0]));
44+
45+
static {
46+
PARSER.declareString(ConstructingObjectParser.constructorArg(), DatafeedConfig.ID);
47+
PARSER.declareString(StartDatafeedRequest::setStart, START);
48+
PARSER.declareString(StartDatafeedRequest::setEnd, END);
49+
PARSER.declareString((params, val) ->
50+
params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
51+
}
52+
53+
private final String datafeedId;
54+
private String start;
55+
private String end;
56+
private TimeValue timeout;
57+
58+
/**
59+
* Create a new StartDatafeedRequest for the given DatafeedId
60+
*
61+
* @param datafeedId non-null existing Datafeed ID
62+
*/
63+
public StartDatafeedRequest(String datafeedId) {
64+
this.datafeedId = Objects.requireNonNull(datafeedId, "[datafeed_id] must not be null");
65+
}
66+
67+
public String getDatafeedId() {
68+
return datafeedId;
69+
}
70+
71+
public String getStart() {
72+
return start;
73+
}
74+
75+
/**
76+
* The time that the datafeed should begin. This value is inclusive.
77+
*
78+
* If you specify a start value that is earlier than the timestamp of the latest processed record,
79+
* the datafeed continues from 1 millisecond after the timestamp of the latest processed record.
80+
*
81+
* If you do not specify a start time and the datafeed is associated with a new job,
82+
* the analysis starts from the earliest time for which data is available.
83+
*
84+
* @param start String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO 8601 string
85+
*/
86+
public void setStart(String start) {
87+
this.start = start;
88+
}
89+
90+
public String getEnd() {
91+
return end;
92+
}
93+
94+
/**
95+
* The time that the datafeed should end. This value is exclusive.
96+
* If you do not specify an end time, the datafeed runs continuously.
97+
*
98+
* @param end String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO 8601 string
99+
*/
100+
public void setEnd(String end) {
101+
this.end = end;
102+
}
103+
104+
public TimeValue getTimeout() {
105+
return timeout;
106+
}
107+
108+
/**
109+
* Indicates how long to wait for the cluster to respond to the request.
110+
*
111+
* @param timeout TimeValue for how long to wait for a response from the cluster
112+
*/
113+
public void setTimeout(TimeValue timeout) {
114+
this.timeout = timeout;
115+
}
116+
117+
@Override
118+
public ActionRequestValidationException validate() {
119+
return null;
120+
}
121+
122+
@Override
123+
public int hashCode() {
124+
return Objects.hash(datafeedId, start, end, timeout);
125+
}
126+
127+
@Override
128+
public boolean equals(Object obj) {
129+
if (this == obj) {
130+
return true;
131+
}
132+
133+
if (obj == null || obj.getClass() != getClass()) {
134+
return false;
135+
}
136+
137+
StartDatafeedRequest other = (StartDatafeedRequest) obj;
138+
return Objects.equals(datafeedId, other.datafeedId) &&
139+
Objects.equals(start, other.start) &&
140+
Objects.equals(end, other.end) &&
141+
Objects.equals(timeout, other.timeout);
142+
}
143+
144+
@Override
145+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
146+
builder.startObject();
147+
builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId);
148+
if (start != null) {
149+
builder.field(START.getPreferredName(), start);
150+
}
151+
if (end != null) {
152+
builder.field(END.getPreferredName(), end);
153+
}
154+
if (timeout != null) {
155+
builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
156+
}
157+
builder.endObject();
158+
return builder;
159+
}
160+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.client.ml;
20+
21+
import org.elasticsearch.action.ActionResponse;
22+
import org.elasticsearch.common.ParseField;
23+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
24+
import org.elasticsearch.common.xcontent.ToXContentObject;
25+
import org.elasticsearch.common.xcontent.XContentBuilder;
26+
import org.elasticsearch.common.xcontent.XContentParser;
27+
28+
import java.io.IOException;
29+
import java.util.Objects;
30+
31+
/**
32+
* Response indicating if the Machine Learning Datafeed is now started or not
33+
*/
34+
public class StartDatafeedResponse extends ActionResponse implements ToXContentObject {
35+
36+
private static final ParseField STARTED = new ParseField("started");
37+
38+
public static final ConstructingObjectParser<StartDatafeedResponse, Void> PARSER =
39+
new ConstructingObjectParser<>(
40+
"start_datafeed_response",
41+
true,
42+
(a) -> new StartDatafeedResponse((Boolean)a[0]));
43+
44+
static {
45+
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), STARTED);
46+
}
47+
48+
private final boolean started;
49+
50+
public StartDatafeedResponse(boolean started) {
51+
this.started = started;
52+
}
53+
54+
public static StartDatafeedResponse fromXContent(XContentParser parser) throws IOException {
55+
return PARSER.parse(parser, null);
56+
}
57+
58+
/**
59+
* Has the Datafeed started or not
60+
*
61+
* @return boolean value indicating the Datafeed started status
62+
*/
63+
public boolean isStarted() {
64+
return started;
65+
}
66+
67+
@Override
68+
public boolean equals(Object other) {
69+
if (this == other) {
70+
return true;
71+
}
72+
73+
if (other == null || getClass() != other.getClass()) {
74+
return false;
75+
}
76+
77+
StartDatafeedResponse that = (StartDatafeedResponse) other;
78+
return isStarted() == that.isStarted();
79+
}
80+
81+
@Override
82+
public int hashCode() {
83+
return Objects.hash(isStarted());
84+
}
85+
86+
@Override
87+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
88+
builder.startObject();
89+
builder.field(STARTED.getPreferredName(), started);
90+
builder.endObject();
91+
return builder;
92+
}
93+
}

client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
import org.elasticsearch.client.ml.PutCalendarRequest;
4545
import org.elasticsearch.client.ml.PutDatafeedRequest;
4646
import org.elasticsearch.client.ml.PutJobRequest;
47+
import org.elasticsearch.client.ml.StartDatafeedRequest;
48+
import org.elasticsearch.client.ml.StartDatafeedRequestTests;
4749
import org.elasticsearch.client.ml.UpdateJobRequest;
4850
import org.elasticsearch.client.ml.calendars.Calendar;
4951
import org.elasticsearch.client.ml.calendars.CalendarTests;
@@ -261,6 +263,19 @@ public void testDeleteDatafeed() {
261263
assertEquals(Boolean.toString(true), request.getParameters().get("force"));
262264
}
263265

266+
public void testStartDatafeed() throws Exception {
267+
String datafeedId = DatafeedConfigTests.randomValidDatafeedId();
268+
StartDatafeedRequest datafeedRequest = StartDatafeedRequestTests.createRandomInstance(datafeedId);
269+
270+
Request request = MLRequestConverters.startDatafeed(datafeedRequest);
271+
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
272+
assertEquals("/_xpack/ml/datafeeds/" + datafeedId + "/_start", request.getEndpoint());
273+
try (XContentParser parser = createParser(JsonXContent.jsonXContent, request.getEntity().getContent())) {
274+
StartDatafeedRequest parsedDatafeedRequest = StartDatafeedRequest.PARSER.apply(parser, null);
275+
assertThat(parsedDatafeedRequest, equalTo(datafeedRequest));
276+
}
277+
}
278+
264279
public void testDeleteForecast() {
265280
String jobId = randomAlphaOfLength(10);
266281
DeleteForecastRequest deleteForecastRequest = new DeleteForecastRequest(jobId);

0 commit comments

Comments
 (0)