Skip to content

Commit 6180680

Browse files
committed
Add persistent tasks
Persistent tasks are build on top of node tasks and provide functionality to restart a task to run on a different coordination node in case the coordinating node is no longer available. It is up to a persistent task implementation to keep track of status, so that in case the task is restarted, the task can continue were it left off before it was restarted.
2 parents 9500513 + 592eedb commit 6180680

30 files changed

+5317
-1
lines changed

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,9 +312,12 @@
312312
import org.elasticsearch.rest.action.search.RestMultiSearchAction;
313313
import org.elasticsearch.rest.action.search.RestSearchAction;
314314
import org.elasticsearch.rest.action.search.RestSearchScrollAction;
315-
import org.elasticsearch.tasks.TaskManager;
316315
import org.elasticsearch.threadpool.ThreadPool;
317316
import org.elasticsearch.usage.UsageService;
317+
import org.elasticsearch.persistent.CompletionPersistentTaskAction;
318+
import org.elasticsearch.persistent.RemovePersistentTaskAction;
319+
import org.elasticsearch.persistent.StartPersistentTaskAction;
320+
import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction;
318321

319322
import java.util.ArrayList;
320323
import java.util.Collections;
@@ -507,6 +510,12 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
507510

508511
actionPlugins.stream().flatMap(p -> p.getActions().stream()).forEach(actions::register);
509512

513+
// Persistent tasks:
514+
actions.register(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class);
515+
actions.register(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class);
516+
actions.register(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class);
517+
actions.register(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class);
518+
510519
return unmodifiableMap(actions.getRegistry());
511520
}
512521

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
import org.elasticsearch.plugins.MapperPlugin;
118118
import org.elasticsearch.plugins.MetaDataUpgrader;
119119
import org.elasticsearch.plugins.NetworkPlugin;
120+
import org.elasticsearch.plugins.PersistentTaskPlugin;
120121
import org.elasticsearch.plugins.Plugin;
121122
import org.elasticsearch.plugins.PluginsService;
122123
import org.elasticsearch.plugins.RepositoryPlugin;
@@ -139,6 +140,10 @@
139140
import org.elasticsearch.transport.TransportService;
140141
import org.elasticsearch.usage.UsageService;
141142
import org.elasticsearch.watcher.ResourceWatcherService;
143+
import org.elasticsearch.persistent.PersistentTasksClusterService;
144+
import org.elasticsearch.persistent.PersistentTasksExecutor;
145+
import org.elasticsearch.persistent.PersistentTasksExecutorRegistry;
146+
import org.elasticsearch.persistent.PersistentTasksService;
142147

143148
import java.io.BufferedWriter;
144149
import java.io.Closeable;
@@ -461,6 +466,17 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
461466
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
462467
responseCollectorService);
463468

