Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/reference/rollup/apis/delete-job.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

experimental[]

This API deletes an existing rollup job. The job can be started or stopped, in both cases it will be deleted. Attempting
to delete a non-existing job will throw an exception
This API deletes an existing rollup job. A job must be *stopped* first before it can be deleted. Attempting to delete
a started job will result in an error. Similarly, attempting to delete a nonexistent job will throw an exception.

.Deleting the job does not delete rolled up data
**********************************
Expand Down Expand Up @@ -99,12 +99,12 @@ A 404 `resource_not_found` exception will be thrown:
"root_cause" : [
{
"type" : "resource_not_found_exception",
"reason" : "the task with id does_not_exist doesn't exist",
"reason" : "the task with id [does_not_exist] doesn't exist",
"stack_trace": ...
}
],
"type" : "resource_not_found_exception",
"reason" : "the task with id does_not_exist doesn't exist",
"reason" : "the task with id [does_not_exist] doesn't exist",
"stack_trace": ...
},
"status": 404
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@
*/
public class ListTasksResponse extends BaseTasksResponse implements ToXContentObject {
private static final String TASKS = "tasks";
private static final String TASK_FAILURES = "task_failures";
private static final String NODE_FAILURES = "node_failures";

private List<TaskInfo> tasks;

Expand Down Expand Up @@ -246,28 +244,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

private void toXContentCommon(XContentBuilder builder, Params params) throws IOException {
if (getTaskFailures() != null && getTaskFailures().size() > 0) {
builder.startArray(TASK_FAILURES);
for (TaskOperationFailure ex : getTaskFailures()){
builder.startObject();
builder.value(ex);
builder.endObject();
}
builder.endArray();
}

if (getNodeFailures() != null && getNodeFailures().size() > 0) {
builder.startArray(NODE_FAILURES);
for (ElasticsearchException ex : getNodeFailures()) {
builder.startObject();
ex.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
}
}

public static ListTasksResponse fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;
Expand All @@ -41,6 +44,9 @@
* Base class for responses of task-related operations
*/
public class BaseTasksResponse extends ActionResponse {
protected static final String TASK_FAILURES = "task_failures";
protected static final String NODE_FAILURES = "node_failures";

private List<TaskOperationFailure> taskFailures;
private List<ElasticsearchException> nodeFailures;

Expand Down Expand Up @@ -103,4 +109,44 @@ public void writeTo(StreamOutput out) throws IOException {
exp.writeTo(out);
}
}

protected void toXContentCommon(XContentBuilder builder, ToXContent.Params params) throws IOException {
if (getTaskFailures() != null && getTaskFailures().size() > 0) {
builder.startArray(TASK_FAILURES);
for (TaskOperationFailure ex : getTaskFailures()){
builder.startObject();
builder.value(ex);
builder.endObject();
}
builder.endArray();
}

if (getNodeFailures() != null && getNodeFailures().size() > 0) {
builder.startArray(NODE_FAILURES);
for (ElasticsearchException ex : getNodeFailures()) {
builder.startObject();
ex.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BaseTasksResponse response = (BaseTasksResponse) o;
return taskFailures.equals(response.taskFailures)
&& nodeFailures.equals(response.nodeFailures);
}

@Override
public int hashCode() {
return Objects.hash(taskFailures, nodeFailures);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -449,7 +450,7 @@ private void wipeClusterSettings() throws IOException {
}
}

private void wipeRollupJobs() throws IOException {
private void wipeRollupJobs() throws IOException, InterruptedException {
Response response = adminClient().performRequest(new Request("GET", "/_xpack/rollup/job/_all"));
Map<String, Object> jobs = entityAsMap(response);
@SuppressWarnings("unchecked")
Expand All @@ -460,6 +461,29 @@ private void wipeRollupJobs() throws IOException {
return;
}

for (Map<String, Object> jobConfig : jobConfigs) {
@SuppressWarnings("unchecked")
String jobId = (String) ((Map<String, Object>) jobConfig.get("config")).get("id");
Request request = new Request("POST", "/_xpack/rollup/job/" + jobId + "/_stop");
request.addParameter("ignore", "404");
logger.debug("stopping rollup job [{}]", jobId);
adminClient().performRequest(request);
}

// TODO this is temporary until StopJob API gains the ability to block until stopped
awaitBusy(() -> {
Request request = new Request("GET", "/_xpack/rollup/job/_all");
try {
Response jobsResponse = adminClient().performRequest(request);
String body = EntityUtils.toString(jobsResponse.getEntity());
logger.error(body);
// If the body contains any of the non-stopped states, at least one job is not finished yet
return Arrays.stream(new String[]{"started", "aborting", "stopping", "indexing"}).noneMatch(body::contains);
} catch (IOException e) {
return false;
}
}, 10, TimeUnit.SECONDS);

for (Map<String, Object> jobConfig : jobConfigs) {
@SuppressWarnings("unchecked")
String jobId = (String) ((Map<String, Object>) jobConfig.get("config")).get("id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,29 @@


import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.rollup.RollupField;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class DeleteRollupJobAction extends Action<AcknowledgedResponse> {
public class DeleteRollupJobAction extends Action<DeleteRollupJobAction.Response> {

public static final DeleteRollupJobAction INSTANCE = new DeleteRollupJobAction();
public static final String NAME = "cluster:admin/xpack/rollup/delete";
Expand All @@ -32,11 +39,11 @@ private DeleteRollupJobAction() {
}

@Override
public AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
public Response newResponse() {
return new Response();
}

public static class Request extends AcknowledgedRequest<Request> implements ToXContent {
public static class Request extends BaseTasksRequest<Request> implements ToXContentFragment {
private String id;

public Request(String id) {
Expand All @@ -45,6 +52,11 @@ public Request(String id) {

public Request() {}

@Override
public boolean match(Task task) {
return task.getDescription().equals(RollupField.NAME + "_" + id);
}

public String getId() {
return id;
}
Expand Down Expand Up @@ -90,10 +102,74 @@ public boolean equals(Object obj) {
}
}

public static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, AcknowledgedResponse, RequestBuilder> {

public static class RequestBuilder extends ActionRequestBuilder<DeleteRollupJobAction.Request, DeleteRollupJobAction.Response> {
protected RequestBuilder(ElasticsearchClient client, DeleteRollupJobAction action) {
super(client, action, new Request());
super(client, action, new DeleteRollupJobAction.Request());
}
}

public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {

private boolean acknowledged;

public Response(StreamInput in) throws IOException {
super(Collections.emptyList(), Collections.emptyList());
readFrom(in);
}

public Response(boolean acknowledged, List<TaskOperationFailure> taskFailures, List<FailedNodeException> nodeFailures) {
super(taskFailures, nodeFailures);
this.acknowledged = acknowledged;
}

public Response(boolean acknowledged) {
super(Collections.emptyList(), Collections.emptyList());
this.acknowledged = acknowledged;
}

public Response() {
super(Collections.emptyList(), Collections.emptyList());
this.acknowledged = false;
}

public boolean isDeleted() {
return acknowledged;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acknowledged = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
toXContentCommon(builder, params);
builder.field("acknowledged", acknowledged);
}
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DeleteRollupJobAction.Response response = (DeleteRollupJobAction.Response) o;
return super.equals(o) && acknowledged == response.acknowledged;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), acknowledged);
}
}
}
Loading