Skip to content

Commit db5e3bd

Browse files
committed
Radix-sorted, lookup-free, typesafe, nonblocking, priority-boosting pending tasks
Today the master's pending task queue is just the `PriorityBlockingQueue<Runnable>` belonging to the underlying `ThreadPoolExecutor`. The reasons for this date back a long way but it doesn't really reflect the structure of the queue as it exists today. In particular, we must keep track of batches independently of the queue itself, and must do various bits of unchecked casting to process multiple items of the same type at once. This commit introduces an new queueing mechanism, independent of the executor's queue, which better represents the conceptual structure of the master's pending tasks: * Today we use a priority queue to allow important tasks to preempt less-important ones. However there are only a small number of priority levels, so it is simpler to maintain a queue for each priority, effectively replacing the sorting within the priority queue with a radix sort. * Today when a task is submitted we perform a map lookup to see if it can be added to an existing batch or not. With this change we allow client code to create its own dedicated queue of tasks. The entries in the per-priority-level queues are themselves queues, one for each executor, representing the batches to be run. * Today each task in the queue holds a reference to its executor, but the executor used to run a task may belong to a different task in the same batch. In practice we know they're the same executor (that's how batches are defined) but we cannot express this knowledge in the type system so we have to do a bunch of unchecked casting to work around it. With this change we associate each per-executor queue directly with its executor, avoiding the need to do all this unchecked casting. * Today the master service must block its thread while waiting for each task to complete, because otherwise the executor would start to process the next task in the queue. This makes testing using a `DeterministicTaskQueue` harder (see `FakeThreadPoolMasterService`). With this change we avoid submitting a task to the `ThreadPoolExecutor` until the previous task is complete, which means we can make the implementation avoid blocking while a task is running and therefore run the whole production implementation in deterministic tests[^1]. * Today it's possible for a steady drip of high-priority tasks to starve low-priority tasks of access to the master for an extended period of time. With this change we separate the queue of tasks from the queue which determines the execution order. This allows us to implement more intelligent execution policies. For instance, if we detect that the queue has not processed any low-priority tasks for too long then we can make the decision to boost their priorities[^2]. [^1]: Not done yet but this is a step in the right direction. [^2]: Not done yet but this is a step in the right direction.
1 parent fd3e512 commit db5e3bd

28 files changed

+982
-1130
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportDeleteDesiredNodesAction.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@
1919
import org.elasticsearch.cluster.metadata.DesiredNodesMetadata;
2020
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2121
import org.elasticsearch.cluster.service.ClusterService;
22+
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
2223
import org.elasticsearch.common.Priority;
2324
import org.elasticsearch.common.inject.Inject;
2425
import org.elasticsearch.tasks.Task;
2526
import org.elasticsearch.threadpool.ThreadPool;
2627
import org.elasticsearch.transport.TransportService;
2728