469+
final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
470+
.filterPlugins(PersistentTaskPlugin.class).stream()
471+
.map(p -> p.getPersistentTasksExecutor(clusterService))
472+
.flatMap(List::stream)
473+
.collect(toList());
474+
475+
final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(settings, tasksExecutors);
476+
final PersistentTasksClusterService persistentTasksClusterService =
477+
new PersistentTasksClusterService(settings, registry, clusterService);
478+
final PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, client);
479+
464480
modules.add(b -> {
465481
b.bind(Node.class).toInstance(this);
466482
b.bind(NodeService.class).toInstance(nodeService);
@@ -504,6 +520,9 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
504520
}
505521
httpBind.accept(b);
506522
pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
523+
b.bind(PersistentTasksService.class).toInstance(persistentTasksService);
524+
b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService);
525+
b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);
507526
}
508527
);
509528
injector = modules.createInjector();
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.persistent;
20+
21+
import org.apache.logging.log4j.Logger;
22+
import org.apache.logging.log4j.message.ParameterizedMessage;
23+
import org.apache.logging.log4j.util.Supplier;
24+
import org.elasticsearch.action.ActionListener;
25+
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
26+
import org.elasticsearch.common.Nullable;
27+
import org.elasticsearch.common.unit.TimeValue;
28+
import org.elasticsearch.tasks.CancellableTask;
29+
import org.elasticsearch.tasks.Task;
30+
import org.elasticsearch.tasks.TaskCancelledException;
31+
import org.elasticsearch.tasks.TaskId;
32+
import org.elasticsearch.tasks.TaskManager;
33+
34+
import java.util.Map;
35+
import java.util.concurrent.atomic.AtomicReference;
36+
import java.util.function.Predicate;
37+
38+
/**
39+
* Represents a executor node operation that corresponds to a persistent task
40+
*/
41+
public class AllocatedPersistentTask extends CancellableTask {
42+
private volatile String persistentTaskId;
43+
private volatile long allocationId;
44+
45+
private final AtomicReference<State> state;
46+
@Nullable
47+
private volatile Exception failure;
48+
49+
private volatile PersistentTasksService persistentTasksService;
50+
private volatile Logger logger;
51+
private volatile TaskManager taskManager;
52+
53+
54+
public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask,
55+
Map<String, String> headers) {
56+
super(id, type, action, description, parentTask, headers);
57+
this.state = new AtomicReference<>(State.STARTED);
58+
}
59+
60+
@Override
61+
public boolean shouldCancelChildrenOnCancellation() {
62+
return true;
63+
}
64+
65+
// In case of persistent tasks we always need to return: `false`
66+
// because in case of persistent task the parent task isn't a task in the task manager, but in cluster state.
67+
// This instructs the task manager not to try to kill this persistent task when the task manager cannot find
68+
// a fake parent node id "cluster" in the cluster state
69+
@Override
70+
public final boolean cancelOnParentLeaving() {
71+
return false;
72+
}
73+
74+
@Override
75+
public Status getStatus() {
76+
return new PersistentTasksNodeService.Status(state.get());
77+
}
78+
79+
/**
80+
* Updates the persistent state for the corresponding persistent task.
81+
* <p>
82+
* This doesn't affect the status of this allocated task.
83+
*/
84+
public void updatePersistentStatus(Task.Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
85+
persistentTasksService.updateStatus(persistentTaskId, allocationId, status, listener);
86+
}
87+
88+
public String getPersistentTaskId() {
89+
return persistentTaskId;
90+
}
91+
92+
void init(PersistentTasksService persistentTasksService, TaskManager taskManager, Logger logger, String persistentTaskId, long
93+
allocationId) {
94+
this.persistentTasksService = persistentTasksService;
95+
this.logger = logger;
96+
this.taskManager = taskManager;
97+
this.persistentTaskId = persistentTaskId;
98+
this.allocationId = allocationId;
99+
}
100+
101+
public Exception getFailure() {
102+
return failure;
103+
}
104+
105+
boolean markAsCancelled() {
106+
return state.compareAndSet(AllocatedPersistentTask.State.STARTED, AllocatedPersistentTask.State.PENDING_CANCEL);
107+
}
108+
109+
public State getState() {
110+
return state.get();
111+
}
112+
113+
public long getAllocationId() {
114+
return allocationId;
115+
}
116+
117+
public enum State {
118+
STARTED, // the task is currently running
119+
PENDING_CANCEL, // the task is cancelled on master, cancelling it locally
120+
COMPLETED // the task is done running and trying to notify caller
121+
}
122+
123+
/**
124+
* Waits for this persistent task to have the desired state.
125+
*/
126+
public void waitForPersistentTaskStatus(Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> predicate,
127+
@Nullable TimeValue timeout,
128+
PersistentTasksService.WaitForPersistentTaskStatusListener<?> listener) {
129+
persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, predicate, timeout, listener);
130+
}
131+
132+
public void markAsCompleted() {
133+
completeAndNotifyIfNeeded(null);
134+
}
135+
136+
public void markAsFailed(Exception e) {
137+
if (CancelTasksRequest.DEFAULT_REASON.equals(getReasonCancelled())) {
138+
completeAndNotifyIfNeeded(null);
139+
} else {
140+
completeAndNotifyIfNeeded(e);
141+
}
142+
143+
}
144+
145+
private void completeAndNotifyIfNeeded(@Nullable Exception failure) {
146+
State prevState = state.getAndSet(AllocatedPersistentTask.State.COMPLETED);
147+
if (prevState == State.COMPLETED) {
148+
logger.warn("attempt to complete task [{}] with id [{}] in the [{}] state", getAction(), getPersistentTaskId(), prevState);
149+
} else {
150+
if (failure != null) {
151+
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
152+
"task {} failed with an exception", getPersistentTaskId()), failure);
153+
}
154+
try {
155+
this.failure = failure;
156+
if (prevState == State.STARTED) {
157+
logger.trace("sending notification for completed task [{}] with id [{}]", getAction(), getPersistentTaskId());
158+
persistentTasksService.sendCompletionNotification(getPersistentTaskId(), getAllocationId(), failure, new
159+
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
160+
@Override
161+
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
162+
logger.trace("notification for task [{}] with id [{}] was successful", getAction(),
163+
getPersistentTaskId());
164+
}
165+
166+
@Override
167+
public void onFailure(Exception e) {
168+
logger.warn((Supplier<?>) () ->
169+
new ParameterizedMessage("notification for task [{}] with id [{}] failed",
170+
getAction(), getPersistentTaskId()), e);
171+
}
172+
});
173+
}
174+
} finally {
175+
taskManager.unregister(this);
176+
}
177+
}
178+
}
179+
}

0 commit comments

Comments
 (0)