From 807ce2a83b8095819f28dbdba913582019eb42c3 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 31 May 2018 13:33:39 +0200 Subject: [PATCH 1/6] Add PersistentTaskState and rename methods in framework --- .../persistent/AllocatedPersistentTask.java | 8 +- .../NodePersistentTasksExecutor.java | 16 ++-- .../persistent/PersistentTaskState.java | 29 +++++++ .../PersistentTasksClusterService.java | 31 ++++--- .../PersistentTasksCustomMetaData.java | 87 +++++++++---------- .../persistent/PersistentTasksExecutor.java | 2 +- .../PersistentTasksNodeService.java | 10 +-- .../persistent/PersistentTasksService.java | 14 +-- .../UpdatePersistentTaskStatusAction.java | 31 +++---- 9 files changed, 123 insertions(+), 105 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/persistent/PersistentTaskState.java diff --git a/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java b/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java index d4d299b7e4af1..54dcffab6e366 100644 --- a/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java +++ b/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.tasks.CancellableTask; -import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; @@ -77,8 +76,9 @@ public Status getStatus() { *

* This doesn't affect the status of this allocated task. */ - public void updatePersistentStatus(Task.Status status, ActionListener> listener) { - persistentTasksService.updateStatus(persistentTaskId, allocationId, status, listener); + public void updatePersistentTaskState(final PersistentTaskState state, + final ActionListener> listener) { + persistentTasksService.sendUpdateStateRequest(persistentTaskId, allocationId, state, listener); } public String getPersistentTaskId() { @@ -116,7 +116,7 @@ public void waitForPersistentTask(final Predicate void executeTask(Params params, - @Nullable Task.Status status, - AllocatedPersistentTask task, - PersistentTasksExecutor executor) { + public void executeTask(final Params params, + final @Nullable PersistentTaskState state, + final AllocatedPersistentTask task, + final PersistentTasksExecutor executor) { threadPool.executor(executor.getExecutor()).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { @@ -49,14 +49,12 @@ public void onFailure(Exception e) { @Override protected void doRun() throws Exception { try { - executor.nodeOperation(task, params, status); + executor.nodeOperation(task, params, state); } catch (Exception ex) { task.markAsFailed(ex); } } }); - } - } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTaskState.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTaskState.java new file mode 100644 index 0000000000000..57c913f51bb88 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTaskState.java @@ -0,0 +1,29 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.persistent; + +import org.elasticsearch.common.io.stream.NamedWriteable; +import org.elasticsearch.common.xcontent.ToXContentObject; + +/** + * {@link PersistentTaskState} represents the state of the persistent tasks, as it + * is persisted in the cluster state. + */ +public interface PersistentTaskState extends ToXContentObject, NamedWriteable { +} diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index 1464279a814d5..c030d3658b373 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -178,27 +178,30 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } /** - * Update task status + * Update the state of a persistent task * - * @param id the id of a persistent task - * @param allocationId the expected allocation id of the persistent task - * @param status new status - * @param listener the listener that will be called when task is removed + * @param taskId the id of a persistent task + * @param taskAllocationId the expected allocation id of the persistent task + * @param taskState new state + * @param listener the listener that will be called when task is removed */ - public void updatePersistentTaskStatus(String id, long allocationId, Task.Status status, ActionListener> listener) { - clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() { + public void updatePersistentTaskState(final String taskId, + final long taskAllocationId, + final PersistentTaskState taskState, + final ActionListener> listener) { + clusterService.submitStateUpdateTask("update task state", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState); - if (tasksInProgress.hasTask(id, allocationId)) { - return update(currentState, tasksInProgress.updateTaskStatus(id, status)); + if (tasksInProgress.hasTask(taskId, taskAllocationId)) { + return update(currentState, tasksInProgress.updateTaskState(taskId, taskState)); } else { - if (tasksInProgress.hasTask(id)) { - logger.warn("trying to update status on task {} with unexpected allocation id {}", id, allocationId); + if (tasksInProgress.hasTask(taskId)) { + logger.warn("trying to update state on task {} with unexpected allocation id {}", taskId, taskAllocationId); } else { - logger.warn("trying to update status on non-existing task {}", id); + logger.warn("trying to update state on non-existing task {}", taskId); } - throw new ResourceNotFoundException("the task with id {} and allocation id {} doesn't exist", id, allocationId); + throw new ResourceNotFoundException("the task with id {} and allocation id {} doesn't exist", taskId, taskAllocationId); } } @@ -209,7 +212,7 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(newState, id)); + listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(newState, taskId)); } }); } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java index 09346704a801d..df7c808a9758c 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java @@ -38,7 +38,6 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task.Status; import java.io.IOException; @@ -61,13 +60,12 @@ * A cluster state record that contains a list of all running persistent tasks */ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable implements MetaData.Custom { - public static final String TYPE = "persistent_tasks"; + public static final String TYPE = "persistent_tasks"; private static final String API_CONTEXT = MetaData.XContentContext.API.toString(); // TODO: Implement custom Diff for tasks private final Map> tasks; - private final long lastAllocationId; public PersistentTasksCustomMetaData(long lastAllocationId, Map> tasks) { @@ -94,8 +92,8 @@ public PersistentTasksCustomMetaData(long lastAllocationId, Map, String> parser = new ObjectParser<>("named"); parser.declareObject(TaskDescriptionBuilder::setParams, (p, c) -> p.namedObject(PersistentTaskParams.class, c, null), new ParseField("params")); - parser.declareObject(TaskDescriptionBuilder::setStatus, - (p, c) -> p.namedObject(Status.class, c, null), new ParseField("status")); + parser.declareObject(TaskDescriptionBuilder::setState, + (p, c) -> p.namedObject(PersistentTaskState.class, c, null), new ParseField("status")); TASK_DESCRIPTION_PARSER = (XContentParser p, Void c, String name) -> parser.parse(p, new TaskDescriptionBuilder<>(name), name); // Assignment parser @@ -115,7 +113,7 @@ public PersistentTasksCustomMetaData(long lastAllocationId, Map builder = objects.get(0); taskBuilder.setTaskName(builder.taskName); taskBuilder.setParams(builder.params); - taskBuilder.setStatus(builder.status); + taskBuilder.setState(builder.state); }, TASK_DESCRIPTION_PARSER, new ParseField("task")); PERSISTENT_TASK_PARSER.declareObject(TaskBuilder::setAssignment, ASSIGNMENT_PARSER, new ParseField("assignment")); PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationIdOnLastStatusUpdate, @@ -123,12 +121,13 @@ public PersistentTasksCustomMetaData(long lastAllocationId, Map { + private final String taskName; private Params params; - private Status status; + private PersistentTaskState state; private TaskDescriptionBuilder(String taskName) { this.taskName = taskName; @@ -139,8 +138,8 @@ private TaskDescriptionBuilder setParams(Params params) { return this; } - private TaskDescriptionBuilder setStatus(Status status) { - this.status = status; + private TaskDescriptionBuilder setState(PersistentTaskState state) { + this.state = state; return this; } } @@ -261,37 +260,34 @@ public String toString() { * A record that represents a single running persistent task */ public static class PersistentTask

implements Writeable, ToXContentObject { + private final String id; private final long allocationId; private final String taskName; private final P params; - @Nullable - private final Status status; + private final @Nullable PersistentTaskState state; private final Assignment assignment; - @Nullable - private final Long allocationIdOnLastStatusUpdate; + private final @Nullable Long allocationIdOnLastStatusUpdate; - public PersistentTask(String id, String taskName, P params, long allocationId, Assignment assignment) { - this(id, allocationId, taskName, params, null, assignment, null); + public PersistentTask(final String id, final String name, final P params, final long allocationId, final Assignment assignment) { + this(id, allocationId, name, params, null, assignment, null); } - public PersistentTask(PersistentTask

task, long allocationId, Assignment assignment) { - this(task.id, allocationId, task.taskName, task.params, task.status, - assignment, task.allocationId); + public PersistentTask(final PersistentTask

task, final long allocationId, final Assignment assignment) { + this(task.id, allocationId, task.taskName, task.params, task.state, assignment, task.allocationId); } - public PersistentTask(PersistentTask

task, Status status) { - this(task.id, task.allocationId, task.taskName, task.params, status, - task.assignment, task.allocationId); + public PersistentTask(final PersistentTask

task, final PersistentTaskState state) { + this(task.id, task.allocationId, task.taskName, task.params, state, task.assignment, task.allocationId); } - private PersistentTask(String id, long allocationId, String taskName, P params, - Status status, Assignment assignment, Long allocationIdOnLastStatusUpdate) { + private PersistentTask(final String id, final long allocationId, final String name, final P params, + final PersistentTaskState state, final Assignment assignment, final Long allocationIdOnLastStatusUpdate) { this.id = id; this.allocationId = allocationId; - this.taskName = taskName; + this.taskName = name; this.params = params; - this.status = status; + this.state = state; this.assignment = assignment; this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate; if (params != null) { @@ -300,10 +296,10 @@ private PersistentTask(String id, long allocationId, String taskName, P params, params.getWriteableName() + " task: " + taskName); } } - if (status != null) { - if (status.getWriteableName().equals(taskName) == false) { + if (state != null) { + if (state.getWriteableName().equals(taskName) == false) { throw new IllegalArgumentException("status has to have the same writeable name as task. status: " + - status.getWriteableName() + " task: " + taskName); + state.getWriteableName() + " task: " + taskName); } } } @@ -318,7 +314,7 @@ public PersistentTask(StreamInput in) throws IOException { } else { params = (P) in.readOptionalNamedWriteable(PersistentTaskParams.class); } - status = in.readOptionalNamedWriteable(Task.Status.class); + state = in.readOptionalNamedWriteable(PersistentTaskState.class); assignment = new Assignment(in.readOptionalString(), in.readString()); allocationIdOnLastStatusUpdate = in.readOptionalLong(); } @@ -333,7 +329,7 @@ public void writeTo(StreamOutput out) throws IOException { } else { out.writeOptionalNamedWriteable(params); } - out.writeOptionalNamedWriteable(status); + out.writeOptionalNamedWriteable(state); out.writeOptionalString(assignment.executorNode); out.writeString(assignment.explanation); out.writeOptionalLong(allocationIdOnLastStatusUpdate); @@ -348,15 +344,14 @@ public boolean equals(Object o) { allocationId == that.allocationId && Objects.equals(taskName, that.taskName) && Objects.equals(params, that.params) && - Objects.equals(status, that.status) && + Objects.equals(state, that.state) && Objects.equals(assignment, that.assignment) && Objects.equals(allocationIdOnLastStatusUpdate, that.allocationIdOnLastStatusUpdate); } @Override public int hashCode() { - return Objects.hash(id, allocationId, taskName, params, status, assignment, - allocationIdOnLastStatusUpdate); + return Objects.hash(id, allocationId, taskName, params, state, assignment, allocationIdOnLastStatusUpdate); } @Override @@ -395,8 +390,8 @@ public boolean isAssigned() { } @Nullable - public Status getStatus() { - return status; + public PersistentTaskState getState() { + return state; } @Override @@ -411,8 +406,9 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params xPa if (params != null) { builder.field("params", params, xParams); } - if (status != null) { - builder.field("status", status, xParams); + if (state != null) { + // The field is names "status" instead of "state" for bwc reason + builder.field("status", state, xParams); } } builder.endObject(); @@ -448,7 +444,7 @@ private static class TaskBuilder { private long allocationId; private String taskName; private Params params; - private Status status; + private PersistentTaskState state; private Assignment assignment = INITIAL_ASSIGNMENT; private Long allocationIdOnLastStatusUpdate; @@ -472,8 +468,8 @@ public TaskBuilder setParams(Params params) { return this; } - public TaskBuilder setStatus(Status status) { - this.status = status; + public TaskBuilder setState(PersistentTaskState state) { + this.state = state; return this; } @@ -489,8 +485,7 @@ public TaskBuilder setAllocationIdOnLastStatusUpdate(Long allocationIdOn } public PersistentTask build() { - return new PersistentTask<>(id, allocationId, taskName, params, status, - assignment, allocationIdOnLastStatusUpdate); + return new PersistentTask<>(id, allocationId, taskName, params, state, assignment, allocationIdOnLastStatusUpdate); } } @@ -608,13 +603,13 @@ public Builder reassignTask(String taskId, Assignment assignment) { } /** - * Updates the task status + * Updates the task state */ - public Builder updateTaskStatus(String taskId, Status status) { + public Builder updateTaskState(final String taskId, final PersistentTaskState taskState) { PersistentTask taskInProgress = tasks.get(taskId); if (taskInProgress != null) { changed = true; - tasks.put(taskId, new PersistentTask<>(taskInProgress, status)); + tasks.put(taskId, new PersistentTask<>(taskInProgress, taskState)); } else { throw new ResourceNotFoundException("cannot update task with id {" + taskId + "}, the task no longer exists"); } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java index de75b1ff54085..d72a3aa466790 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java @@ -118,7 +118,7 @@ protected String getDescription(PersistentTask taskInProgress) { * NOTE: The nodeOperation has to throw an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to * indicate that the persistent task has finished. */ - protected abstract void nodeOperation(AllocatedPersistentTask task, Params params, @Nullable Task.Status status); + protected abstract void nodeOperation(AllocatedPersistentTask task, Params params, @Nullable PersistentTaskState state); public String getExecutor() { return executor; diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java index 724e10c2c9030..91cdb400aa0d4 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java @@ -50,13 +50,13 @@ * non-transport client nodes in the cluster and monitors cluster state changes to detect started commands. */ public class PersistentTasksNodeService extends AbstractComponent implements ClusterStateListener { + private final Map runningTasks = new HashMap<>(); private final PersistentTasksService persistentTasksService; private final PersistentTasksExecutorRegistry persistentTasksExecutorRegistry; private final TaskManager taskManager; private final NodePersistentTasksExecutor nodePersistentTasksExecutor; - public PersistentTasksNodeService(Settings settings, PersistentTasksService persistentTasksService, PersistentTasksExecutorRegistry persistentTasksExecutorRegistry, @@ -172,7 +172,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, task.getPersistentTaskId(), task.getAllocationId()); try { runningTasks.put(taskInProgress.getAllocationId(), task); - nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), taskInProgress.getStatus(), task, executor); + nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), taskInProgress.getState(), task, executor); } catch (Exception e) { // Submit task failure task.markAsFailed(e); @@ -215,8 +215,8 @@ public void onFailure(Exception e) { } } - public static class Status implements Task.Status { + public static final String NAME = "persistent_executor"; private final AllocatedPersistentTask.State state; @@ -252,10 +252,6 @@ public String toString() { return Strings.toString(this); } - public AllocatedPersistentTask.State getState() { - return state; - } - @Override public boolean isFragment() { return false; diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java index 01c28dd5cd634..d0c791e3df046 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java @@ -35,7 +35,6 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; -import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; @@ -113,13 +112,14 @@ void sendCancelRequest(final long taskId, final String reason, final ActionListe * Notifies the master node that the state of a persistent task has changed. *

* Persistent task implementers shouldn't call this method directly and use - * {@link AllocatedPersistentTask#updatePersistentStatus} instead + * {@link AllocatedPersistentTask#updatePersistentTaskState} instead */ - void updateStatus(final String taskId, - final long taskAllocationID, - final Task.Status status, - final ActionListener> listener) { - UpdatePersistentTaskStatusAction.Request request = new UpdatePersistentTaskStatusAction.Request(taskId, taskAllocationID, status); + void sendUpdateStateRequest(final String taskId, + final long taskAllocationID, + final PersistentTaskState taskState, + final ActionListener> listener) { + UpdatePersistentTaskStatusAction.Request request = + new UpdatePersistentTaskStatusAction.Request(taskId, taskAllocationID, taskState); execute(request, UpdatePersistentTaskStatusAction.INSTANCE, listener); } diff --git a/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java b/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java index a898558fc2668..a639e4bde5360 100644 --- a/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java @@ -39,7 +39,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; -import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -63,16 +62,15 @@ public static class Request extends MasterNodeRequest { private String taskId; private long allocationId = -1L; - private Task.Status status; + private PersistentTaskState state; public Request() { - } - public Request(String taskId, long allocationId, Task.Status status) { + public Request(String taskId, long allocationId, PersistentTaskState state) { this.taskId = taskId; this.allocationId = allocationId; - this.status = status; + this.state = state; } public void setTaskId(String taskId) { @@ -83,8 +81,8 @@ public void setAllocationId(long allocationId) { this.allocationId = allocationId; } - public void setStatus(Task.Status status) { - this.status = status; + public void setState(PersistentTaskState state) { + this.state = state; } @Override @@ -92,7 +90,7 @@ public void readFrom(StreamInput in) throws IOException { super.readFrom(in); taskId = in.readString(); allocationId = in.readLong(); - status = in.readOptionalNamedWriteable(Task.Status.class); + state = in.readOptionalNamedWriteable(PersistentTaskState.class); } @Override @@ -100,7 +98,7 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(taskId); out.writeLong(allocationId); - out.writeOptionalNamedWriteable(status); + out.writeOptionalNamedWriteable(state); } @Override @@ -122,13 +120,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Request request = (Request) o; - return Objects.equals(taskId, request.taskId) && allocationId == request.allocationId && - Objects.equals(status, request.status); + return Objects.equals(taskId, request.taskId) && allocationId == request.allocationId && Objects.equals(state, request.state); } @Override public int hashCode() { - return Objects.hash(taskId, allocationId, status); + return Objects.hash(taskId, allocationId, state); } } @@ -144,11 +141,10 @@ public final RequestBuilder setTaskId(String taskId) { return this; } - public final RequestBuilder setStatus(Task.Status status) { - request.setStatus(status); + public final RequestBuilder setState(PersistentTaskState state) { + request.setState(state); return this; } - } public static class TransportAction extends TransportMasterNodeAction { @@ -182,9 +178,10 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) } @Override - protected final void masterOperation(final Request request, ClusterState state, + protected final void masterOperation(final Request request, + final ClusterState state, final ActionListener listener) { - persistentTasksClusterService.updatePersistentTaskStatus(request.taskId, request.allocationId, request.status, + persistentTasksClusterService.updatePersistentTaskState(request.taskId, request.allocationId, request.state, new ActionListener>() { @Override public void onResponse(PersistentTask task) { From 86266e9bf5fa5e889a8532ac987ac3746b78e97f Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 31 May 2018 13:43:48 +0200 Subject: [PATCH 2/6] Fix persistent task framework tests --- .../PersistentTasksClusterServiceTests.java | 2 +- .../PersistentTasksCustomMetaDataTests.java | 21 +++++----- .../PersistentTasksDecidersTestCase.java | 3 +- .../persistent/PersistentTasksExecutorIT.java | 12 +++--- .../PersistentTasksNodeServiceTests.java | 26 ++++++------ .../persistent/TestPersistentTasksPlugin.java | 40 +++++++++---------- .../UpdatePersistentTaskRequestTests.java | 6 +-- 7 files changed, 54 insertions(+), 56 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index 916fdee213695..8da18625f9905 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -649,7 +649,7 @@ public Assignment getAssignment(P params, ClusterState clusterState) { } @Override - protected void nodeOperation(AllocatedPersistentTask task, P params, Task.Status status) { + protected void nodeOperation(AllocatedPersistentTask task, P params, PersistentTaskState state) { throw new UnsupportedOperationException(); } })); diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java index 72e74359d3016..67cd341748da9 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java @@ -42,7 +42,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Builder; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; -import org.elasticsearch.persistent.TestPersistentTasksPlugin.Status; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.State; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.tasks.Task; @@ -79,7 +79,7 @@ protected PersistentTasksCustomMetaData createTestInstance() { randomAssignment()); if (randomBoolean()) { // From time to time update status - tasks.updateTaskStatus(taskId, new Status(randomAlphaOfLength(10))); + tasks.updateTaskState(taskId, new State(randomAlphaOfLength(10))); } } return tasks.build(); @@ -96,7 +96,7 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() { new Entry(MetaData.Custom.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::new), new Entry(NamedDiff.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::readDiffFrom), new Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new), - new Entry(Task.Status.class, TestPersistentTasksExecutor.NAME, Status::new) + new Entry(PersistentTaskState.class, TestPersistentTasksExecutor.NAME, State::new) )); } @@ -118,7 +118,7 @@ protected Custom makeTestChanges(Custom testInstance) { if (builder.getCurrentTaskIds().isEmpty()) { addRandomTask(builder); } else { - builder.updateTaskStatus(pickRandomTask(builder), randomBoolean() ? new Status(randomAlphaOfLength(10)) : null); + builder.updateTaskState(pickRandomTask(builder), randomBoolean() ? new State(randomAlphaOfLength(10)) : null); } break; case 3: @@ -155,9 +155,10 @@ private String pickRandomTask(PersistentTasksCustomMetaData.Builder testInstance @Override protected NamedXContentRegistry xContentRegistry() { return new NamedXContentRegistry(Arrays.asList( - new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TestPersistentTasksExecutor.NAME), - TestParams::fromXContent), - new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(TestPersistentTasksExecutor.NAME), Status::fromXContent) + new NamedXContentRegistry.Entry(PersistentTaskParams.class, + new ParseField(TestPersistentTasksExecutor.NAME), TestParams::fromXContent), + new NamedXContentRegistry.Entry(PersistentTaskState.class, + new ParseField(TestPersistentTasksExecutor.NAME), State::fromXContent) )); } @@ -186,7 +187,7 @@ public void testSerializationContext() throws Exception { // Things that should be serialized assertEquals(testTask.getTaskName(), newTask.getTaskName()); assertEquals(testTask.getId(), newTask.getId()); - assertEquals(testTask.getStatus(), newTask.getStatus()); + assertEquals(testTask.getState(), newTask.getState()); assertEquals(testTask.getParams(), newTask.getParams()); // Things that shouldn't be serialized @@ -224,10 +225,10 @@ public void testBuilder() { case 2: if (builder.hasTask(lastKnownTask)) { changed = true; - builder.updateTaskStatus(lastKnownTask, randomBoolean() ? new Status(randomAlphaOfLength(10)) : null); + builder.updateTaskState(lastKnownTask, randomBoolean() ? new State(randomAlphaOfLength(10)) : null); } else { String fLastKnownTask = lastKnownTask; - expectThrows(ResourceNotFoundException.class, () -> builder.updateTaskStatus(fLastKnownTask, null)); + expectThrows(ResourceNotFoundException.class, () -> builder.updateTaskState(fLastKnownTask, null)); } break; case 3: diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java index 356e518198c52..655a21a5f5390 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java @@ -27,7 +27,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -64,7 +63,7 @@ public void setUp() throws Exception { public PersistentTasksExecutor getPersistentTaskExecutorSafe(String taskName) { return new PersistentTasksExecutor(clusterService.getSettings(), taskName, null) { @Override - protected void nodeOperation(AllocatedPersistentTask task, Params params, Task.Status status) { + protected void nodeOperation(AllocatedPersistentTask task, Params params, PersistentTaskState state) { logger.debug("Executing task {}", task); } }; diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java index 8f37a2412ef5a..e746ff71627cd 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java @@ -31,7 +31,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskListener; -import org.elasticsearch.persistent.TestPersistentTasksPlugin.Status; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.State; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestTasksRequestBuilder; @@ -190,11 +190,11 @@ public void testPersistentActionStatusUpdate() throws Exception { PersistentTasksCustomMetaData tasksInProgress = internalCluster().clusterService().state().getMetaData() .custom(PersistentTasksCustomMetaData.TYPE); assertThat(tasksInProgress.tasks().size(), equalTo(1)); - assertThat(tasksInProgress.tasks().iterator().next().getStatus(), nullValue()); + assertThat(tasksInProgress.tasks().iterator().next().getState(), nullValue()); int numberOfUpdates = randomIntBetween(1, 10); for (int i = 0; i < numberOfUpdates; i++) { - logger.info("Updating the task status"); + logger.info("Updating the task states"); // Complete the running task and make sure it finishes properly assertThat(new TestTasksRequestBuilder(client()).setOperation("update_status").setTaskId(firstRunningTask.getTaskId()) .get().getTasks().size(), equalTo(1)); @@ -202,8 +202,8 @@ public void testPersistentActionStatusUpdate() throws Exception { int finalI = i; WaitForPersistentTaskFuture future1 = new WaitForPersistentTaskFuture<>(); persistentTasksService.waitForPersistentTaskCondition(taskId, - task -> task != null && task.getStatus() != null && task.getStatus().toString() != null && - task.getStatus().toString().equals("{\"phase\":\"phase " + (finalI + 1) + "\"}"), + task -> task != null && task.getState() != null && task.getState().toString() != null && + task.getState().toString().equals("{\"phase\":\"phase " + (finalI + 1) + "\"}"), TimeValue.timeValueSeconds(10), future1); assertThat(future1.get().getId(), equalTo(taskId)); } @@ -215,7 +215,7 @@ public void testPersistentActionStatusUpdate() throws Exception { assertThrows(future1, IllegalStateException.class, "timed out after 10ms"); PlainActionFuture> failedUpdateFuture = new PlainActionFuture<>(); - persistentTasksService.updateStatus(taskId, -2, new Status("should fail"), failedUpdateFuture); + persistentTasksService.sendUpdateStateRequest(taskId, -2, new State("should fail"), failedUpdateFuture); assertThrows(failedUpdateFuture, ResourceNotFoundException.class, "the task with id " + taskId + " and allocation id -2 doesn't exist"); diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java index 5000f73445b0c..906ecf232053d 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java @@ -210,13 +210,12 @@ public void testParamsStatusAndNodeTaskAreDelegated() throws Exception { ClusterState state = createInitialClusterState(1, Settings.EMPTY); - Task.Status status = new TestPersistentTasksPlugin.Status("_test_phase"); + PersistentTaskState taskState = new TestPersistentTasksPlugin.State("_test_phase"); PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(); String taskId = UUIDs.base64UUID(); TestParams taskParams = new TestParams("other_0"); - tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, taskParams, - new Assignment("this_node", "test assignment on other node")); - tasks.updateTaskStatus(taskId, status); + tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, taskParams, new Assignment("this_node", "test assignment on other node")); + tasks.updateTaskState(taskId, taskState); MetaData.Builder metaData = MetaData.builder(state.metaData()); metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build()); ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build(); @@ -225,7 +224,7 @@ public void testParamsStatusAndNodeTaskAreDelegated() throws Exception { assertThat(executor.size(), equalTo(1)); assertThat(executor.get(0).params, sameInstance(taskParams)); - assertThat(executor.get(0).status, sameInstance(status)); + assertThat(executor.get(0).state, sameInstance(taskState)); assertThat(executor.get(0).task, sameInstance(nodeTask)); } @@ -331,15 +330,16 @@ private ClusterState removeTask(ClusterState state, String taskId) { } private class Execution { + private final PersistentTaskParams params; private final AllocatedPersistentTask task; - private final Task.Status status; + private final PersistentTaskState state; private final PersistentTasksExecutor holder; - Execution(PersistentTaskParams params, AllocatedPersistentTask task, Task.Status status, PersistentTasksExecutor holder) { + Execution(PersistentTaskParams params, AllocatedPersistentTask task, PersistentTaskState state, PersistentTasksExecutor holder) { this.params = params; this.task = task; - this.status = status; + this.state = state; this.holder = holder; } } @@ -352,11 +352,11 @@ private class MockExecutor extends NodePersistentTasksExecutor { } @Override - public void executeTask(Params params, - Task.Status status, - AllocatedPersistentTask task, - PersistentTasksExecutor executor) { - executions.add(new Execution(params, task, status, executor)); + public void executeTask(final Params params, + final PersistentTaskState state, + final AllocatedPersistentTask task, + final PersistentTasksExecutor executor) { + executions.add(new Execution(params, task, state, executor)); } public Execution get(int i) { diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index 97b3407938768..462894e9ac831 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -100,16 +100,17 @@ public List> getPersistentTasksExecutor(ClusterServic public List getNamedWriteables() { return Arrays.asList( new NamedWriteableRegistry.Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new), - new NamedWriteableRegistry.Entry(Task.Status.class, TestPersistentTasksExecutor.NAME, Status::new) + new NamedWriteableRegistry.Entry(PersistentTaskState.class, TestPersistentTasksExecutor.NAME, State::new) ); } @Override public List getNamedXContent() { return Arrays.asList( - new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TestPersistentTasksExecutor.NAME), - TestParams::fromXContent), - new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(TestPersistentTasksExecutor.NAME), Status::fromXContent) + new NamedXContentRegistry.Entry(PersistentTaskParams.class, + new ParseField(TestPersistentTasksExecutor.NAME), TestParams::fromXContent), + new NamedXContentRegistry.Entry(PersistentTaskState.class, + new ParseField(TestPersistentTasksExecutor.NAME), State::fromXContent) ); } @@ -221,22 +222,22 @@ public Optional getRequiredFeature() { } } - public static class Status implements Task.Status { + public static class State implements PersistentTaskState { private final String phase; - public static final ConstructingObjectParser STATUS_PARSER = - new ConstructingObjectParser<>(TestPersistentTasksExecutor.NAME, args -> new Status((String) args[0])); + public static final ConstructingObjectParser STATE_PARSER = + new ConstructingObjectParser<>(TestPersistentTasksExecutor.NAME, args -> new State((String) args[0])); static { - STATUS_PARSER.declareString(constructorArg(), new ParseField("phase")); + STATE_PARSER.declareString(constructorArg(), new ParseField("phase")); } - public Status(String phase) { + public State(String phase) { this.phase = requireNonNull(phase, "Phase cannot be null"); } - public Status(StreamInput in) throws IOException { + public State(StreamInput in) throws IOException { phase = in.readString(); } @@ -253,11 +254,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } - public static Task.Status fromXContent(XContentParser parser) throws IOException { - return STATUS_PARSER.parse(parser, null); + public static PersistentTaskState fromXContent(XContentParser parser) throws IOException { + return STATE_PARSER.parse(parser, null); } - @Override public boolean isFragment() { return false; @@ -276,10 +276,10 @@ public String toString() { // Implements equals and hashcode for testing @Override public boolean equals(Object obj) { - if (obj == null || obj.getClass() != Status.class) { + if (obj == null || obj.getClass() != State.class) { return false; } - Status other = (Status) obj; + State other = (State) obj; return phase.equals(other.phase); } @@ -289,7 +289,6 @@ public int hashCode() { } } - public static class TestPersistentTasksExecutor extends PersistentTasksExecutor { public static final String NAME = "cluster:admin/persistent/test"; @@ -317,7 +316,7 @@ public Assignment getAssignment(TestParams params, ClusterState clusterState) { } @Override - protected void nodeOperation(AllocatedPersistentTask task, TestParams params, Task.Status status) { + protected void nodeOperation(AllocatedPersistentTask task, TestParams params, PersistentTaskState state) { logger.info("started node operation for the task {}", task); try { TestTask testTask = (TestTask) task; @@ -340,9 +339,9 @@ protected void nodeOperation(AllocatedPersistentTask task, TestParams params, Ta } else if ("update_status".equals(testTask.getOperation())) { testTask.setOperation(null); CountDownLatch latch = new CountDownLatch(1); - Status newStatus = new Status("phase " + phase.incrementAndGet()); - logger.info("updating the task status to {}", newStatus); - task.updatePersistentStatus(newStatus, new ActionListener>() { + State newState = new State("phase " + phase.incrementAndGet()); + logger.info("updating the task state to {}", newState); + task.updatePersistentTaskState(newState, new ActionListener>() { @Override public void onResponse(PersistentTask persistentTask) { logger.info("updating was successful"); @@ -540,5 +539,4 @@ protected void taskOperation(TestTasksRequest request, TestTask task, ActionList } - } diff --git a/server/src/test/java/org/elasticsearch/persistent/UpdatePersistentTaskRequestTests.java b/server/src/test/java/org/elasticsearch/persistent/UpdatePersistentTaskRequestTests.java index 6e20bb0009732..e23cb77ff2033 100644 --- a/server/src/test/java/org/elasticsearch/persistent/UpdatePersistentTaskRequestTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/UpdatePersistentTaskRequestTests.java @@ -22,7 +22,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.AbstractStreamableTestCase; -import org.elasticsearch.persistent.TestPersistentTasksPlugin.Status; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.State; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction.Request; @@ -32,7 +32,7 @@ public class UpdatePersistentTaskRequestTests extends AbstractStreamableTestCase @Override protected Request createTestInstance() { - return new Request(UUIDs.base64UUID(), randomLong(), new Status(randomAlphaOfLength(10))); + return new Request(UUIDs.base64UUID(), randomLong(), new State(randomAlphaOfLength(10))); } @Override @@ -43,7 +43,7 @@ protected Request createBlankInstance() { @Override protected NamedWriteableRegistry getNamedWriteableRegistry() { return new NamedWriteableRegistry(Collections.singletonList( - new NamedWriteableRegistry.Entry(Task.Status.class, TestPersistentTasksExecutor.NAME, Status::new) + new NamedWriteableRegistry.Entry(PersistentTaskState.class, TestPersistentTasksExecutor.NAME, State::new) )); } } From a09ba28332c26c015e4837ca99972c72155c868d Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 31 May 2018 14:21:49 +0200 Subject: [PATCH 3/6] Adapt ML --- .../xpack/core/XPackClientPlugin.java | 15 +++++++------ .../xpack/core/ml/MlMetadata.java | 12 +++++----- .../xpack/core/ml/datafeed/DatafeedState.java | 4 ++-- .../{JobTaskStatus.java => JobTaskState.java} | 18 +++++++-------- .../ml/action/TransportCloseJobAction.java | 6 ++--- .../ml/action/TransportOpenJobAction.java | 15 +++++++------ .../action/TransportStartDatafeedAction.java | 8 ++++--- .../action/TransportStopDatafeedAction.java | 8 +++---- .../xpack/ml/datafeed/DatafeedManager.java | 2 +- .../ml/datafeed/DatafeedNodeSelector.java | 14 ++++++------ .../autodetect/AutodetectProcessManager.java | 10 ++++----- .../xpack/ml/MlMetadataTests.java | 4 ++-- .../action/TransportCloseJobActionTests.java | 2 +- .../action/TransportOpenJobActionTests.java | 6 ++--- .../TransportStopDatafeedActionTests.java | 4 ++-- .../ml/datafeed/DatafeedManagerTests.java | 4 ++-- .../datafeed/DatafeedNodeSelectorTests.java | 8 +++---- .../integration/BasicDistributedJobsIT.java | 22 +++++++++---------- .../xpack/ml/integration/TooManyJobsIT.java | 4 ++-- ...tatusTests.java => JobTaskStateTests.java} | 16 +++++++------- .../AutodetectProcessManagerTests.java | 4 ++-- .../MlNativeAutodetectIntegTestCase.java | 8 +++---- 22 files changed, 99 insertions(+), 95 deletions(-) rename x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/{JobTaskStatus.java => JobTaskState.java} (86%) rename x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/config/{JobTaskStatusTests.java => JobTaskStateTests.java} (53%) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index a96de96fd4f44..2dab9a3096ae8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -28,6 +28,7 @@ import org.elasticsearch.license.PostStartTrialAction; import org.elasticsearch.license.PutLicenseAction; import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.NetworkPlugin; import org.elasticsearch.plugins.Plugin; @@ -89,7 +90,7 @@ import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction; import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.monitoring.MonitoringFeatureSetUsage; import org.elasticsearch.xpack.core.rollup.RollupFeatureSetUsage; import org.elasticsearch.xpack.core.rollup.RollupField; @@ -325,9 +326,9 @@ public List getNamedWriteables() { StartDatafeedAction.DatafeedParams::new), new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME, OpenJobAction.JobParams::new), - // ML - Task statuses - new NamedWriteableRegistry.Entry(Task.Status.class, JobTaskStatus.NAME, JobTaskStatus::new), - new NamedWriteableRegistry.Entry(Task.Status.class, DatafeedState.NAME, DatafeedState::fromStream), + // ML - Task states + new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new), + new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream), new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.MACHINE_LEARNING, MachineLearningFeatureSetUsage::new), // monitoring @@ -365,9 +366,9 @@ public List getNamedXContent() { StartDatafeedAction.DatafeedParams::fromXContent), new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(OpenJobAction.TASK_NAME), OpenJobAction.JobParams::fromXContent), - // ML - Task statuses - new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(DatafeedState.NAME), DatafeedState::fromXContent), - new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(JobTaskStatus.NAME), JobTaskStatus::fromXContent), + // ML - Task states + new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(DatafeedState.NAME), DatafeedState::fromXContent), + new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(JobTaskState.NAME), JobTaskState::fromXContent), // watcher new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(WatcherMetaData.TYPE), WatcherMetaData::fromXContent), diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index 861f386a90966..5e145306f8c1f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -34,7 +34,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.groups.GroupOrJobLookup; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -402,9 +402,9 @@ public void markJobAsDeleted(String jobId, PersistentTasksCustomMetaData tasks, if (allowDeleteOpenJob == false) { PersistentTask jobTask = getJobTask(jobId, tasks); if (jobTask != null) { - JobTaskStatus jobTaskStatus = (JobTaskStatus) jobTask.getStatus(); + JobTaskState jobTaskState = (JobTaskState) jobTask.getState(); throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because the job is " - + ((jobTaskStatus == null) ? JobState.OPENING : jobTaskStatus.getState())); + + ((jobTaskState == null) ? JobState.OPENING : jobTaskState.getState())); } } Job.Builder jobBuilder = new Job.Builder(job); @@ -448,7 +448,7 @@ public static PersistentTask getDatafeedTask(String datafeedId, @Nullable Per public static JobState getJobState(String jobId, @Nullable PersistentTasksCustomMetaData tasks) { PersistentTask task = getJobTask(jobId, tasks); if (task != null) { - JobTaskStatus jobTaskState = (JobTaskStatus) task.getStatus(); + JobTaskState jobTaskState = (JobTaskState) task.getState(); if (jobTaskState == null) { return JobState.OPENING; } @@ -460,8 +460,8 @@ public static JobState getJobState(String jobId, @Nullable PersistentTasksCustom public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) { PersistentTask task = getDatafeedTask(datafeedId, tasks); - if (task != null && task.getStatus() != null) { - return (DatafeedState) task.getStatus(); + if (task != null && task.getState() != null) { + return (DatafeedState) task.getState(); } else { // If we haven't started a datafeed then there will be no persistent task, // which is the same as if the datafeed was't started diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedState.java index 7343600a6ee37..d894f7b339fe5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedState.java @@ -12,7 +12,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.tasks.Task; +import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import java.io.IOException; @@ -20,7 +20,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; -public enum DatafeedState implements Task.Status { +public enum DatafeedState implements PersistentTaskState { STARTED, STOPPED, STARTING, STOPPING; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java similarity index 86% rename from x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskStatus.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java index de102798d1ca6..d9ab3357319c6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java @@ -12,25 +12,25 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import java.io.IOException; import java.util.Objects; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; -public class JobTaskStatus implements Task.Status { +public class JobTaskState implements PersistentTaskState { public static final String NAME = OpenJobAction.TASK_NAME; private static ParseField STATE = new ParseField("state"); private static ParseField ALLOCATION_ID = new ParseField("allocation_id"); - private static final ConstructingObjectParser PARSER = + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, - args -> new JobTaskStatus((JobState) args[0], (Long) args[1])); + args -> new JobTaskState((JobState) args[0], (Long) args[1])); static { PARSER.declareField(constructorArg(), p -> { @@ -42,7 +42,7 @@ public class JobTaskStatus implements Task.Status { PARSER.declareLong(constructorArg(), ALLOCATION_ID); } - public static JobTaskStatus fromXContent(XContentParser parser) { + public static JobTaskState fromXContent(XContentParser parser) { try { return PARSER.parse(parser, null); } catch (IOException e) { @@ -53,12 +53,12 @@ public static JobTaskStatus fromXContent(XContentParser parser) { private final JobState state; private final long allocationId; - public JobTaskStatus(JobState state, long allocationId) { + public JobTaskState(JobState state, long allocationId) { this.state = Objects.requireNonNull(state); this.allocationId = allocationId; } - public JobTaskStatus(StreamInput in) throws IOException { + public JobTaskState(StreamInput in) throws IOException { state = JobState.fromStream(in); allocationId = in.readLong(); } @@ -100,7 +100,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - JobTaskStatus that = (JobTaskStatus) o; + JobTaskState that = (JobTaskState) o; return state == that.state && Objects.equals(allocationId, that.allocationId); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index 36bcfe92f0075..083d4ce5b1514 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -34,7 +34,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -256,8 +256,8 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen @Override protected void taskOperation(CloseJobAction.Request request, TransportOpenJobAction.JobTask jobTask, ActionListener listener) { - JobTaskStatus taskStatus = new JobTaskStatus(JobState.CLOSING, jobTask.getAllocationId()); - jobTask.updatePersistentStatus(taskStatus, ActionListener.wrap(task -> { + JobTaskState taskState = new JobTaskState(JobState.CLOSING, jobTask.getAllocationId()); + jobTask.updatePersistentTaskState(taskState, ActionListener.wrap(task -> { // we need to fork because we are now on a network threadpool and closeJob method may take a while to complete: threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 5de7962169279..ceb841731d123 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -39,6 +39,7 @@ import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.persistent.PersistentTasksService; @@ -57,7 +58,7 @@ import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; @@ -208,7 +209,7 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j persistentTasks.findTasks(OpenJobAction.TASK_NAME, task -> node.getId().equals(task.getExecutorNode())); for (PersistentTasksCustomMetaData.PersistentTask assignedTask : assignedTasks) { - JobTaskStatus jobTaskState = (JobTaskStatus) assignedTask.getStatus(); + JobTaskState jobTaskState = (JobTaskState) assignedTask.getState(); JobState jobState; if (jobTaskState == null || // executor node didn't have the chance to set job status to OPENING // previous executor node failed and current executor node didn't have the chance to set job status to OPENING @@ -675,14 +676,14 @@ public void validate(OpenJobAction.JobParams params, ClusterState clusterState) } @Override - protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobParams params, Task.Status status) { + protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobParams params, PersistentTaskState state) { JobTask jobTask = (JobTask) task; jobTask.autodetectProcessManager = autodetectProcessManager; - JobTaskStatus jobStateStatus = (JobTaskStatus) status; + JobTaskState jobTaskState = (JobTaskState) state; // If the job is failed then the Persistent Task Service will // try to restart it on a node restart. Exiting here leaves the // job in the failed state and it must be force closed. - if (jobStateStatus != null && jobStateStatus.getState().isAnyOf(JobState.FAILED, JobState.CLOSING)) { + if (jobTaskState != null && jobTaskState.getState().isAnyOf(JobState.FAILED, JobState.CLOSING)) { return; } @@ -766,8 +767,8 @@ private class JobPredicate implements Predicate persistentTask) { JobState jobState = JobState.CLOSED; if (persistentTask != null) { - JobTaskStatus jobStateStatus = (JobTaskStatus) persistentTask.getStatus(); - jobState = jobStateStatus == null ? JobState.OPENING : jobStateStatus.getState(); + JobTaskState jobTaskState = (JobTaskState) persistentTask.getState(); + jobState = jobTaskState == null ? JobState.OPENING : jobTaskState.getState(); PersistentTasksCustomMetaData.Assignment assignment = persistentTask.getAssignment(); // This logic is only appropriate when opening a job, not when reallocating following a failure, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 3d261864ab409..5105625296e58 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; @@ -274,8 +275,9 @@ public void validate(StartDatafeedAction.DatafeedParams params, ClusterState clu } @Override - protected void nodeOperation(AllocatedPersistentTask allocatedPersistentTask, StartDatafeedAction.DatafeedParams params, - Task.Status status) { + protected void nodeOperation(final AllocatedPersistentTask allocatedPersistentTask, + final StartDatafeedAction.DatafeedParams params, + final PersistentTaskState state) { DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask; datafeedTask.datafeedManager = datafeedManager; datafeedManager.run(datafeedTask, @@ -373,7 +375,7 @@ public boolean test(PersistentTasksCustomMetaData.PersistentTask persistentTa assignment.getExplanation() + "]", RestStatus.TOO_MANY_REQUESTS); return true; } - DatafeedState datafeedState = (DatafeedState) persistentTask.getStatus(); + DatafeedState datafeedState = (DatafeedState) persistentTask.getState(); return datafeedState == DatafeedState.STARTED; } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index 4b68f74eb1702..faf6aa80b7a6f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -222,10 +222,10 @@ public void onFailure(Exception e) { } @Override - protected void taskOperation(StopDatafeedAction.Request request, TransportStartDatafeedAction.DatafeedTask datafeedTaskTask, + protected void taskOperation(StopDatafeedAction.Request request, TransportStartDatafeedAction.DatafeedTask datafeedTask, ActionListener listener) { - DatafeedState taskStatus = DatafeedState.STOPPING; - datafeedTaskTask.updatePersistentStatus(taskStatus, ActionListener.wrap(task -> { + DatafeedState taskState = DatafeedState.STOPPING; + datafeedTask.updatePersistentTaskState(taskState, ActionListener.wrap(task -> { // we need to fork because we are now on a network threadpool threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { @Override @@ -235,7 +235,7 @@ public void onFailure(Exception e) { @Override protected void doRun() throws Exception { - datafeedTaskTask.stop("stop_datafeed (api)", request.getStopTimeout()); + datafeedTask.stop("stop_datafeed (api)", request.getStopTimeout()); listener.onResponse(new StopDatafeedAction.Response(true)); } }); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 69acbad20fb2d..338c111401acf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -88,7 +88,7 @@ public void run(TransportStartDatafeedAction.DatafeedTask task, Consumer { Holder holder = new Holder(task, datafeed, datafeedJob, new ProblemTracker(auditor, job.getId()), taskHandler); runningDatafeedsOnThisNode.put(task.getAllocationId(), holder); - task.updatePersistentStatus(DatafeedState.STARTED, new ActionListener>() { + task.updatePersistentTaskState(DatafeedState.STARTED, new ActionListener>() { @Override public void onResponse(PersistentTask persistentTask) { taskRunner.runWhenJobIsOpened(task); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index 0eb57ab79be5d..bebf0f3935d92 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -12,12 +12,12 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import java.util.List; import java.util.Objects; @@ -64,11 +64,11 @@ private AssignmentFailure checkAssignment() { PriorityFailureCollector priorityFailureCollector = new PriorityFailureCollector(); priorityFailureCollector.add(verifyIndicesActive(datafeed)); - JobTaskStatus taskStatus = null; + JobTaskState jobTaskState = null; JobState jobState = JobState.CLOSED; if (jobTask != null) { - taskStatus = (JobTaskStatus) jobTask.getStatus(); - jobState = taskStatus == null ? JobState.OPENING : taskStatus.getState(); + jobTaskState = (JobTaskState) jobTask.getState(); + jobState = jobTaskState == null ? JobState.OPENING : jobTaskState.getState(); } if (jobState.isAnyOf(JobState.OPENING, JobState.OPENED) == false) { @@ -78,8 +78,8 @@ private AssignmentFailure checkAssignment() { priorityFailureCollector.add(new AssignmentFailure(reason, true)); } - if (taskStatus != null && taskStatus.isStatusStale(jobTask)) { - String reason = "cannot start datafeed [" + datafeed.getId() + "], job [" + datafeed.getJobId() + "] status is stale"; + if (jobTaskState != null && jobTaskState.isStatusStale(jobTask)) { + String reason = "cannot start datafeed [" + datafeed.getId() + "], job [" + datafeed.getJobId() + "] state is stale"; priorityFailureCollector.add(new AssignmentFailure(reason, true)); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index d3a848ef3821f..b6efb688c1797 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -31,7 +31,7 @@ import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; @@ -623,8 +623,8 @@ public Optional jobOpenTime(JobTask jobTask) { } void setJobState(JobTask jobTask, JobState state) { - JobTaskStatus taskStatus = new JobTaskStatus(state, jobTask.getAllocationId()); - jobTask.updatePersistentStatus(taskStatus, new ActionListener>() { + JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId()); + jobTask.updatePersistentTaskState(jobTaskState, new ActionListener>() { @Override public void onResponse(PersistentTask persistentTask) { logger.info("Successfully set job state to [{}] for job [{}]", state, jobTask.getJobId()); @@ -638,8 +638,8 @@ public void onFailure(Exception e) { } void setJobState(JobTask jobTask, JobState state, CheckedConsumer handler) { - JobTaskStatus taskStatus = new JobTaskStatus(state, jobTask.getAllocationId()); - jobTask.updatePersistentStatus(taskStatus, new ActionListener>() { + JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId()); + jobTask.updatePersistentTaskState(jobTaskState, new ActionListener>() { @Override public void onResponse(PersistentTask persistentTask) { try { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index 8049b5655d63b..f6fb2db3c9bb9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -27,7 +27,7 @@ import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -363,7 +363,7 @@ public void testGetJobState() { new PersistentTasksCustomMetaData.Assignment("bar", "test assignment")); assertEquals(JobState.OPENING, MlMetadata.getJobState("foo", tasksBuilder.build())); - tasksBuilder.updateTaskStatus(MlMetadata.jobTaskId("foo"), new JobTaskStatus(JobState.OPENED, tasksBuilder.getLastAllocationId())); + tasksBuilder.updateTaskState(MlMetadata.jobTaskId("foo"), new JobTaskState(JobState.OPENED, tasksBuilder.getLastAllocationId())); assertEquals(JobState.OPENED, MlMetadata.getJobState("foo", tasksBuilder.build())); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java index f1679b8b0b9d1..d65fc1476e75e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java @@ -314,7 +314,7 @@ public static void addTask(String datafeedId, long startTime, String nodeId, Dat PersistentTasksCustomMetaData.Builder tasks) { tasks.addTask(MLMetadataField.datafeedTaskId(datafeedId), StartDatafeedAction.TASK_NAME, new StartDatafeedAction.DatafeedParams(datafeedId, startTime), new Assignment(nodeId, "test assignment")); - tasks.updateTaskStatus(MLMetadataField.datafeedTaskId(datafeedId), state); + tasks.updateTaskState(MLMetadataField.datafeedTaskId(datafeedId), state); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index 6ef2d92d9c7c6..b5a315d9687bb 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -42,7 +42,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.config.Operator; import org.elasticsearch.xpack.core.ml.job.config.RuleCondition; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -329,7 +329,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); tasksBuilder = PersistentTasksCustomMetaData.builder(tasks); - tasksBuilder.updateTaskStatus(MlMetadata.jobTaskId("job_id6"), null); + tasksBuilder.updateTaskState(MlMetadata.jobTaskId("job_id6"), null); tasks = tasksBuilder.build(); csBuilder = ClusterState.builder(cs); @@ -630,7 +630,7 @@ public static void addJobTask(String jobId, String nodeId, JobState jobState, Pe builder.addTask(MlMetadata.jobTaskId(jobId), OpenJobAction.TASK_NAME, new OpenJobAction.JobParams(jobId), new Assignment(nodeId, "test assignment")); if (jobState != null) { - builder.updateTaskStatus(MlMetadata.jobTaskId(jobId), new JobTaskStatus(jobState, builder.getLastAllocationId())); + builder.updateTaskState(MlMetadata.jobTaskId(jobId), new JobTaskState(jobState, builder.getLastAllocationId())); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java index a61709be424e8..55a0f4006bcdd 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java @@ -31,7 +31,7 @@ public void testValidate() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); tasksBuilder.addTask(MLMetadataField.datafeedTaskId("foo"), StartDatafeedAction.TASK_NAME, new StartDatafeedAction.DatafeedParams("foo", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); - tasksBuilder.updateTaskStatus(MLMetadataField.datafeedTaskId("foo"), DatafeedState.STARTED); + tasksBuilder.updateTaskState(MLMetadataField.datafeedTaskId("foo"), DatafeedState.STARTED); tasksBuilder.build(); Job job = createDatafeedJob().build(new Date()); @@ -121,6 +121,6 @@ public static void addTask(String datafeedId, long startTime, String nodeId, Dat taskBuilder.addTask(MLMetadataField.datafeedTaskId(datafeedId), StartDatafeedAction.TASK_NAME, new StartDatafeedAction.DatafeedParams(datafeedId, startTime), new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment")); - taskBuilder.updateTaskStatus(MLMetadataField.datafeedTaskId(datafeedId), state); + taskBuilder.updateTaskState(MLMetadataField.datafeedTaskId(datafeedId), state); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index bd722ebf8ef9a..f609f0c8c5ed9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -378,7 +378,7 @@ private static DatafeedTask createDatafeedTask(String datafeedId, long startTime ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; listener.onResponse(mock(PersistentTask.class)); return null; - }).when(task).updatePersistentStatus(any(), any()); + }).when(task).updatePersistentTaskState(any(), any()); return task; } @@ -394,7 +394,7 @@ private DatafeedTask spyDatafeedTask(DatafeedTask task) { ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; listener.onResponse(mock(PersistentTask.class)); return null; - }).when(task).updatePersistentStatus(any(), any()); + }).when(task).updatePersistentTaskState(any(), any()); return task; } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java index 0fee78611a7bf..96ae3b5ef38b6 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java @@ -31,7 +31,7 @@ import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.junit.Before; @@ -255,20 +255,20 @@ public void testSelectNode_jobTaskStale() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), nodeId, JobState.OPENED, tasksBuilder); // Set to lower allocationId, so job task is stale: - tasksBuilder.updateTaskStatus(MlMetadata.jobTaskId(job.getId()), new JobTaskStatus(JobState.OPENED, 0)); + tasksBuilder.updateTaskState(MlMetadata.jobTaskId(job.getId()), new JobTaskState(JobState.OPENED, 0)); tasks = tasksBuilder.build(); givenClusterState("foo", 1, 0); PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); assertNull(result.getExecutorNode()); - assertEquals("cannot start datafeed [datafeed_id], job [job_id] status is stale", + assertEquals("cannot start datafeed [datafeed_id], job [job_id] state is stale", result.getExplanation()); ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " - + "[cannot start datafeed [datafeed_id], job [job_id] status is stale]")); + + "[cannot start datafeed [datafeed_id], job [job_id] state is stale]")); tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id1", JobState.OPENED, tasksBuilder); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index ce47fb0adf80c..e3d67bb0bdb71 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -39,7 +39,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; @@ -211,9 +211,9 @@ public void testDedicatedMlNode() throws Exception { DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); assertThat(node.getAttributes(), hasEntry(MachineLearning.ML_ENABLED_NODE_ATTR, "true")); assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "20")); - JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus(); - assertNotNull(jobTaskStatus); - assertEquals(JobState.OPENED, jobTaskStatus.getState()); + JobTaskState jobTaskState = (JobTaskState) task.getState(); + assertNotNull(jobTaskState); + assertEquals(JobState.OPENED, jobTaskState.getState()); }); logger.info("stop the only running ml node"); @@ -264,7 +264,7 @@ public void testMaxConcurrentJobAllocations() throws Exception { for (DiscoveryNode node : event.state().nodes()) { Collection> foundTasks = tasks.findTasks(OpenJobAction.TASK_NAME, task -> { - JobTaskStatus jobTaskState = (JobTaskStatus) task.getStatus(); + JobTaskState jobTaskState = (JobTaskState) task.getState(); return node.getId().equals(task.getExecutorNode()) && (jobTaskState == null || jobTaskState.isStatusStale(task)); }); @@ -396,9 +396,9 @@ private void assertJobTask(String jobId, JobState expectedState, boolean hasExec assertThat(node.getAttributes(), hasEntry(MachineLearning.ML_ENABLED_NODE_ATTR, "true")); assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "20")); - JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus(); - assertNotNull(jobTaskStatus); - assertEquals(expectedState, jobTaskStatus.getState()); + JobTaskState jobTaskState = (JobTaskState) task.getState(); + assertNotNull(jobTaskState); + assertEquals(expectedState, jobTaskState.getState()); } else { assertNull(task.getExecutorNode()); } @@ -411,9 +411,9 @@ private CheckedRunnable checkAllJobsAreAssignedAndOpened(int numJobs) assertEquals(numJobs, tasks.taskMap().size()); for (PersistentTask task : tasks.taskMap().values()) { assertNotNull(task.getExecutorNode()); - JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus(); - assertNotNull(jobTaskStatus); - assertEquals(JobState.OPENED, jobTaskStatus.getState()); + JobTaskState jobTaskState = (JobTaskState) task.getState(); + assertNotNull(jobTaskState); + assertEquals(JobState.OPENED, jobTaskState.getState()); } }; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java index 17e7b89978e16..f06b73fcd40aa 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java @@ -20,7 +20,7 @@ import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; @@ -58,7 +58,7 @@ public void testCloseFailedJob() throws Exception { assertEquals(1, tasks.taskMap().size()); // now just double check that the first job is still opened: PersistentTasksCustomMetaData.PersistentTask task = tasks.getTask(MlMetadata.jobTaskId("close-failed-job-1")); - assertEquals(JobState.OPENED, ((JobTaskStatus) task.getStatus()).getState()); + assertEquals(JobState.OPENED, ((JobTaskState) task.getState()).getState()); } public void testSingleNode() throws Exception { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatusTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTaskStateTests.java similarity index 53% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatusTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTaskStateTests.java index 7183235b6ff68..4dfd1965804e5 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatusTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTaskStateTests.java @@ -9,22 +9,22 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; -public class JobTaskStatusTests extends AbstractSerializingTestCase { +public class JobTaskStateTests extends AbstractSerializingTestCase { @Override - protected JobTaskStatus createTestInstance() { - return new JobTaskStatus(randomFrom(JobState.values()), randomLong()); + protected JobTaskState createTestInstance() { + return new JobTaskState(randomFrom(JobState.values()), randomLong()); } @Override - protected Writeable.Reader instanceReader() { - return JobTaskStatus::new; + protected Writeable.Reader instanceReader() { + return JobTaskState::new; } @Override - protected JobTaskStatus doParseInstance(XContentParser parser) { - return JobTaskStatus.fromXContent(parser); + protected JobTaskState doParseInstance(XContentParser parser) { + return JobTaskState.fromXContent(parser); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index c3e830553a237..fa41cf0918f71 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -27,7 +27,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; @@ -199,7 +199,7 @@ public void testOpenJob() { manager.openJob(jobTask, e -> {}); assertEquals(1, manager.numberOfOpenJobs()); assertTrue(manager.jobHasActiveAutodetectProcess(jobTask)); - verify(jobTask).updatePersistentStatus(eq(new JobTaskStatus(JobState.OPENED, 1L)), any()); + verify(jobTask).updatePersistentTaskState(eq(new JobTaskState(JobState.OPENED, 1L)), any()); } public void testOpenJob_exceedMaxNumJobs() { diff --git a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index f70efc72506d3..9057db476ad77 100644 --- a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -26,13 +26,13 @@ import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; -import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.SecuritySettingsSourceField; import org.elasticsearch.transport.Netty4Plugin; @@ -70,7 +70,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -449,8 +449,8 @@ protected void ensureClusterStateConsistency() throws IOException { StartDatafeedAction.DatafeedParams::new)); entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME, OpenJobAction.JobParams::new)); - entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, JobTaskStatus.NAME, JobTaskStatus::new)); - entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, DatafeedState.NAME, DatafeedState::fromStream)); + entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new)); + entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream)); entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TokenMetaData.TYPE, TokenMetaData::new)); final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries); ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState(); From 78ab6bb9937369c95a0dad72d2c0b92ebe4cb6bc Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 31 May 2018 17:49:07 +0200 Subject: [PATCH 4/6] Adapt Rollup --- .../xpack/core/XPackClientPlugin.java | 11 +- .../core/rollup/job/RollupJobStatus.java | 5 +- .../xpack/rollup/job/RollupJobTask.java | 60 ++++----- .../elasticsearch/xpack/rollup/RollupIT.java | 62 ++++------ .../xpack/rollup/job/RollupJobTaskTests.java | 114 ++++++++++-------- 5 files changed, 129 insertions(+), 123 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 2dab9a3096ae8..049089e62cf26 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -351,7 +351,8 @@ public List getNamedWriteables() { // rollup new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.ROLLUP, RollupFeatureSetUsage::new), new NamedWriteableRegistry.Entry(PersistentTaskParams.class, RollupJob.NAME, RollupJob::new), - new NamedWriteableRegistry.Entry(Task.Status.class, RollupJobStatus.NAME, RollupJobStatus::new) + new NamedWriteableRegistry.Entry(Task.Status.class, RollupJobStatus.NAME, RollupJobStatus::new), + new NamedWriteableRegistry.Entry(PersistentTaskState.class, RollupJobStatus.NAME, RollupJobStatus::new) ); } @@ -376,8 +377,12 @@ public List getNamedXContent() { new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(LicensesMetaData.TYPE), LicensesMetaData::fromXContent), //rollup - new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(RollupField.TASK_NAME), RollupJob::fromXContent), - new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(RollupJobStatus.NAME), RollupJobStatus::fromXContent) + new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(RollupField.TASK_NAME), + RollupJob::fromXContent), + new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(RollupJobStatus.NAME), + RollupJobStatus::fromXContent), + new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(RollupJobStatus.NAME), + RollupJobStatus::fromXContent) ); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatus.java index 86bc95e092ca3..4cbd5a3b4559a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatus.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.tasks.Task; import java.io.IOException; @@ -30,7 +31,7 @@ * indexer's current position. When the allocated task updates its status, * it is providing a new version of this. */ -public class RollupJobStatus implements Task.Status { +public class RollupJobStatus implements Task.Status, PersistentTaskState { public static final String NAME = "xpack/rollup/job"; private final IndexerState state; @@ -73,7 +74,7 @@ public RollupJobStatus(StreamInput in) throws IOException { currentPosition = in.readBoolean() ? new TreeMap<>(in.readMap()) : null; } - public IndexerState getState() { + public IndexerState getIndexerState() { return state; } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java index 425629c248c9c..50b3f21800d06 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.tasks.TaskId; @@ -62,7 +63,7 @@ public RollupJobPersistentTasksExecutor(Settings settings, Client client, Schedu } @Override - protected void nodeOperation(AllocatedPersistentTask task, @Nullable RollupJob params, Status status) { + protected void nodeOperation(AllocatedPersistentTask task, @Nullable RollupJob params, PersistentTaskState state) { RollupJobTask rollupJobTask = (RollupJobTask) task; SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(SCHEDULE_NAME + "_" + params.getConfig().getId(), new CronSchedule(params.getConfig().getCron())); @@ -80,7 +81,7 @@ protected AllocatedPersistentTask createTask(long id, String type, String action PersistentTasksCustomMetaData.PersistentTask persistentTask, Map headers) { return new RollupJobTask(id, type, action, parentTaskId, persistentTask.getParams(), - (RollupJobStatus) persistentTask.getStatus(), client, schedulerEngine, threadPool, headers); + (RollupJobStatus) persistentTask.getState(), client, schedulerEngine, threadPool, headers); } } @@ -115,15 +116,15 @@ protected void doNextBulk(BulkRequest request, ActionListener next } @Override - protected void doSaveState(IndexerState state, Map position, Runnable next) { - if (state.equals(IndexerState.ABORTING)) { + protected void doSaveState(IndexerState indexerState, Map position, Runnable next) { + if (indexerState.equals(IndexerState.ABORTING)) { // If we're aborting, just invoke `next` (which is likely an onFailure handler) next.run(); } else { // Otherwise, attempt to persist our state - final RollupJobStatus status = new RollupJobStatus(state, getPosition()); - logger.debug("Updating persistent status of job [" + job.getConfig().getId() + "] to [" + state.toString() + "]"); - updatePersistentStatus(status, ActionListener.wrap(task -> next.run(), exc -> next.run())); + final RollupJobStatus state = new RollupJobStatus(indexerState, getPosition()); + logger.debug("Updating persistent state of job [" + job.getConfig().getId() + "] to [" + indexerState.toString() + "]"); + updatePersistentTaskState(state, ActionListener.wrap(task -> next.run(), exc -> next.run())); } } @@ -148,7 +149,7 @@ protected void onAbort() { private final ThreadPool threadPool; private final RollupIndexer indexer; - RollupJobTask(long id, String type, String action, TaskId parentTask, RollupJob job, RollupJobStatus status, + RollupJobTask(long id, String type, String action, TaskId parentTask, RollupJob job, RollupJobStatus state, Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool, Map headers) { super(id, type, action, RollupField.NAME + "_" + job.getConfig().getId(), parentTask, headers); this.job = job; @@ -158,16 +159,17 @@ protected void onAbort() { // If status is not null, we are resuming rather than starting fresh. Map initialPosition = null; IndexerState initialState = IndexerState.STOPPED; - if (status != null) { - logger.debug("We have existing status, setting state to [" + status.getState() + "] " + - "and current position to [" + status.getPosition() + "] for job [" + job.getConfig().getId() + "]"); - if (status.getState().equals(IndexerState.INDEXING)) { + if (state != null) { + final IndexerState existingState = state.getIndexerState(); + logger.debug("We have existing state, setting state to [" + existingState + "] " + + "and current position to [" + state.getPosition() + "] for job [" + job.getConfig().getId() + "]"); + if (existingState.equals(IndexerState.INDEXING)) { /* * If we were indexing, we have to reset back to STARTED otherwise the indexer will be "stuck" thinking * it is indexing but without the actual indexing thread running. */ initialState = IndexerState.STARTED; - } else if (status.getState().equals(IndexerState.ABORTING) || status.getState().equals(IndexerState.STOPPING)) { + } else if (existingState.equals(IndexerState.ABORTING) || existingState.equals(IndexerState.STOPPING)) { // It shouldn't be possible to persist ABORTING, but if for some reason it does, // play it safe and restore the job as STOPPED. An admin will have to clean it up, // but it won't be running, and won't delete itself either. Safest option. @@ -175,9 +177,9 @@ protected void onAbort() { // to restore as STOPEPD initialState = IndexerState.STOPPED; } else { - initialState = status.getState(); + initialState = existingState; } - initialPosition = status.getPosition(); + initialPosition = state.getPosition(); } this.indexer = new ClientRollupPageManager(job, initialState, initialPosition, new ParentTaskAssigningClient(client, new TaskId(getPersistentTaskId()))); @@ -227,20 +229,20 @@ public synchronized void start(ActionListener lis + " state was [" + newState + "]")); return; } - final RollupJobStatus status = new RollupJobStatus(IndexerState.STARTED, indexer.getPosition()); - logger.debug("Updating status for rollup job [" + job.getConfig().getId() + "] to [" + status.getState() + "][" + - status.getPosition() + "]"); - updatePersistentStatus(status, + final RollupJobStatus state = new RollupJobStatus(IndexerState.STARTED, indexer.getPosition()); + logger.debug("Updating state for rollup job [" + job.getConfig().getId() + "] to [" + state.getIndexerState() + "][" + + state.getPosition() + "]"); + updatePersistentTaskState(state, ActionListener.wrap( (task) -> { - logger.debug("Succesfully updated status for rollup job [" + job.getConfig().getId() + "] to [" - + status.getState() + "][" + status.getPosition() + "]"); + logger.debug("Succesfully updated state for rollup job [" + job.getConfig().getId() + "] to [" + + state.getIndexerState() + "][" + state.getPosition() + "]"); listener.onResponse(new StartRollupJobAction.Response(true)); }, (exc) -> { listener.onFailure( - new ElasticsearchException("Error while updating status for rollup job [" + job.getConfig().getId() - + "] to [" + status.getState() + "].", exc) + new ElasticsearchException("Error while updating state for rollup job [" + job.getConfig().getId() + + "] to [" + state.getIndexerState() + "].", exc) ); } ) @@ -268,17 +270,17 @@ public synchronized void stop(ActionListener liste case STOPPING: // update the persistent state only if there is no background job running, // otherwise the state is updated by the indexer when the background job detects the STOPPING state. - RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, indexer.getPosition()); - updatePersistentStatus(status, + RollupJobStatus state = new RollupJobStatus(IndexerState.STOPPED, indexer.getPosition()); + updatePersistentTaskState(state, ActionListener.wrap( (task) -> { - logger.debug("Succesfully updated status for rollup job [" + job.getConfig().getId() - + "] to [" + status.getState() + "]"); + logger.debug("Succesfully updated state for rollup job [" + job.getConfig().getId() + + "] to [" + state.getIndexerState() + "]"); listener.onResponse(new StopRollupJobAction.Response(true)); }, (exc) -> { - listener.onFailure(new ElasticsearchException("Error while updating status for rollup job [" - + job.getConfig().getId() + "] to [" + status.getState() + "].", exc)); + listener.onFailure(new ElasticsearchException("Error while updating state for rollup job [" + + job.getConfig().getId() + "] to [" + state.getIndexerState() + "].", exc)); }) ); break; diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupIT.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupIT.java index ce8bf936d9768..a33fbb042d8c6 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupIT.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupIT.java @@ -104,7 +104,7 @@ protected Settings transportClientSettings() { } @Before - public void createIndex() throws Exception { + public void createIndex() { client().admin().indices().prepareCreate("test-1").addMapping("doc", "{\"doc\": {\"properties\": {" + "\"date_histo\": {\"type\": \"date\"}, " + "\"histo\": {\"type\": \"integer\"}, " + @@ -125,7 +125,7 @@ public void createIndex() throws Exception { } } } - BulkResponse response = bulk.get(); + bulk.get(); client().admin().indices().prepareRefresh("test-1").get(); } @@ -195,27 +195,23 @@ public void testIndexPattern() throws Exception { // Make sure it started ESTestCase.assertBusy(() -> { - ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get(); - - RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "testIndexPattern"); - if (rollupJobStatus == null) {; + RollupJobStatus rollupJobStatus = getRollupJobStatus("testIndexPattern"); + if (rollupJobStatus == null) { fail("null"); } - IndexerState state = rollupJobStatus.getState(); + IndexerState state = rollupJobStatus.getIndexerState(); assertTrue(state.equals(IndexerState.STARTED) || state.equals(IndexerState.INDEXING)); }, 60, TimeUnit.SECONDS); // And wait for it to finish ESTestCase.assertBusy(() -> { - ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get(); - - RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "testIndexPattern"); + RollupJobStatus rollupJobStatus = getRollupJobStatus("testIndexPattern"); if (rollupJobStatus == null) { fail("null"); } - IndexerState state = rollupJobStatus.getState(); + IndexerState state = rollupJobStatus.getIndexerState(); assertTrue(state.equals(IndexerState.STARTED) && rollupJobStatus.getPosition() != null); }, 60, TimeUnit.SECONDS); @@ -274,23 +270,20 @@ public void testTwoJobsStartStopDeleteOne() throws Exception { // Make sure it started ESTestCase.assertBusy(() -> { - ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get(); - - RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "job1"); + RollupJobStatus rollupJobStatus = getRollupJobStatus("job1"); if (rollupJobStatus == null) { fail("null"); } - IndexerState state = rollupJobStatus.getState(); + IndexerState state = rollupJobStatus.getIndexerState(); assertTrue(state.equals(IndexerState.STARTED) || state.equals(IndexerState.INDEXING)); }, 60, TimeUnit.SECONDS); //but not the other task ESTestCase.assertBusy(() -> { - ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get(); - RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "job2"); + RollupJobStatus rollupJobStatus = getRollupJobStatus("job2"); - IndexerState state = rollupJobStatus.getState(); + IndexerState state = rollupJobStatus.getIndexerState(); assertTrue(state.equals(IndexerState.STOPPED)); }, 60, TimeUnit.SECONDS); @@ -301,9 +294,7 @@ public void testTwoJobsStartStopDeleteOne() throws Exception { // Make sure the first job's task is gone ESTestCase.assertBusy(() -> { - ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get(); - - RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "job1"); + RollupJobStatus rollupJobStatus = getRollupJobStatus("job1"); assertTrue(rollupJobStatus == null); }, 60, TimeUnit.SECONDS); @@ -320,10 +311,9 @@ public void testTwoJobsStartStopDeleteOne() throws Exception { // and still STOPPED ESTestCase.assertBusy(() -> { - ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get(); - RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "job2"); + RollupJobStatus rollupJobStatus = getRollupJobStatus("job2"); - IndexerState state = rollupJobStatus.getState(); + IndexerState state = rollupJobStatus.getIndexerState(); assertTrue(state.equals(IndexerState.STOPPED)); }, 60, TimeUnit.SECONDS); } @@ -404,19 +394,17 @@ public void testBig() throws Exception { Assert.assertThat(response.isStarted(), equalTo(true)); ESTestCase.assertBusy(() -> { - ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get(); - RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, taskId); + RollupJobStatus rollupJobStatus = getRollupJobStatus(taskId); if (rollupJobStatus == null) { fail("null"); } - IndexerState state = rollupJobStatus.getState(); + IndexerState state = rollupJobStatus.getIndexerState(); logger.error("state: [" + state + "]"); assertTrue(state.equals(IndexerState.STARTED) && rollupJobStatus.getPosition() != null); }, 60, TimeUnit.SECONDS); - ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get(); - RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, taskId); + RollupJobStatus rollupJobStatus = getRollupJobStatus(taskId); if (rollupJobStatus == null) { Assert.fail("rollup job status should not be null"); } @@ -481,11 +469,13 @@ private void verifyAgg(InternalDateHistogram verify, InternalDateHistogram rollu } } - private RollupJobStatus getRollupJobStatus(ListTasksResponse tasksResponse, String taskId) { - for (TaskInfo task : tasksResponse.getTasks()) { - if (task.getDescription().equals("rollup_" + taskId)) { - return ((RollupJobStatus) task.getStatus()); - } + private RollupJobStatus getRollupJobStatus(final String taskId) { + final GetRollupJobsAction.Request request = new GetRollupJobsAction.Request(taskId); + final GetRollupJobsAction.Response response = client().execute(GetRollupJobsAction.INSTANCE, request).actionGet(); + + if (response.getJobs() != null && response.getJobs().isEmpty() == false) { + assertThat("Expect 1 rollup job with id " + taskId, response.getJobs().size(), equalTo(1)); + return response.getJobs().iterator().next().getStatus(); } return null; } @@ -498,13 +488,13 @@ public void cleanup() throws ExecutionException, InterruptedException { for (GetRollupJobsAction.JobWrapper job : response.getJobs()) { StopRollupJobAction.Request stopRequest = new StopRollupJobAction.Request(job.getJob().getId()); try { - StopRollupJobAction.Response stopResponse = client().execute(StopRollupJobAction.INSTANCE, stopRequest).get(); + client().execute(StopRollupJobAction.INSTANCE, stopRequest).get(); } catch (ElasticsearchException e) { // } DeleteRollupJobAction.Request deleteRequest = new DeleteRollupJobAction.Request(job.getJob().getId()); - DeleteRollupJobAction.Response deleteResponse = client().execute(DeleteRollupJobAction.INSTANCE, deleteRequest).get(); + client().execute(DeleteRollupJobAction.INSTANCE, deleteRequest).get(); } } } diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java index d12be5e6fc196..ffcae267340f8 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; @@ -64,7 +65,7 @@ public void testInitialStatusStopped() { SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); } @@ -77,7 +78,7 @@ public void testInitialStatusAborting() { SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); } @@ -90,7 +91,7 @@ public void testInitialStatusStopping() { SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); } @@ -103,7 +104,7 @@ public void testInitialStatusStarted() { SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); } @@ -116,7 +117,7 @@ public void testInitialStatusIndexing() { SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); } @@ -128,7 +129,7 @@ public void testNoInitialStatus() { SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, null, client, schedulerEngine, pool, Collections.emptyMap()); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertNull(((RollupJobStatus)task.getStatus()).getPosition()); } @@ -140,7 +141,7 @@ public void testStartWhenStarted() throws InterruptedException { SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -172,13 +173,14 @@ public void testStartWhenStopping() throws InterruptedException { RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, null, client, schedulerEngine, pool, Collections.emptyMap()) { @Override - public void updatePersistentStatus(Status status, ActionListener> listener) { - assertThat(status, instanceOf(RollupJobStatus.class)); + public void updatePersistentTaskState(PersistentTaskState taskState, + ActionListener> listener) { + assertThat(taskState, instanceOf(RollupJobStatus.class)); int c = counter.get(); if (c == 0) { - assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STARTED)); + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); } else if (c == 1) { - assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); } else { fail("Should not have updated persistent statuse > 2 times"); } @@ -187,7 +189,7 @@ public void updatePersistentStatus(Status status, ActionListener() { @@ -248,14 +250,15 @@ public void testStartWhenStopped() throws InterruptedException { RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()) { @Override - public void updatePersistentStatus(Status status, ActionListener> listener) { - assertThat(status, instanceOf(RollupJobStatus.class)); - assertThat(((RollupJobStatus)status).getState(), equalTo(IndexerState.STARTED)); + public void updatePersistentTaskState(PersistentTaskState taskState, + ActionListener> listener) { + assertThat(taskState, instanceOf(RollupJobStatus.class)); + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); } }; - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -264,7 +267,7 @@ public void updatePersistentStatus(Status status, ActionListener> listener) { - assertThat(status, instanceOf(RollupJobStatus.class)); - assertThat(((RollupJobStatus)status).getState(), equalTo(IndexerState.STARTED)); + public void updatePersistentTaskState(PersistentTaskState taskState, + ActionListener> listener) { + assertThat(taskState, instanceOf(RollupJobStatus.class)); + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); } }; - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -301,7 +305,7 @@ public void updatePersistentStatus(Status status, ActionListener> listener) { - assertThat(status, instanceOf(RollupJobStatus.class)); - assertThat(((RollupJobStatus)status).getState(), equalTo(IndexerState.STARTED)); + public void updatePersistentTaskState(PersistentTaskState taskState, + ActionListener> listener) { + assertThat(taskState, instanceOf(RollupJobStatus.class)); + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); } }; - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertNull(((RollupJobStatus)task.getStatus()).getPosition()); CountDownLatch latch = new CountDownLatch(1); @@ -340,7 +345,7 @@ public void updatePersistentStatus(Status status, ActionListener> listener) { + public void updatePersistentTaskState(PersistentTaskState taskState, + ActionListener> listener) { Integer counterValue = counter.getAndIncrement(); if (counterValue == 0) { - assertThat(status, instanceOf(RollupJobStatus.class)); - assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STARTED)); + assertThat(taskState, instanceOf(RollupJobStatus.class)); + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); } else if (counterValue == 1) { @@ -405,14 +411,14 @@ public void updatePersistentStatus(Status status, ActionListener() { @Override public void onResponse(StartRollupJobAction.Response response) { assertTrue(response.isStarted()); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); started.set(true); } @@ -424,7 +430,7 @@ public void onFailure(Exception e) { ESTestCase.awaitBusy(started::get); task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123)); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.INDEXING)); // Should still be started, not INDEXING + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); assertThat(task.getStats().getNumInvocations(), equalTo(1L)); // Allow search response to return now latch.countDown(); @@ -475,11 +481,12 @@ public void testTriggerWithHeaders() throws InterruptedException { RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, null, client, schedulerEngine, pool, Collections.emptyMap()) { @Override - public void updatePersistentStatus(Status status, ActionListener> listener) { + public void updatePersistentTaskState(PersistentTaskState taskState, + ActionListener> listener) { Integer counterValue = counter.getAndIncrement(); if (counterValue == 0) { - assertThat(status, instanceOf(RollupJobStatus.class)); - assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STARTED)); + assertThat(taskState, instanceOf(RollupJobStatus.class)); + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); } else if (counterValue == 1) { @@ -488,14 +495,14 @@ public void updatePersistentStatus(Status status, ActionListener() { @Override public void onResponse(StartRollupJobAction.Response response) { assertTrue(response.isStarted()); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); started.set(true); } @@ -507,7 +514,7 @@ public void onFailure(Exception e) { ESTestCase.awaitBusy(started::get); task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123)); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.INDEXING)); // Should still be started, not INDEXING + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); assertThat(task.getStats().getNumInvocations(), equalTo(1L)); // Allow search response to return now latch.countDown(); @@ -524,7 +531,7 @@ public void testStopWhenStopped() throws InterruptedException { SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); CountDownLatch latch = new CountDownLatch(1); task.stop(new ActionListener() { @@ -553,15 +560,16 @@ public void testStopWhenStopping() throws InterruptedException { RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, null, client, schedulerEngine, pool, Collections.emptyMap()) { @Override - public void updatePersistentStatus(Status status, ActionListener> listener) { - assertThat(status, instanceOf(RollupJobStatus.class)); + public void updatePersistentTaskState(PersistentTaskState taskState, + ActionListener> listener) { + assertThat(taskState, instanceOf(RollupJobStatus.class)); int c = counter.get(); if (c == 0) { - assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STARTED)); + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); } else if (c == 1) { - assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); } else if (c == 2) { - assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); } else { fail("Should not have updated persistent statuse > 3 times"); } @@ -571,7 +579,7 @@ public void updatePersistentStatus(Status status, ActionListener() { @@ -642,7 +650,7 @@ public void markAsCompleted() { latch.countDown(); } }; - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); task.onCancelled(); task.stop(new ActionListener() { From 8fb4004854be272528b6f47a54abe477dbcea725 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 11 Jun 2018 14:49:00 +0200 Subject: [PATCH 5/6] Change status field to state --- .../persistent/PersistentTasksCustomMetaData.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java index df7c808a9758c..c232eebbc26eb 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java @@ -93,7 +93,7 @@ public PersistentTasksCustomMetaData(long lastAllocationId, Map p.namedObject(PersistentTaskParams.class, c, null), new ParseField("params")); parser.declareObject(TaskDescriptionBuilder::setState, - (p, c) -> p.namedObject(PersistentTaskState.class, c, null), new ParseField("status")); + (p, c) -> p.namedObject(PersistentTaskState.class, c, null), new ParseField("state", "status")); TASK_DESCRIPTION_PARSER = (XContentParser p, Void c, String name) -> parser.parse(p, new TaskDescriptionBuilder<>(name), name); // Assignment parser @@ -407,8 +407,7 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params xPa builder.field("params", params, xParams); } if (state != null) { - // The field is names "status" instead of "state" for bwc reason - builder.field("status", state, xParams); + builder.field("state", state, xParams); } } builder.endObject(); From 5b7bc08f1d88b0c62ee124cedd2f46169ece0b96 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 14 Jun 2018 15:45:18 +0200 Subject: [PATCH 6/6] Fix unused imports --- .../persistent/PersistentTasksClusterService.java | 1 - .../persistent/PersistentTasksCustomMetaData.java | 1 - .../org/elasticsearch/persistent/PersistentTasksExecutor.java | 1 - .../persistent/PersistentTasksClusterServiceTests.java | 1 - .../persistent/PersistentTasksCustomMetaDataTests.java | 1 - .../elasticsearch/persistent/TestPersistentTasksPlugin.java | 1 - .../persistent/UpdatePersistentTaskRequestTests.java | 1 - .../elasticsearch/xpack/ml/action/TransportOpenJobAction.java | 1 - .../xpack/ml/action/TransportStartDatafeedAction.java | 1 - .../src/test/java/org/elasticsearch/xpack/rollup/RollupIT.java | 3 --- 10 files changed, 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index c030d3658b373..9ed0af010b530 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -35,7 +35,6 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.decider.AssignmentDecision; import org.elasticsearch.persistent.decider.EnableAssignmentDecider; -import org.elasticsearch.tasks.Task; import java.util.Objects; diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java index c232eebbc26eb..f81b7c770e56c 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java @@ -38,7 +38,6 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.tasks.Task.Status; import java.io.IOException; import java.util.Collection; diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java index d72a3aa466790..758ffbe69a04d 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java @@ -26,7 +26,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; -import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import java.util.Map; diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index 8da18625f9905..f13a35613d530 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -37,7 +37,6 @@ import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.persistent.decider.EnableAssignmentDecider; -import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.TestThreadPool; diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java index 67cd341748da9..5b1f74d6cdfa5 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java @@ -45,7 +45,6 @@ import org.elasticsearch.persistent.TestPersistentTasksPlugin.State; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; -import org.elasticsearch.tasks.Task; import org.elasticsearch.test.AbstractDiffableSerializationTestCase; import java.io.IOException; diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index 462894e9ac831..063a861b5c315 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -55,7 +55,6 @@ import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; diff --git a/server/src/test/java/org/elasticsearch/persistent/UpdatePersistentTaskRequestTests.java b/server/src/test/java/org/elasticsearch/persistent/UpdatePersistentTaskRequestTests.java index e23cb77ff2033..5ae54640f8e31 100644 --- a/server/src/test/java/org/elasticsearch/persistent/UpdatePersistentTaskRequestTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/UpdatePersistentTaskRequestTests.java @@ -20,7 +20,6 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.tasks.Task; import org.elasticsearch.test.AbstractStreamableTestCase; import org.elasticsearch.persistent.TestPersistentTasksPlugin.State; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index ceb841731d123..e7fb0fe5fb315 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -45,7 +45,6 @@ import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 5105625296e58..b13ed6d698451 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -25,7 +25,6 @@ import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupIT.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupIT.java index a33fbb042d8c6..3f930cb42981d 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupIT.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupIT.java @@ -7,10 +7,8 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; @@ -27,7 +25,6 @@ import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.Netty4Plugin;