Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.elasticsearch.client.rollup.GetRollupCapsResponse;
import org.elasticsearch.client.rollup.PutRollupJobRequest;
import org.elasticsearch.client.rollup.PutRollupJobResponse;
import org.elasticsearch.client.rollup.StartRollupJobRequest;
import org.elasticsearch.client.rollup.StartRollupJobResponse;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -80,6 +82,40 @@ public void putRollupJobAsync(PutRollupJobRequest request, RequestOptions option
listener, Collections.emptySet());
}

/**
* Start a rollup job
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/rollup-start-job.html">
* the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public StartRollupJobResponse startRollupJob(StartRollupJobRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
RollupRequestConverters::startJob,
options,
StartRollupJobResponse::fromXContent,
Collections.emptySet());
}

/**
* Asynchronously start a rollup job
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/rollup-start-job.html">
* the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public void startRollupJobAsync(StartRollupJobRequest request, RequestOptions options,
ActionListener<StartRollupJobResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
RollupRequestConverters::startJob,
options,
StartRollupJobResponse::fromXContent,
listener, Collections.emptySet());
}

/**
* Delete a rollup job from the cluster
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/rollup-delete-job.html">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@

import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.rollup.DeleteRollupJobRequest;
import org.elasticsearch.client.rollup.GetRollupJobRequest;
import org.elasticsearch.client.rollup.GetRollupCapsRequest;
import org.elasticsearch.client.rollup.GetRollupJobRequest;
import org.elasticsearch.client.rollup.PutRollupJobRequest;
import org.elasticsearch.client.rollup.StartRollupJobRequest;

import java.io.IOException;

Expand All @@ -38,31 +40,35 @@ private RollupRequestConverters() {

static Request putJob(final PutRollupJobRequest putRollupJobRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_xpack")
.addPathPartAsIs("rollup")
.addPathPartAsIs("job")
.addPathPartAsIs("_xpack", "rollup", "job")
.addPathPart(putRollupJobRequest.getConfig().getId())
.build();
Request request = new Request(HttpPut.METHOD_NAME, endpoint);
request.setEntity(createEntity(putRollupJobRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request startJob(final StartRollupJobRequest startRollupJobRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_xpack", "rollup", "job")
.addPathPart(startRollupJobRequest.getJobId())
.addPathPartAsIs("_start")
.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
return request;
}

static Request getJob(final GetRollupJobRequest getRollupJobRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_xpack")
.addPathPartAsIs("rollup")
.addPathPartAsIs("job")
.addPathPartAsIs("_xpack", "rollup", "job")
.addPathPart(getRollupJobRequest.getJobId())
.build();
return new Request(HttpGet.METHOD_NAME, endpoint);
}

static Request deleteJob(final DeleteRollupJobRequest deleteRollupJobRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_xpack")
.addPathPartAsIs("rollup")
.addPathPartAsIs("job")
.addPathPartAsIs("_xpack", "rollup", "job")
.addPathPart(deleteRollupJobRequest.getId())
.build();
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
Expand All @@ -72,9 +78,7 @@ static Request deleteJob(final DeleteRollupJobRequest deleteRollupJobRequest) th

static Request getRollupCaps(final GetRollupCapsRequest getRollupCapsRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_xpack")
.addPathPartAsIs("rollup")
.addPathPartAsIs("data")
.addPathPartAsIs("_xpack", "rollup", "data")
.addPathPart(getRollupCapsRequest.getIndexPattern())
.build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@

package org.elasticsearch.client.rollup;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;
import java.util.function.Function;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;

public abstract class AcknowledgedResponse implements ToXContentObject {

protected static final String PARSE_FIELD_NAME = "acknowledged";
private final boolean acknowledged;

public AcknowledgedResponse(final boolean acknowledged) {
Expand All @@ -37,6 +44,12 @@ public boolean isAcknowledged() {
return acknowledged;
}

protected static <T> ConstructingObjectParser<T, Void> generateParser(String name, Function<Boolean, T> ctor, String parseField) {
ConstructingObjectParser<T, Void> p = new ConstructingObjectParser<>(name, true, args -> ctor.apply((boolean) args[0]));
p.declareBoolean(constructorArg(), new ParseField(parseField));
return p;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -58,10 +71,16 @@ public int hashCode() {
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
{
builder.field("acknowledged", isAcknowledged());
builder.field(getFieldName(), isAcknowledged());
}
builder.endObject();
return builder;
}

/**
* @return the field name this response uses to output the acknowledged flag
*/
protected String getFieldName() {
return PARSE_FIELD_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,21 @@

package org.elasticsearch.client.rollup;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;

public class DeleteRollupJobResponse extends AcknowledgedResponse {

public DeleteRollupJobResponse(boolean acknowledged) {
super(acknowledged);
}

private static final ConstructingObjectParser<DeleteRollupJobResponse, Void> PARSER = AcknowledgedResponse
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super bueno!

.generateParser("delete_rollup_job_response", DeleteRollupJobResponse::new, AcknowledgedResponse.PARSE_FIELD_NAME);

public static DeleteRollupJobResponse fromXContent(final XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

private static final ConstructingObjectParser<DeleteRollupJobResponse, Void> PARSER
= new ConstructingObjectParser<>("delete_rollup_job_response", true,
args -> new DeleteRollupJobResponse((boolean) args[0]));
static {
PARSER.declareBoolean(constructorArg(), new ParseField("acknowledged"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,21 @@
*/
package org.elasticsearch.client.rollup;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;

public class PutRollupJobResponse extends AcknowledgedResponse {


public PutRollupJobResponse(boolean acknowledged) {
super(acknowledged);
}

private static final ConstructingObjectParser<PutRollupJobResponse, Void> PARSER = AcknowledgedResponse
.generateParser("delete_rollup_job_response", PutRollupJobResponse::new, AcknowledgedResponse.PARSE_FIELD_NAME);

public static PutRollupJobResponse fromXContent(final XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

private static final ConstructingObjectParser<PutRollupJobResponse, Void> PARSER
= new ConstructingObjectParser<>("put_rollup_job_response", true, args -> new PutRollupJobResponse((boolean) args[0]));
static {
PARSER.declareBoolean(constructorArg(), new ParseField("acknowledged"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.rollup;

import org.elasticsearch.client.Validatable;

import java.util.Objects;

public class StartRollupJobRequest implements Validatable {

private final String jobId;

public StartRollupJobRequest(final String jobId) {
this.jobId = Objects.requireNonNull(jobId, "id parameter must not be null");
}

public String getJobId() {
return jobId;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final StartRollupJobRequest that = (StartRollupJobRequest) o;
return Objects.equals(jobId, that.jobId);
}

@Override
public int hashCode() {
return Objects.hash(jobId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client.rollup;

import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;

public class StartRollupJobResponse extends AcknowledgedResponse {

private static final String PARSE_FIELD_NAME = "started";

private static final ConstructingObjectParser<StartRollupJobResponse, Void> PARSER = AcknowledgedResponse
.generateParser("delete_rollup_job_response", StartRollupJobResponse::new, PARSE_FIELD_NAME);

public StartRollupJobResponse(boolean acknowledged) {
super(acknowledged);
}

public static StartRollupJobResponse fromXContent(final XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

@Override
protected String getFieldName() {
return PARSE_FIELD_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.elasticsearch.client.rollup.GetRollupJobResponse.JobWrapper;
import org.elasticsearch.client.rollup.PutRollupJobRequest;
import org.elasticsearch.client.rollup.PutRollupJobResponse;
import org.elasticsearch.client.rollup.StartRollupJobRequest;
import org.elasticsearch.client.rollup.StartRollupJobResponse;
import org.elasticsearch.client.rollup.RollableIndexCaps;
import org.elasticsearch.client.rollup.RollupJobCaps;
import org.elasticsearch.client.rollup.job.config.DateHistogramGroupConfig;
Expand Down Expand Up @@ -150,7 +152,7 @@ public void testDeleteRollupJob() throws Exception {
PutRollupJobRequest putRollupJobRequest =
new PutRollupJobRequest(new RollupJobConfig(id, indexPattern, rollupIndex, cron, pageSize, groups, metrics, timeout));
final RollupClient rollupClient = highLevelClient().rollup();
PutRollupJobResponse response = execute(putRollupJobRequest, rollupClient::putRollupJob, rollupClient::putRollupJobAsync);
execute(putRollupJobRequest, rollupClient::putRollupJob, rollupClient::putRollupJobAsync);
DeleteRollupJobRequest deleteRollupJobRequest = new DeleteRollupJobRequest(id);
DeleteRollupJobResponse deleteRollupJobResponse = highLevelClient().rollup()
.deleteRollupJob(deleteRollupJobRequest, RequestOptions.DEFAULT);
Expand All @@ -164,8 +166,7 @@ public void testDeleteMissingRollupJob() {
assertThat(responseException.status().getStatus(), is(404));
}

@SuppressWarnings("unchecked")
public void testPutAndGetRollupJob() throws Exception {
public void testPutStartAndGetRollupJob() throws Exception {
// TODO expand this to also test with histogram and terms?
final GroupConfig groups = new GroupConfig(new DateHistogramGroupConfig("date", DateHistogramInterval.DAY));
final List<MetricConfig> metrics = Collections.singletonList(new MetricConfig("value", SUPPORTED_METRICS));
Expand All @@ -178,9 +179,9 @@ public void testPutAndGetRollupJob() throws Exception {
PutRollupJobResponse response = execute(putRollupJobRequest, rollupClient::putRollupJob, rollupClient::putRollupJobAsync);
assertTrue(response.isAcknowledged());

// TODO Replace this with the Rollup Start Job API
Response startResponse = client().performRequest(new Request("POST", "/_xpack/rollup/job/" + id + "/_start"));
assertEquals(RestStatus.OK.getStatus(), startResponse.getHttpResponse().getStatusLine().getStatusCode());
StartRollupJobRequest startRequest = new StartRollupJobRequest(id);
StartRollupJobResponse startResponse = execute(startRequest, rollupClient::startRollupJob, rollupClient::startRollupJobAsync);
assertTrue(startResponse.isAcknowledged());

assertBusy(() -> {
SearchResponse searchResponse = highLevelClient().search(new SearchRequest(rollupIndex), RequestOptions.DEFAULT);
Expand Down
Loading