2829
public class TransportDeleteDesiredNodesAction extends TransportMasterNodeAction<DeleteDesiredNodesAction.Request, ActionResponse.Empty> {
29-
private final DesiredNodesClusterStateTaskExecutor taskExecutor;
30+
private final MasterServiceTaskQueue<ClusterStateUpdateTask> taskQueue;
3031

3132
@Inject
3233
public TransportDeleteDesiredNodesAction(
@@ -47,7 +48,7 @@ public TransportDeleteDesiredNodesAction(
4748
in -> ActionResponse.Empty.INSTANCE,
4849
ThreadPool.Names.SAME
4950
);
50-
this.taskExecutor = new DesiredNodesClusterStateTaskExecutor();
51+
this.taskQueue = clusterService.getTaskQueue("delete-desired-nodes", Priority.HIGH, new DesiredNodesClusterStateTaskExecutor());
5152
}
5253

5354
@Override
@@ -57,7 +58,7 @@ protected void masterOperation(
5758
ClusterState state,
5859
ActionListener<ActionResponse.Empty> listener
5960
) throws Exception {
60-
final var clusterStateUpdateTask = new ClusterStateUpdateTask(Priority.HIGH) {
61+
taskQueue.submitTask("delete-desired-nodes", new ClusterStateUpdateTask(Priority.HIGH) {
6162
@Override
6263
public ClusterState execute(ClusterState currentState) {
6364
return currentState.copyAndUpdateMetadata(metadata -> metadata.removeCustom(DesiredNodesMetadata.TYPE));
@@ -72,8 +73,7 @@ public void onFailure(Exception e) {
7273
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
7374
listener.onResponse(ActionResponse.Empty.INSTANCE);
7475
}
75-
};
76-
clusterService.submitStateUpdateTask("delete-desired-nodes", clusterStateUpdateTask, clusterStateUpdateTask, taskExecutor);
76+
}, null);
7777
}
7878

7979
@Override

server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.action.support.ActionFilters;
1313
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
1414
import org.elasticsearch.cluster.ClusterState;
15-
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
1615
import org.elasticsearch.cluster.ClusterStateUpdateTask;
1716
import org.elasticsearch.cluster.block.ClusterBlockException;
1817
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -22,6 +21,7 @@
2221
import org.elasticsearch.cluster.metadata.DesiredNodesMetadata;
2322
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2423
import org.elasticsearch.cluster.service.ClusterService;
24+
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
2525
import org.elasticsearch.common.Priority;
2626
import org.elasticsearch.common.inject.Inject;
2727
import org.elasticsearch.tasks.Task;
@@ -34,7 +34,7 @@
3434

3535
public class TransportUpdateDesiredNodesAction extends TransportMasterNodeAction<UpdateDesiredNodesRequest, UpdateDesiredNodesResponse> {
3636
private final DesiredNodesSettingsValidator settingsValidator;
37-
private final ClusterStateTaskExecutor<ClusterStateUpdateTask> taskExecutor;
37+
private final MasterServiceTaskQueue<ClusterStateUpdateTask> taskQueue;
3838

3939
@Inject
4040
public TransportUpdateDesiredNodesAction(
@@ -58,7 +58,7 @@ public TransportUpdateDesiredNodesAction(
5858
ThreadPool.Names.SAME
5959
);
6060
this.settingsValidator = settingsValidator;
61-
this.taskExecutor = new DesiredNodesClusterStateTaskExecutor();
61+
this.taskQueue = clusterService.getTaskQueue("delete-desired-nodes", Priority.URGENT, new DesiredNodesClusterStateTaskExecutor());
6262
}
6363

6464
@Override
@@ -77,7 +77,7 @@ protected void masterOperation(
7777
DesiredNodes proposedDesiredNodes = new DesiredNodes(request.getHistoryID(), request.getVersion(), request.getNodes());
7878
settingsValidator.validate(proposedDesiredNodes);
7979

80-
final var clusterStateUpdateTask = new ClusterStateUpdateTask(Priority.URGENT, request.masterNodeTimeout()) {
80+
taskQueue.submitTask("update-desired-nodes", new ClusterStateUpdateTask(Priority.URGENT, request.masterNodeTimeout()) {
8181
volatile boolean replacedExistingHistoryId = false;
8282

8383
@Override
@@ -99,8 +99,7 @@ public void onFailure(Exception e) {
9999
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
100100
listener.onResponse(new UpdateDesiredNodesResponse(replacedExistingHistoryId));
101101
}
102-
};
103-
clusterService.submitStateUpdateTask("update-desired-nodes", clusterStateUpdateTask, clusterStateUpdateTask, taskExecutor);
102+
}, request.masterNodeTimeout());
104103
} catch (Exception e) {
105104
listener.onFailure(e);
106105
}

server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
2121
import org.elasticsearch.cluster.ClusterState;
2222
import org.elasticsearch.cluster.ClusterStateAckListener;
23-
import org.elasticsearch.cluster.ClusterStateTaskConfig;
2423
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
2524
import org.elasticsearch.cluster.ClusterStateTaskListener;
2625
import org.elasticsearch.cluster.block.ClusterBlockException;
@@ -35,6 +34,7 @@
3534
import org.elasticsearch.cluster.node.DiscoveryNode;
3635
import org.elasticsearch.cluster.routing.allocation.AllocationService;
3736
import org.elasticsearch.cluster.service.ClusterService;
37+
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
3838
import org.elasticsearch.common.Priority;
3939
import org.elasticsearch.common.inject.Inject;
4040
import org.elasticsearch.common.settings.Settings;
@@ -76,7 +76,7 @@ public static final class TransportAction extends TransportMasterNodeAction<Crea
7676
private final AutoCreateIndex autoCreateIndex;
7777
private final SystemIndices systemIndices;
7878

79-
private final ClusterStateTaskExecutor<CreateIndexTask> executor;
79+
private final MasterServiceTaskQueue<CreateIndexTask> taskQueue;
8080

8181
@Inject
8282
public TransportAction(
@@ -107,7 +107,7 @@ public TransportAction(
107107
this.createIndexService = createIndexService;
108108
this.metadataCreateDataStreamService = metadataCreateDataStreamService;
109109
this.autoCreateIndex = autoCreateIndex;
110-
executor = (currentState, taskContexts) -> {
110+
this.taskQueue = clusterService.getTaskQueue("auto-create", Priority.URGENT, (currentState, taskContexts) -> {
111111
ClusterState state = currentState;
112112
final Map<CreateIndexRequest, String> successfulRequests = Maps.newMapWithExpectedSize(taskContexts.size());
113113
for (final var taskContext : taskContexts) {
@@ -123,7 +123,7 @@ public TransportAction(
123123
state = allocationService.reroute(state, "auto-create");
124124
}
125125
return state;
126-
};
126+
});
127127
}
128128

129129
@Override
@@ -133,11 +133,10 @@ protected void masterOperation(
133133
ClusterState state,
134134
ActionListener<CreateIndexResponse> listener
135135
) {
136-
clusterService.submitStateUpdateTask(
136+
taskQueue.submitTask(
137137
"auto create [" + request.index() + "]",
138138
new CreateIndexTask(request, listener),
139-
ClusterStateTaskConfig.build(Priority.URGENT, request.masterNodeTimeout()),
140-
executor
139+
request.masterNodeTimeout()
141140
);
142141
}
143142

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
2323
import org.elasticsearch.client.internal.Client;
2424
import org.elasticsearch.cluster.ClusterState;
25-
import org.elasticsearch.cluster.ClusterStateTaskConfig;
2625
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
2726
import org.elasticsearch.cluster.ClusterStateTaskListener;
2827
import org.elasticsearch.cluster.block.ClusterBlockException;
@@ -32,6 +31,7 @@
3231
import org.elasticsearch.cluster.metadata.Metadata;
3332
import org.elasticsearch.cluster.routing.allocation.AllocationService;
3433
import org.elasticsearch.cluster.service.ClusterService;
34+
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
3535
import org.elasticsearch.common.Priority;
3636
import org.elasticsearch.common.Strings;
3737
import org.elasticsearch.common.inject.Inject;
@@ -61,7 +61,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
6161
private static final Logger logger = LogManager.getLogger(TransportRolloverAction.class);
6262

6363
private final Client client;
64-
private final RolloverExecutor rolloverTaskExecutor;
64+
private final MasterServiceTaskQueue<RolloverTask> rolloverTaskQueue;
6565

6666
@Inject
6767
public TransportRolloverAction(
@@ -86,10 +86,10 @@ public TransportRolloverAction(
8686
ThreadPool.Names.SAME
8787
);
8888
this.client = client;
89-
this.rolloverTaskExecutor = new RolloverExecutor(
90-
allocationService,
91-
rolloverService,
92-
new ActiveShardsObserver(clusterService, threadPool)
89+
this.rolloverTaskQueue = clusterService.getTaskQueue(
90+
"rollover",
91+
Priority.NORMAL,
92+
new RolloverExecutor(allocationService, rolloverService, new ActiveShardsObserver(clusterService, threadPool))
9393
);
9494
}
9595

@@ -181,8 +181,7 @@ protected void masterOperation(
181181
if (trialConditionResults.size() == 0 || trialMetConditions.size() > 0) {
182182
String source = "rollover_index source [" + trialRolloverIndexName + "] to target [" + trialRolloverIndexName + "]";
183183
RolloverTask rolloverTask = new RolloverTask(rolloverRequest, statsResponse, trialRolloverResponse, listener);
184-
ClusterStateTaskConfig config = ClusterStateTaskConfig.build(Priority.NORMAL, rolloverRequest.masterNodeTimeout());
185-
clusterService.submitStateUpdateTask(source, rolloverTask, config, rolloverTaskExecutor);
184+
rolloverTaskQueue.submitTask(source, rolloverTask, rolloverRequest.masterNodeTimeout());
186185
} else {
187186
// conditions not met
188187
listener.onResponse(trialRolloverResponse);

server/src/main/java/org/elasticsearch/cluster/ClusterState.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@
6767
* across master elections (and therefore is preserved in a rolling restart).
6868
* <p>
6969
* Updates are triggered by submitting tasks to the {@link MasterService} on the elected master, typically using a {@link
70-
* TransportMasterNodeAction} to route a request to the master on which the task is submitted with {@link
71-
* ClusterService#submitStateUpdateTask}. Submitted tasks have an associated {@link ClusterStateTaskConfig} which defines a priority and a
70+
* TransportMasterNodeAction} to route a request to the master on which the task is submitted via a queue obtained with {@link
71+
* ClusterService#getTaskQueue}, which has an associated priority. Submitted tasks have an associated
7272
* timeout. Tasks are processed in priority order, so a flood of higher-priority tasks can starve lower-priority ones from running.
7373
* Therefore, avoid priorities other than {@link Priority#NORMAL} where possible. Tasks associated with client actions should typically have
7474
* a timeout, or otherwise be sensitive to client cancellations, to avoid surprises caused by the execution of stale tasks long after they

server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskConfig.java

Lines changed: 0 additions & 60 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
/**
1616
* A task that can update the cluster state.
1717
*/
18-
public abstract class ClusterStateUpdateTask implements ClusterStateTaskConfig, ClusterStateTaskListener {
18+
public abstract class ClusterStateUpdateTask implements ClusterStateTaskListener {
1919

2020
private final Priority priority;
2121

@@ -64,7 +64,6 @@ public final TimeValue timeout() {
6464
return timeout;
6565
}
6666

67-
@Override
6867
public final Priority priority() {
6968
return priority;
7069
}

server/src/main/java/org/elasticsearch/cluster/LocalMasterServiceTask.java

Lines changed: 30 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -34,45 +34,40 @@ public final void clusterStateProcessed(ClusterState oldState, ClusterState newS
3434
protected void onPublicationComplete() {}
3535

3636
public void submit(MasterService masterService, String source) {
37-
masterService.submitStateUpdateTask(
38-
source,
39-
this,
40-
ClusterStateTaskConfig.build(priority),
41-
// Uses a new executor each time so that these tasks are not batched, but they never change the cluster state anyway so they
42-
// don't trigger the publication process and hence batching isn't really needed.
43-
new ClusterStateTaskExecutor<>() {
37+
// Uses a new queue each time so that these tasks are not batched, but they never change the cluster state anyway so they
38+
// don't trigger the publication process and hence batching isn't really needed.
39+
masterService.getTaskQueue("local-master-service-task", priority, new ClusterStateTaskExecutor<LocalMasterServiceTask>() {
4440

45-
@Override
46-
public boolean runOnlyOnMaster() {
47-
return false;
48-
}
41+
@Override
42+
public boolean runOnlyOnMaster() {
43+
return false;
44+
}
4945

50-
@Override
51-
public String describeTasks(List<LocalMasterServiceTask> tasks) {
52-
return ""; // only one task in the batch so the source is enough
53-
}
46+
@Override
47+
public String describeTasks(List<LocalMasterServiceTask> tasks) {
48+
return ""; // only one task in the batch so the source is enough
49+
}
5450

55-
@Override
56-
public ClusterState execute(ClusterState currentState, List<TaskContext<LocalMasterServiceTask>> taskContexts)
57-
throws Exception {
58-
final LocalMasterServiceTask thisTask = LocalMasterServiceTask.this;
59-
assert taskContexts.size() == 1 && taskContexts.get(0).getTask() == thisTask
60-
: "expected one-element task list containing current object but was " + taskContexts;
61-
thisTask.execute(currentState);
62-
taskContexts.get(0).success(new ActionListener<>() {
63-
@Override
64-
public void onResponse(ClusterState clusterState) {
65-
onPublicationComplete();
66-
}
51+
@Override
52+
public ClusterState execute(ClusterState currentState, List<TaskContext<LocalMasterServiceTask>> taskContexts)
53+
throws Exception {
54+
final LocalMasterServiceTask thisTask = LocalMasterServiceTask.this;
55+
assert taskContexts.size() == 1 && taskContexts.get(0).getTask() == thisTask
56+
: "expected one-element task list containing current object but was " + taskContexts;
57+
thisTask.execute(currentState);
58+
taskContexts.get(0).success(new ActionListener<>() {
59+
@Override
60+
public void onResponse(ClusterState clusterState) {
61+
onPublicationComplete();
62+
}
6763

68-
@Override
69-
public void onFailure(Exception e) {
70-
LocalMasterServiceTask.this.onFailure(e);
71-
}
72-
});
73-
return currentState;
74-
}
64+
@Override
65+
public void onFailure(Exception e) {
66+
LocalMasterServiceTask.this.onFailure(e);
67+
}
68+
});
69+
return currentState;
7570
}
76-
);
71+
}).submitTask(source, this, null);
7772
}
7873
}

0 commit comments

Comments
 (0)